package ControllerWorker; import java.io.IOException; import java.lang.Thread; import java.util.Date; import java.util.HashMap; import java.util.StringTokenizer; import java.util.Vector; import org.gearman.client.GearmanClient; import org.gearman.client.GearmanClientImpl; import org.gearman.client.GearmanJob; import org.gearman.client.GearmanJobImpl; import org.gearman.client.GearmanJobResult; import org.gearman.client.GearmanJobStatus; import org.gearman.common.GearmanJobServerConnection; import org.gearman.common.GearmanNIOJobServerConnection; import org.gearman.util.ByteUtils; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.apache.log4j.Logger; public class Shutdown extends Thread { static final Logger logger = Logger.getLogger(Shutdown.class); private String eventName; private Vector clients; private final int updateRate; private long waitTime; // private Vector psWhitelist; private Vector psBlacklist; private final GearmanJobServerConnection gearmanConnection; private GearmanClient gearmanClient; private Vector jobs; private HashMap pingShutdownTime; private Boolean finished; private Boolean error; public Shutdown(String eventName, Vector clients, int updateRate, long waitTime, Vector psWhitelist, Vector psBlacklist, String gearmanServerAddress, int gearmanServerPort) { this.eventName = eventName; this.clients = clients; this.updateRate = updateRate; // updates per second this.waitTime = waitTime * 1000; // this.psWhitelist = psWhitelist; this.psBlacklist = psBlacklist; gearmanConnection = new GearmanNIOJobServerConnection( gearmanServerAddress, gearmanServerPort); gearmanClient = new GearmanClientImpl(); gearmanClient.addJobServer(gearmanConnection); jobs = new Vector(); pingShutdownTime = new HashMap(); finished = false; error = false; } public void run() { workerLoop(); } private void workerLoop() { long beginTime; long timeTaken; long timeLeft; final long updatePeriod = 1000000000L / updateRate; // nanoseconds; Boolean run = true; while (run) { try { beginTime = System.nanoTime(); run = update(); timeTaken = System.nanoTime() - beginTime; timeLeft = (updatePeriod - timeTaken) / 1000000; if (timeLeft < 10) timeLeft = 10; sleep(timeLeft); } catch (Exception e) { logger.error(e.toString()); } } finished = true; if (error) { logger.error("Shutdown of " + eventName + " failed"); } else { logger.info("Shutdown of " + eventName + " finished"); } } /* * -------------- shutdown logik -------------- */ private Boolean update() throws Exception { for (Client client : clients) { switch (client.getState()) { case CLIENT_UNKNOWN: ping(client); break; case CHECK_PING_PROGRESS: checkPingProgress(client); break; case CLIENT_IS_ALIVE: who(client); break; case CHECK_WHO_PROGRESS: checkWhoProgress(client); break; case SHUTDOWN_CLIENT: doShutdown(client); break; case CHECK_SHUTDOWN_PROGRESS: checkShutdownProgress(client); break; case SHUTDOWN_COMMAND_SENT: Date date = new Date(); Long timestamp = date.getTime(); pingShutdownTime.put(client.getId(), timestamp); pingShutdown(client); break; case PING_SHUTDOWN_AGAIN: pingShutdown(client); break; case CHECK_PING_SHUTDOWN_PROGRESS: checkPingShutdwonProgress(client); break; case USER_IS_LOGGED_IN: ps(client); break; case CHECK_PS_PROGRESS: checkPsProgress(client); break; case USER_IS_WORKING: logger.error(client.getIp() + " User has been working"); client.setError("The user has been working."); client.setState(ClientState.SHUTDOWN_ERROR); break; case SHUTDOWN_ERROR: if (!client.isFinished()) { logger.error(client.getIp() + " Shutdown failed"); // errorState client.finish(); error = true; } break; case SHUTDOWN_SUCCESS: if (!client.isFinished()) { logger.info(client.getIp() + " Shutdown finished"); // successState client.finish(); } break; } } boolean allFinished = false; for (Client client : clients) { if (client.isFinished()) { allFinished = true; } else { allFinished = false; break; } } if (allFinished) { return false; } else { return true; } } /* * ------------------------- function declarations ------------------------- */ private void ping(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_PROGRESS); addClientJob(client.getId(), ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PINGJOB); GearmanJob pingJob = job.getGearmanJob(); if (pingJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(pingJob); if (!jobStatus.isKnown() && pingJob.isDone()) { GearmanJobResult pingJobRes = pingJob.get(); String result = ByteUtils .fromUTF8Bytes(pingJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " alive"); client.setState(ClientState.CLIENT_IS_ALIVE); // check // Users removeClientJob(job); } else if (alive.equals("false")) { logger.info(client.getIp() + " not alive"); // not alive, go in successState client.setState(ClientState.SHUTDOWN_SUCCESS); removeClientJob(job); } } else { logger.error(client.getIp() + " Cannot send the ping message."); client .setError("Sending the ping message has been failed."); // sending the ping message has been failed client.setState(ClientState.SHUTDOWN_ERROR); removeClientJob(job); } } } } } private void who(Client client) { GearmanJob job = GearmanJobImpl.createJob("who", client.getIp() .getBytes(), "who" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_WHO_PROGRESS); addClientJob(client.getId(), ClientJob.WHOJOB, job); logger.info("who " + client.getIp()); } private void checkWhoProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.WHOJOB); GearmanJob whoJob = job.getGearmanJob(); if (whoJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(whoJob); if (!jobStatus.isKnown() && whoJob.isDone()) { GearmanJobResult whoJobRes = whoJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String rawoutput = resultObj.get("rawoutput") .toString(); StringTokenizer str = new StringTokenizer(rawoutput, " "); String user = ""; if (str.hasMoreTokens()) { user = str.nextToken(); } if (user.isEmpty()) { logger.info(client.getIp() + " no user is logged in -CHECK PS-"); // no user is logged in // ----- // didnĀ“t work in test-pool, check ps client.setState(ClientState.USER_IS_LOGGED_IN); removeClientJob(job); } else { logger .info(client.getIp() + " a user is logged in"); // a user is logged in client.setState(ClientState.USER_IS_LOGGED_IN); removeClientJob(job); } } else { logger.error(client.getIp() + " Cannot check if a user is logged in."); client .setError("The check if a user is logged in has been failed."); /* * cannot check if a user is logged in, go in errorState */ client.setState(ClientState.SHUTDOWN_ERROR); removeClientJob(job); } } } } } private void doShutdown(Client client) { GearmanJob job = GearmanJobImpl.createJob("doShutdown", client.getIp() .getBytes(), "doShutdown" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_SHUTDOWN_PROGRESS); addClientJob(client.getId(), ClientJob.SHUTDOWNJOB, job); logger.info("doShutdown " + client.getIp()); } private void checkShutdownProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.SHUTDOWNJOB); GearmanJob doShutdownJob = job.getGearmanJob(); if (doShutdownJob != null) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(doShutdownJob); if (!jobStatus.isKnown() && doShutdownJob.isDone()) { GearmanJobResult wolJobRes = doShutdownJob.get(); String result = ByteUtils.fromUTF8Bytes(wolJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { logger.info(client.getIp() + " Shutdown command sent"); client.setState(ClientState.SHUTDOWN_COMMAND_SENT); removeClientJob(job); } else { logger.error(client.getIp() + " Cannot send shutdown command"); client .setError("Sending the shutdown command has been failed."); // cannot send shutdown command, go in / errorState client.setState(ClientState.SHUTDOWN_ERROR); removeClientJob(job); } } } } } private void pingShutdown(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_SHUTDOWN_PROGRESS); addClientJob(client.getId(), ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingShutdwonProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PINGJOB); GearmanJob pingJobShutdown = job.getGearmanJob(); if (pingJobShutdown != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until shutdown Long expectedTimestamp = pingShutdownTime.get(client.getId()) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJobShutdown); if (!jobStatus.isKnown() && pingJobShutdown.isDone()) { GearmanJobResult pingJobRes = pingJobShutdown.get(); String result = ByteUtils.fromUTF8Bytes(pingJobRes .getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue .parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("false")) { logger.info(client.getIp() + " is not alive anymore"); client.setState(ClientState.SHUTDOWN_SUCCESS); removeClientJob(job); } else if (alive.equals("true")) { logger .info(client.getIp() + " is still alive after shutdown command"); client .setState(ClientState.PING_SHUTDOWN_AGAIN); } } else { logger .error(client.getIp() + " Cannot send the ping after shutdown message."); client .setError("Sending the ping after shutdown message has been failed."); /* * sending the ping after shutdown message has been * failed */ client.setState(ClientState.SHUTDOWN_ERROR); removeClientJob(job); } } } } else { logger.error(client.getIp() + " is alive after shutdown"); client.setError("Client is still alive after shutdown."); // still alive, go in errorState client.setState(ClientState.SHUTDOWN_ERROR); removeClientJob(job); } } } private void ps(Client client) { GearmanJob job = GearmanJobImpl.createJob("ps", client.getIp() .getBytes(), "ps" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PS_PROGRESS); addClientJob(client.getId(), ClientJob.PSJOB, job); logger.info("ps " + client.getIp()); } private void checkPsProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PSJOB); GearmanJob psJob = job.getGearmanJob(); if (psJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(psJob); if (!jobStatus.isKnown() && psJob.isDone()) { GearmanJobResult whoJobRes = psJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { JSONArray ps = (JSONArray) resultObj.get("ps"); // boolean whitelistFound = false; boolean blacklistFound = false; for (String blackEntry : psBlacklist) { if (ps.toString().contains(blackEntry)) { blacklistFound = true; } } /* * for (String whiteEntry : psWhitelist) { if * (ps.toString().contains(whiteEntry)) { whitelistFound * = true; } } */ if (blacklistFound) { /* * if (whitelistFound) { logger.info(client.getIp() * + " is not working"); // is not working * status.put(client.getId(), 4); * removeClientJob(job); * * } else { */ client.setState(ClientState.USER_IS_WORKING); removeClientJob(job); // } } else { logger.info(client.getIp() + " is not working"); // is not working client.setState(ClientState.SHUTDOWN_CLIENT); removeClientJob(job); } } else { logger.error(client.getIp() + " Cannot check if user is working."); client .setError("The check if a user is working has been failed."); /* * cannot check if user is working, go in errorState */ client.setState(ClientState.SHUTDOWN_ERROR); removeClientJob(job); } } } } } private ClientJob getClientJob(int clientID, int jobType) { for (ClientJob job : jobs) { if (job.getJobType() == jobType && job.getClientID() == clientID) { return job; } } return null; } private void addClientJob(int clientID, int jobType, GearmanJob gearmanJob) { jobs.add(new ClientJob(clientID, jobType, gearmanJob)); } private void removeClientJob(ClientJob clientJob) { jobs.remove(clientJob); } public Boolean isFinished() { return finished && !error; } public Boolean isFinishedWithErrors() { return finished && error; } public Vector getClients() { return clients; } public String getEventName() { return eventName; } }