Objects to be downloaded fixes

- tries to avoid calling senddata it it would block receiveDataThread,
  allowing fore more asynchronous operation
- request objects in chunks of 100 (CPU performance optimisation)
This commit is contained in:
Peter Šurda 2017-01-15 19:50:28 +01:00
parent f079ff5b99
commit dbe15d0b99
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
2 changed files with 27 additions and 21 deletions

View File

@ -217,16 +217,16 @@ class receiveDataThread(threading.Thread):
self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message
if self.data == '': # if there are no more messages
while Missing().len() > 0 and not self.sendDataThreadQueue.full():
objectHash = Missing().pull()
if objectHash is None:
break
if objectHash in Inventory():
logger.debug('Inventory already has object listed in inv message.')
Missing().delete(objectHash)
else:
# We don't have the object in our inventory. Let's request it.
self.sendgetdata(objectHash)
if Missing().len() > 0 and self.sendDataThreadQueue.qsize() < 10:
for objectHash in Missing().pull(100):
if self.sendDataThreadQueue.full():
break
if objectHash in Inventory():
logger.debug('Inventory already has object listed in inv message.')
Missing().delete(objectHash)
else:
# We don't have the object in our inventory. Let's request it.
self.sendgetdata(objectHash)
self.processData()

View File

@ -101,23 +101,29 @@ class Missing(object):
with self.lock:
return len(self.hashes)
def pull(self):
def pull(self, count=1):
if count < 1:
raise ValueError("Must be at least one")
with self.lock:
now = time.time()
since = now - 300 # once every 5 minutes
try:
objectHash = random.choice({k:v for k, v in self.hashes.iteritems() if current_thread().peer in self.hashes[k]['peers'] and self.hashes[k]['requested'] < since}.keys())
matchingHashes = {k:v for k, v in self.hashes.iteritems() if current_thread().peer in self.hashes[k]['peers'] and self.hashes[k]['requested'] < since}
if count > len(matchingHashes):
count = len(matchingHashes)
objectHashes = random.sample(matchingHashes, count)
except (IndexError, KeyError): # list is empty
return None
try:
self.hashes[objectHash]['peers'].remove(current_thread().peer)
except ValueError:
pass
if len(self.hashes[objectHash]['peers']) == 0:
self.delete(objectHash)
else:
self.hashes[objectHash]['requested'] = now
return objectHash
for objectHash in objectHashes:
try:
self.hashes[objectHash]['peers'].remove(current_thread().peer)
except ValueError:
pass
if len(self.hashes[objectHash]['peers']) == 0:
self.delete(objectHash)
else:
self.hashes[objectHash]['requested'] = now
return objectHashes
def delete(self, objectHash):
with self.lock: