package ControllerWorker; import java.io.IOException; import java.lang.Thread; import java.util.Date; import java.util.HashMap; import java.util.StringTokenizer; import java.util.Vector; import org.gearman.client.GearmanClient; import org.gearman.client.GearmanClientImpl; import org.gearman.client.GearmanJob; import org.gearman.client.GearmanJobImpl; import org.gearman.client.GearmanJobResult; import org.gearman.client.GearmanJobStatus; import org.gearman.common.GearmanJobServerConnection; import org.gearman.common.GearmanNIOJobServerConnection; import org.gearman.util.ByteUtils; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.apache.log4j.Logger; public class Boot extends Thread { static final Logger logger = Logger.getLogger(Boot.class); private String eventName; private Vector clients; private String bootOS; private final int updateRate; private long waitTime; //private Vector psWhitelist; private Vector psBlacklist; private final GearmanJobServerConnection gearmanConnection; private GearmanClient gearmanClient; private HashMap pingJobs; private HashMap wolJobs; private HashMap pingWoLJobs; private HashMap osJobs; private HashMap whoJobs; private HashMap psJobs; private HashMap restartJobs; private HashMap pingRestartShutdownJobs; private HashMap pingRestartBootJobs; private HashMap pingWolTime; private HashMap pingRestartShutdownTime; private HashMap pingRestartBootTime; private Boolean finished; private Boolean error; public Boot(String eventName, Vector clients, String bootOS, int updateRate, long waitTime, Vector psWhitelist, Vector psBlacklist, String gearmanServerAddress, int gearmanServerPort) { this.eventName = eventName; this.clients = clients; this.bootOS = bootOS; this.waitTime = waitTime * 1000; this.updateRate = updateRate; // updates per second //this.psWhitelist = psWhitelist; this.psBlacklist = psBlacklist; gearmanConnection = new GearmanNIOJobServerConnection( gearmanServerAddress, gearmanServerPort); gearmanClient = new GearmanClientImpl(); gearmanClient.addJobServer(this.gearmanConnection); pingJobs = new HashMap(); pingWoLJobs = new HashMap(); wolJobs = new HashMap(); osJobs = new HashMap(); whoJobs = new HashMap(); psJobs = new HashMap(); restartJobs = new HashMap(); pingRestartShutdownJobs = new HashMap(); pingRestartBootJobs = new HashMap(); pingWolTime = new HashMap(); pingRestartShutdownTime = new HashMap(); pingRestartBootTime = new HashMap(); finished = false; error = false; } public void run() { workerLoop(); } private void workerLoop() { long beginTime; long timeTaken; long timeLeft; long updatePeriod = 1000000000L / updateRate; // nanoseconds Boolean run = true; while (run) { try { beginTime = System.nanoTime(); run = update(); timeTaken = System.nanoTime() - beginTime; timeLeft = (updatePeriod - timeTaken) / 1000000; if (timeLeft < 10) timeLeft = 10; Thread.sleep(timeLeft); } catch (Exception e) { logger.error(e.toString()); } } finished = true; if (error) { logger.error("Booting of " + eventName + " failed"); } else { logger.info("Booting of " + eventName + " finished"); } } /* * -------------- * boot logik * -------------- */ private Boolean update() throws Exception { Date date; long timestamp; for (Client client : clients) { switch (client.getState()) { case CLIENT_UNKNOWN: ping(client); break; case CHECK_PING_PROGRESS: checkPingProgress(client); break; case CLIENT_IS_ALIVE: checkOS(client); break; case CLIENT_NOT_ALIVE: wakeOnLan(client); break; case CHECK_WAKE_ON_LAN_PROGRESS: checkWakeOnLanProgress(client); break; case MAGIC_PACKET_SENT: date = new Date(); timestamp = date.getTime(); pingWolTime.put(client.getId(), timestamp); pingWakeOnLan(client); break; case PING_WOL_AGAIN: pingWakeOnLan(client); break; case CHECK_PING_WOL_PROGRESS: checkPingWolProgress(client); break; case CHECK_CHECKOS_PROGRESS: checkCheckosProgress(client); break; case WRONG_OS: who(client); break; case CHECK_WHO_PROGRESS: checkWhoProgress(client); break; case A_USER_IS_LOGGED_IN: ps(client); break; case CHECK_PS_PROGRESS: checkPsProgress(client); break; case RESTART_CLIENT: restart(client); break; case CHECK_RESTART_PROGRESS: checkRestartProgress(client); break; case RESTART_COMMAND_SENT: date = new Date(); timestamp = date.getTime(); pingRestartShutdownTime.put(client.getId(), timestamp); pingRestartShutdown(client); break; case PING_RESTART_SHUTDOWN_AGAIN: pingRestartShutdown(client); break; case CHECK_PING_RESTART_SHUTDOWN_PROGRESS: checkPingRestartShutdownProgress(client); break; case USER_IS_WORKING: logger.error(client.getIp() + " User has been working"); client.setError("The user has been working."); // user has been working, go in errorState client.setState(ClientState.BOOT_ERROR); break; case CLIENT_IS_DOWN: date = new Date(); timestamp = date.getTime(); pingRestartBootTime.put(client.getId(), timestamp); pingRestartBoot(client); break; case PING_RESTART_BOOT_AGAIN: pingRestartBoot(client); break; case CHECK_PING_RESTART_BOOT_PROGRESS: checkPingRestartBootProgress(client); break; case BOOT_ERROR: if (!client.isFinished()) { logger.error(client.getIp() + " Booting failed"); // errorState client.finish(); error = true; } break; case BOOT_SUCCESS: if (!client.isFinished()) { logger.info(client.getIp() + " Booting finished"); // successState client.finish(); } break; } } boolean allFinished = false; for (Client client : clients) { if (client.isFinished()) { allFinished = true; } else { allFinished = false; break; } } if (allFinished) { return false; } else { return true; } } /* * ------------------------- * function declarations * ------------------------- */ private void ping(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_PROGRESS); pingJobs.put(client.getId(), job); logger.info("ping " + client.getIp()); } private void checkPingProgress(Client client) throws Exception, IOException { GearmanJob pingJob = pingJobs.get(client.getId()); if (pingJob != null) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJob); if (!jobStatus.isKnown() && pingJob.isDone()) { GearmanJobResult pingJobRes = pingJob.get(); String result = ByteUtils.fromUTF8Bytes(pingJobRes .getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue .parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive") .toString(); if (alive.equals("true")) { logger.info(client.getIp() + " alive"); client.setState(ClientState.CLIENT_IS_ALIVE); pingJobs.remove(client.getId()); } else if (alive.equals("false")) { logger.info(client.getIp() + " not alive"); client.setState(ClientState.CLIENT_NOT_ALIVE); pingJobs.remove(client.getId()); } } else { logger.error(client.getIp() + " Cannot send the ping message."); client.setError("Sending the ping message has been failed."); client.setState(ClientState.BOOT_ERROR); pingJobs.remove(client.getId()); } } } } } private void pingWakeOnLan(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_WOL_PROGRESS); pingWoLJobs.put(client.getId(), job); logger.info("ping " + client.getIp()); } private void checkPingWolProgress(Client client) throws Exception, IOException { GearmanJob pingJobWoL = pingWoLJobs.get(client.getId()); if (pingJobWoL != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until WoL - Failed Long expectedTimestamp = pingWolTime.get(client.getId()) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJobWoL); if (!jobStatus.isKnown() && pingJobWoL.isDone()) { GearmanJobResult pingJobRes = pingJobWoL.get(); String result = ByteUtils.fromUTF8Bytes(pingJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " is alive after WoL"); // alive, go in successState client.setState(ClientState.BOOT_SUCCESS); pingWoLJobs.remove(client.getId()); } else if (alive.equals("false")) { logger.info("ping again " + client.getIp()); client.setState(ClientState.PING_WOL_AGAIN); pingWoLJobs.remove(client.getId()); } } else { logger.error(client.getIp() + " Cannot send the ping after wake on LAN message."); client.setError("Sending the ping after wake on LAN message has been failed."); /* * sending the ping after wake on LAN * message has been failed, go to errorState */ client.setState(ClientState.BOOT_ERROR); pingWoLJobs.remove(client.getId()); } } } } else { logger.error(client.getIp() + " is not alive after WoL"); client.setError("The wake on LAN has been failed."); // not alive, go in errorState client.setState(ClientState.BOOT_ERROR); pingWoLJobs.remove(client.getId()); } } } private void pingRestartShutdown(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_RESTART_SHUTDOWN_PROGRESS); pingRestartShutdownJobs.put(client.getId(), job); logger.info("ping " + client.getIp()); } private void checkPingRestartShutdownProgress(Client client) throws Exception, IOException { GearmanJob pingJobRestartShutdown = pingRestartShutdownJobs.get(client.getId()); if (pingJobRestartShutdown != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until Restart - Failed Long expectedTimestamp = pingRestartShutdownTime.get(client.getId()) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(pingJobRestartShutdown); if (!jobStatus.isKnown() && pingJobRestartShutdown.isDone()) { GearmanJobResult pingJobRestartRes = pingJobRestartShutdown.get(); String result = ByteUtils.fromUTF8Bytes(pingJobRestartRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " is still alive"); // still alive, ping again client.setState(ClientState.PING_RESTART_SHUTDOWN_AGAIN); pingRestartShutdownJobs.remove(client.getId()); } else if (alive.equals("false")) { logger.info(client.getIp() + " is down"); // not alive, ping again client.setState(ClientState.CLIENT_IS_DOWN); pingRestartShutdownJobs.remove(client.getId()); } } else { logger.error(client.getIp() + " Cannot send the ping after restart message."); client.setError("Sending the ping after restart message has been failed."); /* * sending the ping after restart message * has been failed */ client.setState(ClientState.BOOT_ERROR); pingRestartShutdownJobs.remove(client.getId()); } } } } else { logger.error(client.getIp() + " shutdown failed"); client.setError("The shutdown has been failed."); // still alive, go in errorState client.setState(ClientState.BOOT_ERROR); pingRestartShutdownJobs.remove(client.getId()); } } } private void pingRestartBoot(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_RESTART_BOOT_PROGRESS); pingRestartBootJobs.put(client.getId(), job); logger.info("ping " + client.getIp()); } private void checkPingRestartBootProgress(Client client) throws Exception, IOException { GearmanJob pingJobRestartBoot = pingRestartBootJobs.get(client.getId()); if (pingJobRestartBoot != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until Restart - Failed Long expectedTimestamp = pingRestartBootTime.get(client .getId()) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJobRestartBoot); if (!jobStatus.isKnown() && pingJobRestartBoot.isDone()) { GearmanJobResult pingJobRestartRes = pingJobRestartBoot .get(); String result = ByteUtils .fromUTF8Bytes(pingJobRestartRes .getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue .parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive") .toString(); if (alive.equals("true")) { logger.info(client.getIp() + " is alive after restart"); // alive, go to success state client.setState(ClientState.BOOT_SUCCESS); pingRestartBootJobs.remove(client .getId()); } else if (alive.equals("false")) { logger.info("ping again " + client.getIp()); // not alive, ping again client.setState(ClientState.PING_RESTART_BOOT_AGAIN); pingRestartBootJobs.remove(client .getId()); } } else { logger.error(client.getIp() + " Cannot send the ping after shutdown message."); client .setError("Sending the ping after shutdown message has been failed."); /* * sending the ping after shutdown message * has been failed */ client.setState(ClientState.BOOT_ERROR); } } } } else { logger.error(client.getIp() + " is not alive after reboot"); client.setError("The reboot has been failed."); // not alive, go in errorState client.setState(ClientState.BOOT_ERROR); pingRestartBootJobs.remove(client.getId()); } } } private void wakeOnLan(Client client) { GearmanJob job = GearmanJobImpl.createJob("wol", client.getMac() .getBytes(), "wol" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_WAKE_ON_LAN_PROGRESS); wolJobs.put(client.getId(), job); logger.info("wake on lan " + client.getMac()); } private void checkWakeOnLanProgress(Client client) throws Exception, IOException { GearmanJob wolJob = wolJobs.get(client.getId()); if (wolJob != null) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(wolJob); if (!jobStatus.isKnown() && wolJob.isDone()) { GearmanJobResult wolJobRes = wolJob.get(); String result = ByteUtils.fromUTF8Bytes(wolJobRes .getResults()); if (result.equals("Magic packet send.")) { logger.info(client.getMac() + " Magic packet sent."); client.setState(ClientState.MAGIC_PACKET_SENT); wolJobs.remove(client.getId()); } else { logger.error(client.getIp() + " Cannot send magic packet."); client.setError("Sending the magic packet has been failed."); // cannot send magic packet, go in errorState client.setState(ClientState.BOOT_ERROR); wolJobs.remove(client.getId()); } } } } private void checkOS(Client client) { GearmanJob job = GearmanJobImpl.createJob("os", client.getIp() .getBytes(), "os" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_CHECKOS_PROGRESS); osJobs.put(client.getId(), job); logger.info("check OS " + client.getIp()); } private void checkCheckosProgress(Client client) throws Exception, IOException { GearmanJob osJob = osJobs.get(client.getId()); if (osJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(osJob); if (!jobStatus.isKnown() && osJob.isDone()) { GearmanJobResult osJobRes = osJob.get(); String result = ByteUtils.fromUTF8Bytes(osJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { /* * String release = * resultObj.get("Release").toString(); String * distriputorID = * resultObj.get("Distributor ID").toString(); */ String description = resultObj.get( "Description").toString(); if (description.equals(bootOS)) { logger.info(client.getIp() + " right OS"); // right os, go to successState client.setState(ClientState.BOOT_SUCCESS); osJobs.remove(client.getId()); } else { logger.info(client.getIp() + " wrong OS"); client.setState(ClientState.WRONG_OS); osJobs.remove(client.getId()); } } else { logger.error(client.getIp() + " Cannot check os"); client.setError("The check for correct operating system has been failed."); // cannot check os, go in errorState client.setState(ClientState.BOOT_ERROR); osJobs.remove(client.getId()); } } } } } private void who(Client client) { GearmanJob job = GearmanJobImpl.createJob("who", client.getIp() .getBytes(), "who" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_WHO_PROGRESS); whoJobs.put(client.getId(), job); logger.info("who " + client.getIp()); } private void checkWhoProgress(Client client) throws Exception, IOException { GearmanJob whoJob = whoJobs.get(client.getId()); if (whoJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(whoJob); if (!jobStatus.isKnown() && whoJob.isDone()) { GearmanJobResult whoJobRes = whoJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String rawoutput = resultObj.get("rawoutput").toString(); StringTokenizer str = new StringTokenizer( rawoutput, " "); String user = ""; if (str.hasMoreTokens()) { user = str.nextToken(); } if (user.isEmpty()) { logger.info(client.getIp() + " no user is logged in -CHECK PS-"); // no user is logged in, doing restart //----- // didnĀ“t work in test-pool, check ps client.setState(ClientState.A_USER_IS_LOGGED_IN); whoJobs.remove(client.getId()); } else { logger.info(client.getIp() + " a user is logged in"); client.setState(ClientState.A_USER_IS_LOGGED_IN); whoJobs.remove(client.getId()); } } else { logger.error(client.getIp() + " Cannot check if a user is logged in."); client.setError("The check if a user is logged in has been failed."); /* * cannot check if a user is logged in, go in * errorState */ //client client.setState(ClientState.BOOT_ERROR); whoJobs.remove(client.getId()); } } } } } private void ps(Client client) { GearmanJob job = GearmanJobImpl.createJob("ps", client.getIp() .getBytes(), "ps" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PS_PROGRESS); psJobs.put(client.getId(), job); logger.info("ps " + client.getIp()); } private void checkPsProgress(Client client) throws Exception, IOException { GearmanJob psJob = psJobs.get(client.getId()); if (psJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(psJob); if (!jobStatus.isKnown() && psJob.isDone()) { GearmanJobResult whoJobRes = psJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { JSONArray ps = (JSONArray) resultObj.get("ps"); // boolean whitelistFound = false; boolean blacklistFound = false; for (String blackEntry : psBlacklist) { if (ps.toString().contains(blackEntry)) { blacklistFound = true; } } /* * for (String whiteEntry : psWhitelist) { if * (ps.toString().contains(whiteEntry)) { * whitelistFound = true; } } */ if (blacklistFound) { /* * if (whitelistFound) { * logger.info(client.getIp() + * " is not working"); // is not working * status.put(client.getId(), 13); * psJobs.remove(client.getId()); } else { */ logger.info(client.getIp() + " is working"); client.setState(ClientState.USER_IS_WORKING); psJobs.remove(client.getId()); // } } else { // user is not working, doing restart client.setState(ClientState.RESTART_CLIENT); psJobs.remove(client.getId()); } } else { logger.error(client.getIp() + " Cannot check if user is working."); client.setError("The check if a user is working has been failed."); /* * cannot check if user is working, go in * errorState */ client.setState(ClientState.BOOT_ERROR); psJobs.remove(client.getId()); } } } } } private void restart(Client client) { GearmanJob job = GearmanJobImpl.createJob("restart", client.getIp() .getBytes(), "restart" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_RESTART_PROGRESS); restartJobs.put(client.getId(), job); logger.info("restart " + client.getIp()); } private void checkRestartProgress(Client client) throws Exception, IOException { GearmanJob restartJob = restartJobs.get(client.getId()); if (restartJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(restartJob); if (!jobStatus.isKnown() && restartJob.isDone()) { GearmanJobResult wolJobRes = restartJob.get(); String result = ByteUtils.fromUTF8Bytes(wolJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { logger.info(client.getIp() + " Restart command sent"); client.setState(ClientState.RESTART_COMMAND_SENT); restartJobs.remove(client.getId()); } else { logger.error(client.getIp() + " Cannot send restart command"); client.setError("Sending the restart command has been failed."); // cannot send restart command, go in errorState client.setState(ClientState.BOOT_ERROR); restartJobs.remove(client.getId()); } } } } } public Boolean isFinished() { return finished && !error; } public Boolean isFinishedWithErrors() { return finished && error; } public Vector getClients() { return clients; } public String getEventName() { return eventName; } }