Minor refactoring
This commit is contained in:
parent
0833938ea3
commit
3527983fa6
|
@ -151,9 +151,8 @@ class receiveDataThread(threading.Thread):
|
|||
# just received valid data from it. So update the knownNodes list so
|
||||
# that other peers can be made aware of its existance.
|
||||
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
||||
shared.knownNodesLock.release()
|
||||
with shared.knownNodesLock:
|
||||
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
||||
|
||||
#Strip the nulls
|
||||
command = command.rstrip('\x00')
|
||||
|
@ -488,35 +487,33 @@ class receiveDataThread(threading.Thread):
|
|||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload)))
|
||||
|
||||
|
||||
def _checkIPv4Address(self, host, hostFromAddrMessage):
|
||||
# print 'hostFromAddrMessage', hostFromAddrMessage
|
||||
def _checkIPv4Address(self, host, hostStandardFormat):
|
||||
# print 'hostStandardFormat', hostStandardFormat
|
||||
if host[0] == '\x7F':
|
||||
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
||||
print 'Ignoring IP address in loopback range:', hostStandardFormat
|
||||
return False
|
||||
if host[0] == '\x0A':
|
||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||
print 'Ignoring IP address in private range:', hostStandardFormat
|
||||
return False
|
||||
if host[0:2] == '\xC0\xA8':
|
||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||
print 'Ignoring IP address in private range:', hostStandardFormat
|
||||
return False
|
||||
return True
|
||||
|
||||
def _checkIPv6Address(self, host, hostFromAddrMessage):
|
||||
def _checkIPv6Address(self, host, hostStandardFormat):
|
||||
if host == ('\x00' * 15) + '\x01':
|
||||
print 'Ignoring loopback address:', hostFromAddrMessage
|
||||
print 'Ignoring loopback address:', hostStandardFormat
|
||||
return False
|
||||
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
|
||||
print 'Ignoring local address:', hostFromAddrMessage
|
||||
print 'Ignoring local address:', hostStandardFormat
|
||||
return False
|
||||
if (ord(host[0]) & 0xfe) == 0xfc:
|
||||
print 'Ignoring unique local address:', hostFromAddrMessage
|
||||
print 'Ignoring unique local address:', hostStandardFormat
|
||||
return False
|
||||
return True
|
||||
|
||||
# We have received an addr message.
|
||||
def recaddr(self, data):
|
||||
#listOfAddressDetailsToBroadcastToPeers = []
|
||||
numberOfAddressesIncluded = 0
|
||||
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
||||
data[:10])
|
||||
|
||||
|
@ -531,91 +528,56 @@ class receiveDataThread(threading.Thread):
|
|||
return
|
||||
|
||||
for i in range(0, numberOfAddressesIncluded):
|
||||
try:
|
||||
fullHost = data[20 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)]
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (recaddrHost). Message: %s\n' % str(err))
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
|
||||
try:
|
||||
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
||||
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
fullHost = data[20 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)]
|
||||
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
||||
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
||||
if recaddrStream == 0:
|
||||
continue
|
||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
||||
continue
|
||||
try:
|
||||
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
||||
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
|
||||
try:
|
||||
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
||||
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
||||
# recaddrPort, ', i', i
|
||||
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
||||
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
||||
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
||||
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
||||
if fullHost[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||
ipv4Host = fullHost[12:]
|
||||
hostFromAddrMessage = socket.inet_ntop(socket.AF_INET, ipv4Host)
|
||||
if not self._checkIPv4Address(ipv4Host, hostFromAddrMessage):
|
||||
hostStandardFormat = socket.inet_ntop(socket.AF_INET, ipv4Host)
|
||||
if not self._checkIPv4Address(ipv4Host, hostStandardFormat):
|
||||
continue
|
||||
else:
|
||||
hostFromAddrMessage = socket.inet_ntop(socket.AF_INET6, fullHost)
|
||||
if hostFromAddrMessage == "":
|
||||
hostStandardFormat = socket.inet_ntop(socket.AF_INET6, fullHost)
|
||||
if hostStandardFormat == "":
|
||||
# This can happen on Windows systems which are not 64-bit compatible
|
||||
# so let us drop the IPv6 address.
|
||||
continue
|
||||
if not self._checkIPv6Address(fullHost, hostFromAddrMessage):
|
||||
if not self._checkIPv6Address(fullHost, hostStandardFormat):
|
||||
continue
|
||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
||||
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
||||
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[recaddrStream] = {}
|
||||
shared.knownNodesLock.release()
|
||||
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
||||
with shared.knownNodesLock:
|
||||
shared.knownNodes[recaddrStream] = {}
|
||||
peerFromAddrMessage = shared.Peer(hostStandardFormat, recaddrPort)
|
||||
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
||||
timeSomeoneElseReceivedMessageFromThisNode)
|
||||
shared.knownNodesLock.release()
|
||||
with shared.knownNodesLock:
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
with shared.printLock:
|
||||
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||
|
||||
shared.needToWriteKnownNodesToDisk = True
|
||||
hostDetails = (
|
||||
timeSomeoneElseReceivedMessageFromThisNode,
|
||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
||||
#listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
|
||||
recaddrStream, recaddrServices, hostStandardFormat, recaddrPort)
|
||||
shared.broadcastToSendDataQueues((
|
||||
self.streamNumber, 'advertisepeer', hostDetails))
|
||||
else:
|
||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
shared.knownNodesLock.release()
|
||||
peerFromAddrMessage]
|
||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())+900): # 900 seconds for wiggle-room in case other nodes' clocks aren't quite right.
|
||||
with shared.knownNodesLock:
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
|
||||
#if listOfAddressDetailsToBroadcastToPeers != []:
|
||||
# self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
||||
with shared.printLock:
|
||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||
|
||||
|
@ -633,31 +595,30 @@ class receiveDataThread(threading.Thread):
|
|||
# We are going to share a maximum number of 1000 addrs with our peer.
|
||||
# 500 from this stream, 250 from the left child stream, and 250 from
|
||||
# the right child stream.
|
||||
shared.knownNodesLock.acquire()
|
||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
||||
for i in range(500):
|
||||
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||
if isHostInPrivateIPRange(peer.host):
|
||||
continue
|
||||
addrsInMyStream[peer] = shared.knownNodes[
|
||||
self.streamNumber][peer]
|
||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
||||
for i in range(250):
|
||||
peer, = random.sample(shared.knownNodes[
|
||||
self.streamNumber * 2], 1)
|
||||
if isHostInPrivateIPRange(peer.host):
|
||||
continue
|
||||
addrsInChildStreamLeft[peer] = shared.knownNodes[
|
||||
self.streamNumber * 2][peer]
|
||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
||||
for i in range(250):
|
||||
peer, = random.sample(shared.knownNodes[
|
||||
(self.streamNumber * 2) + 1], 1)
|
||||
if isHostInPrivateIPRange(peer.host):
|
||||
continue
|
||||
addrsInChildStreamRight[peer] = shared.knownNodes[
|
||||
(self.streamNumber * 2) + 1][peer]
|
||||
shared.knownNodesLock.release()
|
||||
with shared.knownNodesLock:
|
||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
||||
for i in range(500):
|
||||
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||
if isHostInPrivateIPRange(peer.host):
|
||||
continue
|
||||
addrsInMyStream[peer] = shared.knownNodes[
|
||||
self.streamNumber][peer]
|
||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
||||
for i in range(250):
|
||||
peer, = random.sample(shared.knownNodes[
|
||||
self.streamNumber * 2], 1)
|
||||
if isHostInPrivateIPRange(peer.host):
|
||||
continue
|
||||
addrsInChildStreamLeft[peer] = shared.knownNodes[
|
||||
self.streamNumber * 2][peer]
|
||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
||||
for i in range(250):
|
||||
peer, = random.sample(shared.knownNodes[
|
||||
(self.streamNumber * 2) + 1], 1)
|
||||
if isHostInPrivateIPRange(peer.host):
|
||||
continue
|
||||
addrsInChildStreamRight[peer] = shared.knownNodes[
|
||||
(self.streamNumber * 2) + 1][peer]
|
||||
numberOfAddressesInAddrMessage = 0
|
||||
payload = ''
|
||||
# print 'addrsInMyStream.items()', addrsInMyStream.items()
|
||||
|
@ -771,10 +732,9 @@ class receiveDataThread(threading.Thread):
|
|||
# in this version message. Let us inform the sendDataThread.
|
||||
self.sendDataThreadQueue.put((0, 'setRemoteProtocolVersion', self.remoteProtocolVersion))
|
||||
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||
shared.needToWriteKnownNodesToDisk = True
|
||||
shared.knownNodesLock.release()
|
||||
with shared.knownNodesLock:
|
||||
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||
shared.needToWriteKnownNodesToDisk = True
|
||||
|
||||
self.sendverack()
|
||||
if self.initiatedConnection == False:
|
||||
|
|
Reference in New Issue
Block a user