Download tracking refactoring

- replace PendingDownload singleton dict with a Queue
- total memory and CPU requirements should be reduced
- get rid of somObjectsOfWhichThisRemoteNodeIsAlearedyAware. It has very
little practicle effect and only uses memory
This commit is contained in:
Peter Šurda 2017-03-19 22:08:00 +01:00
parent 13223887fc
commit 1af49a0165
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
8 changed files with 60 additions and 154 deletions

View File

@ -77,7 +77,7 @@ from class_objectHashHolder import objectHashHolder
from class_singleWorker import singleWorker from class_singleWorker import singleWorker
from dialogs import AddAddressDialog from dialogs import AddAddressDialog
from helper_generic import powQueueSize from helper_generic import powQueueSize
from inventory import PendingDownload, PendingUpload, PendingUploadDeadlineException from inventory import PendingDownloadQueue, PendingUpload, PendingUploadDeadlineException
import knownnodes import knownnodes
import paths import paths
from proofofwork import getPowType from proofofwork import getPowType
@ -2751,16 +2751,16 @@ class MyForm(settingsmixin.SMainWindow):
elif reply == QtGui.QMessage.Cancel: elif reply == QtGui.QMessage.Cancel:
return return
if PendingDownload().len() > 0: if PendingDownloadQueue.totalSize() > 0:
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"), reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"),
_translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len()), _translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize()),
QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel)
if reply == QtGui.QMessageBox.Yes: if reply == QtGui.QMessageBox.Yes:
waitForSync = True waitForSync = True
elif reply == QtGui.QMessageBox.Cancel: elif reply == QtGui.QMessageBox.Cancel:
return return
else: else:
PendingDownload().stop() PendingDownloadQueue.stop()
if shared.statusIconColor == 'red': if shared.statusIconColor == 'red':
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Not connected"), reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Not connected"),
@ -2788,7 +2788,7 @@ class MyForm(settingsmixin.SMainWindow):
if waitForSync: if waitForSync:
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
"MainWindow", "Waiting for finishing synchronisation...")) "MainWindow", "Waiting for finishing synchronisation..."))
while PendingDownload().len() > 0: while PendingDownloadQueue.totalSize() > 0:
time.sleep(0.5) time.sleep(0.5)
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000)

View File

@ -1,8 +1,9 @@
from PyQt4 import QtCore, QtGui from PyQt4 import QtCore, QtGui
import time import time
import shared import shared
from tr import _translate from tr import _translate
from inventory import Inventory, PendingDownload, PendingUpload from inventory import Inventory, PendingDownloadQueue, PendingUpload
import l10n import l10n
from retranslateui import RetranslateMixin from retranslateui import RetranslateMixin
from uisignaler import UISignaler from uisignaler import UISignaler
@ -45,7 +46,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
return "%4.0f kB" % num return "%4.0f kB" % num
def updateNumberOfObjectsToBeSynced(self): def updateNumberOfObjectsToBeSynced(self):
self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownload().len() + PendingUpload().len())) self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, PendingDownloadQueue.totalSize() + PendingUpload().len()))
def updateNumberOfMessagesProcessed(self): def updateNumberOfMessagesProcessed(self):
self.updateNumberOfObjectsToBeSynced() self.updateNumberOfObjectsToBeSynced()

View File

@ -185,12 +185,10 @@ class outgoingSynSender(threading.Thread, StoppableThread):
self.sock.shutdown(socket.SHUT_RDWR) self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close() self.sock.close()
return return
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
sd = sendDataThread(sendDataThreadQueue) sd = sendDataThread(sendDataThreadQueue)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber, sd.setup(self.sock, peer.host, peer.port, self.streamNumber)
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start() sd.start()
rd = receiveDataThread() rd = receiveDataThread()
@ -199,7 +197,6 @@ class outgoingSynSender(threading.Thread, StoppableThread):
peer.host, peer.host,
peer.port, peer.port,
self.streamNumber, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
self.selfInitiatedConnections, self.selfInitiatedConnections,
sendDataThreadQueue, sendDataThreadQueue,
sd.objectHashHolderInstance) sd.objectHashHolderInstance)

View File

@ -32,7 +32,7 @@ import knownnodes
from debug import logger from debug import logger
import paths import paths
import protocol import protocol
from inventory import Inventory, PendingDownload, PendingUpload from inventory import Inventory, PendingDownloadQueue, PendingUpload
import queues import queues
import state import state
import throttle import throttle
@ -56,7 +56,6 @@ class receiveDataThread(threading.Thread):
HOST, HOST,
port, port,
streamNumber, streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
selfInitiatedConnections, selfInitiatedConnections,
sendDataThreadQueue, sendDataThreadQueue,
objectHashHolderInstance): objectHashHolderInstance):
@ -79,8 +78,8 @@ class receiveDataThread(threading.Thread):
self.initiatedConnection = True self.initiatedConnection = True
for stream in self.streamNumber: for stream in self.streamNumber:
self.selfInitiatedConnections[stream][self] = 0 self.selfInitiatedConnections[stream][self] = 0
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
self.objectHashHolderInstance = objectHashHolderInstance self.objectHashHolderInstance = objectHashHolderInstance
self.downloadQueue = PendingDownloadQueue()
self.startTime = time.time() self.startTime = time.time()
def run(self): def run(self):
@ -147,7 +146,6 @@ class receiveDataThread(threading.Thread):
except Exception as err: except Exception as err:
logger.error('Could not delete ' + str(self.hostIdent) + ' from shared.connectedHostsList.' + str(err)) logger.error('Could not delete ' + str(self.hostIdent) + ' from shared.connectedHostsList.' + str(err))
PendingDownload().threadEnd()
queues.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) queues.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
self.checkTimeOffsetNotification() self.checkTimeOffsetNotification()
logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList))) logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
@ -240,10 +238,20 @@ class receiveDataThread(threading.Thread):
self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message
if self.data == '': # if there are no more messages if self.data == '': # if there are no more messages
toRequest = []
try: try:
self.sendgetdata(PendingDownload().pull(100)) for i in range(self.downloadQueue.pendingSize, 100):
except Queue.Full: while True:
hashId = self.downloadQueue.get(False)
if not hashId in Inventory():
toRequest.append(hashId)
break
# don't track download for duplicates
self.downloadQueue.task_done()
except Queue.Empty:
pass pass
if len(toRequest) > 0:
self.sendgetdata(toRequest)
self.processData() self.processData()
def sendpong(self, payload): def sendpong(self, payload):
@ -407,7 +415,7 @@ class receiveDataThread(threading.Thread):
bigInvList = {} bigInvList = {}
for stream in self.streamNumber: for stream in self.streamNumber:
for hash in Inventory().unexpired_hashes_by_stream(stream): for hash in Inventory().unexpired_hashes_by_stream(stream):
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash): if not self.objectHashHolderInstance.hasHash(hash):
bigInvList[hash] = 0 bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0 numberOfObjectsInInvMessage = 0
payload = '' payload = ''
@ -476,6 +484,7 @@ class receiveDataThread(threading.Thread):
def recobject(self, data): def recobject(self, data):
self.messageProcessingStartTime = time.time() self.messageProcessingStartTime = time.time()
lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data) lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data)
self.downloadQueue.task_done()
""" """
Sleeping will help guarantee that we can process messages faster than a Sleeping will help guarantee that we can process messages faster than a
@ -509,8 +518,7 @@ class receiveDataThread(threading.Thread):
objectsNewToMe -= Inventory().hashes_by_stream(stream) objectsNewToMe -= Inventory().hashes_by_stream(stream)
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe: for item in objectsNewToMe:
PendingDownload().add(item) self.downloadQueue.put(item)
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
# Send a getdata message to our peer to request the object with the given # Send a getdata message to our peer to request the object with the given
# hash # hash

View File

@ -39,8 +39,8 @@ class sendDataThread(threading.Thread):
sock, sock,
HOST, HOST,
PORT, PORT,
streamNumber, streamNumber
someObjectsOfWhichThisRemoteNodeIsAlreadyAware): ):
self.sock = sock self.sock = sock
self.peer = state.Peer(HOST, PORT) self.peer = state.Peer(HOST, PORT)
self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator
@ -52,7 +52,6 @@ class sendDataThread(threading.Thread):
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue. 1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
self.lastTimeISentData = int( self.lastTimeISentData = int(
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive. time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
if streamNumber == -1: # This was an incoming connection. if streamNumber == -1: # This was an incoming connection.
self.initiatedConnection = False self.initiatedConnection = False
else: else:
@ -165,7 +164,6 @@ class sendDataThread(threading.Thread):
if self.connectionIsOrWasFullyEstablished: # only send inv messages if we have send and heard a verack from the remote node if self.connectionIsOrWasFullyEstablished: # only send inv messages if we have send and heard a verack from the remote node
payload = '' payload = ''
for hash in data: for hash in data:
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
payload += hash payload += hash
if payload != '': if payload != '':
payload = encodeVarint(len(payload)/32) + payload payload = encodeVarint(len(payload)/32) + payload
@ -176,7 +174,6 @@ class sendDataThread(threading.Thread):
logger.error('sendinv: self.sock.sendall failed') logger.error('sendinv: self.sock.sendall failed')
break break
elif command == 'pong': elif command == 'pong':
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
if self.lastTimeISentData < (int(time.time()) - 298): if self.lastTimeISentData < (int(time.time()) - 298):
# Send out a pong message to keep the connection alive. # Send out a pong message to keep the connection alive.
logger.debug('Sending pong to ' + str(self.peer) + ' to keep connection alive.') logger.debug('Sending pong to ' + str(self.peer) + ' to keep connection alive.')

View File

@ -146,19 +146,18 @@ class singleListener(threading.Thread, StoppableThread):
else: else:
break break
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
socketObject.settimeout(20) socketObject.settimeout(20)
sd = sendDataThread(sendDataThreadQueue) sd = sendDataThread(sendDataThreadQueue)
sd.setup( sd.setup(
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware) socketObject, HOST, PORT, -1)
sd.start() sd.start()
rd = receiveDataThread() rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left rd.daemon = True # close the main program even if there are threads left
rd.setup( rd.setup(
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance) socketObject, HOST, PORT, -1, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance)
rd.start() rd.start()
logger.info('connected to ' + HOST + ' during INCOMING request.') logger.info('connected to ' + HOST + ' during INCOMING request.')

View File

@ -1,5 +1,6 @@
import collections import collections
from threading import current_thread, enumerate as threadingEnumerate, RLock from threading import current_thread, enumerate as threadingEnumerate, RLock
import Queue
import time import time
from helper_sql import * from helper_sql import *
@ -37,7 +38,6 @@ class Inventory(collections.MutableMapping):
value = self.InventoryItem(*value) value = self.InventoryItem(*value)
self._inventory[hash] = value self._inventory[hash] = value
self._streams[value.stream].add(hash) self._streams[value.stream].add(hash)
PendingDownload().delete(hash)
def __delitem__(self, hash): def __delitem__(self, hash):
raise NotImplementedError raise NotImplementedError
@ -84,131 +84,39 @@ class Inventory(collections.MutableMapping):
self._streams[value.stream].add(objectHash) self._streams[value.stream].add(objectHash)
@Singleton class PendingDownloadQueue(Queue.Queue):
class PendingDownload(object):
# keep a track of objects that have been advertised to us but we haven't downloaded them yet # keep a track of objects that have been advertised to us but we haven't downloaded them yet
def __init__(self): def __init__(self, maxsize=0):
super(self.__class__, self).__init__() Queue.Queue.__init__(self, maxsize)
self.lock = RLock()
self.hashes = {}
self.stopped = False self.stopped = False
# don't request the same object more frequently than this self.pendingSize = 0
self.frequency = 60
# after requesting and not receiving an object more than this times, consider it expired
self.maxRequestCount = 3
self.pending = {}
def add(self, objectHash): def task_done(self):
if self.stopped: Queue.Queue.task_done(self)
return if self.pendingSize > 0:
with self.lock: self.pendingSize -= 1
if objectHash not in self.hashes:
self.hashes[objectHash] = {'peers':[], 'requested':0, 'requestedCount':0}
self.hashes[objectHash]['peers'].append(current_thread().peer)
def addPending(self, objectHash=None): def get(self, block=True, timeout=None):
if self.stopped: retval = Queue.Queue.get(self, block, timeout)
return # no exception was raised
if current_thread().peer not in self.pending: if not self.stopped:
self.pending[current_thread().peer] = {'objects':[], 'requested':0, 'received':0} self.pendingSize += 1
if objectHash not in self.pending[current_thread().peer]['objects'] and not objectHash is None: return retval
self.pending[current_thread().peer]['objects'].append(objectHash)
self.pending[current_thread().peer]['requested'] = time.time()
def len(self): @staticmethod
with self.lock: def totalSize():
return sum(1 for x in self.hashes.values() if len(x) > 0) size = 0
for thread in threadingEnumerate():
if thread.isAlive() and hasattr(thread, 'downloadQueue'):
size += thread.downloadQueue.qsize() + thread.downloadQueue.pendingSize
return size
def pull(self, count=1): @staticmethod
if count < 1: def stop():
raise ValueError("Must be at least one") for thread in threadingEnumerate():
objectHashes = [] if thread.isAlive() and hasattr(thread, 'downloadQueue'):
unreachableObjects = [] thread.downloadQueue.stopped = True
if self.stopped: thread.downloadQueue.pendingSize = 0
return objectHashes
start = time.time()
try:
for objectHash in self.hashes.keys():
with self.lock:
if len(objectHashes) >= count:
break
if current_thread().peer not in self.pending:
self.addPending()
if (self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or \
self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \
len(self.pending[current_thread().peer]['objects']) >= count:
break
if len(self.hashes[objectHash]['peers']) == 0:
unreachableObjects.append(objectHash)
continue
# requested too long ago or not at all from any thread
if self.hashes[objectHash]['requested'] < time.time() - self.frequency:
# ready requested from this thread but haven't received yet
if objectHash in self.pending[current_thread().peer]['objects']:
# if still sending or receiving, request next
if self.pending[current_thread().peer]['received'] >= time.time() - self.frequency or \
self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency:
continue
# haven't requested or received anything recently, re-request (i.e. continue)
# the current node doesn't have the object
elif current_thread().peer not in self.hashes[objectHash]['peers']:
continue
# already requested too many times, remove all signs of this object
if self.hashes[objectHash]['requestedCount'] >= self.maxRequestCount:
del self.hashes[objectHash]
for thread in self.pending.keys():
if objectHash in self.pending[thread]['objects']:
self.pending[thread]['objects'].remove(objectHash)
continue
# all ok, request
objectHashes.append(objectHash)
self.hashes[objectHash]['requested'] = time.time()
self.hashes[objectHash]['requestedCount'] += 1
self.pending[current_thread().peer]['requested'] = time.time()
self.addPending(objectHash)
except (RuntimeError, KeyError, ValueError):
# the for cycle sometimes breaks if you remove elements
pass
for objectHash in unreachableObjects:
with self.lock:
if objectHash in self.hashes:
del self.hashes[objectHash]
# logger.debug("Pull took %.3f seconds", time.time() - start)
return objectHashes
def delete(self, objectHash):
with self.lock:
if objectHash in self.hashes:
del self.hashes[objectHash]
if hasattr(current_thread(), 'peer') and current_thread().peer in self.pending:
self.pending[current_thread().peer]['received'] = time.time()
for thread in self.pending.keys():
with self.lock:
if thread in self.pending and objectHash in self.pending[thread]['objects']:
self.pending[thread]['objects'].remove(objectHash)
def stop(self):
with self.lock:
self.hashes = {}
self.pending = {}
def threadEnd(self):
while True:
try:
with self.lock:
if current_thread().peer in self.pending:
for objectHash in self.pending[current_thread().peer]['objects']:
if objectHash in self.hashes:
self.hashes[objectHash]['peers'].remove(current_thread().peer)
except (KeyError):
pass
else:
break
with self.lock:
try:
del self.pending[current_thread().peer]
except KeyError:
pass
class PendingUploadDeadlineException(Exception): class PendingUploadDeadlineException(Exception):

View File

@ -22,7 +22,7 @@ from bmconfigparser import BMConfigParser
import highlevelcrypto import highlevelcrypto
#import helper_startup #import helper_startup
from helper_sql import * from helper_sql import *
from inventory import Inventory, PendingDownload from inventory import Inventory
from queues import objectProcessorQueue from queues import objectProcessorQueue
import protocol import protocol
import state import state
@ -342,22 +342,18 @@ def checkAndShareObjectWithPeers(data):
""" """
if len(data) > 2 ** 18: if len(data) > 2 ** 18:
logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(data)) logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(data))
PendingDownload().delete(calculateInventoryHash(data))
return 0 return 0
# Let us check to make sure that the proof of work is sufficient. # Let us check to make sure that the proof of work is sufficient.
if not protocol.isProofOfWorkSufficient(data): if not protocol.isProofOfWorkSufficient(data):
logger.info('Proof of work is insufficient.') logger.info('Proof of work is insufficient.')
PendingDownload().delete(calculateInventoryHash(data))
return 0 return 0
endOfLifeTime, = unpack('>Q', data[8:16]) endOfLifeTime, = unpack('>Q', data[8:16])
if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: # The TTL may not be larger than 28 days + 3 hours of wiggle room if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: # The TTL may not be larger than 28 days + 3 hours of wiggle room
logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % endOfLifeTime) logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % endOfLifeTime)
PendingDownload().delete(calculateInventoryHash(data))
return 0 return 0
if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much. if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much.
logger.info('This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s' % endOfLifeTime) logger.info('This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s' % endOfLifeTime)
PendingDownload().delete(calculateInventoryHash(data))
return 0 return 0
intObjectType, = unpack('>I', data[16:20]) intObjectType, = unpack('>I', data[16:20])
try: try: