From b99193ba68f947802245a288940e54904ecbee61 Mon Sep 17 00:00:00 2001 From: Triatmoko Date: Fri, 18 Nov 2011 13:23:49 +0100 Subject: final version --- Under-Testing/Server-Code-New/gsmselftest.py | 795 --------------------------- 1 file changed, 795 deletions(-) delete mode 100644 Under-Testing/Server-Code-New/gsmselftest.py (limited to 'Under-Testing/Server-Code-New/gsmselftest.py') diff --git a/Under-Testing/Server-Code-New/gsmselftest.py b/Under-Testing/Server-Code-New/gsmselftest.py deleted file mode 100644 index 082b19f..0000000 --- a/Under-Testing/Server-Code-New/gsmselftest.py +++ /dev/null @@ -1,795 +0,0 @@ -#! /usr/bin/env python -from serial import * #serial port library -import sys -import ControllerClass -import DbClass -import PingClass -import truthtableClass -import initTestClass -import usbDetectClass -import WebsiteCommClass -import signal -import random -from time import sleep -import setproctitle - -setproctitle.setproctitle('Controller Software') - -class bcolors: - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - - def disable(self): - self.HEADER = '' - self.OKBLUE = '' - self.OKGREEN = '' - self.WARNING = '' - self.FAIL = '' - self.ENDC = '' -class TimeoutException(Exception): - pass - -global resultsList -resultsList = list() -global taskNo -dbStatus = None -global caller -global callee - -def timeout_handler(signum, frame): - raise TimeoutException() - -def allPing(): #ping all existing devices - - global sipGate - global sipServer - global unisip - global gsmBox1 - global gsmBox2 - - serverAdd = db.deviceAddress(str('landline')) - server = PingClass.Ping(str(serverAdd[4])) - sipGate = server.ping(1) - - serverAdd = db.deviceAddress(str('sip')) - server = PingClass.Ping(str(serverAdd[4])) - sipServer = server.ping(1) - - serverAdd = db.deviceAddress(str('unisip')) - server = PingClass.Ping(str(serverAdd[4])) - unisip = server.ping(1) - - serverAdd = db.deviceAddress(str('GSMRZ2')) - server = PingClass.Ping(str(serverAdd[4])) - gsmBox1 = server.ping(1) - - serverAdd = db.deviceAddress(str('GSMRZ3')) - server = PingClass.Ping(str(serverAdd[4])) - gsmBox2 = server.ping(1) - -def initDB(): # function for connection database - global dbStatus - global db - if dbStatus != 1: - db = DbClass.DBMySQLConnection() - db.connectDB() - dbStatus = db.connectDB() - if dbStatus == 1: - db.lockMutex(600) - -def initNagiosString(x): - - initResult = truthtableClass.trueTable(x) - initResult.initNagiosResult() - - if int(initResult.FAILED) != 0: - print "GSM CRITICAL - Number of test "+str(initResult.testMount)+'; Failure '+str(initResult.FAILED)+'; Unknown '+str(initResult.handlerError) - elif int(initResult.handlerError) != 0: - print "GSM WARNING - Number of test "+str(initResult.testMount)+'; Failure '+str(initResult.FAILED)+'; Unknown '+str(initResult.handlerError) - elif int(initResult.FAILED) == 0 and int(initResult.handlerError) == 0: - print "GSM OK - Number of test "+str(initResult.testMount)+'; Failure '+str(initResult.FAILED)+'; Unknown '+str(initResult.handlerError) - else: - - print "unknown error" - -def initTrueTable(x): - - initResult = truthtableClass.trueTable(x) - initResult.initTrueTable() - - print '\n' - openBSC = None - asterikServer = None - for x in initResult.nanoBts: - - name = x[0] - if x[1] == True: - openBSC = True - asterikServer = True - print bcolors.OKGREEN +name+ ' Working'+ bcolors.ENDC - - else: - if int(x[1]) == 486: - print bcolors.FAIL+name+ ' not Working' + bcolors.ENDC - elif int(x[1]) == 200: - print bcolors.OKGREEN +name+ ' Working' - asterikServer = True - elif int(x[1]) == 402: - print bcolors.WARNING +name+ ' not Working need top up the credit' - asterikServer = True - elif int(x[1]) == 998 or int(x[1]) == 999: - print bcolors.FAIL+name+ ' not Working, handler error'+ bcolors.ENDC - elif int(x[1]) == 801 or int(x[1]) == 802: - print bcolors.FAIL+name+ ' Device Error, Check the device'+ bcolors.ENDC - else: - print bcolors.OKGREEN +name+ ' not Working'+ bcolors.ENDC - - print '' - if openBSC != None: - if openBSC == True: - print bcolors.OKGREEN +'openBSC working'+ bcolors.ENDC - else: - print bcolors.FAIL+'openBSC doesnt work'+ bcolors.ENDC - print '' - - if initResult.asteriskServer == True or asterikServer == True: - print bcolors.OKGREEN +'asterik server is working'+ bcolors.ENDC - print '' - - if initResult.outGoingRZ == True: - print bcolors.OKGREEN +'Outgoing call from RZ is working'+ bcolors.ENDC - elif initResult.outGoingRZ == False: - print bcolors.FAIL+'Outgoing call from RZ is not working'+ bcolors.ENDC - - if initResult.incomingRZ == True: - print bcolors.OKGREEN +'incoming call from outside RZ to GSM RZ is working'+ bcolors.ENDC - elif initResult.incomingRZ == False: - print bcolors.FAIL+'incoming call from outside RZ to GSM RZ is not working'+ bcolors.ENDC - print '\n' - -def errorCodes(callFrom, callTo, result): - message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(result) + '|' + str(db.errorCode(result)) - - sendResultWebsite(message) - - # function to search in the list -def isThere(keyword,lists): - x = 0 - for item in lists: - - if item == keyword: - return 1 - else: - x = x+1 - -def testDest(callFrom, callTo, tried): - - makeTest.initTest(callFrom,callTo) - db.insertTask(taskNum,callFrom,callTo) - smartResultList.append([callFrom,callTo, makeTest.result,tried]) - taskID = db.maxTaskID() - db.addResult(taskID, makeTest.result) - if WebStatus == True: - errorCodes(callFrom, callTo, makeTest.result) - -def smartTest(): - global smartResultList - smartResultList = list() - deviceLists = db.deviceList() - gsmList = list() - gsmRZList = list() - sipList = list() - destList = list() - rem = list() - item = list() - - cpgsmRZList = list() - - for lists in deviceLists: #define category of the device - device = lists[0] - if device[0:5] == 'GSMRZ': - gsmRZList.append(device) - cpgsmRZList.append(device) - elif device[0:5] == 'GSMEx': - gsmList.append(device) - else: - sipList.append(device) - - if device[0:5] == 'GSMRZ' or device[0:5] == 'GSMEx' or device == 'sip': - destList.append(device) - - #first test from university telphone network to random GSM RZ avaliable - i = random.randint(0, len(gsmRZList)-1) - callTo = gsmRZList[i] - callFrom = 'unisip' - testDest(callFrom, callTo, 1) - gsmRZList.remove(callTo) - destList.remove(callTo) - - - for callFrom in gsmRZList: - i = random.randint(0, len(destList)-1) #Check whether the caller and dest are same - callTo = destList[i] - if callFrom == callTo: #Check whether the caller and dest are same - if i == 0: - i = i+1 # if it in the first list, change to be the second list else, just back on step. - else: - i = i-1 - callTo = destList[i] - - destList.remove(callTo) - destList.remove(callFrom) - gsmRZList.remove(callFrom) - testDest(callFrom, callTo, 1) - - - # test incoming call from outside rz network to gsm rz - i = random.randint(0, len(gsmRZList)-1) # - callTo = gsmRZList[i] - callFrom = 'landline' - - if isThere(callTo,destList) == 1: # Checking whether caller at gsmrz list in the destination list, if yes delete it. - destList.remove(callTo) - testDest(callFrom, callTo, 1) - - for callTo in destList: - callFrom = 'sip' - if callFrom != callTo: - testDest(callFrom, callTo, 1) - - #checking unsuccess call, to make sure that destination are really unreachable - for dest in smartResultList: - #check unsuccess call and did the test have already tried, 2 means has been check - if int(dest[2]) == 486 or int(dest[2]) == 999 or int(dest[2]) == 998 or int(dest[2]) == 801 or int(dest[2]) == 802: - - if int(dest[3]) != 2 and dest[1] != 'sip': - testDestination = True - testFromRZ = False - testCaller = True - # make sure that destination have not tested by another part and give success result. - for test in smartResultList: - if test[1] == dest[1] or test[0] == dest[1]: - if int(test[2]) == 200: - testDestination = False - if test[1] == dest[0] or test[0] == dest[0]: - if int(test[2]) == 200: - testCaller = False - #if destination have not tested by other part. try to test from RZ GSM - if int(test[2]) == 200 and testFromRZ != True: - for caller in cpgsmRZList: - if caller == test[0] or caller == test[1]: - callFrom = caller - testFromRZ = True - - if testDestination == True: - if testFromRZ != True: - callFrom = 'sip' - callTo = dest[1] - testDest(callFrom, callTo, 2) - rem.append(dest) - - #check unsuccess call because caller handler having problem - #destination handler having problem, we should make test also to the caller - if int(dest[2]) == 998 or int(dest[2]) == 802 or int(dest[2]) == 486: - if testCaller == True: - if testFromRZ != True: - callFrom = 'sip' - callTo = dest[0] - testDest(callFrom, callTo, 2) - rem.append(dest) - - - caller = dest[0] # to test nanobts if the test come from RZ GSM but fehler - if caller[0:5] == 'GSMRZ' and int(dest[3]) != 2 and dest[1] != 'sip': - repeatTest = True - for test in smartResultList: - if test[1] == caller or test[0] == caller: - repeatTest = False - if int(dest[2]) == 486 or int(dest[2]) == 402: - if repeatTest == True: - callFrom = 'sip' - testDest(callFrom, dest[0], 2) - rem.append(dest) - - # test to make sure nanoBTS working or not. sice probably that nanotbts seems error but actually not. - for RZ in cpgsmRZList: - repeat = False - for gsmrzResult in smartResultList: - if gsmrzResult[0] == RZ or gsmrzResult[1] == RZ: - if int(gsmrzResult[2]) == 486 or int(gsmrzResult[2]) == 801 or int(gsmrzResult[2]) == 802: - repeat = True - From = gsmrzResult[0] - To = gsmrzResult[1] - result = gsmrzResult[2] - if int(gsmrzResult[2]) == 200: - repeat = False - if gsmrzResult[1] == RZ and int(gsmrzResult[2]) == 998: - try: - cpgsmRZList.remove(RZ) - except ValueError: - message = 'Error' - if gsmrzResult[0] == RZ and int(gsmrzResult[2]) == 999: - try: - cpgsmRZList.remove(RZ) - except ValueError: - message = 'Error' - - if len(cpgsmRZList) > 1: - if repeat == True: - i = random.randint(0, len(cpgsmRZList)-1) # - if i == 0: - x = i+1 - else: - x = i-1 - testDest(cpgsmRZList[x], cpgsmRZList[i], 2) - item = '['+str(From)+','+str(callTo)+','+str(result)+','+str(1)+']' - rem.append(item) - - for remov in rem: - for x in smartResultList: - if x == remov: - try: - smartResultList.remove(x) - except ValueError: - message = 'Error' - return smartResultList - -def doSmartTest(status): - global taskNum, printMessage - global WebStatus - initDB() - taskNum = db.maxTaskNo() - global makeTest - if status == True: - WebStatus = True - - else: - WebStatus = False - - makeTest = initTestClass.initTesting() - result = smartTest() - if status == 'NAGIOS': - initNagiosString(result) - - elif status == False: - initTrueTable(result) - - if status == True: - sendFinishMessage() - - -def doSipTest(): - destList = ['landline', 'unisip'] - doTest = initTestClass.initTesting() - for callTo in destList: - - callFrom = 'sip' - doTest.initTest(callFrom,callTo) - resultsList.append([callFrom, callTo, doTest.result]) - initTrueTable(resultsList) -def doIncomingTest(): #incoming call to RZ network - - destList = ['GSMRZ1','unisip', 'GSMRZ2','GSMRZ3'] - doTest = initTestClass.initTesting() - for callTo in destList: - callFrom = 'landline' - doTest.initTest(callFrom,callTo) - resultsList.append([callFrom, callTo, doTest.result]) - initTrueTable(resultsList) - -def doGsmrzTest(): - - destList = ['GSMRZ1','GSMRZ2', 'GSMRZ3'] - callList = ['sip'] - doTest = initTestClass.initTesting() - - for callFrom in callList: - for callTo in destList: - doTest.initTest(callFrom,callTo) - resultsList.append([callFrom, callTo, doTest.result]) - initTrueTable(resultsList) - -def doGsmExtTest(): - - destList = ['GSMExt.O2', 'GSMExt.Voda', 'GSMExt.Eplus', 'GSMExt.Tm'] - callList = ['sip'] - - doTest = initTestClass.initTesting() - - for callFrom in callList: - for callTo in destList: - doTest.initTest(callFrom,callTo) - resultsList.append([callFrom, callTo, doTest.result]) - initTrueTable(resultsList) - -def doAllTest(): - - doSipTest() - doIncomingTest() - doGsmrzTest() - doGsmExtTest() - - -def sendResultWebsite(message): - if server.sendData(message+ chr(10)) == 1: - print 'data sent successfully' - test = server.receiveData(5) - if test == 'TIMEOUT': - closeFunction(db,server) - sys.exit(2) - if test == 'CONTINUE': - print 'continue' - -def sendFinishMessage(): - if server.connected == 1: - server.sendData('TEST DONE\n') - test = server.receiveData(5) - if test == 'TIMEOUT': - closeFunction(db,server) - if test == 'DISCONNECT': - close = server.closeConnection() - if close == 1: - print 'Closed connection successfully' - - print 'release mutex says ', db.releaseMutex() - -def withDB(x): - - initDB() - - if x == False: - deviceLists = db.deviceList() - callerFound = False - calleeFound = False - for device in deviceLists: - if caller == device[0]: - callerFound = True - break - - for device in deviceLists: - if callee == device[0]: - calleeFound = True - break - if callerFound != True or calleeFound != True: - if callerFound != True: - print 'No device with name', caller - db.closeDBConn() - sys.exit(1) - if calleeFound != True: - print 'No device with name', callee - db.closeDBConn() - sys.exit(1) - if callerFound == True and calleeFound == True: - taskNumber = db.maxTaskNo() - db.insertTaskIn2(caller, callee, taskNumber) - resultsList = list() - if dbStatus == 1: # Checking connection to database - if db.anyTasksToDo() == 1: # Checking task on the table - - allPing() - i=0 - makeTest = initTestClass.initTesting() - - for item in db.tasksList: - - taskID = item[0] - taskNo = item[1] - callFrom = item[2] - callTo = item[3] - - if i == 0: - db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2) - print '\n' - print 'Task ID :', taskID - print 'Calling From :', callFrom - print 'To :', callTo - - - makeTest.initTest(callFrom,callTo) - - db.addResult(taskID, makeTest.result) - - resultsList.append([callFrom, callTo, makeTest.result]) - - db.errorCode(makeTest.result) - - if int(makeTest.result) == 200: - print bcolors.OKGREEN +'Result : ' +str(makeTest.result)+ ' ' +db.errCode + bcolors.ENDC - else: - print bcolors.FAIL+'Result : ' +str(makeTest.result)+ ' ' +db.errCode + bcolors.ENDC - - if x == True: # if x = True means that this function call by website - - message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(makeTest.result) + '|' + str(db.errCode) - sendResultWebsite(message) # send result to website - - db.deleteTempTask(taskID) - i = i+1 - - db.cleanTasksList() - - if x == True: - sendFinishMessage() #send finish message to website and close the connection - - print '\n' - - initTrueTable(resultsList) # fetch result list and make adjustment about the result - db.closeDBConn() - else: - print bcolors.FAIL+"--- No job at all ---" + bcolors.ENDC - db.closeDBConn() - else: - print bcolors.FAIL+'Cant connect to database'+ bcolors.ENDC - sys.exit(1) - -def findPort(portName): # take information in existing usb port - global connect - global prefix - global num - global IMEI - global portClass - sleep(0.5) - portLog = os.popen('dmesg | grep \'pl2303 converter now attached to '+portName+'\'').read() - sleep(0.5) - if portLog != '': - connect = 1 - portClass = usbDetectClass.serialPort(portName) - portClass.findNumber() - portClass.findIMEI() - IMEI = portClass.IMEI - num = portClass.number - number = portClass.number - prefix = number[0:4] - else: - connect = 0 - -def initDevice(deviceName): - print 'Device Name :',deviceName - print ' Device IMEI : ', - imei = sys.stdin.readline().rstrip("\r\n") - print 'Phone number : ', - number = sys.stdin.readline().rstrip("\r\n") - print 'Port Name : /dev/', - portName = sys.stdin.readline().rstrip("\r\n") - print '' - - if imei == '' or portName == '': - print bcolors.FAIL+' == cant save device configuration, please fill IMEI / port name of the device =='+ bcolors.ENDC - else: - findPort(portName) - if connect == 1: - if str(IMEI) != str(imei) and str(num) != str(number): - print bcolors.FAIL+'== error, device not found =='+ bcolors.ENDC - elif str(IMEI) == str(imei): - portClass.initUpdate(deviceName, portName, number) - print bcolors.OKGREEN +'== Device succeced added =='+ bcolors.ENDC - elif str(num) == str(number) and str(IMEI) != str(imei): - portClass.initUpdate(deviceName, portName, number) - print bcolors.WARNING+'== Device succeced added, but have different IMEI =='+ bcolors.ENDC - else: - print bcolors.FAIL+'== error, no device connected =='+ bcolors.ENDC - -def autoUpdateDevice(status): - initDB() - GSMListPrefix = list() - GSMListPrefix = db.GSMPrefix() - i = 0 - x = 0 - count = 0 - while i !=10: - portName ='ttyUSB'+str(i) #checking usb connection - - i=i+1 - old_handler = signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(15) - try: - findPort(portName) - signal.signal(signal.SIGALRM, old_handler) - signal.alarm(0) - if connect == 1: - for listNum in GSMListPrefix: - if prefix == listNum[1]: - print 'Device Name :',listNum[0] - print 'IMEI :',IMEI - print 'Phone Number :',num - print 'Port Name : /dev/'+portName - x=x+1 - newPortName = '/dev/'+portName - portClass.initUpdate(listNum[0], newPortName, num) - print '\n' - except TimeoutException: - signal.signal(signal.SIGALRM, old_handler) - signal.alarm(0) - count = count + 1 - message = "Timeout" - if status == True: - sendFinishMessage() - else: - - print '== FINISH ==' - print 'Found '+str(x)+' devices' - -def updateDevice(): #update port name list of device on DB - quit = False - while quit != True: - print '' - print "Mobile device configuration" - print "Menu: a = automatic device configuration, m = Manual configuration, q = quit : ", - input = sys.stdin.readline().rstrip("\r\n") - print '' - if input == 'm': # manual configuration - - while True: - print '' - print "Mobile device name: " - print " 1. GSM O2" - print " 2. GSM Vodafone" - print " 3. GSM Eplus" - print " 4. GSM T-Mobile" - print " 5. GSM RZ 1" - print " 6. Back to menu" - print "" - print "your choise : ", - input = sys.stdin.readline().rstrip("\r\n") - - if input == '6': - break - elif input == '1': - initDevice('GSMExt.O2') - elif input == '2': - initDevice('GSMExt.Voda') - elif input == '3': - initDevice('GSMExt.Eplus') - elif input == '4': - initDevice('GSMExt.Tm') - elif input == '5': - initDevice('GSMRZ1') - else: - print 'please choose between 1-6' - - if input == 'a': #automatic configuration - autoUpdateDevice(False) - - if input == "q": - break - sys.exit() - -def closeFunction(dbConn,serverSocket): - print 'Release the mutex: ' + str(dbConn.releaseMutex()) - print 'Close the DB Connection: ' + str(dbConn.closeDBConn()) - del dbConn - del serverSocket - sys.exit() - -if len(sys.argv) > 1: - - command = sys.argv[1] - - if command == '--all': - resultsList = list() - doAllTest() - - elif command == '--sip': - resultsList = list() - doSipTest() - - elif command == '--gsmrz': - resultsList = list() - doGsmrzTest() - - elif command == '--gsmext': - resultsList = list() - doGsmExtTest() - - elif command == '--incoming': - resultsList = list() - doIncomingTest() - - elif command == '--smart': - initDB() - resultsList = list() - allPing() - taskNo = db.maxTaskNo() - db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2) - doSmartTest(False) - - elif command == '--nagios': - initDB() - resultsList = list() - allPing() - taskNo = db.maxTaskNo() - db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2) - doSmartTest('NAGIOS') - - elif command == '--devconf': - updateDevice() - - elif command == '--db': - if len(sys.argv) > 2: - try: - caller = sys.argv[2] - callee = sys.argv[3] - - except ValueError: - print "Error given caller and destination. Type '--help' for more information." - else: - print "Error given caller and destination. Type '--help' for more information." - sys.exit() - resultsList = list() - withDB(False) - - elif command == '--help': - file = open('help.txt', 'r') - print file.read() - - else: - print "command not found, Type '--help' for more information." - print '\n' -else: - global server - global tried - - - initDB() # should put db condition - server = WebsiteCommClass.ServerHandlerSoftware(34500) #define the port - tried = server.openSocket(10) - - if tried == 'TIMEOUT': - closeFunction(db,server) - - test = server.receiveData(10) - if test == 'TIMEOUT': - closeFunction(db,server) - - if test == 'START TEST': - server.sendData('CONFIRM\n') - print 'TEST STARTED' - withDB(True) - - if test == 'SMART TEST': - allPing() - taskNo = db.maxTaskNo() - db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2) - server.sendData('CONFIRM\n') - print 'SMART TEST STARTED' - resultsList = list() - doSmartTest(True) - - if test == 'UPDATE DEVICE': - server.sendData('CONFIRM\n') - print 'UPDATE DEVICE STARTED' - autoUpdateDevice(True) - else: - sys.exit('WE DIDN\'T RECEIVE THE CONFIRMATION') - - db.closeDBConn() - - - - - - - - - - - - - - - - - - - - - - - - - - -- cgit v1.2.3-55-g7522