diff --git a/src/network/addrthread.py b/src/network/addrthread.py index a6c401ab..e0b31b6e 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -21,8 +21,12 @@ class AddrThread(threading.Thread, StoppableThread): try: data = addrQueue.get(False) chunk.append((data[0], data[1])) + if len(data) > 2: + source = BMConnectionPool().getConnectionByAddr(data[2]) except Queue.Empty: break + except KeyError: + continue #finish diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 837c4ce8..78149726 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -290,7 +290,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): Inventory()[self.object.inventoryHash] = ( self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag) - invQueue.put((self.object.streamNumber, self.object.inventoryHash, self)) + invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination)) return True def _decode_addr(self): @@ -317,7 +317,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): "rating": 0, "self": False, } - addrQueue.put((stream, peer, self)) + addrQueue.put((stream, peer, self.destination)) return True def bm_command_portcheck(self): diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 4c2b4c6c..ba5481da 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -54,6 +54,17 @@ class BMConnectionPool(object): def connectToStream(self, streamNumber): self.streams.append(streamNumber) + def getConnectionByAddr(self, addr): + if addr in self.inboundConnections: + return self.inboundConnections[addr] + if addr.host in self.inboundConnections: + return self.inboundConnections[addr.host] + if addr in self.outboundConnections: + return self.outboundConnections[addr] + if addr in self.udpSockets: + return self.udpSockets[addr] + raise KeyError + def addConnection(self, connection): if isinstance(connection, network.udp.UDPSocket): return diff --git a/src/network/invthread.py b/src/network/invthread.py index e5ab890a..0cc689b4 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -23,10 +23,14 @@ class InvThread(threading.Thread, StoppableThread): if len(data) == 2: BMConnectionPool().handleReceivedObject(data[0], data[1]) else: - BMConnectionPool().handleReceivedObject(data[0], data[1], data[2]) + source = BMConnectionPool().getConnectionByAddr(data[2]) + BMConnectionPool().handleReceivedObject(data[0], data[1], source) chunk.append((data[0], data[1])) except Queue.Empty: break + # connection not found, handle it as if generated locally + except KeyError: + BMConnectionPool().handleReceivedObject(data[0], data[1]) if chunk: for connection in BMConnectionPool().inboundConnections.values() + \ diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 6a2ebb65..b6810a3c 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -39,21 +39,8 @@ class ReceiveQueueThread(threading.Thread, StoppableThread): # or the connection is to be aborted try: - BMConnectionPool().inboundConnections[dest].process() - except KeyError: - try: - BMConnectionPool().inboundConnections[dest.host].process() - except KeyError: - pass - except AttributeError: - pass - - try: - BMConnectionPool().outboundConnections[dest].process() - except (KeyError, AttributeError): - pass - - try: - BMConnectionPool().udpSockets[dest].process() + BMConnectionPool().getConnectionByAddr(dest).process() + # KeyError = connection object not found + # AttributeError = state isn't implemented except (KeyError, AttributeError): pass