From 6b01e8aa3307783334b472b07e4aa5eb70387d45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C3=B6=20Barany?= Date: Thu, 1 Aug 2013 12:32:07 +0200 Subject: [PATCH] Bug fixes in new peer handling. --- src/class_receiveDataThread.py | 70 ++++++++++++++++------------------ src/class_sendDataThread.py | 2 +- 2 files changed, 33 insertions(+), 39 deletions(-) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 40e4a97f..5af33157 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -42,7 +42,7 @@ class receiveDataThread(threading.Thread): someObjectsOfWhichThisRemoteNodeIsAlreadyAware, selfInitiatedConnections): self.sock = sock - self.peer = shared.Peer(HOST, port) + self.peer = shared.Peer(HOST, port) self.streamNumber = streamNumber self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {} @@ -194,7 +194,7 @@ class receiveDataThread(threading.Thread): objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway. if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0: with shared.printLock: - print '(concerning', self.peer + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) + print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) try: del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ @@ -204,7 +204,7 @@ class receiveDataThread(threading.Thread): break if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0: with shared.printLock: - print '(concerning', self.peer + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) + print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) try: del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ @@ -1677,11 +1677,11 @@ class receiveDataThread(threading.Thread): shared.knownNodesLock.acquire() shared.knownNodes[recaddrStream] = {} shared.knownNodesLock.release() - if hostFromAddrMessage not in shared.knownNodes[recaddrStream]: + peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort) + if peerFromAddrMessage not in shared.knownNodes[recaddrStream]: if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][hostFromAddrMessage] = ( - recaddrPort, timeSomeoneElseReceivedMessageFromThisNode) + shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode shared.knownNodesLock.release() needToWriteKnownNodesToDisk = True hostDetails = ( @@ -1690,15 +1690,12 @@ class receiveDataThread(threading.Thread): listOfAddressDetailsToBroadcastToPeers.append( hostDetails) else: - PORT, timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ - hostFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. + timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ + peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][hostFromAddrMessage] = ( - PORT, timeSomeoneElseReceivedMessageFromThisNode) + shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode shared.knownNodesLock.release() - if PORT != recaddrPort: - print 'Strange occurance: The port specified in an addr message', str(recaddrPort), 'does not match the port', str(PORT), 'that this program (or some other peer) used to connect to it', str(hostFromAddrMessage), '. Perhaps they changed their port or are using a strange NAT configuration.' if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers. shared.knownNodesLock.acquire() output = open(shared.appdata + 'knownnodes.dat', 'wb') @@ -1784,15 +1781,15 @@ class receiveDataThread(threading.Thread): shared.knownNodesLock.acquire() shared.knownNodes[recaddrStream] = {} shared.knownNodesLock.release() - if hostFromAddrMessage not in shared.knownNodes[recaddrStream]: + peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort) + if peerFromAddrMessage not in shared.knownNodes[recaddrStream]: if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. shared.knownNodesLock.acquire() - newPeer = shared.Peer(hostFromAddrMessage, recaddrPort) - shared.knownNodes[recaddrStream][newPeer] = ( + shared.knownNodes[recaddrStream][peerFromAddrMessage] = ( timeSomeoneElseReceivedMessageFromThisNode) shared.knownNodesLock.release() with shared.printLock: - print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream + print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream needToWriteKnownNodesToDisk = True hostDetails = ( @@ -1801,15 +1798,12 @@ class receiveDataThread(threading.Thread): listOfAddressDetailsToBroadcastToPeers.append( hostDetails) else: - PORT, timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ - hostFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. + timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ + peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][hostFromAddrMessage] = ( - PORT, timeSomeoneElseReceivedMessageFromThisNode) + shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode shared.knownNodesLock.release() - if PORT != recaddrPort: - print 'Strange occurance: The port specified in an addr message', str(recaddrPort), 'does not match the port', str(PORT), 'that this program (or some other peer) used to connect to it', str(hostFromAddrMessage), '. Perhaps they changed their port or are using a strange NAT configuration.' if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers. shared.knownNodesLock.acquire() output = open(shared.appdata + 'knownnodes.dat', 'wb') @@ -1866,29 +1860,29 @@ class receiveDataThread(threading.Thread): if len(shared.knownNodes[self.streamNumber]) > 0: for i in range(500): random.seed() - HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) - if helper_generic.isHostInPrivateIPRange(HOST): + peer, = random.sample(shared.knownNodes[self.streamNumber], 1) + if helper_generic.isHostInPrivateIPRange(peer.host): continue - addrsInMyStream[HOST] = shared.knownNodes[ - self.streamNumber][HOST] + addrsInMyStream[peer] = shared.knownNodes[ + self.streamNumber][peer] if len(shared.knownNodes[self.streamNumber * 2]) > 0: for i in range(250): random.seed() - HOST, = random.sample(shared.knownNodes[ + peer, = random.sample(shared.knownNodes[ self.streamNumber * 2], 1) - if helper_generic.isHostInPrivateIPRange(HOST): + if helper_generic.isHostInPrivateIPRange(peer.host): continue - addrsInChildStreamLeft[HOST] = shared.knownNodes[ - self.streamNumber * 2][HOST] + addrsInChildStreamLeft[peer] = shared.knownNodes[ + self.streamNumber * 2][peer] if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0: for i in range(250): random.seed() - HOST, = random.sample(shared.knownNodes[ + peer, = random.sample(shared.knownNodes[ (self.streamNumber * 2) + 1], 1) - if helper_generic.isHostInPrivateIPRange(HOST): + if helper_generic.isHostInPrivateIPRange(peer.host): continue - addrsInChildStreamRight[HOST] = shared.knownNodes[ - (self.streamNumber * 2) + 1][HOST] + addrsInChildStreamRight[peer] = shared.knownNodes[ + (self.streamNumber * 2) + 1][peer] shared.knownNodesLock.release() numberOfAddressesInAddrMessage = 0 payload = '' @@ -1905,8 +1899,8 @@ class receiveDataThread(threading.Thread): payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ socket.inet_aton(HOST) payload += pack('>H', PORT) # remote port - for HOST, value in addrsInChildStreamLeft.items(): - PORT, timeLastReceivedMessageFromThisNode = value + for (HOST, PORT), value in addrsInChildStreamLeft.items(): + timeLastReceivedMessageFromThisNode = value if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. numberOfAddressesInAddrMessage += 1 payload += pack( @@ -1917,8 +1911,8 @@ class receiveDataThread(threading.Thread): payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ socket.inet_aton(HOST) payload += pack('>H', PORT) # remote port - for HOST, value in addrsInChildStreamRight.items(): - PORT, timeLastReceivedMessageFromThisNode = value + for (HOST, PORT), value in addrsInChildStreamRight.items(): + timeLastReceivedMessageFromThisNode = value if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. numberOfAddressesInAddrMessage += 1 payload += pack( diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index ded371a1..867d1d70 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -31,7 +31,7 @@ class sendDataThread(threading.Thread): streamNumber, someObjectsOfWhichThisRemoteNodeIsAlreadyAware): self.sock = sock - self.peer = shared.Peer(HOST, PORT) + self.peer = shared.Peer(HOST, PORT) self.streamNumber = streamNumber self.remoteProtocolVersion = - \ 1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue.