Bug fixes in new peer handling.

This commit is contained in:
Gergö Barany 2013-08-01 12:32:07 +02:00
parent 401c95cdb6
commit 6b01e8aa33
2 changed files with 33 additions and 39 deletions

View File

@ -42,7 +42,7 @@ class receiveDataThread(threading.Thread):
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
selfInitiatedConnections): selfInitiatedConnections):
self.sock = sock self.sock = sock
self.peer = shared.Peer(HOST, port) self.peer = shared.Peer(HOST, port)
self.streamNumber = streamNumber self.streamNumber = streamNumber
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {} self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
@ -194,7 +194,7 @@ class receiveDataThread(threading.Thread):
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway. objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0: if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
with shared.printLock: with shared.printLock:
print '(concerning', self.peer + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
try: try:
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
@ -204,7 +204,7 @@ class receiveDataThread(threading.Thread):
break break
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0: if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
with shared.printLock: with shared.printLock:
print '(concerning', self.peer + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
try: try:
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
@ -1677,11 +1677,11 @@ class receiveDataThread(threading.Thread):
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream] = {} shared.knownNodes[recaddrStream] = {}
shared.knownNodesLock.release() shared.knownNodesLock.release()
if hostFromAddrMessage not in shared.knownNodes[recaddrStream]: peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream][hostFromAddrMessage] = ( shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
shared.knownNodesLock.release() shared.knownNodesLock.release()
needToWriteKnownNodesToDisk = True needToWriteKnownNodesToDisk = True
hostDetails = ( hostDetails = (
@ -1690,15 +1690,12 @@ class receiveDataThread(threading.Thread):
listOfAddressDetailsToBroadcastToPeers.append( listOfAddressDetailsToBroadcastToPeers.append(
hostDetails) hostDetails)
else: else:
PORT, timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
hostFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream][hostFromAddrMessage] = ( shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
PORT, timeSomeoneElseReceivedMessageFromThisNode)
shared.knownNodesLock.release() shared.knownNodesLock.release()
if PORT != recaddrPort:
print 'Strange occurance: The port specified in an addr message', str(recaddrPort), 'does not match the port', str(PORT), 'that this program (or some other peer) used to connect to it', str(hostFromAddrMessage), '. Perhaps they changed their port or are using a strange NAT configuration.'
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers. if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
output = open(shared.appdata + 'knownnodes.dat', 'wb') output = open(shared.appdata + 'knownnodes.dat', 'wb')
@ -1784,15 +1781,15 @@ class receiveDataThread(threading.Thread):
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream] = {} shared.knownNodes[recaddrStream] = {}
shared.knownNodesLock.release() shared.knownNodesLock.release()
if hostFromAddrMessage not in shared.knownNodes[recaddrStream]: peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
newPeer = shared.Peer(hostFromAddrMessage, recaddrPort) shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
shared.knownNodes[recaddrStream][newPeer] = (
timeSomeoneElseReceivedMessageFromThisNode) timeSomeoneElseReceivedMessageFromThisNode)
shared.knownNodesLock.release() shared.knownNodesLock.release()
with shared.printLock: with shared.printLock:
print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
needToWriteKnownNodesToDisk = True needToWriteKnownNodesToDisk = True
hostDetails = ( hostDetails = (
@ -1801,15 +1798,12 @@ class receiveDataThread(threading.Thread):
listOfAddressDetailsToBroadcastToPeers.append( listOfAddressDetailsToBroadcastToPeers.append(
hostDetails) hostDetails)
else: else:
PORT, timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
hostFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream][hostFromAddrMessage] = ( shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
PORT, timeSomeoneElseReceivedMessageFromThisNode)
shared.knownNodesLock.release() shared.knownNodesLock.release()
if PORT != recaddrPort:
print 'Strange occurance: The port specified in an addr message', str(recaddrPort), 'does not match the port', str(PORT), 'that this program (or some other peer) used to connect to it', str(hostFromAddrMessage), '. Perhaps they changed their port or are using a strange NAT configuration.'
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers. if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
output = open(shared.appdata + 'knownnodes.dat', 'wb') output = open(shared.appdata + 'knownnodes.dat', 'wb')
@ -1866,29 +1860,29 @@ class receiveDataThread(threading.Thread):
if len(shared.knownNodes[self.streamNumber]) > 0: if len(shared.knownNodes[self.streamNumber]) > 0:
for i in range(500): for i in range(500):
random.seed() random.seed()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
if helper_generic.isHostInPrivateIPRange(HOST): if helper_generic.isHostInPrivateIPRange(peer.host):
continue continue
addrsInMyStream[HOST] = shared.knownNodes[ addrsInMyStream[peer] = shared.knownNodes[
self.streamNumber][HOST] self.streamNumber][peer]
if len(shared.knownNodes[self.streamNumber * 2]) > 0: if len(shared.knownNodes[self.streamNumber * 2]) > 0:
for i in range(250): for i in range(250):
random.seed() random.seed()
HOST, = random.sample(shared.knownNodes[ peer, = random.sample(shared.knownNodes[
self.streamNumber * 2], 1) self.streamNumber * 2], 1)
if helper_generic.isHostInPrivateIPRange(HOST): if helper_generic.isHostInPrivateIPRange(peer.host):
continue continue
addrsInChildStreamLeft[HOST] = shared.knownNodes[ addrsInChildStreamLeft[peer] = shared.knownNodes[
self.streamNumber * 2][HOST] self.streamNumber * 2][peer]
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0: if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
for i in range(250): for i in range(250):
random.seed() random.seed()
HOST, = random.sample(shared.knownNodes[ peer, = random.sample(shared.knownNodes[
(self.streamNumber * 2) + 1], 1) (self.streamNumber * 2) + 1], 1)
if helper_generic.isHostInPrivateIPRange(HOST): if helper_generic.isHostInPrivateIPRange(peer.host):
continue continue
addrsInChildStreamRight[HOST] = shared.knownNodes[ addrsInChildStreamRight[peer] = shared.knownNodes[
(self.streamNumber * 2) + 1][HOST] (self.streamNumber * 2) + 1][peer]
shared.knownNodesLock.release() shared.knownNodesLock.release()
numberOfAddressesInAddrMessage = 0 numberOfAddressesInAddrMessage = 0
payload = '' payload = ''
@ -1905,8 +1899,8 @@ class receiveDataThread(threading.Thread):
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(HOST) socket.inet_aton(HOST)
payload += pack('>H', PORT) # remote port payload += pack('>H', PORT) # remote port
for HOST, value in addrsInChildStreamLeft.items(): for (HOST, PORT), value in addrsInChildStreamLeft.items():
PORT, timeLastReceivedMessageFromThisNode = value timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack( payload += pack(
@ -1917,8 +1911,8 @@ class receiveDataThread(threading.Thread):
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(HOST) socket.inet_aton(HOST)
payload += pack('>H', PORT) # remote port payload += pack('>H', PORT) # remote port
for HOST, value in addrsInChildStreamRight.items(): for (HOST, PORT), value in addrsInChildStreamRight.items():
PORT, timeLastReceivedMessageFromThisNode = value timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack( payload += pack(

View File

@ -31,7 +31,7 @@ class sendDataThread(threading.Thread):
streamNumber, streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware): someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
self.sock = sock self.sock = sock
self.peer = shared.Peer(HOST, PORT) self.peer = shared.Peer(HOST, PORT)
self.streamNumber = streamNumber self.streamNumber = streamNumber
self.remoteProtocolVersion = - \ self.remoteProtocolVersion = - \
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue. 1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue.