import sys import subprocess, signal import os import ControllerClass import DbClass import PingClass import random from time import sleep class doTest: def pings(self,IP): server = PingClass.Ping(IP) self.serverStatus = server.ping(2) return self.serverStatus def initDB(self): self.db = DbClass.DBMySQLConnection('root', 'randompasswordSQL', 'localhost', 'gsmselftesting') self.db.connectDB() self.dbStatus = self.db.connectDB() def initaccount(self,account,handler): if handler == 'sip' or handler == 'unisip' or handler == 'landline': if account[1] != '' or account[2] != '' or account[3] != '' or account[4] != '': # checking available sip account, is there enough information about the account such as username, password,server self.status = 1 else: self.status = 0 else: if account[0] != '' or account[1] != '': self.status = 1 else: self.status = 0 #kill process to make sure, that the handler is Terminate incase handler having problem receiving # terminate message from controller def killProc(self): # define process name of the Handler procNameDest = 'GSM Handler' procNameCall = 'SIP Handler' p = subprocess.Popen(['ps', '-A'], stdout=subprocess.PIPE) out, err = p.communicate() #search process name and kill it. for line in out.splitlines(): if procNameDest in line: pid = int(line.split(None, 1)[0]) os.kill(pid, signal.SIGKILL) for line in out.splitlines(): if procNameCall in line: pid = int(line.split(None, 1)[0]) os.kill(pid, signal.SIGKILL) def initTest(self, callFrom, callTo): self.initDB() # open database connection if self.dbStatus != 0: # if connection to db establish, do the test #fetch device account detail from database dest = self.db.deviceAddress(str(callTo)) caller = self.db.deviceAddress(str(callFrom)) #self.pings(dest[4]) if self.pings(caller[4]) <> 0: #self.pings(caller[4]) if self.pings(dest[4]) <> 0: self.initaccount(caller,callFrom) if self.status == 1: self.initaccount(dest,callTo) if self.status == 1: callPortName = caller[0] accCaller = caller[2]+':'+caller[3]+':'+caller[4]+':' destPortName = dest[0] destNo = dest[1] accDest = dest[2]+':'+dest[3]+':'+dest[4]+':' makeTest = ControllerClass.doTheTest(callFrom, callPortName, accCaller, callTo, destPortName, destNo, accDest) makeTest.FuncTest() self.result = str(makeTest.testResult) print callFrom, callTo, makeTest.testResult else: self.result = 100 else: self.result = 100 sleep(3) self.killProc() # kill all the handler self.db.closeDBConn() #close db connection sleep(1) else: self.result = 500 else: self.result = 500 else: self.result = 333 self.db.closeDBConn() return self.result # function to search in the list def isThere(self, keyword,lists): x = 0 for item in lists: if item == keyword: return 1 else: x = x+1 def smartTest(self): self.initDB() self.smartResultList = list() deviceLists = self.db.deviceList() gsmList = list() gsmRZList = list() sipList = list() destList = list() rem = list() item = list() cpgsmRZList = list() self.db.closeDBConn() for lists in deviceLists: #define category of the device device = lists[0] if device[0:5] == 'GSMRZ': gsmRZList.append(device) cpgsmRZList.append(device) elif device[0:5] == 'GSMEx': gsmList.append(device) else: sipList.append(device) if device[0:5] == 'GSMRZ' or device[0:5] == 'GSMEx': destList.append(device) #first test from university telphone network to random GSM RZ avaliable i = random.randint(0, len(gsmRZList)-1) callTo = gsmRZList[i] self.initTest('unisip',callTo) gsmRZList.remove(callTo) destList.remove(callTo) self.smartResultList.append(['unisip', callTo, self.result, 1]) for callFrom in gsmRZList: i = random.randint(0, len(destList)-1) #Check whether the caller and dest are same callTo = destList[i] if callFrom == callTo: #Check whether the caller and dest are same if i == 0: i = i+1 # if it in the first list, change to be the second list else, just back on step. else: i = i-1 callTo = destList[i] self.initTest(callFrom,callTo) destList.remove(callTo) destList.remove(callFrom) gsmRZList.remove(callFrom) self.smartResultList.append([callFrom, callTo, self.result,1]) # test incoming call from outside rz network to gsm rz i = random.randint(0, len(gsmRZList)-1) # callTo = gsmRZList[i] self.initTest('landline',callTo) if self.isThere(callTo,destList) == 1: # Checking whether caller at gsmrz list in the destination list, if yes delete it. destList.remove(callTo) self.smartResultList.append(['landline', callTo, self.result,1]) # testing from random GSM RZ to sip #i = random.randint(0, len(cpgsmRZList)-1) # #callFrom = cpgsmRZList[i] #self.initTest(callFrom, 'sip') #self.smartResultList.append([callFrom, 'sip', self.result,1]) for callTo in destList: callFrom = 'sip' self.initTest(callFrom, callTo) self.smartResultList.append([callFrom, callTo, self.result,1]) #checking unsuccess call, to make sure that destination are really unreachable for dest in self.smartResultList: #check unsuccess call and did the test have already tried, 2 means has been check if int(dest[2]) == 486 and int(dest[3]) != 2 and dest[1] != 'sip' and dest[0] != 'sip': testDestination = False # make sure that destination have not tested by another part and give success result. for test in self.smartResultList: if test[1] == dest[1] and int(dest[2]) == 200: testDestination = True if testDestination == True: self.initTest('sip', dest[1]) self.smartResultList.append(['sip', dest[1], self.result,2]) rem.append(dest) #check unsuccess call because caller handler having problem if int(dest[2]) == 999 and int(dest[3]) != 2 and dest[1] != 'sip' and dest[1] != 'sip': testDestination = False # make sure that destination have not tested by another part and give success result. for test in self.smartResultList: if test[1] == dest[1] and int(dest[2]) != 200: testDestination = True if testDestination == True: self.initTest('sip', dest[1]) self.smartResultList.append(['sip', dest[1], self.result,2]) #rem.append(dest) caller = dest[0] # to test nanobts if the test come from RZ GSM but fehler if caller[0:5] == 'GSMRZ' and int(dest[3]) != 2 and dest[1] != 'sip': if int(dest[2]) == 486: self.initTest('sip', dest[1]) self.smartResultList.append(['sip', dest[1], self.result,2]) rem.append(dest) # test to make sure nanoBTS working or not. sice probably that nanotbts seems error but actually not. for RZ in cpgsmRZList: repeat = False for gsmrzResult in self.smartResultList: if gsmrzResult[0] == RZ or gsmrzResult[1] == RZ: if int(gsmrzResult[2]) == 486: repeat = True if gsmrzResult[1] == RZ and int(gsmrzResult[2]) == 998: cpgsmRZList.remove(RZ) if gsmrzResult[0] == RZ and int(gsmrzResult[2]) == 999: cpgsmRZList.remove(RZ) if len(cpgsmRZList) > 1: if repeat == True: i = random.randint(0, len(cpgsmRZList)-1) # callTo = cpgsmRZList[i] if i == 0: x = i+1 else: x = i-1 callFrom = cpgsmRZList[x] self.initTest(callFrom, callTo) self.smartResultList.append([callFrom, callTo, self.result,2]) item = '['+str(gsmrzResult[0])+','+str(gsmrzResult[1])+','+str(gsmrzResult[2])+','+str(1)+']' rem.append(item) for remov in rem: for x in self.smartResultList: if x == remov: self.smartResultList.remove(x) return self.smartResultList