package ControllerWorker; import java.io.IOException; import java.lang.Thread; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.StringTokenizer; import java.util.Vector; import org.gearman.client.GearmanClient; import org.gearman.client.GearmanClientImpl; import org.gearman.client.GearmanJob; import org.gearman.client.GearmanJobImpl; import org.gearman.client.GearmanJobResult; import org.gearman.client.GearmanJobStatus; import org.gearman.common.GearmanJobServerConnection; import org.gearman.common.GearmanNIOJobServerConnection; import org.gearman.util.ByteUtils; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.apache.log4j.Logger; public class Shutdown extends Thread { private static final Logger logger = ControllerWorkerMain.getLogger(); private String eventName; private Boolean force; private Vector clients; private final int updateRate; private long waitTime; private long scsavTime; private Vector psWhitelist; private Vector psBlacklist; private final GearmanJobServerConnection gearmanConnection; private GearmanClient gearmanClient; private Boolean finished; private Boolean error; public Shutdown(String eventName, Boolean force, Vector clients, int updateRate, long waitTime, long scsavTime, Vector psWhitelist, Vector psBlacklist, String gearmanServerAddress, int gearmanServerPort) { this.eventName = eventName; this.force = force; this.clients = clients; this.updateRate = updateRate; // updates per second this.waitTime = waitTime * 1000; this.scsavTime = scsavTime * 1000; this.psWhitelist = psWhitelist; this.psBlacklist = psBlacklist; gearmanConnection = new GearmanNIOJobServerConnection( gearmanServerAddress, gearmanServerPort); gearmanClient = new GearmanClientImpl(); gearmanClient.addJobServer(gearmanConnection); finished = false; error = false; } public void run() { workerLoop(); } private void workerLoop() { long beginTime; long timeTaken; long timeLeft; final long updatePeriod = 1000000000L / updateRate; // nanoseconds; Boolean run = true; while (run) { try { beginTime = System.nanoTime(); run = update(); timeTaken = System.nanoTime() - beginTime; timeLeft = (updatePeriod - timeTaken) / 1000000; if (timeLeft < 10) timeLeft = 10; sleep(timeLeft); } catch (Exception e) { logger.error(e.toString()); } } finished = true; LinkedHashMap jsonData = new LinkedHashMap(); jsonData.put("eventName", eventName); jsonData.put("type", "finishShutdownState"); if (error) { logger.error("Shutdown of " + eventName + " failed"); jsonData.put("error", true); } else { logger.info("Shutdown of " + eventName + " finished"); jsonData.put("error", false); } String dataString = JSONValue.toJSONString(jsonData); GearmanJob job = GearmanJobImpl.createJob("status", dataString .getBytes(), "status" + eventName); gearmanClient.submit(job); } /* * -------------- shutdown logik -------------- */ private Boolean update() throws Exception { for (Client client : clients) { switch (client.getState()) { case CLIENT_UNKNOWN: ping(client); break; case CHECK_PING_PROGRESS: checkPingProgress(client); break; case CLIENT_IS_ALIVE: who(client); break; case CHECK_WHO_PROGRESS: checkWhoProgress(client); break; case SHUTDOWN_CLIENT: doShutdown(client); break; case CHECK_SHUTDOWN_PROGRESS: checkShutdownProgress(client); break; case SHUTDOWN_COMMAND_SENT: Date date = new Date(); Long timestamp = date.getTime(); client.addPingTime(ClientPingTime.SHUTDOWN, timestamp); pingShutdown(client); break; case PING_SHUTDOWN_AGAIN: pingShutdown(client); break; case CHECK_PING_SHUTDOWN_PROGRESS: checkPingShutdwonProgress(client); break; case USER_IS_LOGGED_IN: ps(client); break; case CHECK_PS_PROGRESS: checkPsProgress(client); break; case USERPROCESSES_ARE_RUNNING: if (force) { logger.info(client.getIp() + " force is enabled"); // is not working client.setState(ClientState.SHUTDOWN_CLIENT, gearmanClient); } else { ls(client); } break; case CHECK_LS_PROGRESS: checkLsProgress(client); break; case USER_IS_WORKING: logger.error(client.getIp() + " User has been working"); client.setError("The user has been working."); client.setState(ClientState.SHUTDOWN_ERROR, gearmanClient); break; case SHUTDOWN_ERROR: if (!client.isFinished()) { logger.error(client.getIp() + " Shutdown failed"); // errorState client.finish(); error = true; } break; case SHUTDOWN_SUCCESS: if (!client.isFinished()) { logger.info(client.getIp() + " Shutdown finished"); // successState client.finish(); } break; } } boolean allFinished = false; for (Client client : clients) { if (client.isFinished()) { allFinished = true; } else { allFinished = false; break; } } if (allFinished) { return false; } else { return true; } } /* * ------------------------- function declarations ------------------------- */ private void ping(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_PROGRESS, gearmanClient); client.addJob(ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingProgress(Client client) throws Exception, IOException { GearmanJob pingJob = client.getJob(ClientJob.PINGJOB); if (pingJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(pingJob); if (!jobStatus.isKnown() && pingJob.isDone()) { GearmanJobResult pingJobRes = pingJob.get(); String result = ByteUtils .fromUTF8Bytes(pingJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " alive"); client.setState(ClientState.CLIENT_IS_ALIVE, gearmanClient); // check // Users client.removeJob(pingJob); } else if (alive.equals("false")) { logger.info(client.getIp() + " not alive"); // not alive, go in successState client.setState(ClientState.SHUTDOWN_SUCCESS, gearmanClient); client.removeJob(pingJob); } } else { logger.error(client.getIp() + " Cannot send the ping message."); client .setError("Sending the ping message has been failed."); // sending the ping message has been failed client.setState(ClientState.SHUTDOWN_ERROR, gearmanClient); client.removeJob(pingJob); } } } } } private void who(Client client) { GearmanJob job = GearmanJobImpl.createJob("who", client.getIp() .getBytes(), "who" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_WHO_PROGRESS, gearmanClient); client.addJob(ClientJob.WHOJOB, job); logger.info("who " + client.getIp()); } private void checkWhoProgress(Client client) throws Exception, IOException { GearmanJob whoJob = client.getJob(ClientJob.WHOJOB); if (whoJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(whoJob); if (!jobStatus.isKnown() && whoJob.isDone()) { GearmanJobResult whoJobRes = whoJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String rawoutput = resultObj.get("rawoutput") .toString(); StringTokenizer str = new StringTokenizer(rawoutput, " "); String user = ""; if (str.hasMoreTokens()) { user = str.nextToken(); } if (user.isEmpty()) { logger.info(client.getIp() + " no user is logged in -CHECK PS-"); // no user is logged in // ----- // didnĀ“t work in test-pool, check ps client.setState(ClientState.USER_IS_LOGGED_IN, gearmanClient); client.removeJob(whoJob); } else { logger .info(client.getIp() + " a user is logged in"); // a user is logged in client.setState(ClientState.USER_IS_LOGGED_IN, gearmanClient); client.removeJob(whoJob); } } else { logger.error(client.getIp() + " Cannot check if a user is logged in."); client .setError("The check if a user is logged in has been failed."); /* * cannot check if a user is logged in, go in errorState */ client.setState(ClientState.SHUTDOWN_ERROR, gearmanClient); client.removeJob(whoJob); } } } } } private void doShutdown(Client client) { GearmanJob job = GearmanJobImpl.createJob("doShutdown", client.getIp() .getBytes(), "doShutdown" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_SHUTDOWN_PROGRESS, gearmanClient); client.addJob(ClientJob.SHUTDOWNJOB, job); logger.info("doShutdown " + client.getIp()); } private void checkShutdownProgress(Client client) throws Exception, IOException { GearmanJob doShutdownJob = client.getJob(ClientJob.SHUTDOWNJOB); if (doShutdownJob != null) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(doShutdownJob); if (!jobStatus.isKnown() && doShutdownJob.isDone()) { GearmanJobResult wolJobRes = doShutdownJob.get(); String result = ByteUtils.fromUTF8Bytes(wolJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { logger.info(client.getIp() + " Shutdown command sent"); client.setState(ClientState.SHUTDOWN_COMMAND_SENT, gearmanClient); client.removeJob(doShutdownJob); } else { logger.error(client.getIp() + " Cannot send shutdown command"); client .setError("Sending the shutdown command has been failed."); // cannot send shutdown command, go in / errorState client.setState(ClientState.SHUTDOWN_ERROR, gearmanClient); client.removeJob(doShutdownJob); } } } } } private void pingShutdown(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client .setState(ClientState.CHECK_PING_SHUTDOWN_PROGRESS, gearmanClient); client.addJob(ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingShutdwonProgress(Client client) throws Exception, IOException { GearmanJob pingJobShutdown = client.getJob(ClientJob.PINGJOB); if (pingJobShutdown != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until shutdown Long expectedTimestamp = client .getPingTime(ClientPingTime.SHUTDOWN) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJobShutdown); if (!jobStatus.isKnown() && pingJobShutdown.isDone()) { GearmanJobResult pingJobRes = pingJobShutdown.get(); String result = ByteUtils.fromUTF8Bytes(pingJobRes .getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue .parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("false")) { logger.info(client.getIp() + " is not alive anymore"); client.setState(ClientState.SHUTDOWN_SUCCESS, gearmanClient); client.removeJob(pingJobShutdown); } else if (alive.equals("true")) { logger .info(client.getIp() + " is still alive after shutdown command"); client.setState( ClientState.PING_SHUTDOWN_AGAIN, gearmanClient); client.removeJob(pingJobShutdown); } } else { logger .error(client.getIp() + " Cannot send the ping after shutdown message."); client .setError("Sending the ping after shutdown message has been failed."); /* * sending the ping after shutdown message has been * failed */ client.setState(ClientState.SHUTDOWN_ERROR, gearmanClient); client.removeJob(pingJobShutdown); } } } } else { logger.error(client.getIp() + " is alive after shutdown"); client.setError("Client is still alive after shutdown."); // still alive, go in errorState client.setState(ClientState.SHUTDOWN_ERROR, gearmanClient); client.removeJob(pingJobShutdown); } } } private void ps(Client client) { GearmanJob job = GearmanJobImpl.createJob("ps", client.getIp() .getBytes(), "ps" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PS_PROGRESS, gearmanClient); client.addJob(ClientJob.PSJOB, job); logger.info("ps " + client.getIp()); } private void checkPsProgress(Client client) throws Exception, IOException { GearmanJob psJob = client.getJob(ClientJob.PSJOB); if (psJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(psJob); if (!jobStatus.isKnown() && psJob.isDone()) { GearmanJobResult whoJobRes = psJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { JSONArray ps = (JSONArray) resultObj.get("ps"); HashMap> psMap = new HashMap>(); for (Object obj : ps) { HashMap psEntry = new HashMap(); JSONObject psLine = (JSONObject) obj; String cmd = psLine.get("cmd").toString(); int pid = Integer.parseInt(psLine.get("pid") .toString()); int ppid = Integer.parseInt(psLine.get("ppid") .toString()); psEntry.put("pid", pid); psEntry.put("ppid", ppid); psMap.put(cmd, psEntry); } boolean whitelistFound = false; boolean blacklistFound = false; for (String blackEntry : psBlacklist) { if (psMap.containsKey(blackEntry)) { blacklistFound = true; break; } } for (String whiteEntry : psWhitelist) { if (psMap.containsKey(whiteEntry)) { HashMap psEntry = psMap .get(whiteEntry); if (whiteEntry.equals("gnome-screensav")) { if (psEntry.get("ppid") != 1) { client.setScsavPID(psEntry.get("pid")); whitelistFound = true; break; } } else { client.setScsavPID(psEntry.get("pid")); whitelistFound = true; break; } } } if (blacklistFound) { if (whitelistFound) { logger.info(client.getIp() + " processes are running"); client.setState( ClientState.USERPROCESSES_ARE_RUNNING, gearmanClient); } else { logger.info(client.getIp() + " is working"); // client.setState(ClientState.USERPROCESSES_ARE_RUNNING, client.setState(ClientState.USER_IS_WORKING, gearmanClient); } client.removeJob(psJob); } else { logger.info(client.getIp() + " is not working"); // is not working client.setState(ClientState.SHUTDOWN_CLIENT, gearmanClient); client.removeJob(psJob); } } else { logger.error(client.getIp() + " Cannot check if user is working."); client .setError("The check if a user is working has been failed."); /* * cannot check if user is working, go in errorState */ client.setState(ClientState.SHUTDOWN_ERROR, gearmanClient); client.removeJob(psJob); } } else { logger.error(client.getIp() + " Cannot check if user is working."); client .setError("The check if a user is working has been failed."); /* * cannot check if user is working, go in errorState */ client.setState(ClientState.BOOT_ERROR, gearmanClient); client.removeJob(psJob); } } } } // screensaver check private void ls(Client client) { GearmanJob job = GearmanJobImpl.createJob("ls", client.getIp() .getBytes(), "ls" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_LS_PROGRESS, gearmanClient); client.addJob(ClientJob.LSJOB, job); logger.info("ls " + client.getIp()); } private void checkLsProgress(Client client) throws Exception, IOException { GearmanJob lsJob = client.getJob(ClientJob.LSJOB); if (lsJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(lsJob); if (!jobStatus.isKnown() && lsJob.isDone()) { GearmanJobResult psJobRes = lsJob.get(); String result = ByteUtils.fromUTF8Bytes(psJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { JSONArray ls = (JSONArray) resultObj.get("ls"); HashMap lsMap = new HashMap(); for (Object obj : ls) { JSONObject lsLine = (JSONObject) obj; String name = lsLine.get("name").toString(); SimpleDateFormat df = new SimpleDateFormat( "y-M-d H:m"); Date date = df.parse(lsLine.get("date").toString() + " " + lsLine.get("time").toString()); long timestamp = date.getTime(); lsMap.put(name, timestamp); } String scsavPID = Integer .toString(client.getScsavPID()); long scsavTimestamp = lsMap.get(scsavPID); Date date = new Date(); long currentTimestamp = date.getTime(); long beetweenTimestamp = currentTimestamp - scsavTimestamp; if (beetweenTimestamp < scsavTime) { logger .info(client.getIp() + " screensaver has been running shorter than " + scsavTime / (1000 * 60) + " minutes, user is working."); client.setState(ClientState.USER_IS_WORKING, gearmanClient); client.removeJob(lsJob); } else { logger .info(client.getIp() + " screensaver has been running longer than " + scsavTime / (1000 * 60) + " minutes, user is not working."); client.setState(ClientState.RESTART_CLIENT, gearmanClient); client.removeJob(lsJob); } } else { logger.error(client.getIp() + " Cannot check 'ls -al' of /proc/"); client .setError("The check for screensaver has been failed."); // cannot check ls, go in errorState client.setState(ClientState.BOOT_ERROR, gearmanClient); client.removeJob(lsJob); } } else { logger.error(client.getIp() + " Cannot check 'ls -al' of /proc/"); client .setError("The check for screensaver has been failed."); // cannot check ls, go in errorState client.setState(ClientState.BOOT_ERROR, gearmanClient); client.removeJob(lsJob); } } } } // ------------------ public Boolean isFinished() { return finished && !error; } public Boolean isFinishedWithErrors() { return finished && error; } public Vector getClients() { return clients; } public String getEventName() { return eventName; } }