package ControllerWorker; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; import org.gearman.client.GearmanClient; import org.gearman.client.GearmanClientImpl; import org.gearman.client.GearmanJob; import org.gearman.client.GearmanJobImpl; import org.gearman.client.GearmanJobResult; import org.gearman.client.GearmanJobResultImpl; import org.gearman.client.GearmanJobStatus; import org.gearman.common.GearmanJobServerConnection; import org.gearman.common.GearmanNIOJobServerConnection; import org.gearman.util.ByteUtils; import org.gearman.worker.AbstractGearmanFunction; import org.json.simple.JSONArray; import org.json.simple.JSONValue; import org.apache.log4j.Logger; public class SomePingWorker extends AbstractGearmanFunction { private static final Logger logger = ControllerWorkerMain.getLogger(); @Override public String getName() { return "somePing"; } @Override public GearmanJobResult executeFunction() { final GearmanJobServerConnection connection = new GearmanNIOJobServerConnection( "127.0.0.1", 4730); GearmanClient client = new GearmanClientImpl(); client.addJobServer(connection); String data = ByteUtils.fromUTF8Bytes((byte[]) this.data); JSONArray jsonArray = (JSONArray) JSONValue.parse(data); String res = ""; List jobs = new ArrayList(); int count = 1; for (Object ipObj : jsonArray) { String ip = ipObj.toString(); logger.info("Ping " + ip); GearmanJob job = GearmanJobImpl.createJob("ping", ip.getBytes(), "ping" + count); client.submit(job); jobs.add(job); count++; } Boolean exit = false; List remove = new ArrayList(); while (!exit) { ListIterator it = jobs.listIterator(); while (it.hasNext()) { GearmanJob j = it.next(); GearmanJobStatus jStatus; try { jStatus = client.getJobStatus(j); if (!jStatus.isKnown() && j.isDone()) { GearmanJobResult jobRes = j.get(); String jobID = ByteUtils.fromUTF8Bytes(j.getID()); if (jobID.equals("ping1")) { res += ByteUtils.fromUTF8Bytes(jobRes.getResults()); } else { res += "; " + ByteUtils.fromUTF8Bytes(jobRes .getResults()); } remove.add(j); } } catch (Exception e) { logger.error(e.toString()); } } ListIterator it2 = remove.listIterator(); while (it2.hasNext()) { jobs.remove(it2.next()); } remove.clear(); if (jobs.isEmpty()) { exit = true; } } byte[] warnings = new byte[0]; byte[] exceptions = new byte[0]; int numerator = 0; int denominator = 0; GearmanJobResult gjr = new GearmanJobResultImpl(this.jobHandle, true, res.getBytes(), warnings, exceptions, numerator, denominator); return gjr; } }