Remove obsolete code

- PendingDownload and PendingUpload in inventory.py are obsolete
- the replacement functions only provide lengths and are in
  network/stats.py
This commit is contained in:
Peter Šurda 2018-03-13 07:32:23 +01:00
parent 7938eab454
commit 1e4400a207
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
5 changed files with 22 additions and 194 deletions

View File

@ -47,9 +47,7 @@ from account import (
GatewayAccount, MailchuckAccount, AccountColor) GatewayAccount, MailchuckAccount, AccountColor)
import dialogs import dialogs
from helper_generic import powQueueSize from helper_generic import powQueueSize
from inventory import ( from network.stats import pendingDownload, pendingUpload
PendingDownloadQueue, PendingUpload,
PendingUploadDeadlineException)
from uisignaler import UISignaler from uisignaler import UISignaler
import knownnodes import knownnodes
import paths import paths
@ -2701,10 +2699,10 @@ class MyForm(settingsmixin.SMainWindow):
waitForSync = False waitForSync = False
# C PoW currently doesn't support interrupting and OpenCL is untested # C PoW currently doesn't support interrupting and OpenCL is untested
if getPowType() == "python" and (powQueueSize() > 0 or PendingUpload().len() > 0): if getPowType() == "python" and (powQueueSize() > 0 or pendingUpload() > 0):
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"), reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"),
_translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " + _translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " +
_translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, PendingUpload().len()) + "\n\n" + _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, pendingUpload()) + "\n\n" +
_translate("MainWindow", "Wait until these tasks finish?"), _translate("MainWindow", "Wait until these tasks finish?"),
QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel)
if reply == QtGui.QMessageBox.No: if reply == QtGui.QMessageBox.No:
@ -2712,16 +2710,14 @@ class MyForm(settingsmixin.SMainWindow):
elif reply == QtGui.QMessageBox.Cancel: elif reply == QtGui.QMessageBox.Cancel:
return return
if PendingDownloadQueue.totalSize() > 0: if pendingDownload() > 0:
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"), reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"),
_translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize()), _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, pendingDownload()),
QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel)
if reply == QtGui.QMessageBox.Yes: if reply == QtGui.QMessageBox.Yes:
waitForSync = True waitForSync = True
elif reply == QtGui.QMessageBox.Cancel: elif reply == QtGui.QMessageBox.Cancel:
return return
else:
PendingDownloadQueue.stop()
if shared.statusIconColor == 'red' and not BMConfigParser().safeGetBoolean( if shared.statusIconColor == 'red' and not BMConfigParser().safeGetBoolean(
'bitmessagesettings', 'dontconnect'): 'bitmessagesettings', 'dontconnect'):
@ -2752,7 +2748,7 @@ class MyForm(settingsmixin.SMainWindow):
if waitForSync: if waitForSync:
self.updateStatusBar(_translate( self.updateStatusBar(_translate(
"MainWindow", "Waiting for finishing synchronisation...")) "MainWindow", "Waiting for finishing synchronisation..."))
while PendingDownloadQueue.totalSize() > 0: while pendingDownload() > 0:
time.sleep(0.5) time.sleep(0.5)
QtCore.QCoreApplication.processEvents( QtCore.QCoreApplication.processEvents(
QtCore.QEventLoop.AllEvents, 1000 QtCore.QEventLoop.AllEvents, 1000
@ -2794,19 +2790,18 @@ class MyForm(settingsmixin.SMainWindow):
# check if upload (of objects created locally) pending # check if upload (of objects created locally) pending
self.updateStatusBar(_translate( self.updateStatusBar(_translate(
"MainWindow", "Waiting for objects to be sent... %1%").arg(50)) "MainWindow", "Waiting for objects to be sent... %1%").arg(50))
try: maxPendingUpload = max(1, pendingUpload())
while PendingUpload().progress() < 1:
while pendingUpload() > 1:
self.updateStatusBar(_translate( self.updateStatusBar(_translate(
"MainWindow", "MainWindow",
"Waiting for objects to be sent... %1%" "Waiting for objects to be sent... %1%"
).arg(int(50 + 20 * PendingUpload().progress())) ).arg(int(50 + 20 * (pendingUpload()/maxPendingUpload)))
) )
time.sleep(0.5) time.sleep(0.5)
QtCore.QCoreApplication.processEvents( QtCore.QCoreApplication.processEvents(
QtCore.QEventLoop.AllEvents, 1000 QtCore.QEventLoop.AllEvents, 1000
) )
except PendingUploadDeadlineException:
pass
QtCore.QCoreApplication.processEvents( QtCore.QCoreApplication.processEvents(
QtCore.QEventLoop.AllEvents, 1000 QtCore.QEventLoop.AllEvents, 1000

View File

@ -3,7 +3,7 @@ import time
import shared import shared
from tr import _translate from tr import _translate
from inventory import Inventory, PendingDownloadQueue, PendingUpload from inventory import Inventory
import knownnodes import knownnodes
import l10n import l10n
import network.stats import network.stats

View File

@ -19,7 +19,7 @@ import helper_inbox
from helper_generic import addDataPadding from helper_generic import addDataPadding
import helper_msgcoding import helper_msgcoding
from helper_threading import * from helper_threading import *
from inventory import Inventory, PendingUpload from inventory import Inventory
import l10n import l10n
import protocol import protocol
import queues import queues
@ -199,7 +199,6 @@ class singleWorker(threading.Thread, StoppableThread):
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime,'') objectType, streamNumber, payload, embeddedTime,'')
PendingUpload().add(inventoryHash)
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
@ -289,7 +288,6 @@ class singleWorker(threading.Thread, StoppableThread):
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime,'') objectType, streamNumber, payload, embeddedTime,'')
PendingUpload().add(inventoryHash)
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
@ -379,7 +377,6 @@ class singleWorker(threading.Thread, StoppableThread):
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:]) objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:])
PendingUpload().add(inventoryHash)
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
@ -510,7 +507,6 @@ class singleWorker(threading.Thread, StoppableThread):
objectType = 3 objectType = 3
Inventory()[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, tag) objectType, streamNumber, payload, embeddedTime, tag)
PendingUpload().add(inventoryHash)
logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) queues.invQueue.put((streamNumber, inventoryHash))
@ -834,7 +830,6 @@ class singleWorker(threading.Thread, StoppableThread):
objectType = 2 objectType = 2
Inventory()[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, embeddedTime, '') objectType, toStreamNumber, encryptedPayload, embeddedTime, '')
PendingUpload().add(inventoryHash)
if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): if BMConfigParser().has_section(toaddress) or not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK):
queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp())))) queues.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp()))))
else: else:
@ -941,7 +936,6 @@ class singleWorker(threading.Thread, StoppableThread):
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '') objectType, streamNumber, payload, embeddedTime, '')
PendingUpload().add(inventoryHash)
logger.info('sending inv (for the getpubkey message)') logger.info('sending inv (for the getpubkey message)')
queues.invQueue.put((streamNumber, inventoryHash)) queues.invQueue.put((streamNumber, inventoryHash))

View File

@ -32,164 +32,3 @@ class Inventory():
raise AttributeError("%s instance has no attribute '%s'" %(self.__class__.__name__, attr)) raise AttributeError("%s instance has no attribute '%s'" %(self.__class__.__name__, attr))
else: else:
return realRet return realRet
class PendingDownloadQueue(Queue.Queue):
# keep a track of objects that have been advertised to us but we haven't downloaded them yet
maxWait = 300
def __init__(self, maxsize=0):
Queue.Queue.__init__(self, maxsize)
self.stopped = False
self.pending = {}
self.lock = RLock()
def task_done(self, hashId):
Queue.Queue.task_done(self)
try:
with self.lock:
del self.pending[hashId]
except KeyError:
pass
def get(self, block=True, timeout=None):
retval = Queue.Queue.get(self, block, timeout)
# no exception was raised
if not self.stopped:
with self.lock:
self.pending[retval] = time.time()
return retval
def clear(self):
with self.lock:
newPending = {}
for hashId in self.pending:
if self.pending[hashId] + PendingDownloadQueue.maxWait > time.time():
newPending[hashId] = self.pending[hashId]
self.pending = newPending
@staticmethod
def totalSize():
size = 0
for thread in threadingEnumerate():
if thread.isAlive() and hasattr(thread, 'downloadQueue'):
size += thread.downloadQueue.qsize() + len(thread.downloadQueue.pending)
return size
@staticmethod
def stop():
for thread in threadingEnumerate():
if thread.isAlive() and hasattr(thread, 'downloadQueue'):
thread.downloadQueue.stopped = True
with thread.downloadQueue.lock:
thread.downloadQueue.pending = {}
class PendingUploadDeadlineException(Exception):
pass
@Singleton
class PendingUpload(object):
# keep a track of objects that we have created but haven't distributed yet
def __init__(self):
super(self.__class__, self).__init__()
self.lock = RLock()
self.hashes = {}
# end by this time in any case
self.deadline = 0
self.maxLen = 0
# during shutdown, wait up to 20 seconds to finish uploading
self.shutdownWait = 20
# forget tracking objects after 60 seconds
self.objectWait = 60
# wait 10 seconds between clears
self.clearDelay = 10
self.lastCleared = time.time()
def add(self, objectHash = None):
with self.lock:
# add a new object into existing thread lists
if objectHash:
if objectHash not in self.hashes:
self.hashes[objectHash] = {'created': time.time(), 'sendCount': 0, 'peers': []}
for thread in threadingEnumerate():
if thread.isAlive() and hasattr(thread, 'peer') and \
thread.peer not in self.hashes[objectHash]['peers']:
self.hashes[objectHash]['peers'].append(thread.peer)
# add all objects into the current thread
else:
for objectHash in self.hashes:
if current_thread().peer not in self.hashes[objectHash]['peers']:
self.hashes[objectHash]['peers'].append(current_thread().peer)
def len(self):
self.clearHashes()
with self.lock:
return sum(1
for x in self.hashes if (self.hashes[x]['created'] + self.objectWait < time.time() or
self.hashes[x]['sendCount'] == 0))
def _progress(self):
with self.lock:
return float(sum(len(self.hashes[x]['peers'])
for x in self.hashes if (self.hashes[x]['created'] + self.objectWait < time.time()) or
self.hashes[x]['sendCount'] == 0))
def progress(self, raiseDeadline=True):
if self.maxLen < self._progress():
self.maxLen = self._progress()
if self.deadline < time.time():
if self.deadline > 0 and raiseDeadline:
raise PendingUploadDeadlineException
self.deadline = time.time() + 20
try:
return 1.0 - self._progress() / self.maxLen
except ZeroDivisionError:
return 1.0
def clearHashes(self, objectHash=None):
if objectHash is None:
if self.lastCleared > time.time() - self.clearDelay:
return
objects = self.hashes.keys()
else:
objects = objectHash,
with self.lock:
for i in objects:
try:
if self.hashes[i]['sendCount'] > 0 and (
len(self.hashes[i]['peers']) == 0 or
self.hashes[i]['created'] + self.objectWait < time.time()):
del self.hashes[i]
except KeyError:
pass
self.lastCleared = time.time()
def delete(self, objectHash=None):
if not hasattr(current_thread(), 'peer'):
return
if objectHash is None:
return
with self.lock:
try:
if objectHash in self.hashes and current_thread().peer in self.hashes[objectHash]['peers']:
self.hashes[objectHash]['sendCount'] += 1
self.hashes[objectHash]['peers'].remove(current_thread().peer)
except KeyError:
pass
self.clearHashes(objectHash)
def stop(self):
with self.lock:
self.hashes = {}
def threadEnd(self):
with self.lock:
for objectHash in self.hashes:
try:
if current_thread().peer in self.hashes[objectHash]['peers']:
self.hashes[objectHash]['peers'].remove(current_thread().peer)
except KeyError:
pass
self.clearHashes()

View File

@ -4,6 +4,7 @@ from threading import RLock
from inventory import Inventory from inventory import Inventory
import network.connectionpool import network.connectionpool
from network.dandelion import Dandelion from network.dandelion import Dandelion
from network.stats import pendingDownload
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
from state import missingObjects from state import missingObjects
@ -54,8 +55,7 @@ class ObjectTracker(object):
def clean(self): def clean(self):
if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod: if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
if haveBloom: if haveBloom:
# FIXME if pendingDownload() == 0:
if PendingDownloadQueue().size() == 0:
self.initInvBloom() self.initInvBloom()
self.initAddrBloom() self.initAddrBloom()
else: else: