diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 6806c432..b3e39001 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -47,9 +47,7 @@ from account import ( GatewayAccount, MailchuckAccount, AccountColor) import dialogs from helper_generic import powQueueSize -from inventory import ( - PendingDownloadQueue, PendingUpload, - PendingUploadDeadlineException) +from network.stats import pendingDownload, pendingUpload from uisignaler import UISignaler import knownnodes import paths @@ -2701,10 +2699,10 @@ class MyForm(settingsmixin.SMainWindow): waitForSync = False # C PoW currently doesn't support interrupting and OpenCL is untested - if getPowType() == "python" and (powQueueSize() > 0 or PendingUpload().len() > 0): + if getPowType() == "python" and (powQueueSize() > 0 or pendingUpload() > 0): reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"), _translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " + - _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, PendingUpload().len()) + "\n\n" + + _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, pendingUpload()) + "\n\n" + _translate("MainWindow", "Wait until these tasks finish?"), QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) if reply == QtGui.QMessageBox.No: @@ -2712,16 +2710,14 @@ class MyForm(settingsmixin.SMainWindow): elif reply == QtGui.QMessageBox.Cancel: return - if PendingDownloadQueue.totalSize() > 0: + if pendingDownload() > 0: reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"), - _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize()), + _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, pendingDownload()), QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) if reply == QtGui.QMessageBox.Yes: waitForSync = True elif reply == QtGui.QMessageBox.Cancel: return - else: - PendingDownloadQueue.stop() if shared.statusIconColor == 'red' and not BMConfigParser().safeGetBoolean( 'bitmessagesettings', 'dontconnect'): @@ -2752,7 +2748,7 @@ class MyForm(settingsmixin.SMainWindow): if waitForSync: self.updateStatusBar(_translate( "MainWindow", "Waiting for finishing synchronisation...")) - while PendingDownloadQueue.totalSize() > 0: + while pendingDownload() > 0: time.sleep(0.5) QtCore.QCoreApplication.processEvents( QtCore.QEventLoop.AllEvents, 1000 @@ -2794,19 +2790,18 @@ class MyForm(settingsmixin.SMainWindow): # check if upload (of objects created locally) pending self.updateStatusBar(_translate( "MainWindow", "Waiting for objects to be sent... %1%").arg(50)) - try: - while PendingUpload().progress() < 1: - self.updateStatusBar(_translate( - "MainWindow", - "Waiting for objects to be sent... %1%" - ).arg(int(50 + 20 * PendingUpload().progress())) - ) - time.sleep(0.5) - QtCore.QCoreApplication.processEvents( - QtCore.QEventLoop.AllEvents, 1000 - ) - except PendingUploadDeadlineException: - pass + maxPendingUpload = max(1, pendingUpload()) + + while pendingUpload() > 1: + self.updateStatusBar(_translate( + "MainWindow", + "Waiting for objects to be sent... %1%" + ).arg(int(50 + 20 * (pendingUpload()/maxPendingUpload))) + ) + time.sleep(0.5) + QtCore.QCoreApplication.processEvents( + QtCore.QEventLoop.AllEvents, 1000 + ) QtCore.QCoreApplication.processEvents( QtCore.QEventLoop.AllEvents, 1000 diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 06b1e0ce..3691d5b3 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -3,7 +3,7 @@ import time import shared from tr import _translate -from inventory import Inventory, PendingDownloadQueue, PendingUpload +from inventory import Inventory import knownnodes import l10n import network.stats diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 322bb20e..90db23c3 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -19,7 +19,7 @@ import helper_inbox from helper_generic import addDataPadding import helper_msgcoding from helper_threading import * -from inventory import Inventory, PendingUpload +from inventory import Inventory import l10n import protocol import queues @@ -199,7 +199,6 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime,'') - PendingUpload().add(inventoryHash) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -289,7 +288,6 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime,'') - PendingUpload().add(inventoryHash) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -379,7 +377,6 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:]) - PendingUpload().add(inventoryHash) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -510,7 +507,6 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 3 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, tag) - PendingUpload().add(inventoryHash) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) queues.invQueue.put((streamNumber, inventoryHash)) @@ -834,7 +830,6 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 2 Inventory()[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, embeddedTime, '') - PendingUpload().add(inventoryHash) if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp())))) else: @@ -941,7 +936,6 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') - PendingUpload().add(inventoryHash) logger.info('sending inv (for the getpubkey message)') queues.invQueue.put((streamNumber, inventoryHash)) diff --git a/src/inventory.py b/src/inventory.py index 8177cbbb..54677731 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -32,164 +32,3 @@ class Inventory(): raise AttributeError("%s instance has no attribute '%s'" %(self.__class__.__name__, attr)) else: return realRet - - -class PendingDownloadQueue(Queue.Queue): -# keep a track of objects that have been advertised to us but we haven't downloaded them yet - maxWait = 300 - - def __init__(self, maxsize=0): - Queue.Queue.__init__(self, maxsize) - self.stopped = False - self.pending = {} - self.lock = RLock() - - def task_done(self, hashId): - Queue.Queue.task_done(self) - try: - with self.lock: - del self.pending[hashId] - except KeyError: - pass - - def get(self, block=True, timeout=None): - retval = Queue.Queue.get(self, block, timeout) - # no exception was raised - if not self.stopped: - with self.lock: - self.pending[retval] = time.time() - return retval - - def clear(self): - with self.lock: - newPending = {} - for hashId in self.pending: - if self.pending[hashId] + PendingDownloadQueue.maxWait > time.time(): - newPending[hashId] = self.pending[hashId] - self.pending = newPending - - @staticmethod - def totalSize(): - size = 0 - for thread in threadingEnumerate(): - if thread.isAlive() and hasattr(thread, 'downloadQueue'): - size += thread.downloadQueue.qsize() + len(thread.downloadQueue.pending) - return size - - @staticmethod - def stop(): - for thread in threadingEnumerate(): - if thread.isAlive() and hasattr(thread, 'downloadQueue'): - thread.downloadQueue.stopped = True - with thread.downloadQueue.lock: - thread.downloadQueue.pending = {} - - -class PendingUploadDeadlineException(Exception): - pass - - -@Singleton -class PendingUpload(object): -# keep a track of objects that we have created but haven't distributed yet - def __init__(self): - super(self.__class__, self).__init__() - self.lock = RLock() - self.hashes = {} - # end by this time in any case - self.deadline = 0 - self.maxLen = 0 - # during shutdown, wait up to 20 seconds to finish uploading - self.shutdownWait = 20 - # forget tracking objects after 60 seconds - self.objectWait = 60 - # wait 10 seconds between clears - self.clearDelay = 10 - self.lastCleared = time.time() - - def add(self, objectHash = None): - with self.lock: - # add a new object into existing thread lists - if objectHash: - if objectHash not in self.hashes: - self.hashes[objectHash] = {'created': time.time(), 'sendCount': 0, 'peers': []} - for thread in threadingEnumerate(): - if thread.isAlive() and hasattr(thread, 'peer') and \ - thread.peer not in self.hashes[objectHash]['peers']: - self.hashes[objectHash]['peers'].append(thread.peer) - # add all objects into the current thread - else: - for objectHash in self.hashes: - if current_thread().peer not in self.hashes[objectHash]['peers']: - self.hashes[objectHash]['peers'].append(current_thread().peer) - - def len(self): - self.clearHashes() - with self.lock: - return sum(1 - for x in self.hashes if (self.hashes[x]['created'] + self.objectWait < time.time() or - self.hashes[x]['sendCount'] == 0)) - - def _progress(self): - with self.lock: - return float(sum(len(self.hashes[x]['peers']) - for x in self.hashes if (self.hashes[x]['created'] + self.objectWait < time.time()) or - self.hashes[x]['sendCount'] == 0)) - - def progress(self, raiseDeadline=True): - if self.maxLen < self._progress(): - self.maxLen = self._progress() - if self.deadline < time.time(): - if self.deadline > 0 and raiseDeadline: - raise PendingUploadDeadlineException - self.deadline = time.time() + 20 - try: - return 1.0 - self._progress() / self.maxLen - except ZeroDivisionError: - return 1.0 - - def clearHashes(self, objectHash=None): - if objectHash is None: - if self.lastCleared > time.time() - self.clearDelay: - return - objects = self.hashes.keys() - else: - objects = objectHash, - with self.lock: - for i in objects: - try: - if self.hashes[i]['sendCount'] > 0 and ( - len(self.hashes[i]['peers']) == 0 or - self.hashes[i]['created'] + self.objectWait < time.time()): - del self.hashes[i] - except KeyError: - pass - self.lastCleared = time.time() - - def delete(self, objectHash=None): - if not hasattr(current_thread(), 'peer'): - return - if objectHash is None: - return - with self.lock: - try: - if objectHash in self.hashes and current_thread().peer in self.hashes[objectHash]['peers']: - self.hashes[objectHash]['sendCount'] += 1 - self.hashes[objectHash]['peers'].remove(current_thread().peer) - except KeyError: - pass - self.clearHashes(objectHash) - - def stop(self): - with self.lock: - self.hashes = {} - - def threadEnd(self): - with self.lock: - for objectHash in self.hashes: - try: - if current_thread().peer in self.hashes[objectHash]['peers']: - self.hashes[objectHash]['peers'].remove(current_thread().peer) - except KeyError: - pass - self.clearHashes() diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 66b0685b..4bb0c191 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -4,6 +4,7 @@ from threading import RLock from inventory import Inventory import network.connectionpool from network.dandelion import Dandelion +from network.stats import pendingDownload from randomtrackingdict import RandomTrackingDict from state import missingObjects @@ -54,8 +55,7 @@ class ObjectTracker(object): def clean(self): if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod: if haveBloom: - # FIXME - if PendingDownloadQueue().size() == 0: + if pendingDownload() == 0: self.initInvBloom() self.initAddrBloom() else: