PendingDownloadQueue updates

- track pending hashId more accurately
- add timeout and a cleanup so that the download queues don't
get stuck and memory is freed
- randomise download order (only works for inv commands with
more than 1 entry)
This commit is contained in:
Peter Šurda 2017-03-20 01:22:37 +01:00
parent 9a5f7442a0
commit 913b401dd0
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
3 changed files with 32 additions and 11 deletions

View File

@ -240,14 +240,14 @@ class receiveDataThread(threading.Thread):
if self.data == '': # if there are no more messages
toRequest = []
try:
for i in range(self.downloadQueue.pendingSize, 100):
for i in range(len(self.downloadQueue.pending), 100):
while True:
hashId = self.downloadQueue.get(False)
if not hashId in Inventory():
toRequest.append(hashId)
break
# don't track download for duplicates
self.downloadQueue.task_done()
self.downloadQueue.task_done(hashId)
except Queue.Empty:
pass
if len(toRequest) > 0:
@ -484,7 +484,7 @@ class receiveDataThread(threading.Thread):
def recobject(self, data):
self.messageProcessingStartTime = time.time()
lengthOfTimeWeShouldUseToProcessThisMessage = shared.checkAndShareObjectWithPeers(data)
self.downloadQueue.task_done()
self.downloadQueue.task_done(calculateInventoryHash(data))
"""
Sleeping will help guarantee that we can process messages faster than a
@ -517,7 +517,7 @@ class receiveDataThread(threading.Thread):
for stream in self.streamNumber:
objectsNewToMe -= Inventory().hashes_by_stream(stream)
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe:
for item in random.sample(objectsNewToMe, len(objectsNewToMe)):
self.downloadQueue.put(item)
# Send a getdata message to our peer to request the object with the given

View File

@ -108,6 +108,11 @@ class singleCleaner(threading.Thread, StoppableThread):
os._exit(0)
shared.needToWriteKnownNodesToDisk = False
# clear download queues
for thread in threading.enumerate():
if thread.isAlive() and hasattr(thread, 'downloadQueue'):
thread.downloadQueue.clear()
# TODO: cleanup pending upload / download
if state.shutdown == 0:

View File

@ -86,29 +86,44 @@ class Inventory(collections.MutableMapping):
class PendingDownloadQueue(Queue.Queue):
# keep a track of objects that have been advertised to us but we haven't downloaded them yet
maxWait = 300
def __init__(self, maxsize=0):
Queue.Queue.__init__(self, maxsize)
self.stopped = False
self.pendingSize = 0
self.pending = {}
self.lock = RLock()
def task_done(self):
def task_done(self, hashId):
Queue.Queue.task_done(self)
if self.pendingSize > 0:
self.pendingSize -= 1
try:
with self.lock:
del self.pending[hashId]
except KeyError:
pass
def get(self, block=True, timeout=None):
retval = Queue.Queue.get(self, block, timeout)
# no exception was raised
if not self.stopped:
self.pendingSize += 1
with self.lock:
self.pending[retval] = time.time()
return retval
def clear(self):
with self.lock:
newPending = {}
for hashId in self.pending:
if self.pending[hashId] + PendingDownloadQueue.maxWait > time.time():
newPending[hashId] = self.pending[hashId]
self.pending = newPending
@staticmethod
def totalSize():
size = 0
for thread in threadingEnumerate():
if thread.isAlive() and hasattr(thread, 'downloadQueue'):
size += thread.downloadQueue.qsize() + thread.downloadQueue.pendingSize
size += thread.downloadQueue.qsize() + len(thread.downloadQueue.pending)
return size
@staticmethod
@ -116,7 +131,8 @@ class PendingDownloadQueue(Queue.Queue):
for thread in threadingEnumerate():
if thread.isAlive() and hasattr(thread, 'downloadQueue'):
thread.downloadQueue.stopped = True
thread.downloadQueue.pendingSize = 0
with thread.downloadQueue.lock:
thread.downloadQueue.pending = {}
class PendingUploadDeadlineException(Exception):