import sys import subprocess, signal import os import ControllerClass import DbClass import PingClass import random from time import sleep class doTest: def pings(self,IP): self.serverStatus = None server = PingClass.Ping(IP) self.serverStatus = server.ping(2) def initDB(self): self.db = DbClass.DBMySQLConnection('root', 'randompasswordSQL', 'localhost', 'gsmselftesting') self.db.connectDB() self.dbStatus = self.db.connectDB() def initaccount(self,account,handler): if handler == 'sip' or handler == 'unisip' or handler == 'landline': if account[1] != '' or account[2] != '' or account[3] != '' or account[4] != '': # checking available sip account, is there enough information about the account such as username, password,server self.status = 1 else: self.status = 0 else: if account[0] != '' or account[1] != '': self.status = 1 else: self.status = 0 #kill process to make sure, that the handler is Terminate incase handler having problem receiving # terminate message from controller def killProc(self): # define process name of the Handler procNameDest = 'GSM Handler' procNameCall = 'SIP Handler' p = subprocess.Popen(['ps', '-A'], stdout=subprocess.PIPE) out, err = p.communicate() #search process name and kill it. for line in out.splitlines(): if procNameDest in line: pid = int(line.split(None, 1)[0]) os.kill(pid, signal.SIGKILL) for line in out.splitlines(): if procNameCall in line: pid = int(line.split(None, 1)[0]) os.kill(pid, signal.SIGKILL) def initTest(self, callFrom, callTo): self.initDB() # open database connection if self.dbStatus != 0: # if connection to db establish, do the test #fetch device account detail from database dest = self.db.deviceAddress(str(callTo)) caller = self.db.deviceAddress(str(callFrom)) self.pings(dest[4]) if self.serverStatus <> 0: self.pings(caller[4]) if self.serverStatus <> 0: self.initaccount(caller,callFrom) if self.status == 1: self.initaccount(dest,callTo) if self.status == 1: callPortName = caller[0] accCaller = caller[2]+':'+caller[3]+':'+caller[4]+':' destPortName = dest[0] destNo = dest[1] accDest = dest[2]+':'+dest[3]+':'+dest[4]+':' makeTest = ControllerClass.doTheTest(callFrom, callPortName, accCaller, callTo, destPortName, destNo, accDest) makeTest.FuncTest() self.result = str(makeTest.testResult) print callFrom, callTo, makeTest.testResult else: self.result = 100 else: self.result = 100 sleep(3) self.killProc() # kill all the handler self.db.closeDBConn() sleep(1) else: self.result = 500 else: self.result = 500 else: self.result = 333 return self.result def isThere(self,keyword,lists): x = 0 for item in lists: if item == keyword: return 1 else: x = x+1 def smartTest(self): self.initDB() self.smartResultList = list() deviceLists = self.db.deviceList() gsmList = list() gsmRZList = list() sipList = list() destList = list() rem = list() self.db.closeDBConn() for lists in deviceLists: #define category of the device device = lists[0] if device[0:5] == 'GSMRZ': gsmRZList.append(device) elif device[0:5] == 'GSMEx': gsmList.append(device) else: sipList.append(device) if device[0:5] == 'GSMRZ' or device[0:5] == 'GSMEx': destList.append(device) #first test from university telphone network to random GSM RZ avaliable i = random.randint(0, len(gsmRZList)-1) callTo = gsmRZList[i] self.initTest('unisip',callTo) gsmRZList.remove(callTo) destList.remove(callTo) self.smartResultList.append(['unisip', callTo, self.result, 1]) for callFrom in gsmRZList: i = random.randint(0, len(destList)-1) #Check whether the caller and dest are same callTo = destList[i] if callFrom == callTo: #Check whether the caller and dest are same if i == 0: i = i+1 # if it in the first list, change to be the second list else, just back on step. else: i = i-1 callTo = destList[i] self.initTest(callFrom,callTo) destList.remove(callTo) destList.remove(callFrom) gsmRZList.remove(callFrom) self.smartResultList.append([callFrom, callTo, self.result,1]) # test incoming call from outside rz network to gsm rz i = random.randint(0, len(gsmRZList)-1) # callTo = gsmRZList[i] self.initTest('landline',callTo) if self.isThere(callTo,destList) == 1: # Checking whether caller at gsmrz list in the destination list, if yes delete it. destList.remove(callTo) self.smartResultList.append(['landline', callTo, self.result,1]) for callTo in destList: callFrom = 'sip' self.initTest(callFrom,callTo) self.smartResultList.append([callFrom, callTo, self.result,1]) #retry to call in success test to make sure tests are valid for dest in self.smartResultList: if int(dest[2]) == 486 and int(dest[3]) != 2: self.initTest('sip', dest[1]) self.smartResultList.append([dest[0], dest[1], self.result,2]) rem.append(dest) if int(dest[2]) == 999 and int(dest[3]) != 2: self.initTest('sip', dest[1]) self.smartResultList.append(['sip', dest[1], self.result,2]) rem.append(dest) caller = dest[0] # to test nanobts if the test come from RZ GSM but fehler if caller[0:5] == 'GSMRZ' and int(dest[3]) != 2: if int(dest[2]) == 486: self.initTest('sip', dest[0]) self.smartResultList.append([dest[0], dest[1], self.result,2]) rem.append(dest) for remov in rem: for x in self.smartResultList: if x == remov: self.smartResultList.remove(x) return self.smartResultList