Use different data structure to maintain the number of connections shown on the Network Status tab

This commit is contained in:
Jonathan Warren 2013-05-03 12:05:57 -04:00
parent b8f44aadb4
commit 73ec3e6293
3 changed files with 52 additions and 49 deletions

View File

@ -66,7 +66,7 @@ class outgoingSynSender(threading.Thread):
random.seed()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
alreadyAttemptedConnectionsListLock.acquire()
while HOST in alreadyAttemptedConnectionsList or HOST in connectedHostsList:
while HOST in alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList:
alreadyAttemptedConnectionsListLock.release()
#print 'choosing new sample'
random.seed()
@ -206,8 +206,8 @@ class singleListener(threading.Thread):
time.sleep(10)
a,(HOST,PORT) = sock.accept()
#Users are finding that if they run more than one node in the same network (thus with the same public IP), they can not connect with the second node. This is because this section of code won't accept the connection from the same IP. This problem will go away when the Bitmessage network grows beyond being tiny but in the mean time I'll comment out this code section.
"""while HOST in connectedHostsList:
print 'incoming connection is from a host in connectedHostsList (we are already connected to it). Ignoring it.'
"""while HOST in shared.connectedHostsList:
print 'incoming connection is from a host in shared.connectedHostsList (we are already connected to it). Ignoring it.'
a.close()
a,(HOST,PORT) = sock.accept()"""
rd = receiveDataThread()
@ -242,7 +242,7 @@ class receiveDataThread(threading.Thread):
self.payloadLength = 0 #This is the protocol payload length thus it doesn't include the 24 byte message header
self.receivedgetbiginv = False #Gets set to true once we receive a getbiginv message from our peer. An abusive peer might request it too much so we use this variable to check whether they have already asked for a big inv message.
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
connectedHostsList[self.HOST] = 0 #The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
shared.connectedHostsList[self.HOST] = 0 #The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
self.connectionIsOrWasFullyEstablished = False #set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
if self.streamNumber == -1: #This was an incoming connection. Send out a version message if we accept the other node's version message.
self.initiatedConnection = False
@ -254,7 +254,7 @@ class receiveDataThread(threading.Thread):
def run(self):
shared.printLock.acquire()
print 'ID of the receiveDataThread is', str(id(self))+'. The size of the connectedHostsList is now', len(connectedHostsList)
print 'ID of the receiveDataThread is', str(id(self))+'. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
shared.printLock.release()
while True:
try:
@ -294,21 +294,13 @@ class receiveDataThread(threading.Thread):
except:
pass
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
if self.connectionIsOrWasFullyEstablished: #We don't want to decrement the number of connections and show the result if we never incremented it in the first place (which we only do if the connection is fully established- meaning that both nodes accepted each other's version packets.)
connectionsCountLock.acquire()
connectionsCount[self.streamNumber] -= 1
#self.emit(SIGNAL("updateNetworkStatusTab(PyQt_PyObject,PyQt_PyObject)"),self.streamNumber,connectionsCount[self.streamNumber])
shared.UISignalQueue.put(('updateNetworkStatusTab',(self.streamNumber,connectionsCount[self.streamNumber])))
shared.printLock.acquire()
print 'Updating network status tab with current connections count:', connectionsCount[self.streamNumber]
shared.printLock.release()
connectionsCountLock.release()
try:
del connectedHostsList[self.HOST]
del shared.connectedHostsList[self.HOST]
except Exception, err:
print 'Could not delete', self.HOST, 'from connectedHostsList.', err
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
shared.UISignalQueue.put(('updateNetworkStatusTab','no data'))
shared.printLock.acquire()
print 'The size of the connectedHostsList is now:', len(connectedHostsList)
print 'The size of the shared.connectedHostsList is now:', len(shared.connectedHostsList)
shared.printLock.release()
def processData(self):
@ -434,23 +426,17 @@ class receiveDataThread(threading.Thread):
if not self.initiatedConnection:
#self.emit(SIGNAL("setStatusIcon(PyQt_PyObject)"),'green')
shared.UISignalQueue.put(('setStatusIcon','green'))
#Update the 'Network Status' tab
connectionsCountLock.acquire()
connectionsCount[self.streamNumber] += 1
#self.emit(SIGNAL("updateNetworkStatusTab(PyQt_PyObject,PyQt_PyObject)"),self.streamNumber,connectionsCount[self.streamNumber])
shared.UISignalQueue.put(('updateNetworkStatusTab',(self.streamNumber,connectionsCount[self.streamNumber])))
connectionsCountLock.release()
shared.UISignalQueue.put(('updateNetworkStatusTab','no data'))
remoteNodeIncomingPort, remoteNodeSeenTime = shared.knownNodes[self.streamNumber][self.HOST]
shared.printLock.acquire()
print 'Connection fully established with', self.HOST, remoteNodeIncomingPort
print 'ConnectionsCount now:', connectionsCount[self.streamNumber]
print 'The size of the connectedHostsList is now', len(connectedHostsList)
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
print 'broadcasting addr from within connectionFullyEstablished function.'
shared.printLock.release()
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST, remoteNodeIncomingPort)]) #This lets all of our peers know about this new node.
self.sendaddr() #This is one large addr message to this one peer.
if not self.initiatedConnection and connectionsCount[self.streamNumber] > 150:
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
shared.printLock.acquire()
print 'We are connected to too many people. Closing connection.'
shared.printLock.release()
@ -1568,28 +1554,36 @@ class receiveDataThread(threading.Thread):
#Our peer has requested (in a getdata message) that we send an object.
def sendData(self,objectType,payload):
if objectType == 'pubkey':
shared.printLock.acquire()
print 'sending pubkey'
shared.printLock.release()
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
headerData += 'pubkey\x00\x00\x00\x00\x00\x00'
headerData += pack('>L',len(payload)) #payload length.
headerData += hashlib.sha512(payload).digest()[:4]
self.sock.sendall(headerData + payload)
elif objectType == 'getpubkey' or objectType == 'pubkeyrequest':
shared.printLock.acquire()
print 'sending getpubkey'
shared.printLock.release()
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
headerData += 'getpubkey\x00\x00\x00'
headerData += pack('>L',len(payload)) #payload length.
headerData += hashlib.sha512(payload).digest()[:4]
self.sock.sendall(headerData + payload)
elif objectType == 'msg':
shared.printLock.acquire()
print 'sending msg'
shared.printLock.release()
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00'
headerData += pack('>L',len(payload)) #payload length.
headerData += hashlib.sha512(payload).digest()[:4]
self.sock.sendall(headerData + payload)
elif objectType == 'broadcast':
shared.printLock.acquire()
print 'sending broadcast'
shared.printLock.release()
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
headerData += 'broadcast\x00\x00\x00'
headerData += pack('>L',len(payload)) #payload length.
@ -1936,6 +1930,7 @@ class receiveDataThread(threading.Thread):
print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber,'.'
shared.printLock.release()
return
shared.connectedHostsList[self.HOST] = 1 #We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
#If this was an incoming connection, then the sendData thread doesn't know the stream. We have to set it.
if not self.initiatedConnection:
shared.broadcastToSendDataQueues((0,'setStreamNumber',(self.HOST,self.streamNumber)))
@ -2196,15 +2191,10 @@ def signal_handler(signal, frame):
def connectToStream(streamNumber):
#self.listOfOutgoingSynSenderThreads = [] #if we don't maintain this list, the threads will get garbage-collected.
connectionsCount[streamNumber] = 0
selfInitiatedConnections[streamNumber] = {}
for i in range(32):
a = outgoingSynSender()
#self.listOfOutgoingSynSenderThreads.append(a)
#QtCore.QObject.connect(a, QtCore.SIGNAL("passObjectThrough(PyQt_PyObject)"), self.connectObjectToSignals)
#QtCore.QObject.connect(a, QtCore.SIGNAL("updateStatusBar(PyQt_PyObject)"), self.updateStatusBar)
a.setup(streamNumber)
a.start()
@ -3686,11 +3676,8 @@ class singleAPI(threading.Thread):
selfInitiatedConnections = {} #This is a list of current connections (the thread pointers at least)
alreadyAttemptedConnectionsList = {} #This is a list of nodes to which we have already attempted a connection
ackdataForWhichImWatching = {}
connectionsCount = {} #Used for the 'network status' tab.
connectionsCountLock = threading.Lock()
alreadyAttemptedConnectionsListLock = threading.Lock()
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack('>Q',random.randrange(1, 18446744073709551615))
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
neededPubkeys = {}
successfullyDecryptMessageTimings = [] #A list of the amounts of time it took to successfully decrypt msg messages
#apiSignalQueue = Queue.Queue() #The singleAPI thread uses this queue to pass messages to a QT thread which can emit signals to do things like display a message in the UI.

View File

@ -434,7 +434,7 @@ class MyForm(QtGui.QMainWindow):
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"), self.updateSentItemStatusByAckdata)
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), self.displayNewInboxMessage)
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), self.displayNewSentMessage)
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("updateNetworkStatusTab(PyQt_PyObject,PyQt_PyObject)"), self.updateNetworkStatusTab)
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("updateNetworkStatusTab()"), self.updateNetworkStatusTab)
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("incrementNumberOfMessagesProcessed()"), self.incrementNumberOfMessagesProcessed)
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("incrementNumberOfPubkeysProcessed()"), self.incrementNumberOfPubkeysProcessed)
QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL("incrementNumberOfBroadcastsProcessed()"), self.incrementNumberOfBroadcastsProcessed)
@ -550,16 +550,32 @@ class MyForm(QtGui.QMainWindow):
self.numberOfPubkeysProcessed += 1
self.ui.labelPubkeyCount.setText('Processed ' + str(self.numberOfPubkeysProcessed) + ' public keys.')
def updateNetworkStatusTab(self,streamNumber,connectionCount):
def updateNetworkStatusTab(self):
#print 'updating network status tab'
totalNumberOfConnectionsFromAllStreams = 0 #One would think we could use len(sendDataQueues) for this but the number doesn't always match: just because we have a sendDataThread running doesn't mean that the connection has been fully established (with the exchange of version messages).
foundTheRowThatNeedsUpdating = False
for currentRow in range(self.ui.tableWidgetConnectionCount.rowCount()):
streamNumberTotals = {}
for host, streamNumber in shared.connectedHostsList.items():
if not streamNumber in streamNumberTotals:
streamNumberTotals[streamNumber] = 1
else:
streamNumberTotals[streamNumber] += 1
while self.ui.tableWidgetConnectionCount.rowCount() > 0:
self.ui.tableWidgetConnectionCount.removeRow(0)
for streamNumber, connectionCount in streamNumberTotals.items():
self.ui.tableWidgetConnectionCount.insertRow(0)
newItem = QtGui.QTableWidgetItem(str(streamNumber))
newItem.setFlags( QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled )
self.ui.tableWidgetConnectionCount.setItem(0,0,newItem)
newItem = QtGui.QTableWidgetItem(str(connectionCount))
newItem.setFlags( QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled )
self.ui.tableWidgetConnectionCount.setItem(0,1,newItem)
"""for currentRow in range(self.ui.tableWidgetConnectionCount.rowCount()):
rowStreamNumber = int(self.ui.tableWidgetConnectionCount.item(currentRow,0).text())
if streamNumber == rowStreamNumber:
foundTheRowThatNeedsUpdating = True
self.ui.tableWidgetConnectionCount.item(currentRow,1).setText(str(connectionCount))
totalNumberOfConnectionsFromAllStreams += connectionCount
#totalNumberOfConnectionsFromAllStreams += connectionCount
if foundTheRowThatNeedsUpdating == False:
#Add a line to the table for this stream number and update its count with the current connection count.
self.ui.tableWidgetConnectionCount.insertRow(0)
@ -569,11 +585,11 @@ class MyForm(QtGui.QMainWindow):
newItem = QtGui.QTableWidgetItem(str(connectionCount))
newItem.setFlags( QtCore.Qt.ItemIsSelectable | QtCore.Qt.ItemIsEnabled )
self.ui.tableWidgetConnectionCount.setItem(0,1,newItem)
totalNumberOfConnectionsFromAllStreams += connectionCount
self.ui.labelTotalConnections.setText('Total Connections: ' + str(totalNumberOfConnectionsFromAllStreams))
if totalNumberOfConnectionsFromAllStreams > 0 and shared.statusIconColor == 'red': #FYI: The 'singlelistener' thread sets the icon color to green when it receives an incoming connection, meaning that the user's firewall is configured correctly.
totalNumberOfConnectionsFromAllStreams += connectionCount"""
self.ui.labelTotalConnections.setText('Total Connections: ' + str(len(shared.connectedHostsList)))
if len(shared.connectedHostsList) > 0 and shared.statusIconColor == 'red': #FYI: The 'singlelistener' thread sets the icon color to green when it receives an incoming connection, meaning that the user's firewall is configured correctly.
self.setStatusIcon('yellow')
elif totalNumberOfConnectionsFromAllStreams == 0:
elif len(shared.connectedHostsList) == 0:
self.setStatusIcon('red')
def setStatusIcon(self,color):
@ -851,13 +867,13 @@ class MyForm(QtGui.QMainWindow):
def connectObjectToSignals(self,object):
"""def connectObjectToSignals(self,object):
QtCore.QObject.connect(object, QtCore.SIGNAL("updateStatusBar(PyQt_PyObject)"), self.updateStatusBar)
QtCore.QObject.connect(object, QtCore.SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), self.displayNewInboxMessage)
QtCore.QObject.connect(object, QtCore.SIGNAL("displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), self.displayNewSentMessage)
QtCore.QObject.connect(object, QtCore.SIGNAL("updateSentItemStatusByHash(PyQt_PyObject,PyQt_PyObject)"), self.updateSentItemStatusByHash)
QtCore.QObject.connect(object, QtCore.SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"), self.updateSentItemStatusByAckdata)
QtCore.QObject.connect(object, QtCore.SIGNAL("updateNetworkStatusTab(PyQt_PyObject,PyQt_PyObject)"), self.updateNetworkStatusTab)
QtCore.QObject.connect(object, QtCore.SIGNAL("updateNetworkStatusTab()"), self.updateNetworkStatusTab)
QtCore.QObject.connect(object, QtCore.SIGNAL("incrementNumberOfMessagesProcessed()"), self.incrementNumberOfMessagesProcessed)
QtCore.QObject.connect(object, QtCore.SIGNAL("incrementNumberOfPubkeysProcessed()"), self.incrementNumberOfPubkeysProcessed)
QtCore.QObject.connect(object, QtCore.SIGNAL("incrementNumberOfBroadcastsProcessed()"), self.incrementNumberOfBroadcastsProcessed)
@ -866,7 +882,7 @@ class MyForm(QtGui.QMainWindow):
#This function exists because of the API. The API thread starts an address generator thread and must somehow connect the address generator's signals to the QApplication thread. This function is used to connect the slots and signals.
def connectObjectToAddressGeneratorSignals(self,object):
QtCore.QObject.connect(object, SIGNAL("writeNewAddressToTable(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), self.writeNewAddressToTable)
QtCore.QObject.connect(object, QtCore.SIGNAL("updateStatusBar(PyQt_PyObject)"), self.updateStatusBar)
QtCore.QObject.connect(object, QtCore.SIGNAL("updateStatusBar(PyQt_PyObject)"), self.updateStatusBar)"""
#This function is called by the processmsg function when that function receives a message to an address that is acting as a pseudo-mailing-list. The message will be broadcast out. This function puts the message on the 'Sent' tab.
def displayNewSentMessage(self,toAddress,toLabel,fromAddress,subject,message,ackdata):
@ -1910,8 +1926,7 @@ class UISignaler(QThread):
toAddress,fromLabel,fromAddress,subject,message,ackdata = data
self.emit(SIGNAL("displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),toAddress,fromLabel,fromAddress,subject,message,ackdata)
elif command == 'updateNetworkStatusTab':
streamNumber,count = data
self.emit(SIGNAL("updateNetworkStatusTab(PyQt_PyObject,PyQt_PyObject)"),streamNumber,count)
self.emit(SIGNAL("updateNetworkStatusTab()"))
elif command == 'incrementNumberOfMessagesProcessed':
self.emit(SIGNAL("incrementNumberOfMessagesProcessed()"))
elif command == 'incrementNumberOfPubkeysProcessed':

View File

@ -26,6 +26,7 @@ inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't r
printLock = threading.Lock()
appdata = '' #holds the location of the application data storage directory
statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.