#! /usr/bin/env python from serial import * #serial port library import sys import ControllerClass import DbClass import PingClass import trueTableClass import initTestClass import usbDetectClass import ServerClassSoftware from time import sleep global resultsList resultsList = list() GSMListPrefix = [['GSMExt.Tm','0151'],['GSMExt.Tm','0160'],['GSMExt.Tm','0170'],['GSMExt.Tm','0171'],['GSMExt.Tm','0175'],['GSMExt.Voda','0152'],['GSMExt.Voda','0162'],['GSMExt.Voda','0172'],['GSMExt.Voda','0173'],['GSMExt.Voda','0174'],['GSMExt.Eplus','0157'],['GSMExt.Eplus','0177'],['GSMExt.Eplus','0155'],['GSMExt.Eplus','0163'],['GSMExt.Eplus','0178'],['GSMExt.O2','0159'],['GSMExt.O2','0176'],['GSMExt.O2','0179'],['GSMRZ1','0761']] def allPing(): #ping all existing devices global sipGate global sipServer global unisip global gsmBox1 global gsmBox2 server = PingClass.Ping('sipgate.de') sipGate = server.ping(2) server = PingClass.Ping('132.230.4.8') sipServer = server.ping(2) server = PingClass.Ping('132.230.252.228') unisip = server.ping(2) server = PingClass.Ping('localhost') gsmBox1 = server.ping(2) server = PingClass.Ping('10.4.58.241') gsmBox2 = server.ping(2) def initDB(): # function for connection database global dbStatus global db db = DbClass.DBMySQLConnection('root', 'randompasswordSQL', 'localhost', 'gsmselftesting') db.connectDB() dbStatus = db.connectDB() if dbStatus == 1: print 'Lock the mutex: ' + str(db.lockMutex(600)) def initTrueTable(x): initResult = trueTableClass.trueTable(x) initResult.initTrueTable() print '\n' openBSC = None for x in initResult.nanoBts: name = x[0] if x[1] == True: openBSC = True asterikServer = True print name+ ' Working' else: if int(x[1]) == 486: print name+ ' not Working' elif int(x[1]) == 200: print name+ ' Working' elif int(x[1]) == 998 or int(x[1]) == 999: print name+ ' not Working, handler error' else: print name+ ' not Working' print '' if openBSC != None: if openBSC == True: print 'openBSC working' else: print 'openBSC doesnt work' print '' if initResult.asteriskServer == True: print 'asterik server is working' print '\n' if initResult.outGoingRZ == True: print 'Outgoing call from RZ is working' elif initResult.outGoingRZ == False: print 'Outgoing call from RZ is not working' if initResult.incomingRZ == True: print 'incoming call from outside RZ to GSM RZ is working' elif initResult.incomingRZ == False: print 'incoming call from outside RZ to GSM RZ is not working' print '\n' def doSipTest(): #destList = ['gsmr1','gsmr2', 'gsmr3', 'landline', 'unisip', 'GSMExt.O2', 'GSMExt.Voda', 'GSMExt.Eplus', 'GSMExt.Tm' ] destList = ['landline', 'unisip'] print "test" doTest = initTestClass.doTest() print 'iam here' for callTo in destList: callFrom = 'sip' doTest.initTest(callFrom,callTo) resultsList.append([callFrom, callTo, doTest.result]) def doIncomingTest(): #incoming call to RZ network destList = ['GSMRZ1','unisip', 'GSMRZ2','GSMRZ3'] doTest = initTestClass.doTest() for callTo in destList: callFrom = 'landline' doTest.initTest(callFrom,callTo) resultsList.append([callFrom, callTo, doTest.result]) initTrueTable(resultsList) def doGsmrzTest(): destList = ['GSMRZ1','GSMRZ2', 'GSMRZ3'] callList = ['sip'] doTest = initTestClass.doTest() for callFrom in callList: for callTo in destList: doTest.initTest(callFrom,callTo) resultsList.append([callFrom, callTo, doTest.result]) initTrueTable(resultsList) def doGsmExtTest(): destList = ['GSMExt.O2', 'GSMExt.Voda', 'GSMExt.Eplus', 'GSMExt.Tm'] callList = ['sip'] doTest = initTestClass.doTest() for callFrom in callList: for callTo in destList: doTest.initTest(callFrom,callTo) resultsList.append([callFrom, callTo, doTest.result]) initTrueTable(resultsList) def doAllTest(): doSipTest() doIncomingTest() doGsmrzTest() doGsmExtTest() def regularTest(): regulartest = initTestClass.doTest() regulartest.smartTest() initTrueTable(regulartest.smartResultList) def sendResultWebsite(message): if server.sendData(message+ chr(10)) == 1: print 'data sent successfully' test = server.receiveData(2) if test == 'TIMEOUT': closeFunction(db,server) if test == 'CONTINUE': print 'continue test' def sendFinishMessage(): if server.connected == 1: server.sendData('TEST DONE\n') test = server.receiveData(2) if test == 'TIMEOUT': closeFunction(db,server) if test == 'DISCONNECT': close = server.closeConnection() if close == 1: print 'Closed connection successfully' print 'release mutex says ', db.releaseMutex() def withDB(x): initDB() resultsList = list() if dbStatus == 1: # Checking connection to database if db.anyTasksToDo() == 1: # Checking task on the table allPing() i=0 makeTest = initTestClass.doTest() for item in db.tasksList: taskID = item[0] taskNo = item[1] callFrom = item[2] callTo = item[3] if i == 0: db.updatePingResult(taskNo, sipServer, sipGate, unisip, gsmBox1, gsmBox2) print '\n' print 'Task ID :', taskID print 'Calling From :', callFrom print 'To :', callTo makeTest.initTest(callFrom,callTo) db.addResult(taskID, makeTest.result) resultsList.append([callFrom, callTo, makeTest.result]) db.errorCode(makeTest.result) print 'Result : ' +makeTest.result+ ' ' +db.errCode if x == True: # if x = True means that this function call by website message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(makeTest.result) + '|' + str(db.errCode) sendResultWebsite(message) # send result to website db.deleteTempTask(taskID) i = i+1 db.cleanTasksList() if x == True: sendFinishMessage() #send finish message to website and close the connection print '\n' initTrueTable(resultsList) # fetch result list and make adjustment about the result db.closeDBConn() else: print "--- No job at all ---" db.closeDBConn() else: print 'Cant connect to database' sys.exit(1) def findPort(portName): # take information in existing usb port global connect global prefix global num global IMEI global portClass sleep(0.5) portLog = os.popen('dmesg | grep \'pl2303 converter now attached to '+portName+'\'').read() sleep(0.5) if portLog != '': connect = 1 portClass = usbDetectClass.serialPort(portName) portClass.findNumber() portClass.findIMEI() IMEI = portClass.IMEI num = portClass.number number = portClass.number prefix = number[0:4] else: connect = 0 def initDevice(deviceName): print 'Device Name :',deviceName print ' Device IMEI : ', imei = sys.stdin.readline().rstrip("\r\n") print 'Phone number : ', number = sys.stdin.readline().rstrip("\r\n") print 'Port Name : /dev/', portName = sys.stdin.readline().rstrip("\r\n") print '' if imei == '' or portName == '': print ' == cant save device configuration, please fill IMEI / port name of the device ==' else: findPort(portName) if connect == 1: if str(IMEI) != str(imei) and str(num) != str(number): print '== error, device not found ==' elif str(IMEI) == str(imei): portClass.initUpdate(deviceName, portName, number) print '== Device succeced added ==' elif str(num) == str(number) and str(IMEI) != str(imei): portClass.initUpdate(deviceName, portName, number) print '== Device succeced added, but have different IMEI ==' else: print '== error, no device connected ==' def autoUpdateDevice(): i = 0 x = 0 while i !=10: portName ='ttyUSB'+str(i) #checking usb connection findPort(portName) i=i+1 if connect == 1: for listNum in GSMListPrefix: if prefix == listNum[1]: print 'Device Name :',listNum[0] print 'IMEI :',IMEI print 'Phone Number :',num print 'Port Name : /dev/'+portName x=x+1 newPortName = '/dev/'+portName portClass.initUpdate(listNum[0], newPortName, num) print '\n' print '== FINISH ==' print 'Found '+str(x)+' devices' def updateDevice(): #update port name list of device on DB quit = False while quit != True: print '' print "Mobile device configuration" print "Menu: a = automatic device configuration, m = Manual configuration, q = quit : ", input = sys.stdin.readline().rstrip("\r\n") print '' if input == 'm': # manual configuration while True: print '' print "Mobile device name: " print " 1. GSM O2" print " 2. GSM Vodafone" print " 3. GSM Eplus" print " 4. GSM T-Mobile" print " 5. GSM RZ 1" print " 6. Back to menu" print "" print "your choise : ", input = sys.stdin.readline().rstrip("\r\n") if input == '8': break elif input == '1': initDevice('GSMExt.O2') elif input == '2': initDevice('GSMExt.Voda') elif input == '3': initDevice('GSMExt.Eplus') elif input == '4': initDevice('GSMExt.Tm') elif input == '5': initDevice('GSMRZ1') else: print 'please choose between 1-6' if input == 'a': #automatic configuration autoUpdateDevice() if input == "q": break sys.exit() def closeFunction(dbConn,serverSocket): print 'Release the mutex: ' + str(dbConn.releaseMutex()) print 'Close the DB Connection: ' + str(dbConn.closeDBConn()) del dbConn del serverSocket sys.exit() if len(sys.argv) > 1: command = sys.argv[1] print ' ' if command == '--all': resultsList = list() doAllTest() elif command == '--sip': resultsList = list() doSipTest() elif command == '--gsmrz': resultsList = list() doGsmrzTest() elif command == '--gsmext': resultsList = list() doGsmExtTest() elif command == '--incoming': resultsList = list() doIncomingTest() elif command == '--smart': resultsList = list() regularTest() elif command == '--devconf': updateDevice() elif command == '--db': resultsList = list() withDB(False) elif command == '--help': file = open('help.txt', 'r') print file.read() else: print "command not found, Type '--help', '--credits' for more information." print '\n' else: global server global tried initDB() # should put db condition server = ServerClassSoftware.ServerHandlerSoftware(34500) #define the port tried = server.openSocket(3) if tried == 'TIMEOUT': closeFunction(db,server) test = server.receiveData(2) if test == 'TIMEOUT': closeFunction(db,server) if test == 'START TEST': server.sendData('CONFIRM\n') print 'TEST STARTED' withDB(True) if test == 'SMART TEST': server.sendData('CONFIRM\n') print 'SMART TEST STARTED' resultsList = list() regularTest() if test == 'UPDATE DEVICE': server.sendData('CONFIRM\n') print 'UPDATE DEVICE STARTED' autoUpdateDevice() else: sys.exit('WE DIDN\'T RECEIVE THE CONFIRMATION') del server