Smarter advertisement of object hashes and peers #494
|
@ -1,6 +1,7 @@
|
||||||
# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used
|
# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used
|
||||||
# by each sendDataThread. The sendDataThread uses it whenever it needs to
|
# by each sendDataThread. The sendDataThread uses it whenever it needs to
|
||||||
# advertise an object to peers. Instead of sending it out immediately, it must
|
# advertise an object to peers in an inv message, or advertise a peer to other
|
||||||
|
# peers in an addr message. Instead of sending them out immediately, it must
|
||||||
# wait a random number of seconds for each connection so that different peers
|
# wait a random number of seconds for each connection so that different peers
|
||||||
# get different objects at different times. Thus an attacker who is
|
# get different objects at different times. Thus an attacker who is
|
||||||
# connecting to many network nodes who receives a message first from Alice
|
# connecting to many network nodes who receives a message first from Alice
|
||||||
|
@ -15,22 +16,30 @@ class objectHashHolder(threading.Thread):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.shutdown = False
|
self.shutdown = False
|
||||||
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
|
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
|
||||||
self.collectionOfLists = {}
|
self.collectionOfHashLists = {}
|
||||||
|
self.collectionOfPeerLists = {}
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
self.collectionOfLists[i] = []
|
self.collectionOfHashLists[i] = []
|
||||||
|
self.collectionOfPeerLists[i] = []
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
iterator = 0
|
iterator = 0
|
||||||
while not self.shutdown:
|
while not self.shutdown:
|
||||||
if len(self.collectionOfLists[iterator]) > 0:
|
if len(self.collectionOfHashLists[iterator]) > 0:
|
||||||
self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfLists[iterator]))
|
self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfHashLists[iterator]))
|
||||||
self.collectionOfLists[iterator] = []
|
self.collectionOfHashLists[iterator] = []
|
||||||
|
if len(self.collectionOfPeerLists[iterator]) > 0:
|
||||||
|
self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator]))
|
||||||
|
self.collectionOfPeerLists[iterator] = []
|
||||||
iterator += 1
|
iterator += 1
|
||||||
iterator %= 10
|
iterator %= 10
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
def holdHash(self,hash):
|
def holdHash(self,hash):
|
||||||
self.collectionOfLists[random.randrange(0, 10)].append(hash)
|
self.collectionOfHashLists[random.randrange(0, 10)].append(hash)
|
||||||
|
|
||||||
|
def holdPeer(self,peerDetails):
|
||||||
|
self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.shutdown = True
|
self.shutdown = True
|
|
@ -5,7 +5,6 @@ import threading
|
||||||
import shared
|
import shared
|
||||||
import hashlib
|
import hashlib
|
||||||
import socket
|
import socket
|
||||||
import pickle
|
|
||||||
import random
|
import random
|
||||||
from struct import unpack, pack
|
from struct import unpack, pack
|
||||||
import sys
|
import sys
|
||||||
|
@ -91,7 +90,6 @@ class receiveDataThread(threading.Thread):
|
||||||
del self.selfInitiatedConnections[self.streamNumber][self]
|
del self.selfInitiatedConnections[self.streamNumber][self]
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||||
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
|
@ -175,7 +173,6 @@ class receiveDataThread(threading.Thread):
|
||||||
if self.data == '':
|
if self.data == '':
|
||||||
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||||
shared.numberOfInventoryLookupsPerformed += 1
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
random.seed()
|
|
||||||
objectHash, = random.sample(
|
objectHash, = random.sample(
|
||||||
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
||||||
if objectHash in shared.inventory:
|
if objectHash in shared.inventory:
|
||||||
|
@ -264,16 +261,18 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sock.settimeout(
|
self.sock.settimeout(
|
||||||
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
||||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
remoteNodeSeenTime = shared.knownNodes[
|
|
||||||
self.streamNumber][self.peer]
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Connection fully established with', self.peer
|
print 'Connection fully established with', self.peer
|
||||||
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
||||||
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
||||||
print 'broadcasting addr from within connectionFullyEstablished function.'
|
print 'broadcasting addr from within connectionFullyEstablished function.'
|
||||||
|
|
||||||
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
#self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
||||||
self.peer.port)]) # This lets all of our peers know about this new node.
|
# self.remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
|
||||||
|
dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort)
|
||||||
|
shared.broadcastToSendDataQueues((
|
||||||
|
self.streamNumber, 'advertisepeer', dataToSend))
|
||||||
|
|
||||||
self.sendaddr() # This is one large addr message to this one peer.
|
self.sendaddr() # This is one large addr message to this one peer.
|
||||||
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
|
@ -1561,7 +1560,7 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
# We have received an addr message.
|
# We have received an addr message.
|
||||||
def recaddr(self, data):
|
def recaddr(self, data):
|
||||||
listOfAddressDetailsToBroadcastToPeers = []
|
#listOfAddressDetailsToBroadcastToPeers = []
|
||||||
numberOfAddressesIncluded = 0
|
numberOfAddressesIncluded = 0
|
||||||
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
||||||
data[:10])
|
data[:10])
|
||||||
|
@ -1570,227 +1569,113 @@ class receiveDataThread(threading.Thread):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
||||||
|
|
||||||
|
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||||
|
return
|
||||||
|
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
||||||
|
print 'addr message does not contain the correct amount of data. Ignoring.'
|
||||||
|
return
|
||||||
|
|
||||||
if self.remoteProtocolVersion == 1:
|
for i in range(0, numberOfAddressesIncluded):
|
||||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
try:
|
||||||
return
|
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
if len(data) != lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded):
|
|
||||||
print 'addr message does not contain the correct amount of data. Ignoring.'
|
|
||||||
return
|
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = False
|
|
||||||
for i in range(0, numberOfAddressesIncluded):
|
|
||||||
try:
|
|
||||||
if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
|
||||||
with shared.printLock:
|
|
||||||
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
|
|
||||||
|
|
||||||
continue
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + (
|
|
||||||
34 * i):8 + lengthOfNumberOfAddresses + (34 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
if recaddrStream == 0:
|
|
||||||
continue
|
continue
|
||||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
except Exception as err:
|
||||||
continue
|
with shared.printLock:
|
||||||
try:
|
sys.stderr.write(
|
||||||
recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + (
|
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
||||||
34 * i):16 + lengthOfNumberOfAddresses + (34 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + (
|
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
||||||
34 * i):34 + lengthOfNumberOfAddresses + (34 * i)])
|
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
if recaddrStream == 0:
|
||||||
# recaddrPort, ', i', i
|
continue
|
||||||
hostFromAddrMessage = socket.inet_ntoa(data[
|
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
||||||
28 + lengthOfNumberOfAddresses + (34 * i):32 + lengthOfNumberOfAddresses + (34 * i)])
|
continue
|
||||||
# print 'hostFromAddrMessage', hostFromAddrMessage
|
try:
|
||||||
if data[28 + lengthOfNumberOfAddresses + (34 * i)] == '\x7F':
|
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
||||||
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
continue
|
except Exception as err:
|
||||||
if helper_generic.isHostInPrivateIPRange(hostFromAddrMessage):
|
with shared.printLock:
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
sys.stderr.write(
|
||||||
continue
|
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
||||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I', data[lengthOfNumberOfAddresses + (
|
|
||||||
34 * i):4 + lengthOfNumberOfAddresses + (34 * i)]) # This is the 'time' value in the received addr message.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
|
||||||
shared.knownNodesLock.acquire()
|
try:
|
||||||
shared.knownNodes[recaddrStream] = {}
|
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
||||||
shared.knownNodesLock.release()
|
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
except Exception as err:
|
||||||
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
with shared.printLock:
|
||||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
sys.stderr.write(
|
||||||
shared.knownNodesLock.acquire()
|
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
|
||||||
shared.knownNodesLock.release()
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
needToWriteKnownNodesToDisk = True
|
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
||||||
hostDetails = (
|
# recaddrPort, ', i', i
|
||||||
timeSomeoneElseReceivedMessageFromThisNode,
|
hostFromAddrMessage = socket.inet_ntoa(data[
|
||||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
listOfAddressDetailsToBroadcastToPeers.append(
|
# print 'hostFromAddrMessage', hostFromAddrMessage
|
||||||
hostDetails)
|
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F':
|
||||||
else:
|
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
||||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
continue
|
||||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A':
|
||||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||||
shared.knownNodesLock.acquire()
|
continue
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8':
|
||||||
shared.knownNodesLock.release()
|
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
continue
|
||||||
|
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
||||||
|
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
||||||
|
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
shared.knownNodes[recaddrStream] = {}
|
||||||
pickle.dump(shared.knownNodes, output)
|
|
||||||
output.close()
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
self.broadcastaddr(
|
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
||||||
listOfAddressDetailsToBroadcastToPeers) # no longer broadcast
|
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
||||||
with shared.printLock:
|
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
|
||||||
|
|
||||||
elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
|
|
||||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
|
||||||
return
|
|
||||||
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
|
||||||
print 'addr message does not contain the correct amount of data. Ignoring.'
|
|
||||||
return
|
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = False
|
|
||||||
for i in range(0, numberOfAddressesIncluded):
|
|
||||||
try:
|
|
||||||
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
|
||||||
with shared.printLock:
|
|
||||||
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
|
|
||||||
continue
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
if recaddrStream == 0:
|
|
||||||
continue
|
|
||||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
|
||||||
# recaddrPort, ', i', i
|
|
||||||
hostFromAddrMessage = socket.inet_ntoa(data[
|
|
||||||
32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
# print 'hostFromAddrMessage', hostFromAddrMessage
|
|
||||||
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F':
|
|
||||||
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
|
||||||
continue
|
|
||||||
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A':
|
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
|
||||||
continue
|
|
||||||
if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8':
|
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
|
||||||
continue
|
|
||||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
|
||||||
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream] = {}
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
||||||
|
timeSomeoneElseReceivedMessageFromThisNode)
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
with shared.printLock:
|
||||||
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
|
||||||
shared.knownNodesLock.acquire()
|
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
|
||||||
timeSomeoneElseReceivedMessageFromThisNode)
|
|
||||||
shared.knownNodesLock.release()
|
|
||||||
with shared.printLock:
|
|
||||||
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = True
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
hostDetails = (
|
hostDetails = (
|
||||||
timeSomeoneElseReceivedMessageFromThisNode,
|
timeSomeoneElseReceivedMessageFromThisNode,
|
||||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
||||||
listOfAddressDetailsToBroadcastToPeers.append(
|
#listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
|
||||||
hostDetails)
|
shared.broadcastToSendDataQueues((
|
||||||
else:
|
self.streamNumber, 'advertisepeer', hostDetails))
|
||||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
else:
|
||||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||||
shared.knownNodesLock.acquire()
|
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
shared.knownNodesLock.release()
|
||||||
shared.knownNodesLock.acquire()
|
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
#if listOfAddressDetailsToBroadcastToPeers != []:
|
||||||
try:
|
# self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
||||||
pickle.dump(shared.knownNodes, output)
|
with shared.printLock:
|
||||||
output.close()
|
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
except Exception as err:
|
|
||||||
if "Errno 28" in str(err):
|
|
||||||
logger.fatal('(while receiveDataThread needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
|
|
||||||
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
|
|
||||||
if shared.daemon:
|
|
||||||
os._exit(0)
|
|
||||||
shared.knownNodesLock.release()
|
|
||||||
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
|
||||||
with shared.printLock:
|
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
|
||||||
|
|
||||||
|
|
||||||
# Function runs when we want to broadcast an addr message to all of our
|
# Function runs when we want to broadcast an addr message to all of our
|
||||||
# peers. Runs when we learn of nodes that we didn't previously know about
|
# peers. Runs when we learn of nodes that we didn't previously know about
|
||||||
# and want to share them with our peers.
|
# and want to share them with our peers.
|
||||||
def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers):
|
"""def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers):
|
||||||
numberOfAddressesInAddrMessage = len(
|
numberOfAddressesInAddrMessage = len(
|
||||||
listOfAddressDetailsToBroadcastToPeers)
|
listOfAddressDetailsToBroadcastToPeers)
|
||||||
payload = ''
|
payload = ''
|
||||||
|
@ -1816,7 +1701,7 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
self.streamNumber, 'sendaddr', datatosend))
|
self.streamNumber, 'sendaddr', datatosend))"""
|
||||||
|
|
||||||
# Send a big addr message to our peer
|
# Send a big addr message to our peer
|
||||||
def sendaddr(self):
|
def sendaddr(self):
|
||||||
|
@ -1831,7 +1716,6 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
if len(shared.knownNodes[self.streamNumber]) > 0:
|
||||||
for i in range(500):
|
for i in range(500):
|
||||||
random.seed()
|
|
||||||
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
|
@ -1839,7 +1723,6 @@ class receiveDataThread(threading.Thread):
|
||||||
self.streamNumber][peer]
|
self.streamNumber][peer]
|
||||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
random.seed()
|
|
||||||
peer, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
self.streamNumber * 2], 1)
|
self.streamNumber * 2], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
|
@ -1848,7 +1731,6 @@ class receiveDataThread(threading.Thread):
|
||||||
self.streamNumber * 2][peer]
|
self.streamNumber * 2][peer]
|
||||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
random.seed()
|
|
||||||
peer, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
(self.streamNumber * 2) + 1], 1)
|
(self.streamNumber * 2) + 1], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
|
@ -1967,10 +1849,8 @@ class receiveDataThread(threading.Thread):
|
||||||
self.peer, self.remoteProtocolVersion)))
|
self.peer, self.remoteProtocolVersion)))
|
||||||
|
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
pickle.dump(shared.knownNodes, output)
|
|
||||||
output.close()
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
|
|
||||||
self.sendverack()
|
self.sendverack()
|
||||||
|
|
|
@ -102,14 +102,31 @@ class sendDataThread(threading.Thread):
|
||||||
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
||||||
|
|
||||||
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
||||||
|
elif command == 'advertisepeer':
|
||||||
|
self.objectHashHolderInstance.holdPeer(data)
|
||||||
elif command == 'sendaddr':
|
elif command == 'sendaddr':
|
||||||
|
numberOfAddressesInAddrMessage = len(
|
||||||
|
data)
|
||||||
|
payload = ''
|
||||||
|
for hostDetails in data:
|
||||||
|
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
||||||
|
payload += pack(
|
||||||
|
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
|
||||||
|
payload += pack('>I', streamNumber)
|
||||||
|
payload += pack(
|
||||||
|
'>q', services) # service bit flags offered by this node
|
||||||
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
||||||
|
socket.inet_aton(host)
|
||||||
|
payload += pack('>H', port)
|
||||||
|
|
||||||
|
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
||||||
|
datatosend = '\xE9\xBE\xB4\xD9addr\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
datatosend = datatosend + pack('>L', len(payload)) # payload length
|
||||||
|
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
||||||
|
datatosend = datatosend + payload
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# To prevent some network analysis, 'leak' the data out
|
self.sock.sendall(datatosend)
|
||||||
# to our peer after waiting a random amount of time
|
|
||||||
# unless we have a long list of messages in our queue
|
|
||||||
# to send.
|
|
||||||
time.sleep(random.randrange(0, 10))
|
|
||||||
self.sock.sendall(data)
|
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'sendaddr: self.sock.sendall failed'
|
print 'sendaddr: self.sock.sendall failed'
|
||||||
|
|
|
@ -2,7 +2,11 @@ import threading
|
||||||
import shared
|
import shared
|
||||||
import time
|
import time
|
||||||
import sys
|
import sys
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
import tr#anslate
|
||||||
from helper_sql import *
|
from helper_sql import *
|
||||||
|
from debug import logger
|
||||||
|
|
||||||
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
|
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
|
||||||
It cleans these data structures in memory:
|
It cleans these data structures in memory:
|
||||||
|
@ -124,4 +128,20 @@ class singleCleaner(threading.Thread):
|
||||||
objectType, streamNumber, payload, receivedTime = storedValue
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
if streamNumber in shared.inventorySets:
|
if streamNumber in shared.inventorySets:
|
||||||
shared.inventorySets[streamNumber].add(hash)
|
shared.inventorySets[streamNumber].add(hash)
|
||||||
|
|
||||||
|
# Let us write out the knowNodes to disk if there is anything new to write out.
|
||||||
|
if shared.needToWriteKnownNodesToDisk:
|
||||||
|
shared.knownNodesLock.acquire()
|
||||||
|
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||||
|
try:
|
||||||
|
pickle.dump(shared.knownNodes, output)
|
||||||
|
output.close()
|
||||||
|
except Exception as err:
|
||||||
|
if "Errno 28" in str(err):
|
||||||
|
logger.fatal('(while receiveDataThread shared.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
|
||||||
|
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
|
||||||
|
if shared.daemon:
|
||||||
|
os._exit(0)
|
||||||
|
shared.knownNodesLock.release()
|
||||||
|
shared.needToWriteKnownNodesToDisk = False
|
||||||
time.sleep(300)
|
time.sleep(300)
|
||||||
|
|
|
@ -69,6 +69,7 @@ numberOfPubkeysProcessed = 0
|
||||||
numberOfInventoryLookupsPerformed = 0
|
numberOfInventoryLookupsPerformed = 0
|
||||||
daemon = False
|
daemon = False
|
||||||
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
||||||
|
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
|
||||||
|
|
||||||
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
||||||
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
||||||
|
|
Reference in New Issue
Block a user