package ControllerWorker; import java.io.IOException; import java.lang.Thread; import java.util.Date; import java.util.HashMap; import java.util.StringTokenizer; import java.util.Vector; import org.gearman.client.GearmanClient; import org.gearman.client.GearmanClientImpl; import org.gearman.client.GearmanJob; import org.gearman.client.GearmanJobImpl; import org.gearman.client.GearmanJobResult; import org.gearman.client.GearmanJobStatus; import org.gearman.common.GearmanJobServerConnection; import org.gearman.common.GearmanNIOJobServerConnection; import org.gearman.util.ByteUtils; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; import org.apache.log4j.Logger; public class Boot extends Thread { static final Logger logger = Logger.getLogger(Boot.class); private String eventName; private Vector clients; private String bootOS; private final int updateRate; private long waitTime; // private Vector psWhitelist; private Vector psBlacklist; private final GearmanJobServerConnection gearmanConnection; private GearmanClient gearmanClient; private Vector jobs; private HashMap pingWolTime; private HashMap pingRestartShutdownTime; private HashMap pingRestartBootTime; private Boolean finished; private Boolean error; public Boot(String eventName, Vector clients, String bootOS, int updateRate, long waitTime, Vector psWhitelist, Vector psBlacklist, String gearmanServerAddress, int gearmanServerPort) { this.eventName = eventName; this.clients = clients; this.bootOS = bootOS; this.waitTime = waitTime * 1000; this.updateRate = updateRate; // updates per second // this.psWhitelist = psWhitelist; this.psBlacklist = psBlacklist; gearmanConnection = new GearmanNIOJobServerConnection( gearmanServerAddress, gearmanServerPort); gearmanClient = new GearmanClientImpl(); gearmanClient.addJobServer(this.gearmanConnection); jobs = new Vector(); pingWolTime = new HashMap(); pingRestartShutdownTime = new HashMap(); pingRestartBootTime = new HashMap(); finished = false; error = false; } public void run() { workerLoop(); } private void workerLoop() { long beginTime; long timeTaken; long timeLeft; long updatePeriod = 1000000000L / updateRate; // nanoseconds Boolean run = true; while (run) { try { beginTime = System.nanoTime(); run = update(); timeTaken = System.nanoTime() - beginTime; timeLeft = (updatePeriod - timeTaken) / 1000000; if (timeLeft < 10) timeLeft = 10; sleep(timeLeft); } catch (Exception e) { logger.error(e.toString()); } } finished = true; if (error) { logger.error("Booting of " + eventName + " failed"); } else { logger.info("Booting of " + eventName + " finished"); } } /* * -------------- boot logik -------------- */ private Boolean update() throws Exception { Date date; long timestamp; for (Client client : clients) { switch (client.getState()) { case CLIENT_UNKNOWN: ping(client); break; case CHECK_PING_PROGRESS: checkPingProgress(client); break; case CLIENT_IS_ALIVE: checkOS(client); break; case CLIENT_NOT_ALIVE: wakeOnLan(client); break; case CHECK_WAKE_ON_LAN_PROGRESS: checkWakeOnLanProgress(client); break; case MAGIC_PACKET_SENT: date = new Date(); timestamp = date.getTime(); pingWolTime.put(client.getId(), timestamp); pingWakeOnLan(client); break; case PING_WOL_AGAIN: pingWakeOnLan(client); break; case CHECK_PING_WOL_PROGRESS: checkPingWolProgress(client); break; case CHECK_CHECKOS_PROGRESS: checkCheckosProgress(client); break; case WRONG_OS: who(client); break; case CHECK_WHO_PROGRESS: checkWhoProgress(client); break; case A_USER_IS_LOGGED_IN: ps(client); break; case CHECK_PS_PROGRESS: checkPsProgress(client); break; case RESTART_CLIENT: restart(client); break; case CHECK_RESTART_PROGRESS: checkRestartProgress(client); break; case RESTART_COMMAND_SENT: date = new Date(); timestamp = date.getTime(); pingRestartShutdownTime.put(client.getId(), timestamp); pingRestartShutdown(client); break; case PING_RESTART_SHUTDOWN_AGAIN: pingRestartShutdown(client); break; case CHECK_PING_RESTART_SHUTDOWN_PROGRESS: checkPingRestartShutdownProgress(client); break; case USER_IS_WORKING: logger.error(client.getIp() + " User has been working"); client.setError("The user has been working."); // user has been working, go in errorState client.setState(ClientState.BOOT_ERROR); break; case CLIENT_IS_DOWN: date = new Date(); timestamp = date.getTime(); pingRestartBootTime.put(client.getId(), timestamp); pingRestartBoot(client); break; case PING_RESTART_BOOT_AGAIN: pingRestartBoot(client); break; case CHECK_PING_RESTART_BOOT_PROGRESS: checkPingRestartBootProgress(client); break; case BOOT_ERROR: if (!client.isFinished()) { logger.error(client.getIp() + " Booting failed"); // errorState client.finish(); error = true; } break; case BOOT_SUCCESS: if (!client.isFinished()) { logger.info(client.getIp() + " Booting finished"); // successState client.finish(); } break; } } boolean allFinished = false; for (Client client : clients) { if (client.isFinished()) { allFinished = true; } else { allFinished = false; break; } } if (allFinished) { return false; } else { return true; } } /* * ------------------------- function declarations ------------------------- */ private void ping(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_PROGRESS); addClientJob(client.getId(), ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PINGJOB); GearmanJob pingJob = job.getGearmanJob(); if (pingJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(pingJob); if (!jobStatus.isKnown() && pingJob.isDone()) { GearmanJobResult pingJobRes = pingJob.get(); String result = ByteUtils .fromUTF8Bytes(pingJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " alive"); client.setState(ClientState.CLIENT_IS_ALIVE); removeClientJob(job); } else if (alive.equals("false")) { logger.info(client.getIp() + " not alive"); client.setState(ClientState.CLIENT_NOT_ALIVE); removeClientJob(job); } } else { logger.error(client.getIp() + " Cannot send the ping message."); client .setError("Sending the ping message has been failed."); client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } } private void pingWakeOnLan(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_WOL_PROGRESS); addClientJob(client.getId(), ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingWolProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PINGJOB); GearmanJob pingJobWoL = job.getGearmanJob(); if (pingJobWoL != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until WoL - Failed Long expectedTimestamp = pingWolTime.get(client.getId()) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJobWoL); if (!jobStatus.isKnown() && pingJobWoL.isDone()) { GearmanJobResult pingJobRes = pingJobWoL.get(); String result = ByteUtils.fromUTF8Bytes(pingJobRes .getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue .parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " is alive after WoL"); // alive, go in successState client.setState(ClientState.BOOT_SUCCESS); removeClientJob(job); } else if (alive.equals("false")) { logger.info("ping again " + client.getIp()); client.setState(ClientState.PING_WOL_AGAIN); removeClientJob(job); } } else { logger .error(client.getIp() + " Cannot send the ping after wake on LAN message."); client .setError("Sending the ping after wake on LAN message has been failed."); /* * sending the ping after wake on LAN message has * been failed, go to errorState */ client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } else { logger.error(client.getIp() + " is not alive after WoL"); client.setError("The wake on LAN has been failed."); // not alive, go in errorState client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } private void pingRestartShutdown(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_RESTART_SHUTDOWN_PROGRESS); addClientJob(client.getId(), ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingRestartShutdownProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PINGJOB); GearmanJob pingJobRestartShutdown = job.getGearmanJob(); if (pingJobRestartShutdown != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until Restart - Failed Long expectedTimestamp = pingRestartShutdownTime .get(client.getId()) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJobRestartShutdown); if (!jobStatus.isKnown() && pingJobRestartShutdown.isDone()) { GearmanJobResult pingJobRestartRes = pingJobRestartShutdown .get(); String result = ByteUtils.fromUTF8Bytes(pingJobRestartRes .getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue .parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " is still alive"); // still alive, ping again client .setState(ClientState.PING_RESTART_SHUTDOWN_AGAIN); removeClientJob(job); } else if (alive.equals("false")) { logger.info(client.getIp() + " is down"); // not alive, ping again client.setState(ClientState.CLIENT_IS_DOWN); removeClientJob(job); } } else { logger .error(client.getIp() + " Cannot send the ping after restart message."); client .setError("Sending the ping after restart message has been failed."); /* * sending the ping after restart message has been * failed */ client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } else { logger.error(client.getIp() + " shutdown failed"); client.setError("The shutdown has been failed."); // still alive, go in errorState client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } private void pingRestartBoot(Client client) { GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp() .getBytes(), "ping" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PING_RESTART_BOOT_PROGRESS); addClientJob(client.getId(), ClientJob.PINGJOB, job); logger.info("ping " + client.getIp()); } private void checkPingRestartBootProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PINGJOB); GearmanJob pingJobRestartBoot = job.getGearmanJob(); if (pingJobRestartBoot != null) { Date currentDate = new Date(); Long currentTimestamp = currentDate.getTime(); // wait 2 min until Restart - Failed Long expectedTimestamp = pingRestartBootTime.get(client.getId()) + waitTime; if (expectedTimestamp >= currentTimestamp) { GearmanJobStatus jobStatus = gearmanClient .getJobStatus(pingJobRestartBoot); if (!jobStatus.isKnown() && pingJobRestartBoot.isDone()) { GearmanJobResult pingJobRestartRes = pingJobRestartBoot .get(); String result = ByteUtils.fromUTF8Bytes(pingJobRestartRes .getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue .parse(result); if (!resultObj.containsKey("err")) { String alive = resultObj.get("alive").toString(); if (alive.equals("true")) { logger.info(client.getIp() + " is alive after restart"); // alive, go to success state client.setState(ClientState.BOOT_SUCCESS); removeClientJob(job); } else if (alive.equals("false")) { logger.info("ping again " + client.getIp()); // not alive, ping again client .setState(ClientState.PING_RESTART_BOOT_AGAIN); removeClientJob(job); } } else { logger .error(client.getIp() + " Cannot send the ping after shutdown message."); client .setError("Sending the ping after shutdown message has been failed."); /* * sending the ping after shutdown message has been * failed */ client.setState(ClientState.BOOT_ERROR); } } } } else { logger.error(client.getIp() + " is not alive after reboot"); client.setError("The reboot has been failed."); // not alive, go in errorState client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } private void wakeOnLan(Client client) { GearmanJob job = GearmanJobImpl.createJob("wol", client.getMac() .getBytes(), "wol" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_WAKE_ON_LAN_PROGRESS); addClientJob(client.getId(), ClientJob.WOLJOB, job); logger.info("wake on lan " + client.getMac()); } private void checkWakeOnLanProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.WOLJOB); GearmanJob wolJob = job.getGearmanJob(); if (wolJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(wolJob); if (!jobStatus.isKnown() && wolJob.isDone()) { GearmanJobResult wolJobRes = wolJob.get(); String result = ByteUtils.fromUTF8Bytes(wolJobRes.getResults()); if (result.equals("Magic packet send.")) { logger.info(client.getMac() + " Magic packet sent."); client.setState(ClientState.MAGIC_PACKET_SENT); removeClientJob(job); } else { logger.error(client.getIp() + " Cannot send magic packet."); client .setError("Sending the magic packet has been failed."); // cannot send magic packet, go in errorState client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } private void checkOS(Client client) { GearmanJob job = GearmanJobImpl.createJob("os", client.getIp() .getBytes(), "os" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_CHECKOS_PROGRESS); addClientJob(client.getId(), ClientJob.OSJOB, job); logger.info("check OS " + client.getIp()); } private void checkCheckosProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.OSJOB); GearmanJob osJob = job.getGearmanJob(); if (osJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(osJob); if (!jobStatus.isKnown() && osJob.isDone()) { GearmanJobResult osJobRes = osJob.get(); String result = ByteUtils.fromUTF8Bytes(osJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { /* * String release = resultObj.get("Release").toString(); * String distriputorID = * resultObj.get("Distributor ID").toString(); */ String description = resultObj.get("Description") .toString(); if (description.equals(bootOS)) { logger.info(client.getIp() + " right OS"); // right os, go to successState client.setState(ClientState.BOOT_SUCCESS); removeClientJob(job); } else { logger.info(client.getIp() + " wrong OS"); client.setState(ClientState.WRONG_OS); removeClientJob(job); } } else { logger.error(client.getIp() + " Cannot check os"); client .setError("The check for correct operating system has been failed."); // cannot check os, go in errorState client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } } private void who(Client client) { GearmanJob job = GearmanJobImpl.createJob("who", client.getIp() .getBytes(), "who" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_WHO_PROGRESS); addClientJob(client.getId(), ClientJob.WHOJOB, job); logger.info("who " + client.getIp()); } private void checkWhoProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.WHOJOB); GearmanJob whoJob = job.getGearmanJob(); if (whoJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(whoJob); if (!jobStatus.isKnown() && whoJob.isDone()) { GearmanJobResult whoJobRes = whoJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { String rawoutput = resultObj.get("rawoutput") .toString(); StringTokenizer str = new StringTokenizer(rawoutput, " "); String user = ""; if (str.hasMoreTokens()) { user = str.nextToken(); } if (user.isEmpty()) { logger.info(client.getIp() + " no user is logged in -CHECK PS-"); // no user is logged in, doing restart // ----- // didnĀ“t work in test-pool, check ps client.setState(ClientState.A_USER_IS_LOGGED_IN); removeClientJob(job); } else { logger .info(client.getIp() + " a user is logged in"); client.setState(ClientState.A_USER_IS_LOGGED_IN); removeClientJob(job); } } else { logger.error(client.getIp() + " Cannot check if a user is logged in."); client .setError("The check if a user is logged in has been failed."); /* * cannot check if a user is logged in, go in errorState */ // client client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } } private void ps(Client client) { GearmanJob job = GearmanJobImpl.createJob("ps", client.getIp() .getBytes(), "ps" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_PS_PROGRESS); addClientJob(client.getId(), ClientJob.PSJOB, job); logger.info("ps " + client.getIp()); } private void checkPsProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.PSJOB); GearmanJob psJob = job.getGearmanJob(); if (psJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(psJob); if (!jobStatus.isKnown() && psJob.isDone()) { GearmanJobResult whoJobRes = psJob.get(); String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { JSONArray ps = (JSONArray) resultObj.get("ps"); // boolean whitelistFound = false; boolean blacklistFound = false; for (String blackEntry : psBlacklist) { if (ps.toString().contains(blackEntry)) { blacklistFound = true; } } /* * for (String whiteEntry : psWhitelist) { if * (ps.toString().contains(whiteEntry)) { whitelistFound * = true; } } */ if (blacklistFound) { /* * if (whitelistFound) { logger.info(client.getIp() * + " is not working"); // is not working * status.put(client.getId(), 13); * removeClientJob(job); } else { */ logger.info(client.getIp() + " is working"); client.setState(ClientState.USER_IS_WORKING); removeClientJob(job); // } } else { // user is not working, doing restart client.setState(ClientState.RESTART_CLIENT); removeClientJob(job); } } else { logger.error(client.getIp() + " Cannot check if user is working."); client .setError("The check if a user is working has been failed."); /* * cannot check if user is working, go in errorState */ client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } } private void restart(Client client) { GearmanJob job = GearmanJobImpl.createJob("restart", client.getIp() .getBytes(), "restart" + client.getId()); gearmanClient.submit(job); client.setState(ClientState.CHECK_RESTART_PROGRESS); addClientJob(client.getId(), ClientJob.RESTARTJOB, job); logger.info("restart " + client.getIp()); } private void checkRestartProgress(Client client) throws Exception, IOException { ClientJob job = getClientJob(client.getId(), ClientJob.RESTARTJOB); GearmanJob restartJob = job.getGearmanJob(); if (restartJob != null) { GearmanJobStatus jobStatus = gearmanClient.getJobStatus(restartJob); if (!jobStatus.isKnown() && restartJob.isDone()) { GearmanJobResult wolJobRes = restartJob.get(); String result = ByteUtils.fromUTF8Bytes(wolJobRes.getResults()); if (!result.isEmpty()) { JSONObject resultObj = (JSONObject) JSONValue.parse(result); if (!resultObj.containsKey("err")) { logger.info(client.getIp() + " Restart command sent"); client.setState(ClientState.RESTART_COMMAND_SENT); removeClientJob(job); } else { logger.error(client.getIp() + " Cannot send restart command"); client .setError("Sending the restart command has been failed."); // cannot send restart command, go in errorState client.setState(ClientState.BOOT_ERROR); removeClientJob(job); } } } } } private ClientJob getClientJob(int clientID, int jobType) { for (ClientJob job : jobs) { if (job.getJobType() == jobType && job.getClientID() == clientID) { return job; } } return null; } private void addClientJob(int clientID, int jobType, GearmanJob gearmanJob) { jobs.add(new ClientJob(clientID, jobType, gearmanJob)); } private void removeClientJob(ClientJob clientJob) { jobs.remove(clientJob); } public Boolean isFinished() { return finished && !error; } public Boolean isFinishedWithErrors() { return finished && error; } public Vector getClients() { return clients; } public String getEventName() { return eventName; } }