summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--For Weekly Test/Advance/number.py17
-rwxr-xr-xFor Weekly Test/ControllerClass.py267
-rwxr-xr-xFor Weekly Test/tricode/ControllerClass.py77
3 files changed, 302 insertions, 59 deletions
diff --git a/For Weekly Test/Advance/number.py b/For Weekly Test/Advance/number.py
new file mode 100644
index 0000000..64d4748
--- /dev/null
+++ b/For Weekly Test/Advance/number.py
@@ -0,0 +1,17 @@
+readNum = "+CNUM ,:4915128040906:,129"
+
+firstQuote = readNum.find(':')
+cutString = readNum [firstQuote:]
+print cutString
+secondQuote = cutString.find(',')
+number = cutString[1:secondQuote-1]
+
+print number
+
+print number[0:1]
+
+if number.find('+') == 0:
+ number = '0'+number[3:len(number)]
+elif int(number[0:1]) == 4:
+ number = '0'+number[2:len(number)]
+print number
diff --git a/For Weekly Test/ControllerClass.py b/For Weekly Test/ControllerClass.py
new file mode 100755
index 0000000..85f29ad
--- /dev/null
+++ b/For Weekly Test/ControllerClass.py
@@ -0,0 +1,267 @@
+import sys
+import os
+import subprocess
+import SSHTunnelBoxClass
+import ClientClass
+import random
+
+import LogFileClass
+logger = LogFileClass.Logging('TestProcessLog.log')
+
+from time import sleep
+
+
+class doTheTest:
+
+ def __init__(self, callFrom, callPortName, accCaller, callTo, destPortName, destNo, accDest):
+
+ self.callFrom = callFrom
+ self.dest = callTo
+ self.destNo = destNo
+ self.accDest = accDest
+ self.accCaller = accCaller
+ self.callPortName = callPortName
+ self.destPortName = destPortName
+ self.portCaller = None
+ self.portDest = None
+ self.resultCaller = None
+ self.resultDest = None
+ self.testResult = None
+
+ def FuncTest(self):
+
+ logger.logEvent('')
+
+ self.initCaller()
+
+ if self.callFrom =="GSMRZ3" or self.callFrom =="GSMRZ2": # wait until ssh connection establish
+ sleep(6)
+ else:
+ sleep(2)
+ self.callerGreeting()
+
+ if self.connected == 'OK':
+
+ self.caller.sendData('CALLER|'+self.destNo)
+ callerHandler = self.caller.receiveData(20)
+
+ if callerHandler == "CALLER READY":
+ logger.logEvent('Caller handler : Ready')
+ self.initReceiver()
+ if self.dest =="GSMRZ3" or self.dest =="GSMRZ2": # wait until ssh connection establish
+ sleep(6)
+ else:
+ sleep(2)
+ self.receiverGreeting()
+
+ if self.connected == 'OK':
+
+ self.receiver.sendData('RECEIVER')
+ destHandler = self.receiver.receiveData(20)
+
+ if destHandler == 'RECEIVER READY':
+ logger.logEvent('Receiver handler : Ready')
+ self.startCall()
+ self.waitingFeedback()
+
+ elif destHandler == 'DEVICE NOT WORK':
+ self.testResult == 802
+ logger.logEvent('802 General Device Error: Destination device no respond timeout')
+ self.initTerminate()
+ else:
+ self.testResult = 604
+ logger.logEvent('604 General Handler Error: Destination handler no respond timeout')
+ self.initTerminate()
+
+ elif self.connected == 'DEVICE NOT WORK':
+ self.testResult == 802
+ logger.logEvent('802 General Device Error: Destination device no respond timeout')
+ self.initTerminate()
+ else:
+ logger.logEvent('998 General Handler Error: Could not connect Destination handler')
+ self.testResult = 998
+ self.caller.sendData('TERMINATE CONNECTION')
+ self.caller.closeConnection()
+ self.initCancelTest()
+ else:
+ self.testResult = 605
+ logger.logEvent('605 General Handler Error: caller handler no respond timeout')
+
+ self.caller.sendData('TERMINATE CONNECTION')
+ self.caller.closeConnection()
+ self.initCancelTest()
+
+ elif self.connected == 'DEVICE NOT WORK':
+ self.testResult = 801
+ self.caller.sendData('TERMINATE CONNECTION')
+ self.caller.closeConnection()
+ logger.logEvent('802 General Device Error: Caller device no respond timeout')
+ self.initCancelTest()
+ else:
+ self.testResult = 999
+ logger.logEvent('999 General Handler Error: Could not connect to Caller handler')
+
+
+ def initCancelTest(self):
+ #close SSH connection when using gsmBox and destination doesnt respond. to make sure SSH connection are terminate
+ if self.callFrom[0:5] == 'GSMRZ':
+ if self.callFrom != 'GSMRZ1':
+ # close SSH tunneling
+ self.boxCaller.killTunneling()
+
+ # waiting results state
+ def waitingFeedback(self):
+ logger.logEvent('Waiting Feedback')
+ self.resultDest = self.receiver.receiveData(20)
+ self.resultCaller = self.caller.receiveData(20)
+ #print 'result '+self.resultCaller+'--'+self.resultDest
+ if self.resultCaller == 'DEVICE NOT WORK':
+ logger.logEvent('Caller device not work')
+ self.testResult = 801
+ self.initTerminate()
+
+ elif self.resultCaller == 'CALL OK' and self.resultDest =='CALL OK':
+ logger.logEvent('Test Succeed')
+ self.testResult = 200
+ self.initTerminate()
+ else:
+ if self.dest == 'GSMExt.Eplus':
+ if self.resultCaller == 'CALL OK' and self.resultDest == 'TIME OUT':
+ logger.logEvent('Test Failed - Eplus No credit on Eplus')
+ self.testResult = 402
+ self.initTerminate()
+ else:
+ logger.logEvent('Test Failed')
+ self.testResult = 486
+ self.initTerminate()
+
+ else:
+ if self.resultCaller <> 'CALL OK' or self.resultDest <> 'CALL OK':
+ logger.logEvent('Test Failed')
+ self.testResult = 486
+ self.initTerminate()
+
+ #send start call message to caller
+ def startCall(self):
+ logger.logEvent('Start Call')
+ self.receiver.sendData('RECEIVE START')
+ self.caller.sendData('CALL START')
+
+ def initAccount(self, account):
+
+ accConf = account
+ self.username = accConf[0:accConf.find(':')]
+
+ line = accConf[accConf.find(':')+1:]
+ self.password = line[0:line.find(':')]
+
+ newLine = line[line.find(':')+1:]
+ self.server = newLine[0:newLine.find(':')]
+
+ # define the caller configuration such as port name and port caller.
+ def initCaller(self):
+ logger.logEvent('init Caller')
+ logger.logEvent(self.callFrom)
+ self.portCaller = random.randint(30000,60000)
+
+ if self.callFrom[0:4] == 'GSMR':
+ if self.callFrom =="GSMRZ1":
+ self.initGSM(self.portCaller, self.callPortName, self.callFrom)
+ else:
+ self.initAccount(self.accCaller)
+ #open SSH tunneling
+ self.boxCaller = SSHTunnelBoxClass.SSHTunneling(self.portCaller, 50008, self.server, self.username, self.password)
+ self.boxCaller.startTunneling()
+
+
+ elif self.callFrom[0:4] == 'GSME':
+ self.initGSM(self.portCaller, self.callPortName, self.callFrom)
+
+ else:
+ subprocess.Popen(args=["gnome-terminal", '--command=python SIPHandler.py '+self.accCaller+ ' ' +str(self.portCaller)])
+
+ # define the destination configuration such as port name and port caller.
+ def initReceiver(self):
+ logger.logEvent('init Receiver')
+ logger.logEvent(self.dest)
+ self.portDest = random.randint(30000,60000)
+
+ if self.dest[0:4] == 'GSMR':
+ if self.dest =="GSMRZ1":
+ self.initGSM(self.portDest, self.destPortName, self.dest)
+ else:
+ self.initAccount(self.accDest)
+ #open SSH tunneling
+ self.boxDest = SSHTunnelBoxClass.SSHTunneling(self.portDest, 50008, self.server, self.username, self.password)
+ self.boxDest.startTunneling()
+
+ elif self.dest[0:4] == 'GSME':
+ self.initGSM(self.portDest, self.destPortName, self.dest)
+
+ else:
+ self.portDest = 50100
+ subprocess.Popen(args=['gnome-terminal', '--command=python SIPHandler.py '+self.accDest+ ' ' +str(self.portDest)])
+
+ # send terminate message to Handlers
+ def initTerminate(self):
+ self.caller.sendData('TERMINATE CONNECTION')
+ self.receiver.sendData('TERMINATE CONNECTION')
+ if self.callFrom[0:5] == 'GSMRZ':
+ if self.callFrom != 'GSMRZ1':
+ # close SSH tunneling
+ self.boxCaller.killTunneling()
+ if self.dest[0:5] == 'GSMRZ':
+ if self.dest != 'GSMRZ1':
+ # close SSH tunneling
+ self.boxDest.killTunneling()
+ self.receiver.closeConnection()
+ self.caller.closeConnection()
+
+ def callerGreeting(self): # send greeting message to the caller handler
+ self.connected = None
+ #open connection to the Handler
+ self.caller = ClientClass.Connection('localhost',self.portCaller)
+ self.caller.connect()
+
+ if self.caller.connected == 1:
+ logger.logEvent('Connected to Caller Handler')
+ self.caller.sendData('HELLO HANDLER')
+ message = self.caller.receiveData(20)
+ if message == 'HELLO CONTROLLER':
+ logger.logEvent('Caller Handler respond')
+ self.connected = 'OK'
+ else:
+ logger.logEvent('Connect to Caller but device doesnt work')
+ self.connected = 'DEVICE NOT WORK'
+ else:
+ logger.logEvent('Cannt connect to Caller')
+ self.connected = 'NOT OK'
+
+ def receiverGreeting(self): # send greeting message to the destination handler
+ self.connected = None
+ #open connection to the Handler
+ self.receiver = ClientClass.Connection('localhost', self.portDest)
+ self.receiver.connect()
+
+ if self.receiver.connected == 1:
+ logger.logEvent('Connected to Receiver Handler')
+ self.receiver.sendData('HELLO HANDLER')
+ message = self.receiver.receiveData(20)
+ if message == 'HELLO CONTROLLER':
+ logger.logEvent('Receiver Handler respond')
+ self.connected = 'OK'
+
+ else:
+ logger.logEvent('connect to Receiver but device doesnt work')
+ self.connected = 'DEVICE NOT WORK'
+ else:
+ logger.logEvent('Cannt connect to Receiver')
+ self.connected = 'NOT OK'
+
+ def initGSM(self, portCommunication, portDevice, handler):
+ #open GSM Handler
+ subprocess.Popen(args=["gnome-terminal", '--command=python GSMHandler.py '+str(portCommunication)+ ' ' +str(portDevice)+' '+str(handler)])
+
+
+
+
diff --git a/For Weekly Test/tricode/ControllerClass.py b/For Weekly Test/tricode/ControllerClass.py
index b376c46..6e1de89 100755
--- a/For Weekly Test/tricode/ControllerClass.py
+++ b/For Weekly Test/tricode/ControllerClass.py
@@ -35,7 +35,7 @@ class doTheTest:
self.initCaller()
if self.callFrom =="GSMRZ3" or self.callFrom =="GSMRZ2": # wait until ssh connection establish
- sleep(6)
+ sleep(5)
else:
sleep(2)
self.callerGreeting()
@@ -43,14 +43,14 @@ class doTheTest:
if self.connected == 'OK':
self.caller.sendData('CALLER|'+self.destNo)
- callerHandler = self.caller.receiveData(20)
+ callerHandler = self.caller.receiveData(10)
if callerHandler == "CALLER READY":
logger.logEvent('Caller handler : Ready')
self.initReceiver()
if self.dest =="GSMRZ3" or self.dest =="GSMRZ2": # wait until ssh connection establish
- sleep(6)
+ sleep(5)
else:
sleep(2)
self.receiverGreeting()
@@ -58,7 +58,7 @@ class doTheTest:
if self.connected == 'OK':
self.receiver.sendData('RECEIVER')
- destHandler = self.receiver.receiveData(20)
+ destHandler = self.receiver.receiveData(10)
if destHandler == 'RECEIVER READY':
logger.logEvent('Receiver handler : Ready')
@@ -66,19 +66,12 @@ class doTheTest:
self.startCall()
self.waitingFeedback()
- elif destHandler == 'DEVICE NOT WORK':
- self.testResult == 802
- logger.logEvent('802 General Device Error: Destination device no respond timeout')
- self.initTerminate()
else:
self.testResult = 604
logger.logEvent('604 General Handler Error: Destination handler no respond timeout')
- self.initTerminate()
-
- elif self.connected == 'DEVICE NOT WORK':
- self.testResult == 802
- logger.logEvent('802 General Device Error: Destination device no respond timeout')
- self.initTerminate()
+ self.caller.sendData('TERMINATE CONNECTION')
+ self.receiver.closeConnection()
+
else:
logger.logEvent('998 General Handler Error: Could not connect Destination handler')
self.testResult = 998
@@ -92,13 +85,6 @@ class doTheTest:
self.caller.sendData('TERMINATE CONNECTION')
self.caller.closeConnection()
self.initCancelTest()
-
- elif self.connected == 'DEVICE NOT WORK':
- self.testResult = 801
- self.caller.sendData('TERMINATE CONNECTION')
- self.caller.closeConnection()
- logger.logEvent('802 General Device Error: Caller device no respond timeout')
- self.initCancelTest()
else:
self.testResult = 999
logger.logEvent('999 General Handler Error: Could not connect to Caller handler')
@@ -114,37 +100,19 @@ class doTheTest:
# waiting results state
def waitingFeedback(self):
logger.logEvent('Waiting Feedback')
- self.resultDest = self.receiver.receiveData(20)
- self.resultCaller = self.caller.receiveData(20)
- #print 'result '+self.resultCaller+'--'+self.resultDest 'DEVICE NOT WORK'
- if self.resultCaller == 'DEVICE NOT WORK':
- logger.logEvent('Caller device not work')
- self.testResult = 801
- self.initTerminate()
+ self.resultDest = self.receiver.receiveData(15)
+ self.resultCaller = self.caller.receiveData(15)
+ #print 'result '+self.resultCaller+'--'+self.resultDest
+ if self.resultCaller <> 'CALL OK' or self.resultDest <> 'CALL OK':
- elif self.resultCaller == 'CALL OK' and self.resultDest =='CALL OK':
+ logger.logEvent('Test Failed')
+ self.testResult = 486
+ self.initTerminate()
+
+ else:
logger.logEvent('Test Succeed')
self.testResult = 200
self.initTerminate()
- else:
- if self.dest == 'GSMExt.Eplus':
- if self.resultCaller == 'CALL OK' and self.resultDest <> 'CALL OK':
- logger.logEvent('Test Failed - Eplus No credit on Eplus')
- self.testResult = 402
- self.initTerminate()
- else:
- logger.logEvent('Test Failed')
- self.testResult = 486
- self.initTerminate()
-
- else:
- if self.resultCaller <> 'CALL OK' or self.resultDest <> 'CALL OK':
-
- logger.logEvent('Test Failed')
- self.testResult = 486
- self.initTerminate()
-
-
#send start call message to caller
def startCall(self):
@@ -231,13 +199,9 @@ class doTheTest:
if self.caller.connected == 1:
logger.logEvent('Connected to Caller Handler')
self.caller.sendData('HELLO HANDLER')
- message = self.caller.receiveData(20)
- if message == 'HELLO CONTROLLER':
+ if self.caller.receiveData(30) == 'HELLO CONTROLLER':
logger.logEvent('Caller Handler respond')
self.connected = 'OK'
- else:
- logger.logEvent('Connect to Caller but device doesnt work')
- self.connected = 'DEVICE NOT WORK'
else:
logger.logEvent('Cannt connect to Caller')
self.connected = 'NOT OK'
@@ -251,14 +215,9 @@ class doTheTest:
if self.receiver.connected == 1:
logger.logEvent('Connected to Receiver Handler')
self.receiver.sendData('HELLO HANDLER')
- message = self.receiver.receiveData(20)
- if message == 'HELLO CONTROLLER':
+ if self.receiver.receiveData(30) == 'HELLO CONTROLLER':
logger.logEvent('Receiver Handler respond')
self.connected = 'OK'
-
- else:
- logger.logEvent('connect to Receiver but device doesnt work')
- self.connected = 'DEVICE NOT WORK'
else:
logger.logEvent('Cannt connect to Receiver')
self.connected = 'NOT OK'