Downloading fixes

- able to request more objects with one command
- fixes to logic and error handling
This commit is contained in:
Peter Šurda 2017-01-16 19:36:58 +01:00
parent 9f89df6d1c
commit d652dc864d
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
2 changed files with 60 additions and 56 deletions

View File

@ -9,6 +9,7 @@ import threading
import shared import shared
import hashlib import hashlib
import os import os
import Queue
import select import select
import socket import socket
import random import random
@ -217,15 +218,10 @@ class receiveDataThread(threading.Thread):
self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message
if self.data == '': # if there are no more messages if self.data == '': # if there are no more messages
for objectHash in Missing().pull(100): try:
if self.sendDataThreadQueue.full(): self.sendgetdata(Missing().pull(100))
break except Queue.full:
if objectHash in Inventory(): pass
logger.debug('Inventory already has object listed in inv message.')
Missing().delete(objectHash)
else:
# We don't have the object in our inventory. Let's request it.
self.sendgetdata(objectHash)
self.processData() self.processData()
@ -418,10 +414,10 @@ class receiveDataThread(threading.Thread):
# Send a getdata message to our peer to request the object with the given # Send a getdata message to our peer to request the object with the given
# hash # hash
def sendgetdata(self, hash): def sendgetdata(self, hashes):
logger.debug('sending getdata to retrieve object with hash: ' + hexlify(hash)) logger.debug('sending getdata to retrieve %i objects', len(hashes))
payload = '\x01' + hash payload = encodeVarint(len(hashes)) + ''.join(hashes)
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('getdata', payload))) self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('getdata', payload)), False)
# We have received a getdata request from our peer # We have received a getdata request from our peer

View File

@ -37,7 +37,7 @@ class Inventory(collections.MutableMapping):
value = self.InventoryItem(*value) value = self.InventoryItem(*value)
self._inventory[hash] = value self._inventory[hash] = value
self._streams[value.stream].add(hash) self._streams[value.stream].add(hash)
Missing().delete(hash, True) Missing().delete(hash)
def __delitem__(self, hash): def __delitem__(self, hash):
raise NotImplementedError raise NotImplementedError
@ -126,10 +126,8 @@ class Missing(object):
return return
except ValueError: except ValueError:
pass pass
if len(self.hashes[objectHash]['peers']) == 0 and self.hashes[objectHash]['requested'] < time.time() - self.frequency: if len(self.hashes[objectHash]['peers']) == 0:
self.delete(objectHash) del self.hashes[objectHash]
else:
self.hashes[objectHash]['requested'] = time.time()
def pull(self, count=1): def pull(self, count=1):
if count < 1: if count < 1:
@ -139,41 +137,40 @@ class Missing(object):
return objectHashes return objectHashes
try: try:
for objectHash in self.hashes.keys(): for objectHash in self.hashes.keys():
if len(objectHashes) >= count: with self.lock:
break if len(objectHashes) >= count:
if current_thread().peer not in self.pending: break
self.addPending() if current_thread().peer not in self.pending:
if (self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or \ self.addPending()
self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \ if (self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or \
len(self.pending[current_thread().peer]['objects']) >= count: self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \
break len(self.pending[current_thread().peer]['objects']) >= count:
# requested too long ago or not at all from any thread break
if self.hashes[objectHash]['requested'] < time.time() - self.frequency: # requested too long ago or not at all from any thread
# ready requested from this thread but haven't received yet if self.hashes[objectHash]['requested'] < time.time() - self.frequency:
if objectHash in self.pending[current_thread().peer]['objects']: # ready requested from this thread but haven't received yet
# if still sending or receiving, request next if objectHash in self.pending[current_thread().peer]['objects']:
if self.pending[current_thread().peer]['received'] >= time.time() - self.frequency or \ # if still sending or receiving, request next
self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency: if self.pending[current_thread().peer]['received'] >= time.time() - self.frequency or \
self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency:
continue
# haven't requested or received anything recently, re-request (i.e. continue)
# the current node doesn't have the object
elif current_thread().peer not in self.hashes[objectHash]['peers']:
continue continue
# haven't requested or received anything recently, re-request (i.e. continue) # already requested too many times, remove all signs of this object
# the current node doesn't have the object if self.hashes[objectHash]['requestedCount'] >= self.maxRequestCount:
elif current_thread().peer not in self.hashes[objectHash]['peers']:
continue
# already requested too many times, remove all signs of this object
if self.hashes[objectHash]['requestedCount'] >= self.maxRequestCount:
with self.lock:
del self.hashes[objectHash] del self.hashes[objectHash]
for thread in self.pending.keys(): for thread in self.pending.keys():
if objectHash in self.pending[thread]['objects']: if objectHash in self.pending[thread]['objects']:
self.pending[thread]['objects'].remove(objectHash) self.pending[thread]['objects'].remove(objectHash)
continue continue
# all ok, request # all ok, request
objectHashes.append(objectHash) objectHashes.append(objectHash)
self.hashes[objectHash]['requested'] = time.time() self.hashes[objectHash]['requested'] = time.time()
with self.lock:
self.hashes[objectHash]['requestedCount'] += 1 self.hashes[objectHash]['requestedCount'] += 1
self.pending[current_thread().peer]['requested'] = time.time() self.pending[current_thread().peer]['requested'] = time.time()
self.addPending(objectHash) self.addPending(objectHash)
except (RuntimeError, KeyError, ValueError): except (RuntimeError, KeyError, ValueError):
# the for cycle sometimes breaks if you remove elements # the for cycle sometimes breaks if you remove elements
pass pass
@ -183,10 +180,18 @@ class Missing(object):
with self.lock: with self.lock:
if objectHash in self.hashes: if objectHash in self.hashes:
del self.hashes[objectHash] del self.hashes[objectHash]
if objectHash in self.pending[current_thread().peer]['objects']: self.pending[current_thread().peer]['received'] = time.time()
self.pending[current_thread().peer]['objects'].remove(objectHash) while True:
if justReceived: try:
self.pending[current_thread().peer]['received'] = time.time() for thread in self.pending.keys():
with self.lock:
if objectHash in self.pending[thread]['objects']:
self.pending[thread]['objects'].remove(objectHash)
except (KeyError, RuntimeError):
pass
else:
break
def stop(self): def stop(self):
with self.lock: with self.lock:
@ -194,10 +199,13 @@ class Missing(object):
self.pending = {} self.pending = {}
def threadEnd(self): def threadEnd(self):
with self.lock: while True:
for objectHash in self.hashes:
self.removeObjectFromCurrentThread(objectHash)
try: try:
del self.pending[current_thread().peer] for objectHash in self.hashes:
except KeyError: self.removeObjectFromCurrentThread(objectHash)
with self.lock:
del self.pending[current_thread().peer]
except (KeyError, RuntimeError):
pass pass
else:
break