summaryrefslogtreecommitdiffstats
path: root/Code/Server-Code/initTestClass.py
diff options
context:
space:
mode:
Diffstat (limited to 'Code/Server-Code/initTestClass.py')
-rwxr-xr-xCode/Server-Code/initTestClass.py287
1 files changed, 287 insertions, 0 deletions
diff --git a/Code/Server-Code/initTestClass.py b/Code/Server-Code/initTestClass.py
new file mode 100755
index 0000000..8b854a7
--- /dev/null
+++ b/Code/Server-Code/initTestClass.py
@@ -0,0 +1,287 @@
+import sys
+import subprocess, signal
+import os
+import ControllerClass
+import DbClass
+import PingClass
+import random
+from time import sleep
+
+class initTesting:
+
+ def __init__(self):
+ self.messageList = list()
+
+ def pings(self,IP):
+
+ server = PingClass.Ping(IP)
+ self.serverStatus = server.ping(2)
+ return self.serverStatus
+
+ def initDB(self):
+
+ self.db = DbClass.DBMySQLConnection('root', 'randompasswordSQL', 'localhost', 'gsmselftesting')
+ self.db.connectDB()
+ self.dbStatus = self.db.connectDB()
+
+ def initaccount(self,account,handler):
+ if handler == 'sip' or handler == 'unisip' or handler == 'landline':
+ if account[1] != '' or account[2] != '' or account[3] != '' or account[4] != '': # checking available sip account, is there enough information about the account such as username, password,server
+ self.status = 1
+ else:
+ self.status = 0
+ else:
+ if account[0] != '' or account[1] != '':
+ self.status = 1
+ else:
+ self.status = 0
+
+
+#kill process to make sure, that the handler is Terminate incase handler having problem receiving
+# terminate message from controller
+
+ def killProc(self):
+ # define process name of the Handler
+ procNameDest = 'GSM Handler'
+ procNameCall = 'SIP Handler'
+
+ p = subprocess.Popen(['ps', '-A'], stdout=subprocess.PIPE)
+ out, err = p.communicate()
+
+ #search process name and kill it.
+ for line in out.splitlines():
+ if procNameDest in line:
+ pid = int(line.split(None, 1)[0])
+ os.kill(pid, signal.SIGKILL)
+ for line in out.splitlines():
+ if procNameCall in line:
+ pid = int(line.split(None, 1)[0])
+ os.kill(pid, signal.SIGKILL)
+
+
+ def initTest(self, callFrom, callTo):
+
+ self.initDB() # open database connection
+
+ if self.dbStatus != 0: # if connection to db establish, do the test
+
+ #fetch device account detail from database
+ dest = self.db.deviceAddress(str(callTo))
+ caller = self.db.deviceAddress(str(callFrom))
+
+ if self.pings(caller[4]) <> 0:
+
+ if self.pings(dest[4]) <> 0:
+
+ self.initaccount(caller,callFrom)
+ if self.status == 1:
+ self.initaccount(dest,callTo)
+ if self.status == 1:
+ callPortName = caller[0]
+ accCaller = caller[2]+':'+caller[3]+':'+caller[4]+':'
+
+ destPortName = dest[0]
+ destNo = dest[1]
+ accDest = dest[2]+':'+dest[3]+':'+dest[4]+':'
+
+ makeTest = ControllerClass.controller(callFrom, callPortName, accCaller, callTo, destPortName, destNo, accDest)
+ makeTest.FuncTest()
+ self.result = str(makeTest.testResult)
+
+ #print callFrom, callTo, makeTest.testResult
+ else:
+ self.result = 100
+ else:
+ self.result = 100
+ sleep(2)
+ self.killProc() # kill all the handler
+ self.db.closeDBConn() #close db connection
+ sleep(1)
+ else:
+ self.result = 501
+ else:
+ self.result = 500
+
+ else:
+ self.result = 333
+ self.db.closeDBConn()
+ return self.result
+
+ def errorCodes(self, callFrom, callTo, result):
+ self.initDB()
+ message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(result) + '|' + str(self.db.errorCode(result))
+ self.messageList.append(message)
+ self.db.closeDBConn()
+
+ # function to search in the list
+ def isThere(self, keyword,lists):
+ x = 0
+ for item in lists:
+
+ if item == keyword:
+ return 1
+ else:
+ x = x+1
+
+ def testDestination(self, callFrom, callTo, tried):
+ self.initTest(callFrom,callTo)
+ self.smartResultList.append([callFrom,callTo, self.result,tried])
+ self.errorCodes(callFrom, callTo, self.result)
+
+ def smartTest(self):
+ self.initDB()
+ self.smartResultList = list()
+ deviceLists = self.db.deviceList()
+ gsmList = list()
+ gsmRZList = list()
+ sipList = list()
+ destList = list()
+ rem = list()
+ item = list()
+
+ cpgsmRZList = list()
+ self.db.closeDBConn()
+
+ for lists in deviceLists: #define category of the device
+ device = lists[0]
+ if device[0:5] == 'GSMRZ':
+ gsmRZList.append(device)
+ cpgsmRZList.append(device)
+ elif device[0:5] == 'GSMEx':
+ gsmList.append(device)
+ else:
+ sipList.append(device)
+
+ if device[0:5] == 'GSMRZ' or device[0:5] == 'GSMEx' or device == 'sip':
+ destList.append(device)
+
+ #first test from university telphone network to random GSM RZ avaliable
+ i = random.randint(0, len(gsmRZList)-1)
+ callTo = gsmRZList[i]
+ callFrom = 'unisip'
+ self.testDestination(callFrom, callTo, 1)
+ gsmRZList.remove(callTo)
+ destList.remove(callTo)
+
+
+
+ for callFrom in gsmRZList:
+ i = random.randint(0, len(destList)-1) #Check whether the caller and dest are same
+ callTo = destList[i]
+ if callFrom == callTo: #Check whether the caller and dest are same
+ if i == 0:
+ i = i+1 # if it in the first list, change to be the second list else, just back on step.
+ else:
+ i = i-1
+ callTo = destList[i]
+
+ destList.remove(callTo)
+ destList.remove(callFrom)
+ gsmRZList.remove(callFrom)
+ self.testDestination(callFrom, callTo, 1)
+
+
+ # test incoming call from outside rz network to gsm rz
+ i = random.randint(0, len(gsmRZList)-1) #
+ callTo = gsmRZList[i]
+ callFrom = 'landline'
+
+ if self.isThere(callTo,destList) == 1: # Checking whether caller at gsmrz list in the destination list, if yes delete it.
+ destList.remove(callTo)
+ self.testDestination(callFrom, callTo, 1)
+
+ for callTo in destList:
+ callFrom = 'sip'
+ if callFrom != callTo:
+ self.testDestination(callFrom, callTo, 1)
+
+ #checking unsuccess call, to make sure that destination are really unreachable
+ for dest in self.smartResultList:
+ #check unsuccess call and did the test have already tried, 2 means has been check
+ if int(dest[2]) == 486 or int(dest[2]) == 999 or int(dest[2]) == 998:
+ if int(dest[3]) != 2 and dest[1] != 'sip':
+ testDestination = True
+ founds = False
+ testCaller = True
+ # make sure that destination have not tested by another part and give success result.
+ for test in self.smartResultList:
+ if test[1] == dest[1] or test[0] == dest[1]:
+ if int(test[2]) == 200:
+ testDestination = False
+ if test[1] == dest[0] or test[0] == dest[0]:
+ if int(test[2]) == 200:
+ testCaller = False
+ #if destination have not tested by other part. try to test from RZ GSM
+ if int(test[2]) == 200:
+ for caller in cpgsmRZList:
+ if caller == test[0] or caller == test[1]:
+ callFrom = caller
+ founds = True
+
+ if dest[0] != 'sip':
+ founds = False
+
+ if testDestination == True:
+ if founds != True:
+ callFrom = 'sip'
+ callTo = dest[1]
+ self.testDestination(callFrom, callTo, 2)
+ rem.append(dest)
+
+ #check unsuccess call because caller handler having problem
+ #destination handler having problem, we should make test also to the caller
+ if int(dest[2]) == 998:
+ if testCaller == True:
+ if founds != True:
+ callFrom = 'sip'
+ callTo = dest[0]
+ self.testDestination(callFrom, callTo, 2)
+ rem.append(dest)
+
+
+ caller = dest[0] # to test nanobts if the test come from RZ GSM but fehler
+ if caller[0:5] == 'GSMRZ' and int(dest[3]) != 2 and dest[1] != 'sip':
+ if int(dest[2]) == 486:
+ callFrom = 'sip'
+ self.testDestination(callFrom, dest[0], 2)
+ rem.append(dest)
+
+ # test to make sure nanoBTS working or not. sice probably that nanotbts seems error but actually not.
+ for RZ in cpgsmRZList:
+ repeat = False
+ for gsmrzResult in self.smartResultList:
+
+ if gsmrzResult[0] == RZ or gsmrzResult[1] == RZ:
+ if int(gsmrzResult[2]) == 486:
+ repeat = True
+ From = gsmrzResult[0]
+ To = gsmrzResult[1]
+ result = gsmrzResult[2]
+ if int(gsmrzResult[2]) == 200:
+ repeat = False
+ if gsmrzResult[1] == RZ and int(gsmrzResult[2]) == 998:
+ cpgsmRZList.remove(RZ)
+ if gsmrzResult[0] == RZ and int(gsmrzResult[2]) == 999:
+ cpgsmRZList.remove(RZ)
+
+ if len(cpgsmRZList) > 1:
+ if repeat == True:
+ i = random.randint(0, len(cpgsmRZList)-1) #
+ if i == 0:
+ x = i+1
+ else:
+ x = i-1
+ self.testDestination(cpgsmRZList[x], cpgsmRZList[i], 2)
+ item = '['+str(From)+','+str(callTo)+','+str(result)+','+str(1)+']'
+ rem.append(item)
+
+ for remov in rem:
+ for x in self.smartResultList:
+ if x == remov:
+ self.smartResultList.remove(x)
+
+ return self.smartResultList
+
+
+
+