summaryrefslogtreecommitdiffstats
path: root/Under-Testing/Server-Code-New/gsmselftest.py
diff options
context:
space:
mode:
Diffstat (limited to 'Under-Testing/Server-Code-New/gsmselftest.py')
-rw-r--r--Under-Testing/Server-Code-New/gsmselftest.py795
1 files changed, 0 insertions, 795 deletions
diff --git a/Under-Testing/Server-Code-New/gsmselftest.py b/Under-Testing/Server-Code-New/gsmselftest.py
deleted file mode 100644
index 082b19f..0000000
--- a/Under-Testing/Server-Code-New/gsmselftest.py
+++ /dev/null
@@ -1,795 +0,0 @@
-#! /usr/bin/env python
-from serial import * #serial port library
-import sys
-import ControllerClass
-import DbClass
-import PingClass
-import truthtableClass
-import initTestClass
-import usbDetectClass
-import WebsiteCommClass
-import signal
-import random
-from time import sleep
-import setproctitle
-
-setproctitle.setproctitle('Controller Software')
-
-class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
-
- def disable(self):
- self.HEADER = ''
- self.OKBLUE = ''
- self.OKGREEN = ''
- self.WARNING = ''
- self.FAIL = ''
- self.ENDC = ''
-class TimeoutException(Exception):
- pass
-
-global resultsList
-resultsList = list()
-global taskNo
-dbStatus = None
-global caller
-global callee
-
-def timeout_handler(signum, frame):
- raise TimeoutException()
-
-def allPing(): #ping all existing devices
-
- global sipGate
- global sipServer
- global unisip
- global gsmBox1
- global gsmBox2
-
- serverAdd = db.deviceAddress(str('landline'))
- server = PingClass.Ping(str(serverAdd[4]))
- sipGate = server.ping(1)
-
- serverAdd = db.deviceAddress(str('sip'))
- server = PingClass.Ping(str(serverAdd[4]))
- sipServer = server.ping(1)
-
- serverAdd = db.deviceAddress(str('unisip'))
- server = PingClass.Ping(str(serverAdd[4]))
- unisip = server.ping(1)
-
- serverAdd = db.deviceAddress(str('GSMRZ2'))
- server = PingClass.Ping(str(serverAdd[4]))
- gsmBox1 = server.ping(1)
-
- serverAdd = db.deviceAddress(str('GSMRZ3'))
- server = PingClass.Ping(str(serverAdd[4]))
- gsmBox2 = server.ping(1)
-
-def initDB(): # function for connection database
- global dbStatus
- global db
- if dbStatus != 1:
- db = DbClass.DBMySQLConnection()
- db.connectDB()
- dbStatus = db.connectDB()
- if dbStatus == 1:
- db.lockMutex(600)
-
-def initNagiosString(x):
-
- initResult = truthtableClass.trueTable(x)
- initResult.initNagiosResult()
-
- if int(initResult.FAILED) != 0:
- print "GSM CRITICAL - Number of test "+str(initResult.testMount)+'; Failure '+str(initResult.FAILED)+'; Unknown '+str(initResult.handlerError)
- elif int(initResult.handlerError) != 0:
- print "GSM WARNING - Number of test "+str(initResult.testMount)+'; Failure '+str(initResult.FAILED)+'; Unknown '+str(initResult.handlerError)
- elif int(initResult.FAILED) == 0 and int(initResult.handlerError) == 0:
- print "GSM OK - Number of test "+str(initResult.testMount)+'; Failure '+str(initResult.FAILED)+'; Unknown '+str(initResult.handlerError)
- else:
-
- print "unknown error"
-
-def initTrueTable(x):
-
- initResult = truthtableClass.trueTable(x)
- initResult.initTrueTable()
-
- print '\n'
- openBSC = None
- asterikServer = None
- for x in initResult.nanoBts:
-
- name = x[0]
- if x[1] == True:
- openBSC = True
- asterikServer = True
- print bcolors.OKGREEN +name+ ' Working'+ bcolors.ENDC
-
- else:
- if int(x[1]) == 486:
- print bcolors.FAIL+name+ ' not Working' + bcolors.ENDC
- elif int(x[1]) == 200:
- print bcolors.OKGREEN +name+ ' Working'
- asterikServer = True
- elif int(x[1]) == 402:
- print bcolors.WARNING +name+ ' not Working need top up the credit'
- asterikServer = True
- elif int(x[1]) == 998 or int(x[1]) == 999:
- print bcolors.FAIL+name+ ' not Working, handler error'+ bcolors.ENDC
- elif int(x[1]) == 801 or int(x[1]) == 802:
- print bcolors.FAIL+name+ ' Device Error, Check the device'+ bcolors.ENDC
- else:
- print bcolors.OKGREEN +name+ ' not Working'+ bcolors.ENDC
-
- print ''
- if openBSC != None:
- if openBSC == True:
- print bcolors.OKGREEN +'openBSC working'+ bcolors.ENDC
- else:
- print bcolors.FAIL+'openBSC doesnt work'+ bcolors.ENDC
- print ''
-
- if initResult.asteriskServer == True or asterikServer == True:
- print bcolors.OKGREEN +'asterik server is working'+ bcolors.ENDC
- print ''
-
- if initResult.outGoingRZ == True:
- print bcolors.OKGREEN +'Outgoing call from RZ is working'+ bcolors.ENDC
- elif initResult.outGoingRZ == False:
- print bcolors.FAIL+'Outgoing call from RZ is not working'+ bcolors.ENDC
-
- if initResult.incomingRZ == True:
- print bcolors.OKGREEN +'incoming call from outside RZ to GSM RZ is working'+ bcolors.ENDC
- elif initResult.incomingRZ == False:
- print bcolors.FAIL+'incoming call from outside RZ to GSM RZ is not working'+ bcolors.ENDC
- print '\n'
-
-def errorCodes(callFrom, callTo, result):
- message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(result) + '|' + str(db.errorCode(result))
-
- sendResultWebsite(message)
-
- # function to search in the list
-def isThere(keyword,lists):
- x = 0
- for item in lists:
-
- if item == keyword:
- return 1
- else:
- x = x+1
-
-def testDest(callFrom, callTo, tried):
-
- makeTest.initTest(callFrom,callTo)
- db.insertTask(taskNum,callFrom,callTo)
- smartResultList.append([callFrom,callTo, makeTest.result,tried])
- taskID = db.maxTaskID()
- db.addResult(taskID, makeTest.result)
- if WebStatus == True:
- errorCodes(callFrom, callTo, makeTest.result)
-
-def smartTest():
- global smartResultList
- smartResultList = list()
- deviceLists = db.deviceList()
- gsmList = list()
- gsmRZList = list()
- sipList = list()
- destList = list()
- rem = list()
- item = list()
-
- cpgsmRZList = list()
-
- for lists in deviceLists: #define category of the device
- device = lists[0]
- if device[0:5] == 'GSMRZ':
- gsmRZList.append(device)
- cpgsmRZList.append(device)
- elif device[0:5] == 'GSMEx':
- gsmList.append(device)
- else:
- sipList.append(device)
-
- if device[0:5] == 'GSMRZ' or device[0:5] == 'GSMEx' or device == 'sip':
- destList.append(device)
-
- #first test from university telphone network to random GSM RZ avaliable
- i = random.randint(0, len(gsmRZList)-1)
- callTo = gsmRZList[i]
- callFrom = 'unisip'
- testDest(callFrom, callTo, 1)
- gsmRZList.remove(callTo)
- destList.remove(callTo)
-
-
- for callFrom in gsmRZList:
- i = random.randint(0, len(destList)-1) #Check whether the caller and dest are same
- callTo = destList[i]
- if callFrom == callTo: #Check whether the caller and dest are same
- if i == 0:
- i = i+1 # if it in the first list, change to be the second list else, just back on step.
- else:
- i = i-1
- callTo = destList[i]
-
- destList.remove(callTo)
- destList.remove(callFrom)
- gsmRZList.remove(callFrom)
- testDest(callFrom, callTo, 1)
-
-
- # test incoming call from outside rz network to gsm rz
- i = random.randint(0, len(gsmRZList)-1) #
- callTo = gsmRZList[i]
- callFrom = 'landline'
-
- if isThere(callTo,destList) == 1: # Checking whether caller at gsmrz list in the destination list, if yes delete it.
- destList.remove(callTo)
- testDest(callFrom, callTo, 1)
-
- for callTo in destList:
- callFrom = 'sip'
- if callFrom != callTo:
- testDest(callFrom, callTo, 1)
-
- #checking unsuccess call, to make sure that destination are really unreachable
- for dest in smartResultList:
- #check unsuccess call and did the test have already tried, 2 means has been check
- if int(dest[2]) == 486 or int(dest[2]) == 999 or int(dest[2]) == 998 or int(dest[2]) == 801 or int(dest[2]) == 802:
-
- if int(dest[3]) != 2 and dest[1] != 'sip':
- testDestination = True
- testFromRZ = False
- testCaller = True
- # make sure that destination have not tested by another part and give success result.
- for test in smartResultList:
- if test[1] == dest[1] or test[0] == dest[1]:
- if int(test[2]) == 200:
- testDestination = False
- if test[1] == dest[0] or test[0] == dest[0]:
- if int(test[2]) == 200:
- testCaller = False
- #if destination have not tested by other part. try to test from RZ GSM
- if int(test[2]) == 200 and testFromRZ != True:
- for caller in cpgsmRZList:
- if caller == test[0] or caller == test[1]:
- callFrom = caller
- testFromRZ = True
-
- if testDestination == True:
- if testFromRZ != True:
- callFrom = 'sip'
- callTo = dest[1]
- testDest(callFrom, callTo, 2)
- rem.append(dest)
-
- #check unsuccess call because caller handler having problem
- #destination handler having problem, we should make test also to the caller
- if int(dest[2]) == 998 or int(dest[2]) == 802 or int(dest[2]) == 486:
- if testCaller == True:
- if testFromRZ != True:
- callFrom = 'sip'
- callTo = dest[0]
- testDest(callFrom, callTo, 2)
- rem.append(dest)
-
-
- caller = dest[0] # to test nanobts if the test come from RZ GSM but fehler
- if caller[0:5] == 'GSMRZ' and int(dest[3]) != 2 and dest[1] != 'sip':
- repeatTest = True
- for test in smartResultList:
- if test[1] == caller or test[0] == caller:
- repeatTest = False
- if int(dest[2]) == 486 or int(dest[2]) == 402:
- if repeatTest == True:
- callFrom = 'sip'
- testDest(callFrom, dest[0], 2)
- rem.append(dest)
-
- # test to make sure nanoBTS working or not. sice probably that nanotbts seems error but actually not.
- for RZ in cpgsmRZList:
- repeat = False
- for gsmrzResult in smartResultList:
- if gsmrzResult[0] == RZ or gsmrzResult[1] == RZ:
- if int(gsmrzResult[2]) == 486 or int(gsmrzResult[2]) == 801 or int(gsmrzResult[2]) == 802:
- repeat = True
- From = gsmrzResult[0]
- To = gsmrzResult[1]
- result = gsmrzResult[2]
- if int(gsmrzResult[2]) == 200:
- repeat = False
- if gsmrzResult[1] == RZ and int(gsmrzResult[2]) == 998:
- try:
- cpgsmRZList.remove(RZ)
- except ValueError:
- message = 'Error'
- if gsmrzResult[0] == RZ and int(gsmrzResult[2]) == 999:
- try:
- cpgsmRZList.remove(RZ)
- except ValueError:
- message = 'Error'
-
- if len(cpgsmRZList) > 1:
- if repeat == True:
- i = random.randint(0, len(cpgsmRZList)-1) #
- if i == 0:
- x = i+1
- else:
- x = i-1
- testDest(cpgsmRZList[x], cpgsmRZList[i], 2)
- item = '['+str(From)+','+str(callTo)+','+str(result)+','+str(1)+']'
- rem.append(item)
-
- for remov in rem:
- for x in smartResultList:
- if x == remov:
- try:
- smartResultList.remove(x)
- except ValueError:
- message = 'Error'
- return smartResultList
-
-def doSmartTest(status):
- global taskNum, printMessage
- global WebStatus
- initDB()
- taskNum = db.maxTaskNo()
- global makeTest
- if status == True:
- WebStatus = True
-
- else:
- WebStatus = False
-
- makeTest = initTestClass.initTesting()
- result = smartTest()
- if status == 'NAGIOS':
- initNagiosString(result)
-
- elif status == False:
- initTrueTable(result)
-
- if status == True:
- sendFinishMessage()
-
-
-def doSipTest():
- destList = ['landline', 'unisip']
- doTest = initTestClass.initTesting()
- for callTo in destList:
-
- callFrom = 'sip'
- doTest.initTest(callFrom,callTo)
- resultsList.append([callFrom, callTo, doTest.result])
- initTrueTable(resultsList)
-def doIncomingTest(): #incoming call to RZ network
-
- destList = ['GSMRZ1','unisip', 'GSMRZ2','GSMRZ3']
- doTest = initTestClass.initTesting()
- for callTo in destList:
- callFrom = 'landline'
- doTest.initTest(callFrom,callTo)
- resultsList.append([callFrom, callTo, doTest.result])
- initTrueTable(resultsList)
-
-def doGsmrzTest():
-
- destList = ['GSMRZ1','GSMRZ2', 'GSMRZ3']
- callList = ['sip']
- doTest = initTestClass.initTesting()
-
- for callFrom in callList:
- for callTo in destList:
- doTest.initTest(callFrom,callTo)
- resultsList.append([callFrom, callTo, doTest.result])
- initTrueTable(resultsList)
-
-def doGsmExtTest():
-
- destList = ['GSMExt.O2', 'GSMExt.Voda', 'GSMExt.Eplus', 'GSMExt.Tm']
- callList = ['sip']
-
- doTest = initTestClass.initTesting()
-
- for callFrom in callList:
- for callTo in destList:
- doTest.initTest(callFrom,callTo)
- resultsList.append([callFrom, callTo, doTest.result])
- initTrueTable(resultsList)
-
-def doAllTest():
-
- doSipTest()
- doIncomingTest()
- doGsmrzTest()
- doGsmExtTest()
-
-
-def sendResultWebsite(message):
- if server.sendData(message+ chr(10)) == 1:
- print 'data sent successfully'
- test = server.receiveData(5)
- if test == 'TIMEOUT':
- closeFunction(db,server)
- sys.exit(2)
- if test == 'CONTINUE':
- print 'continue'
-
-def sendFinishMessage():
- if server.connected == 1:
- server.sendData('TEST DONE\n')
- test = server.receiveData(5)
- if test == 'TIMEOUT':
- closeFunction(db,server)
- if test == 'DISCONNECT':
- close = server.closeConnection()
- if close == 1:
- print 'Closed connection successfully'
-
- print 'release mutex says ', db.releaseMutex()
-
-def withDB(x):
-
- initDB()
-
- if x == False:
- deviceLists = db.deviceList()
- callerFound = False
- calleeFound = False
- for device in deviceLists:
- if caller == device[0]:
- callerFound = True
- break
-
- for device in deviceLists:
- if callee == device[0]:
- calleeFound = True
- break
- if callerFound != True or calleeFound != True:
- if callerFound != True:
- print 'No device with name', caller
- db.closeDBConn()
- sys.exit(1)
- if calleeFound != True:
- print 'No device with name', callee
- db.closeDBConn()
- sys.exit(1)
- if callerFound == True and calleeFound == True:
- taskNumber = db.maxTaskNo()
- db.insertTaskIn2(caller, callee, taskNumber)
- resultsList = list()
- if dbStatus == 1: # Checking connection to database
- if db.anyTasksToDo() == 1: # Checking task on the table
-
- allPing()
- i=0
- makeTest = initTestClass.initTesting()
-
- for item in db.tasksList:
-
- taskID = item[0]
- taskNo = item[1]
- callFrom = item[2]
- callTo = item[3]
-
- if i == 0:
- db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2)
- print '\n'
- print 'Task ID :', taskID
- print 'Calling From :', callFrom
- print 'To :', callTo
-
-
- makeTest.initTest(callFrom,callTo)
-
- db.addResult(taskID, makeTest.result)
-
- resultsList.append([callFrom, callTo, makeTest.result])
-
- db.errorCode(makeTest.result)
-
- if int(makeTest.result) == 200:
- print bcolors.OKGREEN +'Result : ' +str(makeTest.result)+ ' ' +db.errCode + bcolors.ENDC
- else:
- print bcolors.FAIL+'Result : ' +str(makeTest.result)+ ' ' +db.errCode + bcolors.ENDC
-
- if x == True: # if x = True means that this function call by website
-
- message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(makeTest.result) + '|' + str(db.errCode)
- sendResultWebsite(message) # send result to website
-
- db.deleteTempTask(taskID)
- i = i+1
-
- db.cleanTasksList()
-
- if x == True:
- sendFinishMessage() #send finish message to website and close the connection
-
- print '\n'
-
- initTrueTable(resultsList) # fetch result list and make adjustment about the result
- db.closeDBConn()
- else:
- print bcolors.FAIL+"--- No job at all ---" + bcolors.ENDC
- db.closeDBConn()
- else:
- print bcolors.FAIL+'Cant connect to database'+ bcolors.ENDC
- sys.exit(1)
-
-def findPort(portName): # take information in existing usb port
- global connect
- global prefix
- global num
- global IMEI
- global portClass
- sleep(0.5)
- portLog = os.popen('dmesg | grep \'pl2303 converter now attached to '+portName+'\'').read()
- sleep(0.5)
- if portLog != '':
- connect = 1
- portClass = usbDetectClass.serialPort(portName)
- portClass.findNumber()
- portClass.findIMEI()
- IMEI = portClass.IMEI
- num = portClass.number
- number = portClass.number
- prefix = number[0:4]
- else:
- connect = 0
-
-def initDevice(deviceName):
- print 'Device Name :',deviceName
- print ' Device IMEI : ',
- imei = sys.stdin.readline().rstrip("\r\n")
- print 'Phone number : ',
- number = sys.stdin.readline().rstrip("\r\n")
- print 'Port Name : /dev/',
- portName = sys.stdin.readline().rstrip("\r\n")
- print ''
-
- if imei == '' or portName == '':
- print bcolors.FAIL+' == cant save device configuration, please fill IMEI / port name of the device =='+ bcolors.ENDC
- else:
- findPort(portName)
- if connect == 1:
- if str(IMEI) != str(imei) and str(num) != str(number):
- print bcolors.FAIL+'== error, device not found =='+ bcolors.ENDC
- elif str(IMEI) == str(imei):
- portClass.initUpdate(deviceName, portName, number)
- print bcolors.OKGREEN +'== Device succeced added =='+ bcolors.ENDC
- elif str(num) == str(number) and str(IMEI) != str(imei):
- portClass.initUpdate(deviceName, portName, number)
- print bcolors.WARNING+'== Device succeced added, but have different IMEI =='+ bcolors.ENDC
- else:
- print bcolors.FAIL+'== error, no device connected =='+ bcolors.ENDC
-
-def autoUpdateDevice(status):
- initDB()
- GSMListPrefix = list()
- GSMListPrefix = db.GSMPrefix()
- i = 0
- x = 0
- count = 0
- while i !=10:
- portName ='ttyUSB'+str(i) #checking usb connection
-
- i=i+1
- old_handler = signal.signal(signal.SIGALRM, timeout_handler)
- signal.alarm(15)
- try:
- findPort(portName)
- signal.signal(signal.SIGALRM, old_handler)
- signal.alarm(0)
- if connect == 1:
- for listNum in GSMListPrefix:
- if prefix == listNum[1]:
- print 'Device Name :',listNum[0]
- print 'IMEI :',IMEI
- print 'Phone Number :',num
- print 'Port Name : /dev/'+portName
- x=x+1
- newPortName = '/dev/'+portName
- portClass.initUpdate(listNum[0], newPortName, num)
- print '\n'
- except TimeoutException:
- signal.signal(signal.SIGALRM, old_handler)
- signal.alarm(0)
- count = count + 1
- message = "Timeout"
- if status == True:
- sendFinishMessage()
- else:
-
- print '== FINISH =='
- print 'Found '+str(x)+' devices'
-
-def updateDevice(): #update port name list of device on DB
- quit = False
- while quit != True:
- print ''
- print "Mobile device configuration"
- print "Menu: a = automatic device configuration, m = Manual configuration, q = quit : ",
- input = sys.stdin.readline().rstrip("\r\n")
- print ''
- if input == 'm': # manual configuration
-
- while True:
- print ''
- print "Mobile device name: "
- print " 1. GSM O2"
- print " 2. GSM Vodafone"
- print " 3. GSM Eplus"
- print " 4. GSM T-Mobile"
- print " 5. GSM RZ 1"
- print " 6. Back to menu"
- print ""
- print "your choise : ",
- input = sys.stdin.readline().rstrip("\r\n")
-
- if input == '6':
- break
- elif input == '1':
- initDevice('GSMExt.O2')
- elif input == '2':
- initDevice('GSMExt.Voda')
- elif input == '3':
- initDevice('GSMExt.Eplus')
- elif input == '4':
- initDevice('GSMExt.Tm')
- elif input == '5':
- initDevice('GSMRZ1')
- else:
- print 'please choose between 1-6'
-
- if input == 'a': #automatic configuration
- autoUpdateDevice(False)
-
- if input == "q":
- break
- sys.exit()
-
-def closeFunction(dbConn,serverSocket):
- print 'Release the mutex: ' + str(dbConn.releaseMutex())
- print 'Close the DB Connection: ' + str(dbConn.closeDBConn())
- del dbConn
- del serverSocket
- sys.exit()
-
-if len(sys.argv) > 1:
-
- command = sys.argv[1]
-
- if command == '--all':
- resultsList = list()
- doAllTest()
-
- elif command == '--sip':
- resultsList = list()
- doSipTest()
-
- elif command == '--gsmrz':
- resultsList = list()
- doGsmrzTest()
-
- elif command == '--gsmext':
- resultsList = list()
- doGsmExtTest()
-
- elif command == '--incoming':
- resultsList = list()
- doIncomingTest()
-
- elif command == '--smart':
- initDB()
- resultsList = list()
- allPing()
- taskNo = db.maxTaskNo()
- db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2)
- doSmartTest(False)
-
- elif command == '--nagios':
- initDB()
- resultsList = list()
- allPing()
- taskNo = db.maxTaskNo()
- db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2)
- doSmartTest('NAGIOS')
-
- elif command == '--devconf':
- updateDevice()
-
- elif command == '--db':
- if len(sys.argv) > 2:
- try:
- caller = sys.argv[2]
- callee = sys.argv[3]
-
- except ValueError:
- print "Error given caller and destination. Type '--help' for more information."
- else:
- print "Error given caller and destination. Type '--help' for more information."
- sys.exit()
- resultsList = list()
- withDB(False)
-
- elif command == '--help':
- file = open('help.txt', 'r')
- print file.read()
-
- else:
- print "command not found, Type '--help' for more information."
- print '\n'
-else:
- global server
- global tried
-
-
- initDB() # should put db condition
- server = WebsiteCommClass.ServerHandlerSoftware(34500) #define the port
- tried = server.openSocket(10)
-
- if tried == 'TIMEOUT':
- closeFunction(db,server)
-
- test = server.receiveData(10)
- if test == 'TIMEOUT':
- closeFunction(db,server)
-
- if test == 'START TEST':
- server.sendData('CONFIRM\n')
- print 'TEST STARTED'
- withDB(True)
-
- if test == 'SMART TEST':
- allPing()
- taskNo = db.maxTaskNo()
- db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2)
- server.sendData('CONFIRM\n')
- print 'SMART TEST STARTED'
- resultsList = list()
- doSmartTest(True)
-
- if test == 'UPDATE DEVICE':
- server.sendData('CONFIRM\n')
- print 'UPDATE DEVICE STARTED'
- autoUpdateDevice(True)
- else:
- sys.exit('WE DIDN\'T RECEIVE THE CONFIRMATION')
-
- db.closeDBConn()
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-