From 8a7755d6f96e599fe75ac8e5e1c6994138f4b99e Mon Sep 17 00:00:00 2001 From: gsmselftest Date: Wed, 16 Nov 2011 18:06:02 +0100 Subject: under Testing folder --- Under-Testing/Server-Code-New/ControllerClass.py | 352 +++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 Under-Testing/Server-Code-New/ControllerClass.py (limited to 'Under-Testing/Server-Code-New/ControllerClass.py') diff --git a/Under-Testing/Server-Code-New/ControllerClass.py b/Under-Testing/Server-Code-New/ControllerClass.py new file mode 100644 index 0000000..df89ac5 --- /dev/null +++ b/Under-Testing/Server-Code-New/ControllerClass.py @@ -0,0 +1,352 @@ +import sys +import os +import subprocess +import SSHTunnelBoxClass +import ClientClass +import random +import csv + +import LogFileClass +logger = LogFileClass.Logging('TestProcessLog.log') + +from time import sleep + + +class controller: + + def __init__(self, callFrom, callPortName, accCaller, callTo, destPortName, destNo, accDest): + + self.callFrom = callFrom + self.dest = callTo + self.destNo = destNo + self.accDest = accDest + self.accCaller = accCaller + self.callPortName = callPortName + self.destPortName = destPortName + self.portCaller = None + self.portDest = None + self.resultCaller = None + self.resultDest = None + self.testResult = None + + def FuncTest(self): + + logger.logEvent(' -- -X- --') + + self.initCaller() + + if self.callFrom =="GSMRZ3" or self.callFrom =="GSMRZ2": # wait until ssh connection establish + if self.continues == 1: + #putting 6 seconds sleep to prevent unsuccess login using SSH since we have problem with duration time loging into beagle board + sleep(2) + self.callerGreeting() + else: + self.connected = 'NOT OK' + else: + #waiting 2 seconds if doesnt use ssh connection until handler ready + sleep(5) + self.callerGreeting() + + if self.connected == 'OK': + + self.caller.sendData('CALLER|'+self.destNo) + callerHandler = self.caller.receiveData(25) + #waiting ready message from caller + + if callerHandler == "CALLER READY": + logger.logEvent('Caller handler : Ready') + self.initReceiver() + if self.dest =="GSMRZ3" or self.dest =="GSMRZ2": # wait until ssh connection establish + if self.continues == 1: + #putting 6 seconds sleep to prevent unsuccess login using SSH since we have problem with duration time loging into beagle board + sleep(2) + self.receiverGreeting() + else: + self.connected = 'NOT OK' + else: + #waiting 2 seconds if doesnt use ssh connection until handler ready + sleep(5) + self.receiverGreeting() + + if self.connected == 'OK': + + self.receiver.sendData('RECEIVER') + destHandler = self.receiver.receiveData(25) + #waiting ready message from destination + + if destHandler == 'RECEIVER READY': + logger.logEvent('Receiver handler : Ready') + self.startCall() + self.waitingFeedback() + + #Device destination having error after handler ready to receive call. + elif destHandler == 'DEVICE NOT READY': + self.testResult == 802 + logger.logEvent('802 General Device Error: Destination device no respond timeout') + self.initTerminate() + else: + self.testResult = 604 + logger.logEvent('604 General Handler Error: Destination handler no respond timeout') + self.initTerminate() + + #can connect to handler but device destination not ready to do the test. + elif self.connected == 'DEVICE NOT READY': + self.testResult = 802 + logger.logEvent('802 General Device Error: Destination device no respond timeout') + self.initTerminate() + else: + logger.logEvent('998 General Handler Error: Could not connect Destination handler') + self.testResult = 998 + self.caller.sendData('TERMINATE CONNECTION') + self.caller.closeConnection() + self.initCancelTest() + else: + self.testResult = 605 + logger.logEvent('605 General Handler Error: caller handler no respond timeout') + + self.caller.sendData('TERMINATE CONNECTION') + self.caller.closeConnection() + self.initCancelTest() + + #can connect to handler but device caller not ready to do the test. + elif self.connected == 'DEVICE NOT READY': + self.testResult = 801 + self.caller.sendData('TERMINATE CONNECTION') + self.caller.closeConnection() + logger.logEvent('802 General Device Error: Caller device no respond timeout') + self.initCancelTest() + else: + self.testResult = 999 + logger.logEvent('999 General Handler Error: Could not connect to Caller handler') + + def writeToFile(self, AccountInfo): + try: + with open('handler.txt', 'w') as F: + writer = csv.writer(F) + writer.writerow([AccountInfo]) + except ValueError: + print "can't write to file" + + def initCancelTest(self): + #close SSH connection when using gsmBox and destination doesnt respond. to make sure SSH connection are terminate + logger.logEvent('init Cancel test') + if self.callFrom[0:5] == 'GSMRZ': + if self.callFrom != 'GSMRZ1': + # close SSH tunneling + self.boxCaller.killTunneling() + + # waiting results state + def waitingFeedback(self): + logger.logEvent('Waiting Feedback') + self.resultDest = self.receiver.receiveData(20) + self.resultCaller = self.caller.receiveData(20) + #print 'result '+self.resultCaller+'--'+self.resultDest + if self.resultCaller == 'DEVICE NOT READY': + logger.logEvent('Caller DEVICE NOT READY') + self.testResult = 801 + self.initTerminate() + + elif self.dest == 'DEVICE NOT READY': + logger.logEvent('Caller DEVICE NOT READY') + self.testResult = 802 + self.initTerminate() + + elif self.resultCaller == 'CALL OK' and self.resultDest =='CALL OK': + logger.logEvent('Test Succeed') + self.testResult = 200 + self.initTerminate() + else: + #build specially only for Eplus card. since they use prepaid card. + if self.dest == 'GSMExt.Eplus': + if self.resultCaller == 'CALL OK' and self.resultDest <> 'TIME OUT': + logger.logEvent('Test Failed - Eplus No credit on Eplus') + self.testResult = 402 + self.initTerminate() + else: + logger.logEvent('Test Failed') + self.testResult = 486 + self.initTerminate() + + else: + #one or both of the handler send un success test. assign failed to this test + if self.resultCaller <> 'CALL OK' or self.resultDest <> 'CALL OK': + logger.logEvent('Test Failed') + self.testResult = 486 + self.initTerminate() + + #send start call message to caller + def startCall(self): + logger.logEvent('Start Call') + self.receiver.sendData('RECEIVE START') + self.caller.sendData('CALL START') + + def initAccount(self, account, handler, PortName, portCom): + + accConf = account + self.username = accConf[0:accConf.find(':')] + + line = accConf[accConf.find(':')+1:] + self.password = line[0:line.find(':')] + + newLine = line[line.find(':')+1:] + self.server = newLine[0:newLine.find(':')] + + textFile = 'Account:'+str(self.username)+':'+str(self.password)+':'+str(self.server)+':'+str(handler)+':'+str(PortName)+':'+str(portCom) + self.writeToFile(textFile) + + # define the caller configuration such as port name and port caller. + def initCaller(self): + logger.logEvent('init Caller') + logger.logEvent(self.callFrom) + self.portCaller = random.randint(30000,60000) + + if self.callFrom[0:4] == 'GSMR': + if self.callFrom =="GSMRZ1": + self.initAccount(self.accCaller,self.callFrom, self.callPortName,self.portCaller) + self.initGSM(self.portCaller, self.callPortName, self.callFrom) + else: + self.initAccount(self.accCaller, self.callFrom, self.callPortName,self.portCaller) + #open SSH tunneling + self.boxCaller = SSHTunnelBoxClass.SSHTunneling(self.portCaller, 50008, self.server, self.username, self.password) + status = self.boxCaller.startTunneling() + + #check whether the SSH tunneling succes or not, 0 is failed! + if status!= 0: + self.continues = 1 + else: + self.continues = 0 + elif self.callFrom[0:4] == 'GSME': + self.initAccount(self.accCaller,self.callFrom, self.callPortName,self.portCaller) + self.initGSM(self.portCaller, self.callPortName, self.callFrom) + + else: + #open the SIP handler + self.initAccount(self.accCaller,self.callFrom, self.callPortName,self.portCaller) + script = 'SIPHandler.py' + subprocess.Popen(['python',script], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + #subprocess.Popen(args=["gnome-terminal", '--command=python SIPHandler.py '+self.accCaller+ ' ' +str(self.portCaller)]) + + # define the destination configuration such as port name and port caller. + def initReceiver(self): + logger.logEvent('init Receiver') + logger.logEvent(self.dest) + self.portDest = random.randint(30000,60000) + + if self.dest[0:4] == 'GSMR': + if self.dest =="GSMRZ1": + self.initAccount(self.accDest, self.dest, self.destPortName,self.portDest) + self.initGSM(self.portDest, self.destPortName, self.dest) + else: + self.initAccount(self.accDest, self.dest, self.destPortName,self.portDest) + #open SSH tunneling + self.boxDest = SSHTunnelBoxClass.SSHTunneling(self.portDest, 50008, self.server, self.username, self.password) + status = self.boxDest.startTunneling() + + #check whether the SSH tunneling succes or not, 0 is failed! + if status!= 0: + self.continues = 1 + else: + self.continues = 0 + + elif self.dest[0:4] == 'GSME': + self.initAccount(self.accDest, self.dest, self.destPortName,self.portDest) + self.initGSM(self.portDest, self.destPortName, self.dest) + + else: + #self.portDest = 50100 + self.initAccount(self.accDest, self.dest, self.destPortName,self.portDest) + script = 'SIPHandler.py' + subprocess.Popen(['python',script], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + #subprocess.Popen(args=['gnome-terminal', '--command=python SIPHandler.py '+self.accDest+ ' ' +str(self.portDest)]) + + # send terminate message to Handlers + def initTerminate(self): + logger.logEvent('TERMINATE CONNECTION') + self.caller.sendData('TERMINATE CONNECTION') + self.receiver.sendData('TERMINATE CONNECTION') + + if self.callFrom[0:5] == 'GSMRZ': + if self.callFrom != 'GSMRZ1': + # close SSH tunneling + self.boxCaller.killTunneling() + if self.dest[0:5] == 'GSMRZ': + if self.dest != 'GSMRZ1': + # close SSH tunneling + self.boxDest.killTunneling() + + #close port communication + self.receiver.closeConnection() + self.caller.closeConnection() + + def callerGreeting(self): # send greeting message to the caller handler + logger.logEvent('Caller Greeting') + self.connected = None + #open connection to the Handler + self.caller = ClientClass.Connection('localhost',self.portCaller) + self.caller.connect() + + if self.caller.connected == 1: + #connection establish and send hallo message to handler + logger.logEvent('Connected to Caller Handler') + self.caller.sendData('HELLO HANDLER') + #waiting handler respond + message = self.caller.receiveData(20) + + if message == 'HELLO CONTROLLER': + # caller handler send hello message + logger.logEvent('Caller Handler respond') + self.connected = 'OK' + elif message == 'DEVICE NOT READY': + # the handler found the device not ready to making test. + logger.logEvent('Connect to Caller but device doesnt work') + self.connected = 'DEVICE NOT READY' + else: + logger.logEvent('Cannt connect to Caller') + self.connected = 'NOT OK' + else: + #can't connect to caller handler + logger.logEvent('Cannt connect to Caller') + self.connected = 'NOT OK' + + def receiverGreeting(self): # send greeting message to the destination handler + logger.logEvent('Receiver Greeting') + self.connected = None + + #open connection to the Handler + self.receiver = ClientClass.Connection('localhost', self.portDest) + self.receiver.connect() + + if self.receiver.connected == 1: + #connection establish and send hallo message to handler + logger.logEvent('Connected to Receiver Handler') + self.receiver.sendData('HELLO HANDLER') + #waiting handler respond + message = self.receiver.receiveData(20) + + # destination handler send hello message + if message == 'HELLO CONTROLLER': + logger.logEvent('Receiver Handler respond') + self.connected = 'OK' + + # the handler found the device not ready to making test. + elif message == 'DEVICE NOT READY': + logger.logEvent('Connect to Caller but device doesnt work') + self.connected = 'DEVICE NOT READY' + else: + logger.logEvent('receiver handler not respond') + self.connected = 'NOT OK' + else: + #can't connect to destintaion handler + logger.logEvent('Cannt connect to Receiver') + self.connected = 'NOT OK' + + def initGSM(self, portCommunication, portDevice, handler): + #open GSM Handler + logger.logEvent('Init GSM') + script = 'GSMHandler.py' + subprocess.Popen(['python',script], shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + #subprocess.Popen(args=["gnome-terminal", '--command=python GSMHandler.py '+str(portCommunication)+ ' ' +str(portDevice)+' '+str(handler)]) + + + + -- cgit v1.2.3-55-g7522