From 1af49a016592fa639a48225e085205f083b5e7dd Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sun, 19 Mar 2017 22:08:00 +0100 Subject: [PATCH] Download tracking refactoring - replace PendingDownload singleton dict with a Queue - total memory and CPU requirements should be reduced - get rid of somObjectsOfWhichThisRemoteNodeIsAlearedyAware. It has very little practicle effect and only uses memory --- src/bitmessageqt/__init__.py | 10 +- src/bitmessageqt/networkstatus.py | 5 +- src/class_outgoingSynSender.py | 5 +- src/class_receiveDataThread.py | 26 ++++-- src/class_sendDataThread.py | 9 +- src/class_singleListener.py | 5 +- src/inventory.py | 148 ++++++------------------------ src/shared.py | 6 +- 8 files changed, 60 insertions(+), 154 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index feaf1c48..e861f767 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -77,7 +77,7 @@ from class_objectHashHolder import objectHashHolder from class_singleWorker import singleWorker from dialogs import AddAddressDialog from helper_generic import powQueueSize -from inventory import PendingDownload, PendingUpload, PendingUploadDeadlineException +from inventory import PendingDownloadQueue, PendingUpload, PendingUploadDeadlineException import knownnodes import paths from proofofwork import getPowType @@ -2751,16 +2751,16 @@ class MyForm(settingsmixin.SMainWindow): elif reply == QtGui.QMessage.Cancel: return - if PendingDownload().len() > 0: + if PendingDownloadQueue.totalSize() > 0: reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"), - _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len()), + _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize()), QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) if reply == QtGui.QMessageBox.Yes: waitForSync = True elif reply == QtGui.QMessageBox.Cancel: return else: - PendingDownload().stop() + PendingDownloadQueue.stop() if shared.statusIconColor == 'red': reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Not connected"), @@ -2788,7 +2788,7 @@ class MyForm(settingsmixin.SMainWindow): if waitForSync: self.statusBar().showMessage(_translate( "MainWindow", "Waiting for finishing synchronisation...")) - while PendingDownload().len() > 0: + while PendingDownloadQueue.totalSize() > 0: time.sleep(0.5) QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 7506b652..b5870a6b 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -1,8 +1,9 @@ from PyQt4 import QtCore, QtGui import time import shared + from tr import _translate -from inventory import Inventory, PendingDownload, PendingUpload +from inventory import Inventory, PendingDownloadQueue, PendingUpload import l10n from retranslateui import RetranslateMixin from uisignaler import UISignaler @@ -45,7 +46,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): return "%4.0f kB" % num def updateNumberOfObjectsToBeSynced(self): - self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len() + PendingUpload().len())) + self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize() + PendingUpload().len())) def updateNumberOfMessagesProcessed(self): self.updateNumberOfObjectsToBeSynced() diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index c4f385c3..9b3eac14 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -185,12 +185,10 @@ class outgoingSynSender(threading.Thread, StoppableThread): self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() return - someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. sd = sendDataThread(sendDataThreadQueue) - sd.setup(self.sock, peer.host, peer.port, self.streamNumber, - someObjectsOfWhichThisRemoteNodeIsAlreadyAware) + sd.setup(self.sock, peer.host, peer.port, self.streamNumber) sd.start() rd = receiveDataThread() @@ -199,7 +197,6 @@ class outgoingSynSender(threading.Thread, StoppableThread): peer.host, peer.port, self.streamNumber, - someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 0ddd5739..df7d18b5 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -32,7 +32,7 @@ import knownnodes from debug import logger import paths import protocol -from inventory import Inventory, PendingDownload, PendingUpload +from inventory import Inventory, PendingDownloadQueue, PendingUpload import queues import state import throttle @@ -56,7 +56,6 @@ class receiveDataThread(threading.Thread): HOST, port, streamNumber, - someObjectsOfWhichThisRemoteNodeIsAlreadyAware, selfInitiatedConnections, sendDataThreadQueue, objectHashHolderInstance): @@ -79,8 +78,8 @@ class receiveDataThread(threading.Thread): self.initiatedConnection = True for stream in self.streamNumber: self.selfInitiatedConnections[stream][self] = 0 - self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware self.objectHashHolderInstance = objectHashHolderInstance + self.downloadQueue = PendingDownloadQueue() self.startTime = time.time() def run(self): @@ -147,7 +146,6 @@ class receiveDataThread(threading.Thread): except Exception as err: logger.error('Could not delete ' + str(self.hostIdent) + ' from shared.connectedHostsList.' + str(err)) - PendingDownload().threadEnd() queues.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.checkTimeOffsetNotification() logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) @@ -240,10 +238,20 @@ class receiveDataThread(threading.Thread): self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message if self.data == '': # if there are no more messages + toRequest = [] try: - self.sendgetdata(PendingDownload().pull(100)) - except Queue.Full: + for i in range(self.downloadQueue.pendingSize, 100): + while True: + hashId = self.downloadQueue.get(False) + if not hashId in Inventory(): + toRequest.append(hashId) + break + # don't track download for duplicates + self.downloadQueue.task_done() + except Queue.Empty: pass + if len(toRequest) > 0: + self.sendgetdata(toRequest) self.processData() def sendpong(self, payload): @@ -407,7 +415,7 @@ class receiveDataThread(threading.Thread): bigInvList = {} for stream in self.streamNumber: for hash in Inventory().unexpired_hashes_by_stream(stream): - if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash): + if not self.objectHashHolderInstance.hasHash(hash): bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' @@ -476,6 +484,7 @@ class receiveDataThread(threading.Thread): def recobject(self, data): self.messageProcessingStartTime = time.time() lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data) + self.downloadQueue.task_done() """ Sleeping will help guarantee that we can process messages faster than a @@ -509,8 +518,7 @@ class receiveDataThread(threading.Thread): objectsNewToMe -= Inventory().hashes_by_stream(stream) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) for item in objectsNewToMe: - PendingDownload().add(item) - self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein + self.downloadQueue.put(item) # Send a getdata message to our peer to request the object with the given # hash diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index ac11c6b2..792fedd0 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -39,8 +39,8 @@ class sendDataThread(threading.Thread): sock, HOST, PORT, - streamNumber, - someObjectsOfWhichThisRemoteNodeIsAlreadyAware): + streamNumber + ): self.sock = sock self.peer = state.Peer(HOST, PORT) self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator @@ -52,7 +52,6 @@ class sendDataThread(threading.Thread): 1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue. self.lastTimeISentData = int( time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive. - self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware if streamNumber == -1: # This was an incoming connection. self.initiatedConnection = False else: @@ -165,8 +164,7 @@ class sendDataThread(threading.Thread): if self.connectionIsOrWasFullyEstablished: # only send inv messages if we have send and heard a verack from the remote node payload = '' for hash in data: - if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - payload += hash + payload += hash if payload != '': payload = encodeVarint(len(payload)/32) + payload packet = protocol.CreatePacket('inv', payload) @@ -176,7 +174,6 @@ class sendDataThread(threading.Thread): logger.error('sendinv: self.sock.sendall failed') break elif command == 'pong': - self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. if self.lastTimeISentData < (int(time.time()) - 298): # Send out a pong message to keep the connection alive. logger.debug('Sending pong to ' + str(self.peer) + ' to keep connection alive.') diff --git a/src/class_singleListener.py b/src/class_singleListener.py index 5332929c..243a494a 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -146,19 +146,18 @@ class singleListener(threading.Thread, StoppableThread): else: break - someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. socketObject.settimeout(20) sd = sendDataThread(sendDataThreadQueue) sd.setup( - socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware) + socketObject, HOST, PORT, -1) sd.start() rd = receiveDataThread() rd.daemon = True # close the main program even if there are threads left rd.setup( - socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance) + socketObject, HOST, PORT, -1, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance) rd.start() logger.info('connected to ' + HOST + ' during INCOMING request.') diff --git a/src/inventory.py b/src/inventory.py index f2e9b37b..1982dacf 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -1,5 +1,6 @@ import collections from threading import current_thread, enumerate as threadingEnumerate, RLock +import Queue import time from helper_sql import * @@ -37,7 +38,6 @@ class Inventory(collections.MutableMapping): value = self.InventoryItem(*value) self._inventory[hash] = value self._streams[value.stream].add(hash) - PendingDownload().delete(hash) def __delitem__(self, hash): raise NotImplementedError @@ -84,131 +84,39 @@ class Inventory(collections.MutableMapping): self._streams[value.stream].add(objectHash) -@Singleton -class PendingDownload(object): +class PendingDownloadQueue(Queue.Queue): # keep a track of objects that have been advertised to us but we haven't downloaded them yet - def __init__(self): - super(self.__class__, self).__init__() - self.lock = RLock() - self.hashes = {} + def __init__(self, maxsize=0): + Queue.Queue.__init__(self, maxsize) self.stopped = False - # don't request the same object more frequently than this - self.frequency = 60 - # after requesting and not receiving an object more than this times, consider it expired - self.maxRequestCount = 3 - self.pending = {} + self.pendingSize = 0 - def add(self, objectHash): - if self.stopped: - return - with self.lock: - if objectHash not in self.hashes: - self.hashes[objectHash] = {'peers':[], 'requested':0, 'requestedCount':0} - self.hashes[objectHash]['peers'].append(current_thread().peer) + def task_done(self): + Queue.Queue.task_done(self) + if self.pendingSize > 0: + self.pendingSize -= 1 - def addPending(self, objectHash=None): - if self.stopped: - return - if current_thread().peer not in self.pending: - self.pending[current_thread().peer] = {'objects':[], 'requested':0, 'received':0} - if objectHash not in self.pending[current_thread().peer]['objects'] and not objectHash is None: - self.pending[current_thread().peer]['objects'].append(objectHash) - self.pending[current_thread().peer]['requested'] = time.time() + def get(self, block=True, timeout=None): + retval = Queue.Queue.get(self, block, timeout) + # no exception was raised + if not self.stopped: + self.pendingSize += 1 + return retval - def len(self): - with self.lock: - return sum(1 for x in self.hashes.values() if len(x) > 0) + @staticmethod + def totalSize(): + size = 0 + for thread in threadingEnumerate(): + if thread.isAlive() and hasattr(thread, 'downloadQueue'): + size += thread.downloadQueue.qsize() + thread.downloadQueue.pendingSize + return size - def pull(self, count=1): - if count < 1: - raise ValueError("Must be at least one") - objectHashes = [] - unreachableObjects = [] - if self.stopped: - return objectHashes - start = time.time() - try: - for objectHash in self.hashes.keys(): - with self.lock: - if len(objectHashes) >= count: - break - if current_thread().peer not in self.pending: - self.addPending() - if (self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or \ - self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \ - len(self.pending[current_thread().peer]['objects']) >= count: - break - if len(self.hashes[objectHash]['peers']) == 0: - unreachableObjects.append(objectHash) - continue - # requested too long ago or not at all from any thread - if self.hashes[objectHash]['requested'] < time.time() - self.frequency: - # ready requested from this thread but haven't received yet - if objectHash in self.pending[current_thread().peer]['objects']: - # if still sending or receiving, request next - if self.pending[current_thread().peer]['received'] >= time.time() - self.frequency or \ - self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency: - continue - # haven't requested or received anything recently, re-request (i.e. continue) - # the current node doesn't have the object - elif current_thread().peer not in self.hashes[objectHash]['peers']: - continue - # already requested too many times, remove all signs of this object - if self.hashes[objectHash]['requestedCount'] >= self.maxRequestCount: - del self.hashes[objectHash] - for thread in self.pending.keys(): - if objectHash in self.pending[thread]['objects']: - self.pending[thread]['objects'].remove(objectHash) - continue - # all ok, request - objectHashes.append(objectHash) - self.hashes[objectHash]['requested'] = time.time() - self.hashes[objectHash]['requestedCount'] += 1 - self.pending[current_thread().peer]['requested'] = time.time() - self.addPending(objectHash) - except (RuntimeError, KeyError, ValueError): - # the for cycle sometimes breaks if you remove elements - pass - for objectHash in unreachableObjects: - with self.lock: - if objectHash in self.hashes: - del self.hashes[objectHash] -# logger.debug("Pull took %.3f seconds", time.time() - start) - return objectHashes - - def delete(self, objectHash): - with self.lock: - if objectHash in self.hashes: - del self.hashes[objectHash] - if hasattr(current_thread(), 'peer') and current_thread().peer in self.pending: - self.pending[current_thread().peer]['received'] = time.time() - for thread in self.pending.keys(): - with self.lock: - if thread in self.pending and objectHash in self.pending[thread]['objects']: - self.pending[thread]['objects'].remove(objectHash) - - def stop(self): - with self.lock: - self.hashes = {} - self.pending = {} - - def threadEnd(self): - while True: - try: - with self.lock: - if current_thread().peer in self.pending: - for objectHash in self.pending[current_thread().peer]['objects']: - if objectHash in self.hashes: - self.hashes[objectHash]['peers'].remove(current_thread().peer) - except (KeyError): - pass - else: - break - with self.lock: - try: - del self.pending[current_thread().peer] - except KeyError: - pass + @staticmethod + def stop(): + for thread in threadingEnumerate(): + if thread.isAlive() and hasattr(thread, 'downloadQueue'): + thread.downloadQueue.stopped = True + thread.downloadQueue.pendingSize = 0 class PendingUploadDeadlineException(Exception): diff --git a/src/shared.py b/src/shared.py index c28392f6..69794033 100644 --- a/src/shared.py +++ b/src/shared.py @@ -22,7 +22,7 @@ from bmconfigparser import BMConfigParser import highlevelcrypto #import helper_startup from helper_sql import * -from inventory import Inventory, PendingDownload +from inventory import Inventory from queues import objectProcessorQueue import protocol import state @@ -342,22 +342,18 @@ def checkAndShareObjectWithPeers(data): """ if len(data) > 2 ** 18: logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(data)) - PendingDownload().delete(calculateInventoryHash(data)) return 0 # Let us check to make sure that the proof of work is sufficient. if not protocol.isProofOfWorkSufficient(data): logger.info('Proof of work is insufficient.') - PendingDownload().delete(calculateInventoryHash(data)) return 0 endOfLifeTime, = unpack('>Q', data[8:16]) if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: # The TTL may not be larger than 28 days + 3 hours of wiggle room logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % endOfLifeTime) - PendingDownload().delete(calculateInventoryHash(data)) return 0 if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much. logger.info('This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s' % endOfLifeTime) - PendingDownload().delete(calculateInventoryHash(data)) return 0 intObjectType, = unpack('>I', data[16:20]) try: