From a9b15f83ba8b4ce9b446eb9b3de2a4570b245031 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Fri, 6 Sep 2013 18:55:12 -0400 Subject: [PATCH 1/5] initial testing inv refactorization --- src/bitmessagemain.py | 11 +++++++++-- src/class_receiveDataThread.py | 18 +++++++----------- src/class_sendDataThread.py | 25 +++++++++++++++++-------- src/class_singleWorker.py | 15 ++++++++++----- src/shared.py | 1 + 5 files changed, 44 insertions(+), 26 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index a5a4b320..9cbe4471 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -46,6 +46,11 @@ if sys.platform == 'darwin': def connectToStream(streamNumber): selfInitiatedConnections[streamNumber] = {} + shared.inventorySets[streamNumber] = set() + queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber) + for row in queryData: + shared.inventorySets[streamNumber].add(row[0]) + if sys.platform[0:3] == 'win': maximumNumberOfHalfOpenConnections = 9 else: @@ -705,10 +710,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType = 'msg' shared.inventory[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, int(time.time())) + shared.inventorySets[toStreamNumber].add(inventoryHash) with shared.printLock: print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - toStreamNumber, 'sendinv', inventoryHash)) + toStreamNumber, 'advertiseobject', inventoryHash)) elif method == 'disseminatePubkey': # The device issuing this command to PyBitmessage supplies a pubkey object to be # disseminated to the rest of the Bitmessage network. PyBitmessage accepts this @@ -741,10 +747,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, pubkeyStreamNumber, payload, int(time.time())) + shared.inventorySets[pubkeyStreamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) elif method == 'getMessageDataByDestinationHash': # Method will eventually be used by a particular Android app to # select relevant messages. Do not yet add this to the api diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 914d3f84..a8b38bc4 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -392,6 +392,7 @@ class receiveDataThread(threading.Thread): objectType = 'broadcast' shared.inventory[self.inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(self.inventoryHash) shared.inventoryLock.release() self.broadcastinv(self.inventoryHash) shared.numberOfBroadcastsProcessed += 1 @@ -755,6 +756,7 @@ class receiveDataThread(threading.Thread): objectType = 'msg' shared.inventory[self.inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(self.inventoryHash) shared.inventoryLock.release() self.broadcastinv(self.inventoryHash) shared.numberOfMessagesProcessed += 1 @@ -1153,6 +1155,7 @@ class receiveDataThread(threading.Thread): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(inventoryHash) shared.inventoryLock.release() self.broadcastinv(inventoryHash) shared.numberOfPubkeysProcessed += 1 @@ -1348,6 +1351,7 @@ class receiveDataThread(threading.Thread): objectType = 'getpubkey' shared.inventory[inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(inventoryHash) shared.inventoryLock.release() # This getpubkey request is valid so far. Forward to peers. self.broadcastinv(inventoryHash) @@ -1442,18 +1446,10 @@ class receiveDataThread(threading.Thread): # 'set' of objects we are aware of and a set of objects in this inv # message so that we can diff one from the other cheaply. startTime = time.time() - currentInventoryList = set() - queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', - self.streamNumber) - for row in queryData: - currentInventoryList.add(row[0]) - with shared.inventoryLock: - for objectHash, value in shared.inventory.items(): - currentInventoryList.add(objectHash) advertisedSet = set() for i in range(numberOfItemsInInv): advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) - objectsNewToMe = advertisedSet - currentInventoryList + objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber] logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) for item in objectsNewToMe: if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation @@ -1552,12 +1548,12 @@ class receiveDataThread(threading.Thread): print 'sock.sendall error:', err - # Send an inv message with just one hash to all of our peers + # Advertise this object to all of our peers def broadcastinv(self, hash): with shared.printLock: print 'broadcasting inv with hash:', hash.encode('hex') - shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash)) + shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash)) # We have received an addr message. def recaddr(self, data): diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 867d1d70..0c8bb580 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -8,7 +8,8 @@ import random import sys import socket -#import bitmessagemain +from class_objectHashHolder import * +from addresses import * # Every connection to a peer has a sendDataThread (and also a # receiveDataThread). @@ -22,6 +23,9 @@ class sendDataThread(threading.Thread): print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues) self.data = '' + self.objectHashHolderInstance = objectHashHolder(self.mailbox) + self.objectHashHolderInstance.start() + def setup( self, @@ -118,17 +122,20 @@ class sendDataThread(threading.Thread): shared.sendDataQueues.remove(self.mailbox) print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer break + elif command == 'advertiseobject': + self.objectHashHolderInstance.holdHash(data) elif command == 'sendinv': - if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - payload = '\x01' + data + payload = '' + for hash in data: + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: + payload += hash + if payload != '': + print 'within sendinv, payload contains', len(payload)/32, 'hashes.' + payload = encodeVarint(len(payload)/32) + payload headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' headerData += pack('>L', len(payload)) headerData += hashlib.sha512(payload).digest()[:4] - # To prevent some network analysis, 'leak' the data out - # to our peer after waiting a random amount of time - random.seed() - time.sleep(random.randrange(0, 10)) try: self.sock.sendall(headerData + payload) self.lastTimeISentData = int(time.time()) @@ -142,6 +149,8 @@ class sendDataThread(threading.Thread): shared.sendDataQueues.remove(self.mailbox) print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer break + else: + print '(within sendinv) payload was empty. Not sending anything' #testing. elif command == 'pong': self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. if self.lastTimeISentData < (int(time.time()) - 298): @@ -167,4 +176,4 @@ class sendDataThread(threading.Thread): with shared.printLock: print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream - + self.objectHashHolderInstance.close() diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index d3c0c784..d8990ede 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -151,12 +151,13 @@ class singleWorker(threading.Thread): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime) + shared.inventorySets[streamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv with hash:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) shared.UISignalQueue.put(('updateStatusBar', '')) shared.config.set( myAddress, 'lastpubkeysendtime', str(int(time.time()))) @@ -224,12 +225,13 @@ class singleWorker(threading.Thread): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime) + shared.inventorySets[streamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv with hash:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) shared.UISignalQueue.put(('updateStatusBar', '')) # If this is a chan address then we won't send out the pubkey over the # network but rather will only store it in our pubkeys table so that @@ -327,10 +329,11 @@ class singleWorker(threading.Thread): objectType = 'broadcast' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, int(time.time())) + shared.inventorySets[streamNumber].add(inventoryHash) with shared.printLock: print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) @@ -650,6 +653,7 @@ class singleWorker(threading.Thread): objectType = 'msg' shared.inventory[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, int(time.time())) + shared.inventorySets[toStreamNumber].add(inventoryHash) if shared.safeConfigGetBoolean(toaddress, 'chan'): shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) @@ -659,7 +663,7 @@ class singleWorker(threading.Thread): strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) # Update the status of the message in the 'sent' table to have a # 'msgsent' status or 'msgsentnoackexpected' status. @@ -706,9 +710,10 @@ class singleWorker(threading.Thread): objectType = 'getpubkey' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, int(time.time())) + shared.inventorySets[streamNumber].add(inventoryHash) print 'sending inv (for the getpubkey message)' shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) sqlExecute( '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''', diff --git a/src/shared.py b/src/shared.py index e7718959..9339987a 100644 --- a/src/shared.py +++ b/src/shared.py @@ -68,6 +68,7 @@ numberOfBroadcastsProcessed = 0 numberOfPubkeysProcessed = 0 numberOfInventoryLookupsPerformed = 0 daemon = False +inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work. From 2725281a6db147a627c74a98a36bb58e5803f1ea Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Fri, 6 Sep 2013 18:58:56 -0400 Subject: [PATCH 2/5] initial testing inv refactorization --- src/class_objectHashHolder.py | 39 +++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 src/class_objectHashHolder.py diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py new file mode 100644 index 00000000..cc636a8d --- /dev/null +++ b/src/class_objectHashHolder.py @@ -0,0 +1,39 @@ +# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used +# by each sendDataThread. It uses it whenever a sendDataThread needs to +# advertise an object to peers. Instead of sending it out immediately, it must +# wait a random number of seconds for each connection so that different peers +# get different objects at different times. Thus an attacker who is +# connecting to many network nodes who receives a message first from Alice +# cannot be sure if Alice is the node who originated the message. + +import random +import time +import threading + +class objectHashHolder(threading.Thread): + def __init__(self, sendDataThreadMailbox): + threading.Thread.__init__(self) + self.shutdown = False + self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread. + self.collectionOfLists = {} + for i in range(10): + self.collectionOfLists[i] = [] + + def run(self): + print 'objectHashHolder running.' + iterator = 0 + while not self.shutdown: + if len(self.collectionOfLists[iterator]) > 0: + print 'objectHashHolder is submitting', len(self.collectionOfLists[iterator]), 'items to the queue.' + self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfLists[iterator])) + self.collectionOfLists[iterator] = [] + iterator += 1 + iterator %= 10 + time.sleep(1) + print 'objectHashHolder shutting down.' + + def holdHash(self,hash): + self.collectionOfLists[random.randrange(0, 10)].append(hash) + + def close(self): + self.shutdown = True \ No newline at end of file From 831edf0d248ea050b59565550f9df69fc65c8fe0 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Fri, 6 Sep 2013 21:47:54 -0400 Subject: [PATCH 3/5] completed inv refactorization --- src/class_objectHashHolder.py | 3 --- src/class_sendDataThread.py | 4 ---- src/class_singleCleaner.py | 9 +++++++++ src/shared.py | 2 +- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py index cc636a8d..9c392765 100644 --- a/src/class_objectHashHolder.py +++ b/src/class_objectHashHolder.py @@ -20,17 +20,14 @@ class objectHashHolder(threading.Thread): self.collectionOfLists[i] = [] def run(self): - print 'objectHashHolder running.' iterator = 0 while not self.shutdown: if len(self.collectionOfLists[iterator]) > 0: - print 'objectHashHolder is submitting', len(self.collectionOfLists[iterator]), 'items to the queue.' self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfLists[iterator])) self.collectionOfLists[iterator] = [] iterator += 1 iterator %= 10 time.sleep(1) - print 'objectHashHolder shutting down.' def holdHash(self,hash): self.collectionOfLists[random.randrange(0, 10)].append(hash) diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 0c8bb580..b939b06b 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -108,7 +108,6 @@ class sendDataThread(threading.Thread): # to our peer after waiting a random amount of time # unless we have a long list of messages in our queue # to send. - random.seed() time.sleep(random.randrange(0, 10)) self.sock.sendall(data) self.lastTimeISentData = int(time.time()) @@ -130,7 +129,6 @@ class sendDataThread(threading.Thread): if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: payload += hash if payload != '': - print 'within sendinv, payload contains', len(payload)/32, 'hashes.' payload = encodeVarint(len(payload)/32) + payload headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' @@ -149,8 +147,6 @@ class sendDataThread(threading.Thread): shared.sendDataQueues.remove(self.mailbox) print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer break - else: - print '(within sendinv) payload was empty. Not sending anything' #testing. elif command == 'pong': self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. if self.lastTimeISentData < (int(time.time()) - 298): diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 3cc80868..07d56424 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -7,6 +7,7 @@ from helper_sql import * '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. It cleans these data structures in memory: inventory (moves data to the on-disk sql database) + inventorySets (clears then reloads data out of sql database) It cleans these tables on the disk: inventory (clears data more than 2 days and 12 hours old) @@ -109,4 +110,12 @@ class singleCleaner(threading.Thread): shared.workerQueue.put(('sendmessage', '')) shared.UISignalQueue.put(( 'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) + + # Let's also clear and reload shared.inventorySets to keep it from + # taking up an unnecessary amount of memory. + for streamNumber in shared.inventorySets: + shared.inventorySets[streamNumber] = set() + queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber) + for row in queryData: + shared.inventorySets[streamNumber].add(row[0]) time.sleep(300) diff --git a/src/shared.py b/src/shared.py index 9339987a..0c0173a3 100644 --- a/src/shared.py +++ b/src/shared.py @@ -304,7 +304,7 @@ def doCleanShutdown(): def broadcastToSendDataQueues(data): # logger.debug('running broadcastToSendDataQueues') for q in sendDataQueues: - q.put((data)) + q.put(data) def flushInventory(): #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. From f0bf3aad482b53b752212cabc20514529d1f70ff Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Sat, 7 Sep 2013 18:23:20 -0400 Subject: [PATCH 4/5] use locks when accessing dictionary inventory --- src/class_objectHashHolder.py | 2 +- src/class_receiveDataThread.py | 14 +++++++++----- src/class_singleCleaner.py | 32 +++++++++++++++++++------------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py index 9c392765..90df7fd9 100644 --- a/src/class_objectHashHolder.py +++ b/src/class_objectHashHolder.py @@ -1,5 +1,5 @@ # objectHashHolder is a timer-driven thread. One objectHashHolder thread is used -# by each sendDataThread. It uses it whenever a sendDataThread needs to +# by each sendDataThread. The sendDataThread uses it whenever it needs to # advertise an object to peers. Instead of sending it out immediately, it must # wait a random number of seconds for each connection so that different peers # get different objects at different times. Thus an attacker who is diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index a8b38bc4..e751cef0 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -298,11 +298,12 @@ class receiveDataThread(threading.Thread): bigInvList[hash] = 0 # We also have messages in our inventory in memory (which is a python # dictionary). Let's fetch those too. - for hash, storedValue in shared.inventory.items(): - if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - objectType, streamNumber, payload, receivedTime = storedValue - if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers: - bigInvList[hash] = 0 + with shared.inventoryLock: + for hash, storedValue in shared.inventory.items(): + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: + objectType, streamNumber, payload, receivedTime = storedValue + if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers: + bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' # Now let us start appending all of these hashes together. They will be @@ -1496,11 +1497,14 @@ class receiveDataThread(threading.Thread): print 'received getdata request for item:', hash.encode('hex') shared.numberOfInventoryLookupsPerformed += 1 + shared.inventoryLock.acquire() if hash in shared.inventory: objectType, streamNumber, payload, receivedTime = shared.inventory[ hash] + shared.inventoryLock.release() self.sendData(objectType, payload) else: + shared.inventoryLock.release() queryreturn = sqlQuery( '''select objecttype, payload from inventory where hash=?''', hash) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 07d56424..653a2461 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -32,19 +32,20 @@ class singleCleaner(threading.Thread): shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - with SqlBulkExecute() as sql: - for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, receivedTime = storedValue - if int(time.time()) - 3600 > receivedTime: - sql.execute( - '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash, - objectType, - streamNumber, - payload, - receivedTime, - '') - del shared.inventory[hash] + with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. + with SqlBulkExecute() as sql: + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + if int(time.time()) - 3600 > receivedTime: + sql.execute( + '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', + hash, + objectType, + streamNumber, + payload, + receivedTime, + '') + del shared.inventory[hash] shared.UISignalQueue.put(('updateStatusBar', '')) shared.broadcastToSendDataQueues(( 0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. @@ -118,4 +119,9 @@ class singleCleaner(threading.Thread): queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber) for row in queryData: shared.inventorySets[streamNumber].add(row[0]) + with shared.inventoryLock: + for hash, storedValue in shared.inventory.items(): + objectType, streamNumber, payload, receivedTime = storedValue + if streamNumber in shared.inventorySets: + shared.inventorySets[streamNumber].add(hash) time.sleep(300) From 90e60d814552c3b26e97a3df8e309dc339210a9a Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Mon, 9 Sep 2013 19:26:32 -0400 Subject: [PATCH 5/5] delay addr messages random number of seconds --- src/class_objectHashHolder.py | 23 ++- src/class_receiveDataThread.py | 320 +++++++++++---------------------- src/class_sendDataThread.py | 29 ++- src/class_singleCleaner.py | 20 +++ src/shared.py | 1 + 5 files changed, 160 insertions(+), 233 deletions(-) diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py index 90df7fd9..c91b1c23 100644 --- a/src/class_objectHashHolder.py +++ b/src/class_objectHashHolder.py @@ -1,6 +1,7 @@ # objectHashHolder is a timer-driven thread. One objectHashHolder thread is used # by each sendDataThread. The sendDataThread uses it whenever it needs to -# advertise an object to peers. Instead of sending it out immediately, it must +# advertise an object to peers in an inv message, or advertise a peer to other +# peers in an addr message. Instead of sending them out immediately, it must # wait a random number of seconds for each connection so that different peers # get different objects at different times. Thus an attacker who is # connecting to many network nodes who receives a message first from Alice @@ -15,22 +16,30 @@ class objectHashHolder(threading.Thread): threading.Thread.__init__(self) self.shutdown = False self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread. - self.collectionOfLists = {} + self.collectionOfHashLists = {} + self.collectionOfPeerLists = {} for i in range(10): - self.collectionOfLists[i] = [] + self.collectionOfHashLists[i] = [] + self.collectionOfPeerLists[i] = [] def run(self): iterator = 0 while not self.shutdown: - if len(self.collectionOfLists[iterator]) > 0: - self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfLists[iterator])) - self.collectionOfLists[iterator] = [] + if len(self.collectionOfHashLists[iterator]) > 0: + self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfHashLists[iterator])) + self.collectionOfHashLists[iterator] = [] + if len(self.collectionOfPeerLists[iterator]) > 0: + self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator])) + self.collectionOfPeerLists[iterator] = [] iterator += 1 iterator %= 10 time.sleep(1) def holdHash(self,hash): - self.collectionOfLists[random.randrange(0, 10)].append(hash) + self.collectionOfHashLists[random.randrange(0, 10)].append(hash) + + def holdPeer(self,peerDetails): + self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails) def close(self): self.shutdown = True \ No newline at end of file diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index e751cef0..643185fa 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -5,7 +5,6 @@ import threading import shared import hashlib import socket -import pickle import random from struct import unpack, pack import sys @@ -91,7 +90,6 @@ class receiveDataThread(threading.Thread): del self.selfInitiatedConnections[self.streamNumber][self] with shared.printLock: print 'removed self (a receiveDataThread) from selfInitiatedConnections' - except: pass shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) @@ -175,7 +173,6 @@ class receiveDataThread(threading.Thread): if self.data == '': while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: shared.numberOfInventoryLookupsPerformed += 1 - random.seed() objectHash, = random.sample( self.objectsThatWeHaveYetToGetFromThisPeer, 1) if objectHash in shared.inventory: @@ -264,16 +261,18 @@ class receiveDataThread(threading.Thread): self.sock.settimeout( 600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately. shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) - remoteNodeSeenTime = shared.knownNodes[ - self.streamNumber][self.peer] with shared.printLock: print 'Connection fully established with', self.peer print 'The size of the connectedHostsList is now', len(shared.connectedHostsList) print 'The length of sendDataQueues is now:', len(shared.sendDataQueues) print 'broadcasting addr from within connectionFullyEstablished function.' - self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host, - self.peer.port)]) # This lets all of our peers know about this new node. + #self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host, + # self.remoteNodeIncomingPort)]) # This lets all of our peers know about this new node. + dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort) + shared.broadcastToSendDataQueues(( + self.streamNumber, 'advertisepeer', dataToSend)) + self.sendaddr() # This is one large addr message to this one peer. if not self.initiatedConnection and len(shared.connectedHostsList) > 200: with shared.printLock: @@ -1561,7 +1560,7 @@ class receiveDataThread(threading.Thread): # We have received an addr message. def recaddr(self, data): - listOfAddressDetailsToBroadcastToPeers = [] + #listOfAddressDetailsToBroadcastToPeers = [] numberOfAddressesIncluded = 0 numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint( data[:10]) @@ -1570,227 +1569,113 @@ class receiveDataThread(threading.Thread): with shared.printLock: print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' + if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: + return + if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded): + print 'addr message does not contain the correct amount of data. Ignoring.' + return - if self.remoteProtocolVersion == 1: - if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: - return - if len(data) != lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded): - print 'addr message does not contain the correct amount of data. Ignoring.' - return - - needToWriteKnownNodesToDisk = False - for i in range(0, numberOfAddressesIncluded): - try: - if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': - with shared.printLock: - print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)]) - - continue - except Exception as err: + for i in range(0, numberOfAddressesIncluded): + try: + if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) + print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)]) - break # giving up on unpacking any more. We should still be connected however. - - try: - recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + ( - 34 * i):8 + lengthOfNumberOfAddresses + (34 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) - - break # giving up on unpacking any more. We should still be connected however. - if recaddrStream == 0: continue - if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business. - continue - try: - recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + ( - 34 * i):16 + lengthOfNumberOfAddresses + (34 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) + except Exception as err: + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) - break # giving up on unpacking any more. We should still be connected however. + break # giving up on unpacking any more. We should still be connected however. - try: - recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + ( - 34 * i):34 + lengthOfNumberOfAddresses + (34 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) + try: + recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + ( + 38 * i):12 + lengthOfNumberOfAddresses + (38 * i)]) + except Exception as err: + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) - break # giving up on unpacking any more. We should still be connected however. - # print 'Within recaddr(): IP', recaddrIP, ', Port', - # recaddrPort, ', i', i - hostFromAddrMessage = socket.inet_ntoa(data[ - 28 + lengthOfNumberOfAddresses + (34 * i):32 + lengthOfNumberOfAddresses + (34 * i)]) - # print 'hostFromAddrMessage', hostFromAddrMessage - if data[28 + lengthOfNumberOfAddresses + (34 * i)] == '\x7F': - print 'Ignoring IP address in loopback range:', hostFromAddrMessage - continue - if helper_generic.isHostInPrivateIPRange(hostFromAddrMessage): - print 'Ignoring IP address in private range:', hostFromAddrMessage - continue - timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I', data[lengthOfNumberOfAddresses + ( - 34 * i):4 + lengthOfNumberOfAddresses + (34 * i)]) # This is the 'time' value in the received addr message. - if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream] = {} - shared.knownNodesLock.release() - peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort) - if peerFromAddrMessage not in shared.knownNodes[recaddrStream]: - if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode - shared.knownNodesLock.release() - needToWriteKnownNodesToDisk = True - hostDetails = ( - timeSomeoneElseReceivedMessageFromThisNode, - recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) - listOfAddressDetailsToBroadcastToPeers.append( - hostDetails) - else: - timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ - peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. - if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode - shared.knownNodesLock.release() - if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers. + break # giving up on unpacking any more. We should still be connected however. + if recaddrStream == 0: + continue + if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business. + continue + try: + recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + ( + 38 * i):20 + lengthOfNumberOfAddresses + (38 * i)]) + except Exception as err: + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) + + break # giving up on unpacking any more. We should still be connected however. + + try: + recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + ( + 38 * i):38 + lengthOfNumberOfAddresses + (38 * i)]) + except Exception as err: + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) + + break # giving up on unpacking any more. We should still be connected however. + # print 'Within recaddr(): IP', recaddrIP, ', Port', + # recaddrPort, ', i', i + hostFromAddrMessage = socket.inet_ntoa(data[ + 32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)]) + # print 'hostFromAddrMessage', hostFromAddrMessage + if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F': + print 'Ignoring IP address in loopback range:', hostFromAddrMessage + continue + if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A': + print 'Ignoring IP address in private range:', hostFromAddrMessage + continue + if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8': + print 'Ignoring IP address in private range:', hostFromAddrMessage + continue + timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + ( + 38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit. + if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. shared.knownNodesLock.acquire() - output = open(shared.appdata + 'knownnodes.dat', 'wb') - pickle.dump(shared.knownNodes, output) - output.close() + shared.knownNodes[recaddrStream] = {} shared.knownNodesLock.release() - self.broadcastaddr( - listOfAddressDetailsToBroadcastToPeers) # no longer broadcast - with shared.printLock: - print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' - - elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times. - if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: - return - if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded): - print 'addr message does not contain the correct amount of data. Ignoring.' - return - - needToWriteKnownNodesToDisk = False - for i in range(0, numberOfAddressesIncluded): - try: - if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': - with shared.printLock: - print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)]) - - continue - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) - - break # giving up on unpacking any more. We should still be connected however. - - try: - recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + ( - 38 * i):12 + lengthOfNumberOfAddresses + (38 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) - - break # giving up on unpacking any more. We should still be connected however. - if recaddrStream == 0: - continue - if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business. - continue - try: - recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + ( - 38 * i):20 + lengthOfNumberOfAddresses + (38 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) - - break # giving up on unpacking any more. We should still be connected however. - - try: - recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + ( - 38 * i):38 + lengthOfNumberOfAddresses + (38 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) - - break # giving up on unpacking any more. We should still be connected however. - # print 'Within recaddr(): IP', recaddrIP, ', Port', - # recaddrPort, ', i', i - hostFromAddrMessage = socket.inet_ntoa(data[ - 32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)]) - # print 'hostFromAddrMessage', hostFromAddrMessage - if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F': - print 'Ignoring IP address in loopback range:', hostFromAddrMessage - continue - if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A': - print 'Ignoring IP address in private range:', hostFromAddrMessage - continue - if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8': - print 'Ignoring IP address in private range:', hostFromAddrMessage - continue - timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + ( - 38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit. - if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. + peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort) + if peerFromAddrMessage not in shared.knownNodes[recaddrStream]: + if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream] = {} + shared.knownNodes[recaddrStream][peerFromAddrMessage] = ( + timeSomeoneElseReceivedMessageFromThisNode) shared.knownNodesLock.release() - peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort) - if peerFromAddrMessage not in shared.knownNodes[recaddrStream]: - if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][peerFromAddrMessage] = ( - timeSomeoneElseReceivedMessageFromThisNode) - shared.knownNodesLock.release() - with shared.printLock: - print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream + with shared.printLock: + print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream - needToWriteKnownNodesToDisk = True - hostDetails = ( - timeSomeoneElseReceivedMessageFromThisNode, - recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) - listOfAddressDetailsToBroadcastToPeers.append( - hostDetails) - else: - timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ - peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. - if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode - shared.knownNodesLock.release() - if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers. - shared.knownNodesLock.acquire() - output = open(shared.appdata + 'knownnodes.dat', 'wb') - try: - pickle.dump(shared.knownNodes, output) - output.close() - except Exception as err: - if "Errno 28" in str(err): - logger.fatal('(while receiveDataThread needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ') - shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True))) - if shared.daemon: - os._exit(0) - shared.knownNodesLock.release() - self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) - with shared.printLock: - print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' + shared.needToWriteKnownNodesToDisk = True + hostDetails = ( + timeSomeoneElseReceivedMessageFromThisNode, + recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) + #listOfAddressDetailsToBroadcastToPeers.append(hostDetails) + shared.broadcastToSendDataQueues(( + self.streamNumber, 'advertisepeer', hostDetails)) + else: + timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ + peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. + if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): + shared.knownNodesLock.acquire() + shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode + shared.knownNodesLock.release() + + #if listOfAddressDetailsToBroadcastToPeers != []: + # self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) + with shared.printLock: + print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' # Function runs when we want to broadcast an addr message to all of our # peers. Runs when we learn of nodes that we didn't previously know about # and want to share them with our peers. - def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers): + """def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers): numberOfAddressesInAddrMessage = len( listOfAddressDetailsToBroadcastToPeers) payload = '' @@ -1816,7 +1701,7 @@ class receiveDataThread(threading.Thread): print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' shared.broadcastToSendDataQueues(( - self.streamNumber, 'sendaddr', datatosend)) + self.streamNumber, 'sendaddr', datatosend))""" # Send a big addr message to our peer def sendaddr(self): @@ -1831,7 +1716,6 @@ class receiveDataThread(threading.Thread): shared.knownNodesLock.acquire() if len(shared.knownNodes[self.streamNumber]) > 0: for i in range(500): - random.seed() peer, = random.sample(shared.knownNodes[self.streamNumber], 1) if helper_generic.isHostInPrivateIPRange(peer.host): continue @@ -1839,7 +1723,6 @@ class receiveDataThread(threading.Thread): self.streamNumber][peer] if len(shared.knownNodes[self.streamNumber * 2]) > 0: for i in range(250): - random.seed() peer, = random.sample(shared.knownNodes[ self.streamNumber * 2], 1) if helper_generic.isHostInPrivateIPRange(peer.host): @@ -1848,7 +1731,6 @@ class receiveDataThread(threading.Thread): self.streamNumber * 2][peer] if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0: for i in range(250): - random.seed() peer, = random.sample(shared.knownNodes[ (self.streamNumber * 2) + 1], 1) if helper_generic.isHostInPrivateIPRange(peer.host): @@ -1967,10 +1849,8 @@ class receiveDataThread(threading.Thread): self.peer, self.remoteProtocolVersion))) shared.knownNodesLock.acquire() - shared.knownNodes[self.streamNumber][self.peer] = int(time.time()) - output = open(shared.appdata + 'knownnodes.dat', 'wb') - pickle.dump(shared.knownNodes, output) - output.close() + shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time()) + shared.needToWriteKnownNodesToDisk = True shared.knownNodesLock.release() self.sendverack() diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index b939b06b..240f9c64 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -102,14 +102,31 @@ class sendDataThread(threading.Thread): print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion self.remoteProtocolVersion = specifiedRemoteProtocolVersion + elif command == 'advertisepeer': + self.objectHashHolderInstance.holdPeer(data) elif command == 'sendaddr': + numberOfAddressesInAddrMessage = len( + data) + payload = '' + for hostDetails in data: + timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails + payload += pack( + '>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time + payload += pack('>I', streamNumber) + payload += pack( + '>q', services) # service bit flags offered by this node + payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ + socket.inet_aton(host) + payload += pack('>H', port) + + payload = encodeVarint(numberOfAddressesInAddrMessage) + payload + datatosend = '\xE9\xBE\xB4\xD9addr\x00\x00\x00\x00\x00\x00\x00\x00' + datatosend = datatosend + pack('>L', len(payload)) # payload length + datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] + datatosend = datatosend + payload + try: - # To prevent some network analysis, 'leak' the data out - # to our peer after waiting a random amount of time - # unless we have a long list of messages in our queue - # to send. - time.sleep(random.randrange(0, 10)) - self.sock.sendall(data) + self.sock.sendall(datatosend) self.lastTimeISentData = int(time.time()) except: print 'sendaddr: self.sock.sendall failed' diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 653a2461..44cb893c 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -2,7 +2,11 @@ import threading import shared import time import sys +import pickle + +import tr#anslate from helper_sql import * +from debug import logger '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. It cleans these data structures in memory: @@ -124,4 +128,20 @@ class singleCleaner(threading.Thread): objectType, streamNumber, payload, receivedTime = storedValue if streamNumber in shared.inventorySets: shared.inventorySets[streamNumber].add(hash) + + # Let us write out the knowNodes to disk if there is anything new to write out. + if shared.needToWriteKnownNodesToDisk: + shared.knownNodesLock.acquire() + output = open(shared.appdata + 'knownnodes.dat', 'wb') + try: + pickle.dump(shared.knownNodes, output) + output.close() + except Exception as err: + if "Errno 28" in str(err): + logger.fatal('(while receiveDataThread shared.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ') + shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True))) + if shared.daemon: + os._exit(0) + shared.knownNodesLock.release() + shared.needToWriteKnownNodesToDisk = False time.sleep(300) diff --git a/src/shared.py b/src/shared.py index 0c0173a3..22a6d890 100644 --- a/src/shared.py +++ b/src/shared.py @@ -69,6 +69,7 @@ numberOfPubkeysProcessed = 0 numberOfInventoryLookupsPerformed = 0 daemon = False inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. +needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually. #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.