diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 622c71ca..304068fb 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -9,6 +9,7 @@ import threading import shared import hashlib import os +import Queue import select import socket import random @@ -217,15 +218,10 @@ class receiveDataThread(threading.Thread): self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message if self.data == '': # if there are no more messages - for objectHash in Missing().pull(100): - if self.sendDataThreadQueue.full(): - break - if objectHash in Inventory(): - logger.debug('Inventory already has object listed in inv message.') - Missing().delete(objectHash) - else: - # We don't have the object in our inventory. Let's request it. - self.sendgetdata(objectHash) + try: + self.sendgetdata(Missing().pull(100)) + except Queue.full: + pass self.processData() @@ -418,10 +414,10 @@ class receiveDataThread(threading.Thread): # Send a getdata message to our peer to request the object with the given # hash - def sendgetdata(self, hash): - logger.debug('sending getdata to retrieve object with hash: ' + hexlify(hash)) - payload = '\x01' + hash - self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('getdata', payload))) + def sendgetdata(self, hashes): + logger.debug('sending getdata to retrieve %i objects', len(hashes)) + payload = encodeVarint(len(hashes)) + ''.join(hashes) + self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('getdata', payload)), False) # We have received a getdata request from our peer diff --git a/src/inventory.py b/src/inventory.py index 8a747d4b..e10c822e 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -37,7 +37,7 @@ class Inventory(collections.MutableMapping): value = self.InventoryItem(*value) self._inventory[hash] = value self._streams[value.stream].add(hash) - Missing().delete(hash, True) + Missing().delete(hash) def __delitem__(self, hash): raise NotImplementedError @@ -126,10 +126,8 @@ class Missing(object): return except ValueError: pass - if len(self.hashes[objectHash]['peers']) == 0 and self.hashes[objectHash]['requested'] < time.time() - self.frequency: - self.delete(objectHash) - else: - self.hashes[objectHash]['requested'] = time.time() + if len(self.hashes[objectHash]['peers']) == 0: + del self.hashes[objectHash] def pull(self, count=1): if count < 1: @@ -139,41 +137,40 @@ class Missing(object): return objectHashes try: for objectHash in self.hashes.keys(): - if len(objectHashes) >= count: - break - if current_thread().peer not in self.pending: - self.addPending() - if (self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or \ - self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \ - len(self.pending[current_thread().peer]['objects']) >= count: - break - # requested too long ago or not at all from any thread - if self.hashes[objectHash]['requested'] < time.time() - self.frequency: - # ready requested from this thread but haven't received yet - if objectHash in self.pending[current_thread().peer]['objects']: - # if still sending or receiving, request next - if self.pending[current_thread().peer]['received'] >= time.time() - self.frequency or \ - self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency: + with self.lock: + if len(objectHashes) >= count: + break + if current_thread().peer not in self.pending: + self.addPending() + if (self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency or \ + self.pending[current_thread().peer]['received'] >= time.time() - self.frequency) and \ + len(self.pending[current_thread().peer]['objects']) >= count: + break + # requested too long ago or not at all from any thread + if self.hashes[objectHash]['requested'] < time.time() - self.frequency: + # ready requested from this thread but haven't received yet + if objectHash in self.pending[current_thread().peer]['objects']: + # if still sending or receiving, request next + if self.pending[current_thread().peer]['received'] >= time.time() - self.frequency or \ + self.pending[current_thread().peer]['requested'] >= time.time() - self.frequency: + continue + # haven't requested or received anything recently, re-request (i.e. continue) + # the current node doesn't have the object + elif current_thread().peer not in self.hashes[objectHash]['peers']: continue - # haven't requested or received anything recently, re-request (i.e. continue) - # the current node doesn't have the object - elif current_thread().peer not in self.hashes[objectHash]['peers']: - continue - # already requested too many times, remove all signs of this object - if self.hashes[objectHash]['requestedCount'] >= self.maxRequestCount: - with self.lock: + # already requested too many times, remove all signs of this object + if self.hashes[objectHash]['requestedCount'] >= self.maxRequestCount: del self.hashes[objectHash] for thread in self.pending.keys(): if objectHash in self.pending[thread]['objects']: self.pending[thread]['objects'].remove(objectHash) - continue - # all ok, request - objectHashes.append(objectHash) - self.hashes[objectHash]['requested'] = time.time() - with self.lock: + continue + # all ok, request + objectHashes.append(objectHash) + self.hashes[objectHash]['requested'] = time.time() self.hashes[objectHash]['requestedCount'] += 1 - self.pending[current_thread().peer]['requested'] = time.time() - self.addPending(objectHash) + self.pending[current_thread().peer]['requested'] = time.time() + self.addPending(objectHash) except (RuntimeError, KeyError, ValueError): # the for cycle sometimes breaks if you remove elements pass @@ -183,10 +180,18 @@ class Missing(object): with self.lock: if objectHash in self.hashes: del self.hashes[objectHash] - if objectHash in self.pending[current_thread().peer]['objects']: - self.pending[current_thread().peer]['objects'].remove(objectHash) - if justReceived: - self.pending[current_thread().peer]['received'] = time.time() + self.pending[current_thread().peer]['received'] = time.time() + while True: + try: + for thread in self.pending.keys(): + with self.lock: + if objectHash in self.pending[thread]['objects']: + self.pending[thread]['objects'].remove(objectHash) + except (KeyError, RuntimeError): + pass + else: + break + def stop(self): with self.lock: @@ -194,10 +199,13 @@ class Missing(object): self.pending = {} def threadEnd(self): - with self.lock: - for objectHash in self.hashes: - self.removeObjectFromCurrentThread(objectHash) + while True: try: - del self.pending[current_thread().peer] - except KeyError: + for objectHash in self.hashes: + self.removeObjectFromCurrentThread(objectHash) + with self.lock: + del self.pending[current_thread().peer] + except (KeyError, RuntimeError): pass + else: + break