Less data transferred in invThread and addrThread

This commit is contained in:
Peter Šurda 2017-07-08 18:02:47 +02:00
parent 2df9598774
commit 4fce01e34a
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
5 changed files with 25 additions and 19 deletions

View File

@ -21,8 +21,12 @@ class AddrThread(threading.Thread, StoppableThread):
try: try:
data = addrQueue.get(False) data = addrQueue.get(False)
chunk.append((data[0], data[1])) chunk.append((data[0], data[1]))
if len(data) > 2:
source = BMConnectionPool().getConnectionByAddr(data[2])
except Queue.Empty: except Queue.Empty:
break break
except KeyError:
continue
#finish #finish

View File

@ -290,7 +290,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
Inventory()[self.object.inventoryHash] = ( Inventory()[self.object.inventoryHash] = (
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag) self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self)) invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination))
return True return True
def _decode_addr(self): def _decode_addr(self):
@ -317,7 +317,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
"rating": 0, "rating": 0,
"self": False, "self": False,
} }
addrQueue.put((stream, peer, self)) addrQueue.put((stream, peer, self.destination))
return True return True
def bm_command_portcheck(self): def bm_command_portcheck(self):

View File

@ -54,6 +54,17 @@ class BMConnectionPool(object):
def connectToStream(self, streamNumber): def connectToStream(self, streamNumber):
self.streams.append(streamNumber) self.streams.append(streamNumber)
def getConnectionByAddr(self, addr):
if addr in self.inboundConnections:
return self.inboundConnections[addr]
if addr.host in self.inboundConnections:
return self.inboundConnections[addr.host]
if addr in self.outboundConnections:
return self.outboundConnections[addr]
if addr in self.udpSockets:
return self.udpSockets[addr]
raise KeyError
def addConnection(self, connection): def addConnection(self, connection):
if isinstance(connection, network.udp.UDPSocket): if isinstance(connection, network.udp.UDPSocket):
return return

View File

@ -23,10 +23,14 @@ class InvThread(threading.Thread, StoppableThread):
if len(data) == 2: if len(data) == 2:
BMConnectionPool().handleReceivedObject(data[0], data[1]) BMConnectionPool().handleReceivedObject(data[0], data[1])
else: else:
BMConnectionPool().handleReceivedObject(data[0], data[1], data[2]) source = BMConnectionPool().getConnectionByAddr(data[2])
BMConnectionPool().handleReceivedObject(data[0], data[1], source)
chunk.append((data[0], data[1])) chunk.append((data[0], data[1]))
except Queue.Empty: except Queue.Empty:
break break
# connection not found, handle it as if generated locally
except KeyError:
BMConnectionPool().handleReceivedObject(data[0], data[1])
if chunk: if chunk:
for connection in BMConnectionPool().inboundConnections.values() + \ for connection in BMConnectionPool().inboundConnections.values() + \

View File

@ -39,21 +39,8 @@ class ReceiveQueueThread(threading.Thread, StoppableThread):
# or the connection is to be aborted # or the connection is to be aborted
try: try:
BMConnectionPool().inboundConnections[dest].process() BMConnectionPool().getConnectionByAddr(dest).process()
except KeyError: # KeyError = connection object not found
try: # AttributeError = state isn't implemented
BMConnectionPool().inboundConnections[dest.host].process()
except KeyError:
pass
except AttributeError:
pass
try:
BMConnectionPool().outboundConnections[dest].process()
except (KeyError, AttributeError):
pass
try:
BMConnectionPool().udpSockets[dest].process()
except (KeyError, AttributeError): except (KeyError, AttributeError):
pass pass