From f079ff5b99034bf6db8ed709d9bd859d73da4d4d Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sun, 15 Jan 2017 19:21:24 +0100 Subject: [PATCH] Refactor objects to be downloaded - moved logic into a Missing singleton - shouldn't try to download duplicates anymore, only requests a hash once every 5 minutes and not from the same host - removed obsoleted variables - the "Objects to be synced" in the Network tab should now be correct - removed some checks which aren't necessary anymore in my opinion - fix missing self in Throttle (thanks landscape.io) --- src/bitmessageqt/__init__.py | 9 ++-- src/bitmessageqt/networkstatus.py | 4 +- src/class_receiveDataThread.py | 86 ++++++------------------------- src/inventory.py | 85 ++++++++++++++++++++++++------ src/shared.py | 8 ++- src/throttle.py | 4 +- 6 files changed, 99 insertions(+), 97 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 5f1a65e6..4cdcae30 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -77,6 +77,7 @@ from dialogs import AddAddressDialog from class_objectHashHolder import objectHashHolder from class_singleWorker import singleWorker from helper_generic import powQueueSize, invQueueSize +from inventory import Missing import paths from proofofwork import getPowType import protocol @@ -2712,11 +2713,9 @@ class MyForm(settingsmixin.SMainWindow): elif reply == QtGui.QMessage.Cancel: return - toBeDownloaded = sum(shared.numberOfObjectsThatWeHaveYetToGetPerPeer.itervalues()) - - if toBeDownloaded > 0: + if Missing().len() > 0: reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"), - _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, toBeDownloaded), + _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, Missing().len()), QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) if reply == QtGui.QMessageBox.Yes: waitForSync = True @@ -2747,7 +2746,7 @@ class MyForm(settingsmixin.SMainWindow): if waitForSync: self.statusBar().showMessage(_translate( "MainWindow", "Waiting for finishing synchronisation...")) - while sum(shared.numberOfObjectsThatWeHaveYetToGetPerPeer.itervalues()) > 0: + while Missing().len() > 0: time.sleep(0.5) QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 2fe493ac..3eeae392 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -2,7 +2,7 @@ from PyQt4 import QtCore, QtGui import time import shared from tr import _translate -from inventory import Inventory +from inventory import Inventory, Missing import l10n from retranslateui import RetranslateMixin from uisignaler import UISignaler @@ -45,7 +45,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): return "%4.0f kB" % num def updateNumberOfObjectsToBeSynced(self): - self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, sum(shared.numberOfObjectsThatWeHaveYetToGetPerPeer.itervalues()))) + self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, Missing().len())) def updateNumberOfMessagesProcessed(self): self.updateNumberOfObjectsToBeSynced() diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 1abb8e74..40add534 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -30,7 +30,7 @@ from helper_sql import sqlQuery from debug import logger import paths import protocol -from inventory import Inventory +from inventory import Inventory, Missing import state import throttle import tr @@ -62,7 +62,6 @@ class receiveDataThread(threading.Thread): self.peer = state.Peer(HOST, port) self.name = "receiveData-" + self.peer.host.replace(":", ".") # ":" log parser field separator self.streamNumber = streamNumber - self.objectsThatWeHaveYetToGetFromThisPeer = {} self.selfInitiatedConnections = selfInitiatedConnections self.sendDataThreadQueue = sendDataThreadQueue # used to send commands and data to the sendDataThread self.hostIdent = self.peer.port if ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname') and protocol.checkSocksIP(self.peer.host) else self.peer.host @@ -129,11 +128,7 @@ class receiveDataThread(threading.Thread): except Exception as err: logger.error('Could not delete ' + str(self.hostIdent) + ' from shared.connectedHostsList.' + str(err)) - try: - del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ - self.peer] - except: - pass + Missing().threadEnd() shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.checkTimeOffsetNotification() logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) @@ -222,37 +217,16 @@ class receiveDataThread(threading.Thread): self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message if self.data == '': # if there are no more messages - while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0 and not self.sendDataThreadQueue.full(): - objectHash, = random.sample( - self.objectsThatWeHaveYetToGetFromThisPeer, 1) + while Missing().len() > 0 and not self.sendDataThreadQueue.full(): + objectHash = Missing().pull() + if objectHash is None: + break if objectHash in Inventory(): logger.debug('Inventory already has object listed in inv message.') - del self.objectsThatWeHaveYetToGetFromThisPeer[objectHash] + Missing().delete(objectHash) else: # We don't have the object in our inventory. Let's request it. self.sendgetdata(objectHash) - del self.objectsThatWeHaveYetToGetFromThisPeer[ - objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway. - if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0: - logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0') - try: - del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ - self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. - except: - pass - if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0: - # We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore. - logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0') - try: - del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ - self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. - except: - pass - if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: - logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer))) - - shared.numberOfObjectsThatWeHaveYetToGetPerPeer[self.peer] = len( - self.objectsThatWeHaveYetToGetFromThisPeer) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. self.processData() @@ -425,13 +399,6 @@ class receiveDataThread(threading.Thread): # We have received an inv message def recinv(self, data): - totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = 0 # this counts duplicates separately because they take up memory - if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0: - for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items(): - totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value - logger.debug('number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer: ' + str(len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer)) + "\n" + \ - 'totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers)) - numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10]) if numberOfItemsInInv > 50000: sys.stderr.write('Too many items in inv message!') @@ -439,35 +406,16 @@ class receiveDataThread(threading.Thread): if len(data) < lengthOfVarint + (numberOfItemsInInv * 32): logger.info('inv message doesn\'t contain enough data. Ignoring.') return - if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object. - if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and state.trustedPeer == None: # inv flooding attack mitigation - logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.') - return - self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[ - data[lengthOfVarint:32 + lengthOfVarint]] = 0 - if data[lengthOfVarint:32 + lengthOfVarint] in Inventory(): - logger.debug('Inventory has inventory item already.') - else: - self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) - else: - # There are many items listed in this inv message. Let us create a - # 'set' of objects we are aware of and a set of objects in this inv - # message so that we can diff one from the other cheaply. - startTime = time.time() - advertisedSet = set() - for i in range(numberOfItemsInInv): - advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) - objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber) - logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) - for item in objectsNewToMe: - if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and state.trustedPeer == None: # inv flooding attack mitigation - logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)), ' from this node in particular. Ignoring the rest of this inv message.') - break - self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein - self.objectsThatWeHaveYetToGetFromThisPeer[item] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers. - if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: - shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ - self.peer] = len(self.objectsThatWeHaveYetToGetFromThisPeer) + + startTime = time.time() + advertisedSet = set() + for i in range(numberOfItemsInInv): + advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) + objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber) + logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) + for item in objectsNewToMe: + Missing().add(item) + self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein # Send a getdata message to our peer to request the object with the given # hash diff --git a/src/inventory.py b/src/inventory.py index 83a9ce82..3e00822a 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -1,13 +1,11 @@ import collections -from threading import RLock +import random +from threading import current_thread, RLock import time from helper_sql import * from singleton import Singleton -inventoryLock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) -InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag') - @Singleton class Inventory(collections.MutableMapping): @@ -16,26 +14,28 @@ class Inventory(collections.MutableMapping): self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). self.numberOfInventoryLookupsPerformed = 0 self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. + self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) + self.InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag') def __contains__(self, hash): - with inventoryLock: + with self.lock: self.numberOfInventoryLookupsPerformed += 1 if hash in self._inventory: return True return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash)) def __getitem__(self, hash): - with inventoryLock: + with self.lock: if hash in self._inventory: return self._inventory[hash] rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash) if not rows: raise KeyError(hash) - return InventoryItem(*rows[0]) + return self.InventoryItem(*rows[0]) def __setitem__(self, hash, value): - with inventoryLock: - value = InventoryItem(*value) + with self.lock: + value = self.InventoryItem(*value) self._inventory[hash] = value self._streams[value.stream].add(hash) @@ -43,42 +43,93 @@ class Inventory(collections.MutableMapping): raise NotImplementedError def __iter__(self): - with inventoryLock: + with self.lock: hashes = self._inventory.keys()[:] hashes += (hash for hash, in sqlQuery('SELECT hash FROM inventory')) return hashes.__iter__() def __len__(self): - with inventoryLock: + with self.lock: return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0] def by_type_and_tag(self, type, tag): - with inventoryLock: + with self.lock: values = [value for value in self._inventory.values() if value.type == type and value.tag == tag] - values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag)) + values += (self.InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag)) return values def hashes_by_stream(self, stream): - with inventoryLock: + with self.lock: return self._streams[stream] def unexpired_hashes_by_stream(self, stream): - with inventoryLock: + with self.lock: t = int(time.time()) hashes = [hash for hash, value in self._inventory.items() if value.stream == stream and value.expires > t] hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) return hashes def flush(self): - with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. + with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. with SqlBulkExecute() as sql: for hash, value in self._inventory.items(): sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value) self._inventory.clear() def clean(self): - with inventoryLock: + with self.lock: sqlExecute('DELETE FROM inventory WHERE expirestime