From 5ae1327edcea61b9d532f8866b2166f3c3844925 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Thu, 19 Jan 2017 19:48:12 +0100 Subject: [PATCH] Download/upload shutdown fixes - Missing renamed to PendingDownload - PendingDownload now only retries 3 times rather than 6 to dowload an object - Added PendingUpload, replacing invQueueSize - PendingUpload has both the "len" method (number of objects not uploaded) as well as "progress" method, which is a float from 0 (nothing done) to 1 (all uploaded) which considers not only objects but also how many nodes they are uploaded to - PendingUpload tracks when the object is successfully uploaded to the remote node instead of just adding an arbitrary time after they have been send the corresponding "inv" - Network status tab's "Objects to be synced" shows the sum of PendingUpload and PendingDownload sizes --- src/bitmessageqt/__init__.py | 33 +++++------- src/bitmessageqt/networkstatus.py | 4 +- src/class_receiveDataThread.py | 17 +++--- src/class_sendDataThread.py | 6 +++ src/class_singleWorker.py | 8 ++- src/helper_generic.py | 10 ---- src/inventory.py | 90 +++++++++++++++++++++++++++++-- 7 files changed, 125 insertions(+), 43 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index b5f036a7..adcae53c 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -76,8 +76,8 @@ from account import * from dialogs import AddAddressDialog from class_objectHashHolder import objectHashHolder from class_singleWorker import singleWorker -from helper_generic import powQueueSize, invQueueSize -from inventory import Missing +from helper_generic import powQueueSize +from inventory import PendingDownload, PendingUpload, PendingUploadDeadlineException import paths from proofofwork import getPowType import protocol @@ -2708,10 +2708,10 @@ class MyForm(settingsmixin.SMainWindow): waitForSync = False # C PoW currently doesn't support interrupting and OpenCL is untested - if getPowType() == "python" and (powQueueSize() > 0 or invQueueSize() > 0): + if getPowType() == "python" and (powQueueSize() > 0 or PendingUpload().len() > 0): reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"), _translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " + - _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, invQueueSize()) + "\n\n" + + _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, PendingUpload().len()) + "\n\n" + _translate("MainWindow", "Wait until these tasks finish?"), QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) if reply == QtGui.QMessageBox.No: @@ -2719,16 +2719,16 @@ class MyForm(settingsmixin.SMainWindow): elif reply == QtGui.QMessage.Cancel: return - if Missing().len() > 0: + if PendingDownload().len() > 0: reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"), - _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, Missing().len()), + _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len()), QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) if reply == QtGui.QMessageBox.Yes: waitForSync = True elif reply == QtGui.QMessageBox.Cancel: return else: - Missing().stop() + PendingDownload().stop() if shared.statusIconColor == 'red': reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Not connected"), @@ -2756,7 +2756,7 @@ class MyForm(settingsmixin.SMainWindow): if waitForSync: self.statusBar().showMessage(_translate( "MainWindow", "Waiting for finishing synchronisation...")) - while Missing().len() > 0: + while PendingDownload().len() > 0: time.sleep(0.5) QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) @@ -2781,22 +2781,17 @@ class MyForm(settingsmixin.SMainWindow): time.sleep(0.5) # a bit of time so that the hashHolder is populated QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - # check if objectHashHolder empty + # check if upload (of objects created locally) pending self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50))) - maxWaitingObjects = 0 - curWaitingObjects = invQueueSize() - while curWaitingObjects > 0: - curWaitingObjects = invQueueSize() - if curWaitingObjects > maxWaitingObjects: - maxWaitingObjects = curWaitingObjects - if curWaitingObjects > 0: - self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50 + 20 * (maxWaitingObjects - curWaitingObjects) / maxWaitingObjects))) + try: + while PendingUpload().progress() < 1: + self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(int(50 + 20 * PendingUpload().progress())))) time.sleep(0.5) QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) + except PendingUploadDeadlineException: + pass QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - if maxWorkerQueue > 0 or maxWaitingObjects > 0: - time.sleep(10) # a bit of time so that the other nodes retrieve the objects QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) # save state and geometry self and all widgets diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 3eeae392..7506b652 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -2,7 +2,7 @@ from PyQt4 import QtCore, QtGui import time import shared from tr import _translate -from inventory import Inventory, Missing +from inventory import Inventory, PendingDownload, PendingUpload import l10n from retranslateui import RetranslateMixin from uisignaler import UISignaler @@ -45,7 +45,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): return "%4.0f kB" % num def updateNumberOfObjectsToBeSynced(self): - self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, Missing().len())) + self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len() + PendingUpload().len())) def updateNumberOfMessagesProcessed(self): self.updateNumberOfObjectsToBeSynced() diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index a569e2d8..70418f8d 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -31,7 +31,7 @@ from helper_sql import sqlQuery from debug import logger import paths import protocol -from inventory import Inventory, Missing +from inventory import Inventory, PendingDownload, PendingUpload import state import throttle import tr @@ -129,7 +129,7 @@ class receiveDataThread(threading.Thread): except Exception as err: logger.error('Could not delete ' + str(self.hostIdent) + ' from shared.connectedHostsList.' + str(err)) - Missing().threadEnd() + PendingDownload().threadEnd() shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) self.checkTimeOffsetNotification() logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) @@ -219,7 +219,7 @@ class receiveDataThread(threading.Thread): if self.data == '': # if there are no more messages try: - self.sendgetdata(Missing().pull(100)) + self.sendgetdata(PendingDownload().pull(100)) except Queue.Full: pass self.processData() @@ -292,6 +292,9 @@ class receiveDataThread(threading.Thread): if self.initiatedConnection: state.networkProtocolAvailability[protocol.networkType(self.peer.host)] = True + # we need to send our own objects to this node + PendingUpload().add() + # Let all of our peers know about this new node. dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort) protocol.broadcastToSendDataQueues(( @@ -409,7 +412,7 @@ class receiveDataThread(threading.Thread): objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) for item in objectsNewToMe: - Missing().add(item) + PendingDownload().add(item) self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein # Send a getdata message to our peer to request the object with the given @@ -439,15 +442,15 @@ class receiveDataThread(threading.Thread): self.antiIntersectionDelay() else: if hash in Inventory(): - self.sendObject(Inventory()[hash].payload) + self.sendObject(hash, Inventory()[hash].payload) else: self.antiIntersectionDelay() logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,)) # Our peer has requested (in a getdata message) that we send an object. - def sendObject(self, payload): + def sendObject(self, hash, payload): logger.debug('sending an object.') - self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('object',payload))) + self.sendDataThreadQueue.put((0, 'sendRawData', (hash, protocol.CreatePacket('object',payload)))) def _checkIPAddress(self, host): if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 2ea03e22..79510f5a 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -12,6 +12,7 @@ from helper_generic import addDataPadding from class_objectHashHolder import * from addresses import * from debug import logger +from inventory import PendingUpload import protocol import state import throttle @@ -169,8 +170,12 @@ class sendDataThread(threading.Thread): logger.error('send pong failed') break elif command == 'sendRawData': + hash = None + if type(data) in [list, tuple]: + hash, data = data try: self.sendBytes(data) + PendingUpload().delete(hash) except: logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.', exc_info=True) break @@ -188,5 +193,6 @@ class sendDataThread(threading.Thread): except: pass state.sendDataQueues.remove(self.sendDataThreadQueue) + PendingUpload().threadEnd() logger.info('sendDataThread ending. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(state.sendDataQueues))) self.objectHashHolderInstance.close() diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index e124c7f5..ac36e0b7 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -18,7 +18,7 @@ import helper_inbox from helper_generic import addDataPadding import helper_msgcoding from helper_threading import * -from inventory import Inventory +from inventory import Inventory, PendingUpload import l10n import protocol import state @@ -176,6 +176,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime,'') + PendingUpload().add(inventoryHash) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -266,6 +267,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime,'') + PendingUpload().add(inventoryHash) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -356,6 +358,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:]) + PendingUpload().add(inventoryHash) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -487,6 +490,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 3 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, tag) + PendingUpload().add(inventoryHash) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) protocol.broadcastToSendDataQueues(( streamNumber, 'advertiseobject', inventoryHash)) @@ -813,6 +817,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 2 Inventory()[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, embeddedTime, '') + PendingUpload().add(inventoryHash) if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp())))) else: @@ -922,6 +927,7 @@ class singleWorker(threading.Thread, StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') + PendingUpload().add(inventoryHash) logger.info('sending inv (for the getpubkey message)') protocol.broadcastToSendDataQueues(( streamNumber, 'advertiseobject', inventoryHash)) diff --git a/src/helper_generic.py b/src/helper_generic.py index 9e2eb9ba..d710f429 100644 --- a/src/helper_generic.py +++ b/src/helper_generic.py @@ -19,16 +19,6 @@ def powQueueSize(): pass return curWorkerQueue -def invQueueSize(): - curInvQueue = 0 - for thread in enumerate(): - try: - if thread.name == "objectHashHolder": - curInvQueue += thread.hashCount() - except: - pass - return curInvQueue - def convertIntToString(n): a = __builtins__.hex(n) if a[-1:] == 'L': diff --git a/src/inventory.py b/src/inventory.py index 1036d07e..dfe35b8b 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -1,5 +1,6 @@ import collections -from threading import current_thread, RLock +import pprint +from threading import current_thread, enumerate as threadingEnumerate, RLock import time from helper_sql import * @@ -37,7 +38,7 @@ class Inventory(collections.MutableMapping): value = self.InventoryItem(*value) self._inventory[hash] = value self._streams[value.stream].add(hash) - Missing().delete(hash) + PendingDownload().delete(hash) def __delitem__(self, hash): raise NotImplementedError @@ -85,7 +86,8 @@ class Inventory(collections.MutableMapping): @Singleton -class Missing(object): +class PendingDownload(object): +# keep a track of objects that have been advertised to us but we haven't downloaded them yet def __init__(self): super(self.__class__, self).__init__() self.lock = RLock() @@ -94,7 +96,7 @@ class Missing(object): # don't request the same object more frequently than this self.frequency = 60 # after requesting and not receiving an object more than this times, consider it expired - self.maxRequestCount = 6 + self.maxRequestCount = 3 self.pending = {} def add(self, objectHash): @@ -207,3 +209,83 @@ class Missing(object): del self.pending[current_thread().peer] except KeyError: pass + + +class PendingUploadDeadlineException(Exception): + pass + + +@Singleton +class PendingUpload(object): +# keep a track of objects that we have created but haven't distributed yet + def __init__(self): + super(self.__class__, self).__init__() + self.lock = RLock() + self.hashes = {} + # end by this time in any case + self.deadline = 0 + self.maxLen = 0 + + def add(self, objectHash = None): + with self.lock: + # add a new object into existing thread lists + if objectHash: + if objectHash not in self.hashes: + self.hashes[objectHash] = [] + for thread in threadingEnumerate(): + if thread.isAlive() and hasattr(thread, 'peer') and \ + thread.peer not in self.hashes[objectHash]: + self.hashes[objectHash].append(thread.peer) + # add all objects into the current thread + else: + for hash in self.hashes: + if current_thread().peer not in self.hashes[hash]: + self.hashes[hash].append(current_thread().peer) + + + def len(self): + with self.lock: + return sum(len(self.hashes[x]) > 0 for x in self.hashes) + + def _progress(self): + with self.lock: + return float(sum(len(self.hashes[x]) for x in self.hashes)) + + def progress(self, throwDeadline=True): + if self.maxLen < self._progress(): + self.maxLen = self._progress() + if self.deadline < time.time(): + if self.deadline > 0 and throwDeadline: + raise PendingUploadDeadlineException + self.deadline = time.time() + 20 + try: + return 1.0 - self._progress() / self.maxLen + except ZeroDivisionError: + return 1.0 + + def delete(self, objectHash): + if not hasattr(current_thread(), 'peer'): + return + with self.lock: + if objectHash in self.hashes and current_thread().peer in self.hashes[objectHash]: + self.hashes[objectHash].remove(current_thread().peer) + if len(self.hashes[objectHash]) == 0: + del self.hashes[objectHash] + + def stop(self): + with self.lock: + self.hashes = {} + + def threadEnd(self): + while True: + try: + with self.lock: + for objectHash in self.hashes: + if current_thread().peer in self.hashes[objectHash]: + self.hashes[objectHash].remove(current_thread().peer) + if len(self.hashes[objectHash]) == 0: + del self.hashes[objectHash] + except (KeyError, RuntimeError): + pass + else: + break