Merge pull request #360 from Atheros1/master
Support for storing multiple NATed nodes in knownnodes file.
This commit is contained in:
commit
200189d69f
|
@ -325,7 +325,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
'base64'), 'message': message.encode('base64'), 'encodingType': encodingtype, 'receivedTime': received, 'read': read}, indent=4, separators=(',', ': '))
|
'base64'), 'message': message.encode('base64'), 'encodingType': encodingtype, 'receivedTime': received, 'read': read}, indent=4, separators=(',', ': '))
|
||||||
data += ']}'
|
data += ']}'
|
||||||
return data
|
return data
|
||||||
elif method == 'getAllInboxMessageIds':
|
elif method == 'getAllInboxMessageIds' or method == 'getAllInboxMessageIDs':
|
||||||
shared.sqlLock.acquire()
|
shared.sqlLock.acquire()
|
||||||
shared.sqlSubmitQueue.put(
|
shared.sqlSubmitQueue.put(
|
||||||
'''SELECT msgid FROM inbox where folder='inbox' ORDER BY received''')
|
'''SELECT msgid FROM inbox where folder='inbox' ORDER BY received''')
|
||||||
|
@ -374,7 +374,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
data += json.dumps({'msgid':msgid.encode('hex'), 'toAddress':toAddress, 'fromAddress':fromAddress, 'subject':subject.encode('base64'), 'message':message.encode('base64'), 'encodingType':encodingtype, 'lastActionTime':lastactiontime, 'status':status, 'ackData':ackdata.encode('hex')}, indent=4, separators=(',', ': '))
|
data += json.dumps({'msgid':msgid.encode('hex'), 'toAddress':toAddress, 'fromAddress':fromAddress, 'subject':subject.encode('base64'), 'message':message.encode('base64'), 'encodingType':encodingtype, 'lastActionTime':lastactiontime, 'status':status, 'ackData':ackdata.encode('hex')}, indent=4, separators=(',', ': '))
|
||||||
data += ']}'
|
data += ']}'
|
||||||
return data
|
return data
|
||||||
elif method == 'getAllSentMessageIds':
|
elif method == 'getAllSentMessageIds' or method == 'getAllSentMessageIDs':
|
||||||
shared.sqlLock.acquire()
|
shared.sqlLock.acquire()
|
||||||
shared.sqlSubmitQueue.put('''SELECT msgid FROM sent where folder='sent' ORDER BY lastactiontime''')
|
shared.sqlSubmitQueue.put('''SELECT msgid FROM sent where folder='sent' ORDER BY lastactiontime''')
|
||||||
shared.sqlSubmitQueue.put('')
|
shared.sqlSubmitQueue.put('')
|
||||||
|
|
|
@ -32,15 +32,15 @@ class outgoingSynSender(threading.Thread):
|
||||||
break
|
break
|
||||||
random.seed()
|
random.seed()
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
shared.alreadyAttemptedConnectionsListLock.acquire()
|
shared.alreadyAttemptedConnectionsListLock.acquire()
|
||||||
while HOST in shared.alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList:
|
while peer in shared.alreadyAttemptedConnectionsList or peer.host in shared.connectedHostsList:
|
||||||
shared.alreadyAttemptedConnectionsListLock.release()
|
shared.alreadyAttemptedConnectionsListLock.release()
|
||||||
# print 'choosing new sample'
|
# print 'choosing new sample'
|
||||||
random.seed()
|
random.seed()
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
# Clear out the shared.alreadyAttemptedConnectionsList every half
|
# Clear out the shared.alreadyAttemptedConnectionsList every half
|
||||||
|
@ -51,10 +51,10 @@ class outgoingSynSender(threading.Thread):
|
||||||
shared.alreadyAttemptedConnectionsListResetTime = int(
|
shared.alreadyAttemptedConnectionsListResetTime = int(
|
||||||
time.time())
|
time.time())
|
||||||
shared.alreadyAttemptedConnectionsListLock.acquire()
|
shared.alreadyAttemptedConnectionsListLock.acquire()
|
||||||
shared.alreadyAttemptedConnectionsList[HOST] = 0
|
shared.alreadyAttemptedConnectionsList[peer] = 0
|
||||||
shared.alreadyAttemptedConnectionsListLock.release()
|
shared.alreadyAttemptedConnectionsListLock.release()
|
||||||
PORT, timeNodeLastSeen = shared.knownNodes[
|
timeNodeLastSeen = shared.knownNodes[
|
||||||
self.streamNumber][HOST]
|
self.streamNumber][peer]
|
||||||
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
|
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
# This option apparently avoids the TIME_WAIT state so that we
|
# This option apparently avoids the TIME_WAIT state so that we
|
||||||
# can rebind faster
|
# can rebind faster
|
||||||
|
@ -62,13 +62,13 @@ class outgoingSynSender(threading.Thread):
|
||||||
sock.settimeout(20)
|
sock.settimeout(20)
|
||||||
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
|
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Trying an outgoing connection to', HOST, ':', PORT
|
print 'Trying an outgoing connection to', peer
|
||||||
|
|
||||||
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
|
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
|
||||||
if shared.verbose >= 2:
|
if shared.verbose >= 2:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
|
print '(Using SOCKS4a) Trying an outgoing connection to', peer
|
||||||
|
|
||||||
proxytype = socks.PROXY_TYPE_SOCKS4
|
proxytype = socks.PROXY_TYPE_SOCKS4
|
||||||
sockshostname = shared.config.get(
|
sockshostname = shared.config.get(
|
||||||
|
@ -89,7 +89,7 @@ class outgoingSynSender(threading.Thread):
|
||||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
||||||
if shared.verbose >= 2:
|
if shared.verbose >= 2:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
|
print '(Using SOCKS5) Trying an outgoing connection to', peer
|
||||||
|
|
||||||
proxytype = socks.PROXY_TYPE_SOCKS5
|
proxytype = socks.PROXY_TYPE_SOCKS5
|
||||||
sockshostname = shared.config.get(
|
sockshostname = shared.config.get(
|
||||||
|
@ -109,19 +109,19 @@ class outgoingSynSender(threading.Thread):
|
||||||
proxytype, sockshostname, socksport, rdns)
|
proxytype, sockshostname, socksport, rdns)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sock.connect((HOST, PORT))
|
sock.connect((peer.host, peer.port))
|
||||||
rd = receiveDataThread()
|
rd = receiveDataThread()
|
||||||
rd.daemon = True # close the main program even if there are threads left
|
rd.daemon = True # close the main program even if there are threads left
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
||||||
rd.setup(sock, HOST, PORT, self.streamNumber,
|
rd.setup(sock, peer.host, peer.port, self.streamNumber,
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
||||||
rd.start()
|
rd.start()
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print self, 'connected to', HOST, 'during an outgoing attempt.'
|
print self, 'connected to', peer, 'during an outgoing attempt.'
|
||||||
|
|
||||||
|
|
||||||
sd = sendDataThread()
|
sd = sendDataThread()
|
||||||
sd.setup(sock, HOST, PORT, self.streamNumber,
|
sd.setup(sock, peer.host, peer.port, self.streamNumber,
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
sd.start()
|
sd.start()
|
||||||
sd.sendVersionMessage()
|
sd.sendVersionMessage()
|
||||||
|
@ -129,16 +129,16 @@ class outgoingSynSender(threading.Thread):
|
||||||
except socks.GeneralProxyError as err:
|
except socks.GeneralProxyError as err:
|
||||||
if shared.verbose >= 2:
|
if shared.verbose >= 2:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
print 'Could NOT connect to', peer, 'during outgoing attempt.', err
|
||||||
|
|
||||||
PORT, timeLastSeen = shared.knownNodes[
|
timeLastSeen = shared.knownNodes[
|
||||||
self.streamNumber][HOST]
|
self.streamNumber][peer]
|
||||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
|
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
del shared.knownNodes[self.streamNumber][HOST]
|
del shared.knownNodes[self.streamNumber][peer]
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
|
print 'deleting ', peer, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||||
|
|
||||||
except socks.Socks5AuthError as err:
|
except socks.Socks5AuthError as err:
|
||||||
shared.UISignalQueue.put((
|
shared.UISignalQueue.put((
|
||||||
|
@ -155,16 +155,16 @@ class outgoingSynSender(threading.Thread):
|
||||||
else:
|
else:
|
||||||
if shared.verbose >= 1:
|
if shared.verbose >= 1:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
print 'Could NOT connect to', peer, 'during outgoing attempt.', err
|
||||||
|
|
||||||
PORT, timeLastSeen = shared.knownNodes[
|
timeLastSeen = shared.knownNodes[
|
||||||
self.streamNumber][HOST]
|
self.streamNumber][peer]
|
||||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
|
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
del shared.knownNodes[self.streamNumber][HOST]
|
del shared.knownNodes[self.streamNumber][peer]
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
print 'deleting ', peer, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
|
|
|
@ -42,14 +42,13 @@ class receiveDataThread(threading.Thread):
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
|
||||||
selfInitiatedConnections):
|
selfInitiatedConnections):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.HOST = HOST
|
self.peer = shared.Peer(HOST, port)
|
||||||
self.PORT = port
|
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
|
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
|
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
|
||||||
self.selfInitiatedConnections = selfInitiatedConnections
|
self.selfInitiatedConnections = selfInitiatedConnections
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.HOST] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
self.peer.host] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
||||||
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
||||||
if self.streamNumber == -1: # This was an incoming connection. Send out a version message if we accept the other node's version message.
|
if self.streamNumber == -1: # This was an incoming connection. Send out a version message if we accept the other node's version message.
|
||||||
self.initiatedConnection = False
|
self.initiatedConnection = False
|
||||||
|
@ -70,18 +69,18 @@ class receiveDataThread(threading.Thread):
|
||||||
self.data += self.sock.recv(4096)
|
self.data += self.sock.recv(4096)
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
print 'Timeout occurred waiting for data from', self.peer, '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||||
|
|
||||||
break
|
break
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err
|
print 'sock.recv error. Closing receiveData thread (HOST:', self.peer, 'ID:', str(id(self)) + ').', err
|
||||||
|
|
||||||
break
|
break
|
||||||
# print 'Received', repr(self.data)
|
# print 'Received', repr(self.data)
|
||||||
if len(self.data) == dataLen: # If self.sock.recv returned no data:
|
if len(self.data) == dataLen: # If self.sock.recv returned no data:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
print 'Connection to', self.peer, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
self.processData()
|
self.processData()
|
||||||
|
@ -93,16 +92,16 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
try:
|
try:
|
||||||
del shared.connectedHostsList[self.HOST]
|
del shared.connectedHostsList[self.peer.host]
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
|
print 'Could not delete', self.peer.host, 'from shared.connectedHostsList.', err
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||||
self.HOST]
|
self.peer]
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
|
@ -137,13 +136,12 @@ class receiveDataThread(threading.Thread):
|
||||||
# that other peers can be made aware of its existance.
|
# that other peers can be made aware of its existance.
|
||||||
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[self.streamNumber][
|
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
||||||
self.HOST] = (self.PORT, int(time.time()))
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
|
if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
|
||||||
remoteCommand = self.data[4:16]
|
remoteCommand = self.data[4:16]
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST
|
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.peer
|
||||||
|
|
||||||
if remoteCommand == 'version\x00\x00\x00\x00\x00':
|
if remoteCommand == 'version\x00\x00\x00\x00\x00':
|
||||||
self.recversion(self.data[24:self.payloadLength + 24])
|
self.recversion(self.data[24:self.payloadLength + 24])
|
||||||
|
@ -196,28 +194,28 @@ class receiveDataThread(threading.Thread):
|
||||||
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
|
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||||
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
break
|
break
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||||
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0:
|
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
||||||
|
|
||||||
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len(
|
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.peer] = len(
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
if len(self.ackDataThatWeHaveYetToSend) > 0:
|
if len(self.ackDataThatWeHaveYetToSend) > 0:
|
||||||
self.data = self.ackDataThatWeHaveYetToSend.pop()
|
self.data = self.ackDataThatWeHaveYetToSend.pop()
|
||||||
|
@ -262,22 +260,22 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sock.settimeout(
|
self.sock.settimeout(
|
||||||
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
||||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
remoteNodeIncomingPort, remoteNodeSeenTime = shared.knownNodes[
|
remoteNodeSeenTime = shared.knownNodes[
|
||||||
self.streamNumber][self.HOST]
|
self.streamNumber][self.peer]
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Connection fully established with', self.HOST, remoteNodeIncomingPort
|
print 'Connection fully established with', self.peer
|
||||||
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
||||||
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
||||||
print 'broadcasting addr from within connectionFullyEstablished function.'
|
print 'broadcasting addr from within connectionFullyEstablished function.'
|
||||||
|
|
||||||
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST,
|
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
||||||
remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
|
self.peer.port)]) # This lets all of our peers know about this new node.
|
||||||
self.sendaddr() # This is one large addr message to this one peer.
|
self.sendaddr() # This is one large addr message to this one peer.
|
||||||
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'We are connected to too many people. Closing connection.'
|
print 'We are connected to too many people. Closing connection.'
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
return
|
return
|
||||||
self.sendBigInv()
|
self.sendBigInv()
|
||||||
|
|
||||||
|
@ -1497,7 +1495,7 @@ class receiveDataThread(threading.Thread):
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
||||||
data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
|
data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
|
||||||
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||||
self.HOST] = len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
self.peer] = len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
||||||
|
|
||||||
# Send a getdata message to our peer to request the object with the given
|
# Send a getdata message to our peer to request the object with the given
|
||||||
# hash
|
# hash
|
||||||
|
@ -1679,11 +1677,11 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream] = {}
|
shared.knownNodes[recaddrStream] = {}
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
if hostFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
||||||
|
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
||||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream][hostFromAddrMessage] = (
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
needToWriteKnownNodesToDisk = True
|
needToWriteKnownNodesToDisk = True
|
||||||
hostDetails = (
|
hostDetails = (
|
||||||
|
@ -1692,15 +1690,12 @@ class receiveDataThread(threading.Thread):
|
||||||
listOfAddressDetailsToBroadcastToPeers.append(
|
listOfAddressDetailsToBroadcastToPeers.append(
|
||||||
hostDetails)
|
hostDetails)
|
||||||
else:
|
else:
|
||||||
PORT, timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||||
hostFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream][hostFromAddrMessage] = (
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
PORT, timeSomeoneElseReceivedMessageFromThisNode)
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
if PORT != recaddrPort:
|
|
||||||
print 'Strange occurance: The port specified in an addr message', str(recaddrPort), 'does not match the port', str(PORT), 'that this program (or some other peer) used to connect to it', str(hostFromAddrMessage), '. Perhaps they changed their port or are using a strange NAT configuration.'
|
|
||||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||||
|
@ -1786,14 +1781,15 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream] = {}
|
shared.knownNodes[recaddrStream] = {}
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
if hostFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
||||||
|
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
||||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream][hostFromAddrMessage] = (
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
||||||
recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
|
timeSomeoneElseReceivedMessageFromThisNode)
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = True
|
needToWriteKnownNodesToDisk = True
|
||||||
hostDetails = (
|
hostDetails = (
|
||||||
|
@ -1802,15 +1798,12 @@ class receiveDataThread(threading.Thread):
|
||||||
listOfAddressDetailsToBroadcastToPeers.append(
|
listOfAddressDetailsToBroadcastToPeers.append(
|
||||||
hostDetails)
|
hostDetails)
|
||||||
else:
|
else:
|
||||||
PORT, timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||||
hostFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream][hostFromAddrMessage] = (
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
PORT, timeSomeoneElseReceivedMessageFromThisNode)
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
if PORT != recaddrPort:
|
|
||||||
print 'Strange occurance: The port specified in an addr message', str(recaddrPort), 'does not match the port', str(PORT), 'that this program (or some other peer) used to connect to it', str(hostFromAddrMessage), '. Perhaps they changed their port or are using a strange NAT configuration.'
|
|
||||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||||
|
@ -1867,35 +1860,35 @@ class receiveDataThread(threading.Thread):
|
||||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
if len(shared.knownNodes[self.streamNumber]) > 0:
|
||||||
for i in range(500):
|
for i in range(500):
|
||||||
random.seed()
|
random.seed()
|
||||||
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(HOST):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
addrsInMyStream[HOST] = shared.knownNodes[
|
addrsInMyStream[peer] = shared.knownNodes[
|
||||||
self.streamNumber][HOST]
|
self.streamNumber][peer]
|
||||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
random.seed()
|
random.seed()
|
||||||
HOST, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
self.streamNumber * 2], 1)
|
self.streamNumber * 2], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(HOST):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
addrsInChildStreamLeft[HOST] = shared.knownNodes[
|
addrsInChildStreamLeft[peer] = shared.knownNodes[
|
||||||
self.streamNumber * 2][HOST]
|
self.streamNumber * 2][peer]
|
||||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
random.seed()
|
random.seed()
|
||||||
HOST, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
(self.streamNumber * 2) + 1], 1)
|
(self.streamNumber * 2) + 1], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(HOST):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
addrsInChildStreamRight[HOST] = shared.knownNodes[
|
addrsInChildStreamRight[peer] = shared.knownNodes[
|
||||||
(self.streamNumber * 2) + 1][HOST]
|
(self.streamNumber * 2) + 1][peer]
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
numberOfAddressesInAddrMessage = 0
|
numberOfAddressesInAddrMessage = 0
|
||||||
payload = ''
|
payload = ''
|
||||||
# print 'addrsInMyStream.items()', addrsInMyStream.items()
|
# print 'addrsInMyStream.items()', addrsInMyStream.items()
|
||||||
for HOST, value in addrsInMyStream.items():
|
for (HOST, PORT), value in addrsInMyStream.items():
|
||||||
PORT, timeLastReceivedMessageFromThisNode = value
|
timeLastReceivedMessageFromThisNode = value
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
||||||
numberOfAddressesInAddrMessage += 1
|
numberOfAddressesInAddrMessage += 1
|
||||||
payload += pack(
|
payload += pack(
|
||||||
|
@ -1906,8 +1899,8 @@ class receiveDataThread(threading.Thread):
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
||||||
socket.inet_aton(HOST)
|
socket.inet_aton(HOST)
|
||||||
payload += pack('>H', PORT) # remote port
|
payload += pack('>H', PORT) # remote port
|
||||||
for HOST, value in addrsInChildStreamLeft.items():
|
for (HOST, PORT), value in addrsInChildStreamLeft.items():
|
||||||
PORT, timeLastReceivedMessageFromThisNode = value
|
timeLastReceivedMessageFromThisNode = value
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
||||||
numberOfAddressesInAddrMessage += 1
|
numberOfAddressesInAddrMessage += 1
|
||||||
payload += pack(
|
payload += pack(
|
||||||
|
@ -1918,8 +1911,8 @@ class receiveDataThread(threading.Thread):
|
||||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
||||||
socket.inet_aton(HOST)
|
socket.inet_aton(HOST)
|
||||||
payload += pack('>H', PORT) # remote port
|
payload += pack('>H', PORT) # remote port
|
||||||
for HOST, value in addrsInChildStreamRight.items():
|
for (HOST, PORT), value in addrsInChildStreamRight.items():
|
||||||
PORT, timeLastReceivedMessageFromThisNode = value
|
timeLastReceivedMessageFromThisNode = value
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
||||||
numberOfAddressesInAddrMessage += 1
|
numberOfAddressesInAddrMessage += 1
|
||||||
payload += pack(
|
payload += pack(
|
||||||
|
@ -1956,9 +1949,9 @@ class receiveDataThread(threading.Thread):
|
||||||
elif not self.verackSent:
|
elif not self.verackSent:
|
||||||
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
||||||
if self.remoteProtocolVersion <= 1:
|
if self.remoteProtocolVersion <= 1:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closing connection to old protocol version 1 node: ', self.HOST
|
print 'Closing connection to old protocol version 1 node: ', self.peer
|
||||||
|
|
||||||
return
|
return
|
||||||
# print 'remoteProtocolVersion', self.remoteProtocolVersion
|
# print 'remoteProtocolVersion', self.remoteProtocolVersion
|
||||||
|
@ -1980,30 +1973,29 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
||||||
|
|
||||||
if self.streamNumber != 1:
|
if self.streamNumber != 1:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber, '.'
|
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
|
||||||
|
|
||||||
return
|
return
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.HOST] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
||||||
# If this was an incoming connection, then the sendData thread
|
# If this was an incoming connection, then the sendData thread
|
||||||
# doesn't know the stream. We have to set it.
|
# doesn't know the stream. We have to set it.
|
||||||
if not self.initiatedConnection:
|
if not self.initiatedConnection:
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
0, 'setStreamNumber', (self.HOST, self.streamNumber)))
|
0, 'setStreamNumber', (self.peer, self.streamNumber)))
|
||||||
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closing connection to myself: ', self.HOST
|
print 'Closing connection to myself: ', self.peer
|
||||||
|
|
||||||
return
|
return
|
||||||
shared.broadcastToSendDataQueues((0, 'setRemoteProtocolVersion', (
|
shared.broadcastToSendDataQueues((0, 'setRemoteProtocolVersion', (
|
||||||
self.HOST, self.remoteProtocolVersion)))
|
self.peer, self.remoteProtocolVersion)))
|
||||||
|
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[self.streamNumber][self.HOST] = (
|
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
||||||
self.remoteNodeIncomingPort, int(time.time()))
|
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||||
pickle.dump(shared.knownNodes, output)
|
pickle.dump(shared.knownNodes, output)
|
||||||
output.close()
|
output.close()
|
||||||
|
@ -2020,7 +2012,7 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(shared.assembleVersionMessage(
|
self.sock.sendall(shared.assembleVersionMessage(
|
||||||
self.HOST, self.PORT, self.streamNumber))
|
self.peer.host, self.peer.port, self.streamNumber))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
|
|
|
@ -31,8 +31,7 @@ class sendDataThread(threading.Thread):
|
||||||
streamNumber,
|
streamNumber,
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.HOST = HOST
|
self.peer = shared.Peer(HOST, PORT)
|
||||||
self.PORT = PORT
|
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
self.remoteProtocolVersion = - \
|
self.remoteProtocolVersion = - \
|
||||||
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue.
|
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue.
|
||||||
|
@ -45,7 +44,7 @@ class sendDataThread(threading.Thread):
|
||||||
|
|
||||||
def sendVersionMessage(self):
|
def sendVersionMessage(self):
|
||||||
datatosend = shared.assembleVersionMessage(
|
datatosend = shared.assembleVersionMessage(
|
||||||
self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
|
self.peer.host, self.peer.port, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
|
||||||
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Sending version packet: ', repr(datatosend)
|
print 'Sending version packet: ', repr(datatosend)
|
||||||
|
@ -62,15 +61,12 @@ class sendDataThread(threading.Thread):
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
deststream, command, data = self.mailbox.get()
|
deststream, command, data = self.mailbox.get()
|
||||||
# with shared.printLock:
|
|
||||||
# print 'sendDataThread, destream:', deststream, ', Command:', command, ', ID:',id(self), ', HOST:', self.HOST
|
|
||||||
#
|
|
||||||
|
|
||||||
if deststream == self.streamNumber or deststream == 0:
|
if deststream == self.streamNumber or deststream == 0:
|
||||||
if command == 'shutdown':
|
if command == 'shutdown':
|
||||||
if data == self.HOST or data == 'all':
|
if data == self.peer or data == 'all':
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sendDataThread (associated with', self.HOST, ') ID:', id(self), 'shutting down now.'
|
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
self.sock.shutdown(socket.SHUT_RDWR)
|
||||||
|
@ -89,15 +85,15 @@ class sendDataThread(threading.Thread):
|
||||||
# will continue on with the connection and will set the
|
# will continue on with the connection and will set the
|
||||||
# streamNumber of this send data thread here:
|
# streamNumber of this send data thread here:
|
||||||
elif command == 'setStreamNumber':
|
elif command == 'setStreamNumber':
|
||||||
hostInMessage, specifiedStreamNumber = data
|
peerInMessage, specifiedStreamNumber = data
|
||||||
if hostInMessage == self.HOST:
|
if peerInMessage == self.peer:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', specifiedStreamNumber
|
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', specifiedStreamNumber
|
||||||
|
|
||||||
self.streamNumber = specifiedStreamNumber
|
self.streamNumber = specifiedStreamNumber
|
||||||
elif command == 'setRemoteProtocolVersion':
|
elif command == 'setRemoteProtocolVersion':
|
||||||
hostInMessage, specifiedRemoteProtocolVersion = data
|
peerInMessage, specifiedRemoteProtocolVersion = data
|
||||||
if hostInMessage == self.HOST:
|
if peerInMessage == self.peer:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
||||||
|
|
||||||
|
@ -113,14 +109,14 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock.sendall(data)
|
self.sock.sendall(data)
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'self.sock.sendall failed'
|
print 'sendaddr: self.sock.sendall failed'
|
||||||
try:
|
try:
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
self.sock.shutdown(socket.SHUT_RDWR)
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.mailbox)
|
||||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.HOST
|
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
||||||
break
|
break
|
||||||
elif command == 'sendinv':
|
elif command == 'sendinv':
|
||||||
if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
|
@ -137,21 +133,21 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock.sendall(headerData + payload)
|
self.sock.sendall(headerData + payload)
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'self.sock.sendall failed'
|
print 'sendinv: self.sock.sendall failed'
|
||||||
try:
|
try:
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
self.sock.shutdown(socket.SHUT_RDWR)
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.mailbox)
|
||||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.HOST
|
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
||||||
break
|
break
|
||||||
elif command == 'pong':
|
elif command == 'pong':
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
||||||
if self.lastTimeISentData < (int(time.time()) - 298):
|
if self.lastTimeISentData < (int(time.time()) - 298):
|
||||||
# Send out a pong message to keep the connection alive.
|
# Send out a pong message to keep the connection alive.
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Sending pong to', self.HOST, 'to keep connection alive.'
|
print 'Sending pong to', self.peer, 'to keep connection alive.'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(
|
self.sock.sendall(
|
||||||
|
@ -165,7 +161,7 @@ class sendDataThread(threading.Thread):
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.mailbox)
|
||||||
print 'sendDataThread thread', self, 'ending now. Was connected to', self.HOST
|
print 'sendDataThread thread', self, 'ending now. Was connected to', self.peer
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
|
|
|
@ -5,19 +5,20 @@ import time
|
||||||
import random
|
import random
|
||||||
import sys
|
import sys
|
||||||
from time import strftime, localtime
|
from time import strftime, localtime
|
||||||
|
import shared
|
||||||
|
|
||||||
def createDefaultKnownNodes(appdata):
|
def createDefaultKnownNodes(appdata):
|
||||||
############## Stream 1 ################
|
############## Stream 1 ################
|
||||||
stream1 = {}
|
stream1 = {}
|
||||||
|
|
||||||
stream1['85.171.174.131'] = (8444,int(time.time()))
|
stream1[shared.Peer('85.171.174.131', 8444)] = int(time.time())
|
||||||
stream1['23.28.68.159'] = (8444,int(time.time()))
|
stream1[shared.Peer('23.28.68.159', 8444)] = int(time.time())
|
||||||
stream1['66.108.210.240'] = (8080,int(time.time()))
|
stream1[shared.Peer('66.108.210.240', 8444)] = int(time.time())
|
||||||
stream1['204.236.246.212'] = (8444,int(time.time()))
|
stream1[shared.Peer('204.236.246.212', 8444)] = int(time.time())
|
||||||
stream1['78.81.56.239'] = (8444,int(time.time()))
|
stream1[shared.Peer('78.81.56.239', 8444)] = int(time.time())
|
||||||
stream1['122.60.235.157'] = (8444,int(time.time()))
|
stream1[shared.Peer('122.60.235.157', 8444)] = int(time.time())
|
||||||
stream1['204.236.246.212'] = (8444,int(time.time()))
|
stream1[shared.Peer('204.236.246.212', 8444)] = int(time.time())
|
||||||
stream1['24.98.219.109'] = (8444,int(time.time()))
|
stream1[shared.Peer('24.98.219.109', 8444)] = int(time.time())
|
||||||
|
|
||||||
|
|
||||||
############# Stream 2 #################
|
############# Stream 2 #################
|
||||||
|
@ -48,10 +49,15 @@ def readDefaultKnownNodes(appdata):
|
||||||
pickleFile = open(appdata + 'knownnodes.dat', 'rb')
|
pickleFile = open(appdata + 'knownnodes.dat', 'rb')
|
||||||
knownNodes = pickle.load(pickleFile)
|
knownNodes = pickle.load(pickleFile)
|
||||||
pickleFile.close()
|
pickleFile.close()
|
||||||
knownNodes
|
|
||||||
for stream, storedValue in knownNodes.items():
|
for stream, storedValue in knownNodes.items():
|
||||||
for host,value in storedValue.items():
|
for host,value in storedValue.items():
|
||||||
port, storedtime = storedValue[host]
|
try:
|
||||||
|
# Old knownNodes format.
|
||||||
|
port, storedtime = value
|
||||||
|
except:
|
||||||
|
# New knownNodes format.
|
||||||
|
host, port = host
|
||||||
|
storedtime = value
|
||||||
print host, '\t', port, '\t', unicode(strftime('%a, %d %b %Y %I:%M %p',localtime(storedtime)),'utf-8')
|
print host, '\t', port, '\t', unicode(strftime('%a, %d %b %Y %I:%M %p',localtime(storedtime)),'utf-8')
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -9,8 +9,20 @@ def knownNodes():
|
||||||
# We shouldn't have to use the shared.knownNodesLock because this had
|
# We shouldn't have to use the shared.knownNodesLock because this had
|
||||||
# better be the only thread accessing knownNodes right now.
|
# better be the only thread accessing knownNodes right now.
|
||||||
pickleFile = open(shared.appdata + 'knownnodes.dat', 'rb')
|
pickleFile = open(shared.appdata + 'knownnodes.dat', 'rb')
|
||||||
shared.knownNodes = pickle.load(pickleFile)
|
loadedKnownNodes = pickle.load(pickleFile)
|
||||||
pickleFile.close()
|
pickleFile.close()
|
||||||
|
# The old format of storing knownNodes was as a 'host: (port, time)'
|
||||||
|
# mapping. The new format is as 'Peer: time' pairs. If we loaded
|
||||||
|
# data in the old format, transform it to the new style.
|
||||||
|
for stream, nodes in loadedKnownNodes.items():
|
||||||
|
shared.knownNodes[stream] = {}
|
||||||
|
for node_tuple in nodes.items():
|
||||||
|
try:
|
||||||
|
host, (port, time) = node_tuple
|
||||||
|
peer = shared.Peer(host, port)
|
||||||
|
except:
|
||||||
|
peer, time = node_tuple
|
||||||
|
shared.knownNodes[stream][peer] = time
|
||||||
except:
|
except:
|
||||||
shared.knownNodes = defaultKnownNodes.createDefaultKnownNodes(shared.appdata)
|
shared.knownNodes = defaultKnownNodes.createDefaultKnownNodes(shared.appdata)
|
||||||
if shared.config.getint('bitmessagesettings', 'settingsversion') > 6:
|
if shared.config.getint('bitmessagesettings', 'settingsversion') > 6:
|
||||||
|
@ -28,13 +40,13 @@ def dns():
|
||||||
try:
|
try:
|
||||||
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
|
||||||
print 'Adding', item[4][0], 'to knownNodes based on DNS boostrap method'
|
print 'Adding', item[4][0], 'to knownNodes based on DNS boostrap method'
|
||||||
shared.knownNodes[1][item[4][0]] = (8080, int(time.time()))
|
shared.knownNodes[1][shared.Peer(item[4][0], 8080)] = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'bootstrap8080.bitmessage.org DNS bootstrapping failed.'
|
print 'bootstrap8080.bitmessage.org DNS bootstrapping failed.'
|
||||||
try:
|
try:
|
||||||
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
|
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
|
||||||
print 'Adding', item[4][0], 'to knownNodes based on DNS boostrap method'
|
print 'Adding', item[4][0], 'to knownNodes based on DNS boostrap method'
|
||||||
shared.knownNodes[1][item[4][0]] = (8444, int(time.time()))
|
shared.knownNodes[1][shared.Peer(item[4][0], 8444)] = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'bootstrap8444.bitmessage.org DNS bootstrapping failed.'
|
print 'bootstrap8444.bitmessage.org DNS bootstrapping failed.'
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -9,6 +9,7 @@ useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the
|
||||||
|
|
||||||
|
|
||||||
# Libraries.
|
# Libraries.
|
||||||
|
import collections
|
||||||
import ConfigParser
|
import ConfigParser
|
||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
|
@ -370,5 +371,7 @@ def fixSensitiveFilePermissions(filename, hasEnabledKeys):
|
||||||
logger.exception('Keyfile permissions could not be fixed.')
|
logger.exception('Keyfile permissions could not be fixed.')
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
Peer = collections.namedtuple('Peer', ['host', 'port'])
|
||||||
|
|
||||||
helper_startup.loadConfig()
|
helper_startup.loadConfig()
|
||||||
from debug import logger
|
from debug import logger
|
Reference in New Issue
Block a user