summaryrefslogblamecommitdiffstats
path: root/For Weekly Test/tricode/initTestClass.py
blob: 15a167c5e02587cf7d02e91df6f13700f0e1f018 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11










                         


                                         



















































































































                                                                                                                                                                                                                           
                


















                                                                                       

                                              



                                        


                                                                                                                       

















                                                                                                                                

                                                                                                                               



                                                                      

                                              



                                                                                                                                         


                                                                                                                       










                                                                                      

                                                                                                                               









                                                                                                                     





                                                                                                                                               









                                                                                                                     





                                                                                                                                               




                                                                                                  





                                                                                                                                               


























                                                                                                                     

                                                                                                                                       






                                                                                                                             
                




                                           
import sys
import subprocess, signal
import os
import ControllerClass
import DbClass
import PingClass
import random
from time import sleep

class doTest:

	def __init__(self):
		self.messageList = list()

	def pings(self,IP):

		server = PingClass.Ping(IP)
		self.serverStatus = server.ping(2)
		return self.serverStatus 
	
	def initDB(self):
    
		self.db = DbClass.DBMySQLConnection('root', 'randompasswordSQL', 'localhost', 'gsmselftesting')
        	self.db.connectDB()
        	self.dbStatus = self.db.connectDB()

	def initaccount(self,account,handler): 
		if handler == 'sip' or handler == 'unisip' or handler == 'landline':
			if account[1] != '' or account[2] != '' or account[3] != '' or account[4] != '': # checking available sip account, is there enough information about the account such as username, password,server 
				self.status = 1
			else:
				self.status = 0
		else:
			if account[0] != '' or account[1] != '':
				self.status = 1
			else:
				self.status = 0


#kill process to make sure, that the handler is Terminate incase handler having problem receiving
# terminate message from controller

	def killProc(self):
		# define process name of the Handler
		procNameDest = 'GSM Handler'
		procNameCall = 'SIP Handler'

		p = subprocess.Popen(['ps', '-A'], stdout=subprocess.PIPE)
		out, err = p.communicate()

		#search process name and kill it.
		for line in out.splitlines():
			if procNameDest in line:
				pid = int(line.split(None, 1)[0])
				os.kill(pid, signal.SIGKILL)
		for line in out.splitlines():
			if procNameCall in line:
				pid = int(line.split(None, 1)[0])
				os.kill(pid, signal.SIGKILL)

	def initTest(self, callFrom, callTo):

    		self.initDB() # open database connection
    
    		if self.dbStatus != 0: # if connection to db establish, do the test

			#fetch device account detail from database
                    	dest = self.db.deviceAddress(str(callTo))
                    	caller = self.db.deviceAddress(str(callFrom))
			#self.pings(dest[4])

			if self.pings(caller[4]) <> 0:
				#self.pings(caller[4])
			
				if self.pings(dest[4]) <> 0:

					self.initaccount(caller,callFrom)
					if self.status == 1:
						self.initaccount(dest,callTo)
						if self.status == 1:
        						callPortName = caller[0]
        						accCaller = caller[2]+':'+caller[3]+':'+caller[4]+':'
        
        						destPortName = dest[0]
        						destNo = dest[1]
        						accDest = dest[2]+':'+dest[3]+':'+dest[4]+':'
        
        						makeTest = ControllerClass.doTheTest(callFrom, callPortName, accCaller, callTo, destPortName, destNo, accDest)	
        						makeTest.FuncTest()
                                			self.result = str(makeTest.testResult)
							
							print callFrom, callTo, makeTest.testResult
						else:
							self.result = 100
					else:
        					self.result = 100
					sleep(3)
        				self.killProc() # kill all the handler
					self.db.closeDBConn() #close db connection
					sleep(1)
				else:
					self.result = 500
			else:
				self.result = 500
    
    		else:
        		self.result = 333
    		self.db.closeDBConn()
    		return self.result

	# function to search in the list
	def isThere(self, keyword,lists):
		x = 0
		for item in lists:

			if item == keyword:
				return 1
			else:
				x = x+1

	def smartTest(self):
		self.initDB()
		self.smartResultList = list()
        	deviceLists = self.db.deviceList()
        	gsmList = list()
        	gsmRZList = list()
        	sipList = list()
		destList = list()
		rem = list()
		item = list()
		
		cpgsmRZList = list()
		self.db.closeDBConn()
       
        	for lists in deviceLists: #define category of the device
			device = lists[0]
            		if device[0:5] == 'GSMRZ':
                		gsmRZList.append(device)
				cpgsmRZList.append(device)
            		elif device[0:5] == 'GSMEx':
                		gsmList.append(device)
            		else:
                		sipList.append(device)

            		if device[0:5] == 'GSMRZ' or device[0:5] == 'GSMEx':
                		destList.append(device)
	
                #first test from university telphone network to random GSM RZ avaliable
		i = random.randint(0, len(gsmRZList)-1)
		callTo = gsmRZList[i]
		callFrom = 'unisip'
		self.initTest(callFrom,callTo)

		gsmRZList.remove(callTo)
		destList.remove(callTo)
		
		self.smartResultList.append([callFrom, callTo, self.result, 1])
		message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
		self.messageList.append(message)
		
		
		for callFrom in gsmRZList:
			i = random.randint(0, len(destList)-1) #Check whether the caller and dest are same
			callTo = destList[i]
			if callFrom == callTo: #Check whether the caller and dest are same
				if i == 0:
					i = i+1 # if it in the first list, change to be the second list else, just back on step.
				else:
					i = i-1
				callTo = destList[i]

			self.initTest(callFrom,callTo)
			
			destList.remove(callTo)
			destList.remove(callFrom)
			gsmRZList.remove(callFrom)
			self.smartResultList.append([callFrom, callTo, self.result,1])
			message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
			self.messageList.append(message)

		# test incoming call from outside rz network to gsm rz
		i = random.randint(0, len(gsmRZList)-1) #
		callTo = gsmRZList[i]
		callFrom = 'landline'
		self.initTest(callFrom,callTo)
		

		if self.isThere(callTo,destList) == 1: # Checking whether caller at gsmrz list in the destination list, if yes delete it.
			destList.remove(callTo)
		self.smartResultList.append([callFrom, callTo, self.result,1])
		message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
		self.messageList.append(message)
		# testing from random GSM RZ to sip
		#i = random.randint(0, len(cpgsmRZList)-1) #
		#callFrom = cpgsmRZList[i]
		#self.initTest(callFrom, 'sip')

		#self.smartResultList.append([callFrom, 'sip', self.result,1])

		for callTo in destList:
        		callFrom = 'sip'
    			self.initTest(callFrom, callTo)
			self.smartResultList.append([callFrom, callTo, self.result,1])
			message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
			self.messageList.append(message)		
		#checking unsuccess call, to make sure that destination are really unreachable
		for dest in self.smartResultList:
			#check unsuccess call and did the test have already tried, 2 means has been check
			if int(dest[2]) == 486 and int(dest[3]) != 2 and dest[1] != 'sip' and dest[0] != 'sip':
				testDestination = False
				# make sure that destination have not tested by another part and give success result.
				for test in self.smartResultList:
					if test[1] == dest[1] and int(dest[2]) == 200:
							testDestination = True
				if  testDestination == True:
					callFrom = 'sip'
					callTo	= dest[1]
					self.initTest(callFrom,callTo)
					self.smartResultList.append([callFrom,callTo, self.result,2])
					message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
					self.messageList.append(message)
					rem.append(dest)

			#check unsuccess call because caller handler having problem
			if int(dest[2]) == 999 and int(dest[3]) != 2 and dest[1] != 'sip' and dest[1] != 'sip': 
				testDestination = False
				# make sure that destination have not tested by another part and give success result.
				for test in self.smartResultList:
					if test[1] == dest[1] and int(dest[2]) != 200:
							testDestination = True
				if  testDestination == True:
					callFrom = 'sip'
					callTo	= dest[1]
					self.initTest(callFrom,callTo)
					self.smartResultList.append([callFrom,callTo, self.result,2])
					message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
					self.messageList.append(message)
					#rem.append(dest)

			caller = dest[0] # to test nanobts if the test come from RZ GSM but fehler
			if caller[0:5] == 'GSMRZ' and int(dest[3]) != 2 and dest[1] != 'sip':
				if int(dest[2]) == 486:
					callFrom = 'sip'
					callTo	= dest[1]
					self.initTest(callFrom,callTo)
					self.smartResultList.append([callFrom,callTo, self.result,2])
					message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
					self.messageList.append(message)
					rem.append(dest)

		# test to make sure nanoBTS working or not. sice probably that nanotbts seems error but actually not.
		for RZ in cpgsmRZList:
			repeat = False
			for gsmrzResult in self.smartResultList:
				
				if gsmrzResult[0] == RZ or gsmrzResult[1] == RZ:
					if int(gsmrzResult[2]) == 486:
						repeat = True
				if gsmrzResult[1] == RZ and int(gsmrzResult[2]) == 998:
					cpgsmRZList.remove(RZ)
				if gsmrzResult[0] == RZ and int(gsmrzResult[2]) == 999:
					cpgsmRZList.remove(RZ)

		if len(cpgsmRZList) > 1:
		
			if repeat == True:
				i = random.randint(0, len(cpgsmRZList)-1) #
				callTo = cpgsmRZList[i]
				if i == 0:
					x = i+1
				else:
					x = i-1
				callFrom = cpgsmRZList[x]
				self.initTest(callFrom, callTo)
				self.smartResultList.append([callFrom, callTo, self.result,2])
				message = '|' + str(callFrom) + '|' + str(callTo) + '|' + str(self.result) + '|' + str(self.db.errCode)
				self.messageList.append(message)
				item = '['+str(gsmrzResult[0])+','+str(gsmrzResult[1])+','+str(gsmrzResult[2])+','+str(1)+']'
				rem.append(item)

		for remov in rem:
			for x in self.smartResultList:
				if x == remov:
					self.smartResultList.remove(x)
		
		return self.smartResultList