From a9b15f83ba8b4ce9b446eb9b3de2a4570b245031 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Fri, 6 Sep 2013 18:55:12 -0400 Subject: [PATCH] initial testing inv refactorization --- src/bitmessagemain.py | 11 +++++++++-- src/class_receiveDataThread.py | 18 +++++++----------- src/class_sendDataThread.py | 25 +++++++++++++++++-------- src/class_singleWorker.py | 15 ++++++++++----- src/shared.py | 1 + 5 files changed, 44 insertions(+), 26 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index a5a4b320..9cbe4471 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -46,6 +46,11 @@ if sys.platform == 'darwin': def connectToStream(streamNumber): selfInitiatedConnections[streamNumber] = {} + shared.inventorySets[streamNumber] = set() + queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber) + for row in queryData: + shared.inventorySets[streamNumber].add(row[0]) + if sys.platform[0:3] == 'win': maximumNumberOfHalfOpenConnections = 9 else: @@ -705,10 +710,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType = 'msg' shared.inventory[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, int(time.time())) + shared.inventorySets[toStreamNumber].add(inventoryHash) with shared.printLock: print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - toStreamNumber, 'sendinv', inventoryHash)) + toStreamNumber, 'advertiseobject', inventoryHash)) elif method == 'disseminatePubkey': # The device issuing this command to PyBitmessage supplies a pubkey object to be # disseminated to the rest of the Bitmessage network. PyBitmessage accepts this @@ -741,10 +747,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, pubkeyStreamNumber, payload, int(time.time())) + shared.inventorySets[pubkeyStreamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) elif method == 'getMessageDataByDestinationHash': # Method will eventually be used by a particular Android app to # select relevant messages. Do not yet add this to the api diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 914d3f84..a8b38bc4 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -392,6 +392,7 @@ class receiveDataThread(threading.Thread): objectType = 'broadcast' shared.inventory[self.inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(self.inventoryHash) shared.inventoryLock.release() self.broadcastinv(self.inventoryHash) shared.numberOfBroadcastsProcessed += 1 @@ -755,6 +756,7 @@ class receiveDataThread(threading.Thread): objectType = 'msg' shared.inventory[self.inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(self.inventoryHash) shared.inventoryLock.release() self.broadcastinv(self.inventoryHash) shared.numberOfMessagesProcessed += 1 @@ -1153,6 +1155,7 @@ class receiveDataThread(threading.Thread): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(inventoryHash) shared.inventoryLock.release() self.broadcastinv(inventoryHash) shared.numberOfPubkeysProcessed += 1 @@ -1348,6 +1351,7 @@ class receiveDataThread(threading.Thread): objectType = 'getpubkey' shared.inventory[inventoryHash] = ( objectType, self.streamNumber, data, embeddedTime) + shared.inventorySets[self.streamNumber].add(inventoryHash) shared.inventoryLock.release() # This getpubkey request is valid so far. Forward to peers. self.broadcastinv(inventoryHash) @@ -1442,18 +1446,10 @@ class receiveDataThread(threading.Thread): # 'set' of objects we are aware of and a set of objects in this inv # message so that we can diff one from the other cheaply. startTime = time.time() - currentInventoryList = set() - queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', - self.streamNumber) - for row in queryData: - currentInventoryList.add(row[0]) - with shared.inventoryLock: - for objectHash, value in shared.inventory.items(): - currentInventoryList.add(objectHash) advertisedSet = set() for i in range(numberOfItemsInInv): advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) - objectsNewToMe = advertisedSet - currentInventoryList + objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber] logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) for item in objectsNewToMe: if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation @@ -1552,12 +1548,12 @@ class receiveDataThread(threading.Thread): print 'sock.sendall error:', err - # Send an inv message with just one hash to all of our peers + # Advertise this object to all of our peers def broadcastinv(self, hash): with shared.printLock: print 'broadcasting inv with hash:', hash.encode('hex') - shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash)) + shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash)) # We have received an addr message. def recaddr(self, data): diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 867d1d70..0c8bb580 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -8,7 +8,8 @@ import random import sys import socket -#import bitmessagemain +from class_objectHashHolder import * +from addresses import * # Every connection to a peer has a sendDataThread (and also a # receiveDataThread). @@ -22,6 +23,9 @@ class sendDataThread(threading.Thread): print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues) self.data = '' + self.objectHashHolderInstance = objectHashHolder(self.mailbox) + self.objectHashHolderInstance.start() + def setup( self, @@ -118,17 +122,20 @@ class sendDataThread(threading.Thread): shared.sendDataQueues.remove(self.mailbox) print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer break + elif command == 'advertiseobject': + self.objectHashHolderInstance.holdHash(data) elif command == 'sendinv': - if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - payload = '\x01' + data + payload = '' + for hash in data: + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: + payload += hash + if payload != '': + print 'within sendinv, payload contains', len(payload)/32, 'hashes.' + payload = encodeVarint(len(payload)/32) + payload headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' headerData += pack('>L', len(payload)) headerData += hashlib.sha512(payload).digest()[:4] - # To prevent some network analysis, 'leak' the data out - # to our peer after waiting a random amount of time - random.seed() - time.sleep(random.randrange(0, 10)) try: self.sock.sendall(headerData + payload) self.lastTimeISentData = int(time.time()) @@ -142,6 +149,8 @@ class sendDataThread(threading.Thread): shared.sendDataQueues.remove(self.mailbox) print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer break + else: + print '(within sendinv) payload was empty. Not sending anything' #testing. elif command == 'pong': self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. if self.lastTimeISentData < (int(time.time()) - 298): @@ -167,4 +176,4 @@ class sendDataThread(threading.Thread): with shared.printLock: print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream - + self.objectHashHolderInstance.close() diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index d3c0c784..d8990ede 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -151,12 +151,13 @@ class singleWorker(threading.Thread): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime) + shared.inventorySets[streamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv with hash:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) shared.UISignalQueue.put(('updateStatusBar', '')) shared.config.set( myAddress, 'lastpubkeysendtime', str(int(time.time()))) @@ -224,12 +225,13 @@ class singleWorker(threading.Thread): objectType = 'pubkey' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime) + shared.inventorySets[streamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv with hash:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) shared.UISignalQueue.put(('updateStatusBar', '')) # If this is a chan address then we won't send out the pubkey over the # network but rather will only store it in our pubkeys table so that @@ -327,10 +329,11 @@ class singleWorker(threading.Thread): objectType = 'broadcast' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, int(time.time())) + shared.inventorySets[streamNumber].add(inventoryHash) with shared.printLock: print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) @@ -650,6 +653,7 @@ class singleWorker(threading.Thread): objectType = 'msg' shared.inventory[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, int(time.time())) + shared.inventorySets[toStreamNumber].add(inventoryHash) if shared.safeConfigGetBoolean(toaddress, 'chan'): shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) @@ -659,7 +663,7 @@ class singleWorker(threading.Thread): strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) # Update the status of the message in the 'sent' table to have a # 'msgsent' status or 'msgsentnoackexpected' status. @@ -706,9 +710,10 @@ class singleWorker(threading.Thread): objectType = 'getpubkey' shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, int(time.time())) + shared.inventorySets[streamNumber].add(inventoryHash) print 'sending inv (for the getpubkey message)' shared.broadcastToSendDataQueues(( - streamNumber, 'sendinv', inventoryHash)) + streamNumber, 'advertiseobject', inventoryHash)) sqlExecute( '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''', diff --git a/src/shared.py b/src/shared.py index e7718959..9339987a 100644 --- a/src/shared.py +++ b/src/shared.py @@ -68,6 +68,7 @@ numberOfBroadcastsProcessed = 0 numberOfPubkeysProcessed = 0 numberOfInventoryLookupsPerformed = 0 daemon = False +inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.