summaryrefslogblamecommitdiffstats
path: root/gearman/controllerWorker/ControllerWorker/Shutdown.java
blob: b0c869e8f97a7728ca9867c98f7dcd471f47e432 (plain) (tree)
1
2
3
4
5
6



                           
                      
                                 











                                                        
                                 


                                  

                               
                                      
                                                                              
 
                                 
                                       
                                     
                              
                                              
                                           
 

                                                                   

                                 
 
                                                                                 


                                                                                

                                           
                                                                   
                                                
                                                  
                                               

                                                                         

                                                              

                                 






                                   



                                                                                   








                                                                                
                                                
                                               
                                                           

                         

                                
                                                                             
                        
                                                                              
                 
         
 
          
                                                       
           
 
                                                   
                                               
                                                    
 
                                            
                                             
 
                                      
 
                                                 
                                                          
 
                                      
 
                                             
                                            
 
                                      
 
                                                
                                                         
 
                                      
 
                                             
                                                   
 
                                      
 
                                                     
                                                              


                                      
                                                   

                                                                
                                                                                       
                                                     


                                      
                                                 



                                                     
                                                          
                                                                  
 
                                      
 
                                               
                                           


                                      
                                               
                                                        


                                      
                                             
                                                                                        
                                                                              
                                                                            

                                      
                                            
                                                           
                                                                                                        
                                                        

                                                     
 

                                      
                                              
                                                           
                                                                                                           
                                                        
                                 
 
                                      
 
                         


                                            

                                                  
                                                   

                                                    
                                      









                                     
          
                                                                                    
           
 
                                          

                                                                                
                                          
                                                                 
                                                      
                                                      
         
 
                                                                                     
                                                                      





                                                                                         

                                                                                        










                                                                                                     
                                                                                  


                                                                                                   
                                                                                                      
                                                                                  

                                                 



                                                                                                                       
                                                                                           
                                                                                            
                                                                          



                                         
         
 
                                         

                                                                               
                                          
                                                                
                                                     
                                                     
         
 
                                                                                    
                                                                    
 









                                                                                                    



                                                                                                    



                                                                               
 
                                                                     

                                                                                                              
                                                                               
                                                                

                                                                                                       
                                                                                 
                                                        


                                                                                                                  

                                                                                                       
                                                                                 








                                                                                                                               
                                                                                            
                                                                         




                                         
 
                                                
                                                                                      

                                                            
                                          
                                                                     
                                                          
                                                            
         
 

                                                                           
                                                                                
 











                                                                                                       
                                                                                
                                                

                                                                                                   

                                                                                                                           
                                                                                                   
                                                                                            
                                                                                


                                         
                 
         
 
                                                  

                                                                                
                                          
                                                                          
                                                      
                                                      
         
 

                                                                               
                                                                              
 



                                                                      

                                                                             





                                                                                            

                                                                                          
                                                                

                                                                                             


                                                                                                         

                                                                                                           
                                                                                                              
                                                                                                  
                                                                                          




                                                                                                                                            

                                                         


                                                                                                                                           


                                                                                                                                              

                                                                                                           
                                                           
                                                                                                    
                                                                                          






                                                                                          
                                                                            
                                                                  
                         
                 
         
 
                                        

                                                                              
                                          
                                                               
                                                    
                                                    
         
 
                                                                                   
                                                                  






















                                                                                                    

                                                                                                        



                                                                     

                                                                                                           
                                                                                         
                                                                                       



                                                                                                     
                                                                                




                                                                                                        
                                                                                








                                                                                                                             
                                                                                            
                                                                        




                                         
 
                                     
                                          




                                               
 
                                            

                               



                                      
 
 
package ControllerWorker;

import java.io.IOException;
import java.lang.Thread;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.Vector;

import org.gearman.client.GearmanClient;
import org.gearman.client.GearmanClientImpl;
import org.gearman.client.GearmanJob;
import org.gearman.client.GearmanJobImpl;
import org.gearman.client.GearmanJobResult;
import org.gearman.client.GearmanJobStatus;
import org.gearman.common.GearmanJobServerConnection;
import org.gearman.common.GearmanNIOJobServerConnection;
import org.gearman.util.ByteUtils;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import org.apache.log4j.Logger;

public class Shutdown extends Thread {
	private static final Logger logger = ControllerWorkerMain.getLogger();

	private String eventName;
	private Vector<Client> clients;
	private final int updateRate;
	private long waitTime;
	// private Vector<String> psWhitelist;
	private Vector<String> psBlacklist;

	private final GearmanJobServerConnection gearmanConnection;
	private GearmanClient gearmanClient;
	private Boolean finished;
	private Boolean error;

	public Shutdown(String eventName, Vector<Client> clients, int updateRate,
			long waitTime, Vector<String> psWhitelist,
			Vector<String> psBlacklist, String gearmanServerAddress,
			int gearmanServerPort) {
		this.eventName = eventName;
		this.clients = clients;
		this.updateRate = updateRate; // updates per second
		this.waitTime = waitTime * 1000;
		// this.psWhitelist = psWhitelist;
		this.psBlacklist = psBlacklist;
		gearmanConnection = new GearmanNIOJobServerConnection(
				gearmanServerAddress, gearmanServerPort);
		gearmanClient = new GearmanClientImpl();
		gearmanClient.addJobServer(gearmanConnection);
		finished = false;
		error = false;
	}

	public void run() {
		workerLoop();
	}

	private void workerLoop() {
		long beginTime;
		long timeTaken;
		long timeLeft;
		final long updatePeriod = 1000000000L / updateRate; // nanoseconds;
		Boolean run = true;
		while (run) {
			try {
				beginTime = System.nanoTime();
				run = update();
				timeTaken = System.nanoTime() - beginTime;
				timeLeft = (updatePeriod - timeTaken) / 1000000;
				if (timeLeft < 10)
					timeLeft = 10;
				sleep(timeLeft);
			} catch (Exception e) {
				logger.error(e.toString());
			}
		}
		finished = true;
		if (error) {
			logger.error("Shutdown of " + eventName + " failed");
		} else {
			logger.info("Shutdown of " + eventName + " finished");
		}
	}

	/*
	 * -------------- shutdown logik --------------
	 */

	private Boolean update() throws Exception {
		for (Client client : clients) {
			switch (client.getState()) {

			case CLIENT_UNKNOWN:
				ping(client);

				break;

			case CHECK_PING_PROGRESS:
				checkPingProgress(client);

				break;

			case CLIENT_IS_ALIVE:
				who(client);

				break;

			case CHECK_WHO_PROGRESS:
				checkWhoProgress(client);

				break;

			case SHUTDOWN_CLIENT:
				doShutdown(client);

				break;

			case CHECK_SHUTDOWN_PROGRESS:
				checkShutdownProgress(client);

				break;

			case SHUTDOWN_COMMAND_SENT:
				Date date = new Date();
				Long timestamp = date.getTime();
				client.addPingTime(ClientPingTime.SHUTDOWN, timestamp);
				pingShutdown(client);

				break;

			case PING_SHUTDOWN_AGAIN:
				pingShutdown(client);

				break;

			case CHECK_PING_SHUTDOWN_PROGRESS:
				checkPingShutdwonProgress(client);

				break;

			case USER_IS_LOGGED_IN:
				ps(client);

				break;

			case CHECK_PS_PROGRESS:
				checkPsProgress(client);

				break;

			case USER_IS_WORKING:
				logger.error(client.getIp() + " User has been working");
				client.setError("The user has been working.");
				client.setState(ClientState.SHUTDOWN_ERROR);
				break;

			case SHUTDOWN_ERROR:
				if (!client.isFinished()) {
					logger.error(client.getIp() + " Shutdown failed"); // errorState
					client.finish();
					error = true;
				}

				break;

			case SHUTDOWN_SUCCESS:
				if (!client.isFinished()) {
					logger.info(client.getIp() + " Shutdown finished"); // successState
					client.finish();
				}

				break;

			}
		}

		boolean allFinished = false;
		for (Client client : clients) {
			if (client.isFinished()) {
				allFinished = true;
			} else {
				allFinished = false;
				break;
			}
		}

		if (allFinished) {
			return false;
		} else {
			return true;
		}
	}

	/*
	 * ------------------------- function declarations -------------------------
	 */

	private void ping(Client client) {
		GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp()
				.getBytes(), "ping" + client.getId());
		gearmanClient.submit(job);
		client.setState(ClientState.CHECK_PING_PROGRESS);
		client.addJob(ClientJob.PINGJOB, job);
		logger.info("ping " + client.getIp());
	}

	private void checkPingProgress(Client client) throws Exception, IOException {
		GearmanJob pingJob = client.getJob(ClientJob.PINGJOB);

		if (pingJob != null) {
			GearmanJobStatus jobStatus = gearmanClient.getJobStatus(pingJob);

			if (!jobStatus.isKnown() && pingJob.isDone()) {
				GearmanJobResult pingJobRes = pingJob.get();
				String result = ByteUtils
						.fromUTF8Bytes(pingJobRes.getResults());

				if (!result.isEmpty()) {
					JSONObject resultObj = (JSONObject) JSONValue.parse(result);
					if (!resultObj.containsKey("err")) {
						String alive = resultObj.get("alive").toString();

						if (alive.equals("true")) {
							logger.info(client.getIp() + " alive");
							client.setState(ClientState.CLIENT_IS_ALIVE);
							// check
							// Users
							client.removeJob(pingJob);
						} else if (alive.equals("false")) {
							logger.info(client.getIp() + " not alive");
							// not alive, go in successState
							client.setState(ClientState.SHUTDOWN_SUCCESS);
							client.removeJob(pingJob);
						}
					} else {
						logger.error(client.getIp()
								+ " Cannot send the ping message.");
						client
								.setError("Sending the ping message has been failed.");
						// sending the ping message has been failed
						client.setState(ClientState.SHUTDOWN_ERROR);
						client.removeJob(pingJob);
					}
				}
			}
		}
	}

	private void who(Client client) {
		GearmanJob job = GearmanJobImpl.createJob("who", client.getIp()
				.getBytes(), "who" + client.getId());
		gearmanClient.submit(job);
		client.setState(ClientState.CHECK_WHO_PROGRESS);
		client.addJob(ClientJob.WHOJOB, job);
		logger.info("who " + client.getIp());
	}

	private void checkWhoProgress(Client client) throws Exception, IOException {
		GearmanJob whoJob = client.getJob(ClientJob.WHOJOB);

		if (whoJob != null) {
			GearmanJobStatus jobStatus = gearmanClient.getJobStatus(whoJob);

			if (!jobStatus.isKnown() && whoJob.isDone()) {
				GearmanJobResult whoJobRes = whoJob.get();
				String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults());

				if (!result.isEmpty()) {
					JSONObject resultObj = (JSONObject) JSONValue.parse(result);
					if (!resultObj.containsKey("err")) {
						String rawoutput = resultObj.get("rawoutput")
								.toString();
						StringTokenizer str = new StringTokenizer(rawoutput,
								" ");
						String user = "";
						if (str.hasMoreTokens()) {
							user = str.nextToken();
						}

						if (user.isEmpty()) {
							logger.info(client.getIp()
									+ " no user is logged in -CHECK PS-");
							// no user is logged in
							// -----
							// didn´t work in test-pool, check ps
							client.setState(ClientState.USER_IS_LOGGED_IN);
							client.removeJob(whoJob);
						} else {
							logger
									.info(client.getIp()
											+ " a user is logged in");
							// a user is logged in
							client.setState(ClientState.USER_IS_LOGGED_IN);
							client.removeJob(whoJob);
						}
					} else {
						logger.error(client.getIp()
								+ " Cannot check if a user is logged in.");
						client
								.setError("The check if a user is logged in has been failed.");
						/*
						 * cannot check if a user is logged in, go in errorState
						 */
						client.setState(ClientState.SHUTDOWN_ERROR);
						client.removeJob(whoJob);
					}
				}
			}
		}
	}

	private void doShutdown(Client client) {
		GearmanJob job = GearmanJobImpl.createJob("doShutdown", client.getIp()

		.getBytes(), "doShutdown" + client.getId());
		gearmanClient.submit(job);
		client.setState(ClientState.CHECK_SHUTDOWN_PROGRESS);
		client.addJob(ClientJob.SHUTDOWNJOB, job);
		logger.info("doShutdown " + client.getIp());
	}

	private void checkShutdownProgress(Client client) throws Exception,
			IOException {
		GearmanJob doShutdownJob = client.getJob(ClientJob.SHUTDOWNJOB);

		if (doShutdownJob != null) {
			GearmanJobStatus jobStatus = gearmanClient
					.getJobStatus(doShutdownJob);

			if (!jobStatus.isKnown() && doShutdownJob.isDone()) {
				GearmanJobResult wolJobRes = doShutdownJob.get();
				String result = ByteUtils.fromUTF8Bytes(wolJobRes.getResults());
				if (!result.isEmpty()) {
					JSONObject resultObj = (JSONObject) JSONValue.parse(result);
					if (!resultObj.containsKey("err")) {
						logger.info(client.getIp() + " Shutdown command sent");
						client.setState(ClientState.SHUTDOWN_COMMAND_SENT);
						client.removeJob(doShutdownJob);
					} else {
						logger.error(client.getIp()
								+ " Cannot send shutdown command");
						client
								.setError("Sending the shutdown command has been failed.");
						// cannot send shutdown command, go in / errorState
						client.setState(ClientState.SHUTDOWN_ERROR);
						client.removeJob(doShutdownJob);
					}
				}
			}
		}
	}

	private void pingShutdown(Client client) {
		GearmanJob job = GearmanJobImpl.createJob("ping", client.getIp()
				.getBytes(), "ping" + client.getId());
		gearmanClient.submit(job);
		client.setState(ClientState.CHECK_PING_SHUTDOWN_PROGRESS);
		client.addJob(ClientJob.PINGJOB, job);
		logger.info("ping " + client.getIp());
	}

	private void checkPingShutdwonProgress(Client client) throws Exception,
			IOException {
		GearmanJob pingJobShutdown = client.getJob(ClientJob.PINGJOB);

		if (pingJobShutdown != null) {
			Date currentDate = new Date();
			Long currentTimestamp = currentDate.getTime();
			// wait 2 min until shutdown
			Long expectedTimestamp = client
					.getPingTime(ClientPingTime.SHUTDOWN)
					+ waitTime;
			if (expectedTimestamp >= currentTimestamp) {
				GearmanJobStatus jobStatus = gearmanClient
						.getJobStatus(pingJobShutdown);
				if (!jobStatus.isKnown() && pingJobShutdown.isDone()) {
					GearmanJobResult pingJobRes = pingJobShutdown.get();
					String result = ByteUtils.fromUTF8Bytes(pingJobRes
							.getResults());
					if (!result.isEmpty()) {
						JSONObject resultObj = (JSONObject) JSONValue
								.parse(result);
						if (!resultObj.containsKey("err")) {
							String alive = resultObj.get("alive").toString();
							if (alive.equals("false")) {
								logger.info(client.getIp()
										+ " is not alive anymore");
								client.setState(ClientState.SHUTDOWN_SUCCESS);
								client.removeJob(pingJobShutdown);
							} else if (alive.equals("true")) {
								logger
										.info(client.getIp()
												+ " is still alive after shutdown command");
								client
										.setState(ClientState.PING_SHUTDOWN_AGAIN);
							}
						} else {
							logger
									.error(client.getIp()
											+ " Cannot send the ping after shutdown message.");
							client
									.setError("Sending the ping after shutdown message has been failed.");
							/*
							 * sending the ping after shutdown message has been
							 * failed
							 */
							client.setState(ClientState.SHUTDOWN_ERROR);
							client.removeJob(pingJobShutdown);
						}
					}
				}
			} else {
				logger.error(client.getIp() + " is alive after shutdown");
				client.setError("Client is still alive after shutdown.");
				// still alive, go in errorState
				client.setState(ClientState.SHUTDOWN_ERROR);
				client.removeJob(pingJobShutdown);
			}
		}
	}

	private void ps(Client client) {
		GearmanJob job = GearmanJobImpl.createJob("ps", client.getIp()
				.getBytes(), "ps" + client.getId());
		gearmanClient.submit(job);
		client.setState(ClientState.CHECK_PS_PROGRESS);
		client.addJob(ClientJob.PSJOB, job);
		logger.info("ps " + client.getIp());
	}

	private void checkPsProgress(Client client) throws Exception, IOException {
		GearmanJob psJob = client.getJob(ClientJob.PSJOB);

		if (psJob != null) {
			GearmanJobStatus jobStatus = gearmanClient.getJobStatus(psJob);

			if (!jobStatus.isKnown() && psJob.isDone()) {
				GearmanJobResult whoJobRes = psJob.get();
				String result = ByteUtils.fromUTF8Bytes(whoJobRes.getResults());

				if (!result.isEmpty()) {
					JSONObject resultObj = (JSONObject) JSONValue.parse(result);
					if (!resultObj.containsKey("err")) {
						JSONArray ps = (JSONArray) resultObj.get("ps");
						// boolean whitelistFound = false;
						boolean blacklistFound = false;

						for (String blackEntry : psBlacklist) {
							if (ps.toString().contains(blackEntry)) {
								blacklistFound = true;
							}
						}

						/*
						 * for (String whiteEntry : psWhitelist) { if
						 * (ps.toString().contains(whiteEntry)) { whitelistFound
						 * = true; } }
						 */

						if (blacklistFound) {
							/*
							 * if (whitelistFound) { logger.info(client.getIp()
							 * + " is not working"); // is not working
							 * status.put(client.getId(), 4);
							 * client.removeClientJob(job);
							 * 
							 * } else {
							 */
							client.setState(ClientState.USER_IS_WORKING);
							client.removeJob(psJob);
							// }
						} else {
							logger.info(client.getIp() + " is not working");
							// is not working
							client.setState(ClientState.SHUTDOWN_CLIENT);
							client.removeJob(psJob);
						}
					} else {
						logger.error(client.getIp()
								+ " Cannot check if user is working.");
						client
								.setError("The check if a user is working has been failed.");
						/*
						 * cannot check if user is working, go in errorState
						 */
						client.setState(ClientState.SHUTDOWN_ERROR);
						client.removeJob(psJob);
					}
				}
			}
		}
	}

	public Boolean isFinished() {
		return finished && !error;
	}

	public Boolean isFinishedWithErrors() {
		return finished && error;
	}

	public Vector<Client> getClients() {
		return clients;
	}

	public String getEventName() {
		return eventName;
	}

}