import sys import subprocess, signal import os import ControllerClass import DbClass import PingClass import random from time import sleep class doTest: def ping(self,handler): # function to check connection to the device if handler == 'landline': server = PingClass.Ping('sipgate.de') self.serverStatus = server.ping(2) elif handler == 'sip': server = PingClass.Ping('132.230.4.8') self.serverStatus = server.ping(2) elif handler == 'unisip': server = PingClass.Ping('132.230.252.228') self.serverStatus = server.ping(2) elif handler == 'GSMRZ3': server = PingClass.Ping('localhost') self.serverStatus = server.ping(2) elif handler == 'GSMRZ2': server = PingClass.Ping('10.4.58.241') self.serverStatus = server.ping(2) else: self.serverStatus = 1 def pings(self,IP): self.serverStatus = None server = PingClass.Ping(IP) self.serverStatus = server.ping(2) def initDB(self): self.db = DbClass.DBMySQLConnection('root', 'randompasswordSQL', 'localhost', 'gsmselftesting') self.db.connectDB() self.dbStatus = self.db.connectDB() def initaccount(self,account,handler): if handler == 'sip' or handler == 'unisip' or handler == 'landline': if account[1] != '' or account[2] != '' or account[3] != '' or account[4] != '': # checking available sip account, is there enough information about the account such as username, password,server self.status = 1 else: self.status = 0 else: if account[0] != '' or account[1] != '': self.status = 1 else: self.status = 0 #kill process to make sure, that the handler is Terminate incase handler having problem receiving # terminate message from controller def killProc(self): # define process name of the Handler procNameDest = 'GSM Handler' procNameCall = 'SIP Handler' p = subprocess.Popen(['ps', '-A'], stdout=subprocess.PIPE) out, err = p.communicate() #search process name and kill it. for line in out.splitlines(): if procNameDest in line: pid = int(line.split(None, 1)[0]) os.kill(pid, signal.SIGKILL) for line in out.splitlines(): if procNameCall in line: pid = int(line.split(None, 1)[0]) os.kill(pid, signal.SIGKILL) def initTest(self, callFrom, callTo): self.initDB() # open database connection if self.dbStatus != 0: # if connection to db establish, do the test #fetch device account detail from database dest = self.db.deviceAddress(str(callTo)) caller = self.db.deviceAddress(str(callFrom)) self.pings(dest[4]) #self.ping(callFrom) # check connection to the device, if self.serverStatus <> 0: #self.ping(callTo) self.pings(caller[4]) if self.serverStatus <> 0: #fetch device account detail from database #dest = self.db.deviceAddress(str(callTo)) #caller = self.db.deviceAddress(str(callFrom)) self.initaccount(caller,callFrom) if self.status == 1: self.initaccount(dest,callTo) if self.status == 1: callPortName = caller[0] accCaller = caller[2]+':'+caller[3]+':'+caller[4]+':' destPortName = dest[0] destNo = dest[1] accDest = dest[2]+':'+dest[3]+':'+dest[4]+':' makeTest = ControllerClass.doTheTest(callFrom, callPortName, accCaller, callTo, destPortName, destNo, accDest) makeTest.FuncTest() self.result = str(makeTest.testResult) else: self.result = 100 else: self.result = 100 self.killProc() # kill all the handler self.db.closeDBConn() #sleep(5) else: self.result = 500 else: self.result = 500 else: self.result = 333 return self.result def isThere(keyword,lists): x = 0 for item in lists: if item == keyword: return 1 else: x = x+1 def smartTest(self): self.initDB() self.smartResultList = list() deviceLists = self.db.deviceList() gsmList = list() gsmRZList = list() sipList = list() destList = list() self.db.closeDBConn() for lists in deviceLists: #define category of the device device = lists[0] if device[0:5] == 'GSMRZ': gsmRZList.append(device) elif device[0:5] == 'GSMEx': gsmList.append(device) else: sipList.append(device) if device[0:5] == 'GSMRZ' or device[0:5] == 'GSMEx': destList.append(device) #first test from university telphone network to random GSM RZ avaliable i = random.randint(0, len(gsmRZList)-1) callTo = gsmRZList[i] self.initTest('unisip',callTo) gsmRZList.remove(callTo) destList.remove(callTo) openBSC = self.result self.smartResultList.append(['unisip', callTo, self.result]) if openBSC == 200: for callFrom in gsmRZList: i = random.randint(0, len(destList)-1) #Check whether the caller and dest are same callTo = destList[i] if callFrom == callTo: #Check whether the caller and dest are same if i == 0: i = i+1 else: i = i-1 callTo = destList[i] self.initTest(callFrom,callTo) destList.remove(callTo) destList.remove(callFrom) gsmRZList.remove(callFrom) self.smartResultList.append([callFrom, callTo, self.result]) i = random.randint(0, len(gsmRZList)-1) # callTo = gsmRZList[i] self.initTest('landline',callTo) gsmRZList.remove(callTo) #destList.remove(callTo) self.smartResultList.append(['landline', callTo, self.result]) for callTo in destList: callFrom = 'sip' self.initTest(callFrom,callTo) self.smartResultList.append([callFrom, callTo, self.result]) return self.smartResultList