From d75d920a687ff3aa03cd1ccf51c1b23ec17b5b1c Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Fri, 2 Jun 2017 07:09:35 +0200 Subject: [PATCH] Asyncore updates - clean object tracking dictionaries in the cleaner thread - clean up close / handle_close - add locking to tracking dictionaries --- src/class_singleCleaner.py | 4 ++++ src/network/advanceddispatcher.py | 2 +- src/network/bmproto.py | 8 ++++---- src/network/connectionpool.py | 16 +++++++++------- src/network/objectracker.py | 25 ++++++++++++++++++------- src/network/tcp.py | 10 ++++------ 6 files changed, 40 insertions(+), 25 deletions(-) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 19c3ca52..e6d98989 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -119,6 +119,10 @@ class singleCleaner(threading.Thread, StoppableThread): if thread.isAlive() and hasattr(thread, 'downloadQueue'): thread.downloadQueue.clear() + # inv/object tracking + for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): + connection.clean() + # TODO: cleanup pending upload / download if state.shutdown == 0: diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index 5f8a94aa..89a7423d 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -106,7 +106,7 @@ class AdvancedDispatcher(asyncore.dispatcher): def state_close(self): pass - def close(self): + def handle_close(self): self.read_buf = b"" self.write_buf = b"" self.state = "close" diff --git a/src/network/bmproto.py b/src/network/bmproto.py index f59be314..8c727f00 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -80,7 +80,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.bm_proto_reset() self.set_state("bm_header", length=1, expectBytes=protocol.Header.size) logger.debug("Bad magic") - self.close() + self.handle_close("Bad magic") return False if self.payloadLength > BMProto.maxMessageSize: self.invalid = True @@ -127,7 +127,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): else: #print "Skipping command %s due to invalid data" % (self.command) logger.debug("Closing due to invalid command %s", self.command) - self.close() + self.handle_close("Invalid command %s" % (self.command)) return False if retval: self.set_state("bm_header", length=self.payloadLength, expectBytes=protocol.Header.size) @@ -445,7 +445,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): payload += struct.pack('>H', peer.port) # remote port return protocol.CreatePacket('addr', payload) - def close(self, reason=None): + def handle_close(self, reason=None): self.set_state("close") if reason is None: #logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, ''.join(traceback.format_stack())) @@ -453,4 +453,4 @@ class BMProto(AdvancedDispatcher, ObjectTracker): #traceback.print_stack() else: logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason) - AdvancedDispatcher.close(self) + AdvancedDispatcher.handle_close(self) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 517b3c98..6aa6e49b 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -31,7 +31,6 @@ class BMConnectionPool(object): self.streams = [] self.lastSpawned = 0 self.spawnWait = 0.3 - self.bootstrapped = False def handleReceivedObject(self, streamNumber, hashid, connection = None): @@ -41,12 +40,15 @@ class BMConnectionPool(object): if not i.fullyEstablished: continue try: - del i.objectsNewToMe[hashid] + with i.objectsNewToMeLock: + del i.objectsNewToMe[hashid] except KeyError: - i.objectsNewToThem[hashid] = True + with i.objectsNewToThemLock: + i.objectsNewToThem[hashid] = True if i == connection: try: - del i.objectsNewToThem[hashid] + with i.objectsNewToThemLock: + del i.objectsNewToThem[hashid] except KeyError: pass @@ -171,11 +173,11 @@ class BMConnectionPool(object): logger.info('Starting UDP socket(s).') if len(self.listeningSockets) > 0 and not acceptConnections: for i in self.listeningSockets: - i.close() + i.handle_close() logger.info('Stopped listening for incoming connections.') if len(self.udpSockets) > 0 and not acceptConnections: for i in self.udpSockets: - i.close() + i.handle_close() logger.info('Stopped udp sockets.') # while len(asyncore.socket_map) > 0 and state.shutdown == 0: @@ -194,7 +196,7 @@ class BMConnectionPool(object): if i.fullyEstablished: i.writeQueue.put(protocol.CreatePacket('ping')) else: - i.close("Timeout (%is)" % (time.time() - i.lastTx)) + i.handle_close("Timeout (%is)" % (time.time() - i.lastTx)) for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values(): if not (i.accepting or i.connecting or i.connected): reaper.append(i) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 246916b9..d073d78a 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -1,5 +1,6 @@ from Queue import Queue import time +from threading import RLock from inventory import Inventory from network.downloadqueue import DownloadQueue @@ -29,11 +30,14 @@ class ObjectTracker(object): def __init__(self): self.objectsNewToMe = {} + self.objectsNewToMeLock = RLock() self.objectsNewToThem = {} + self.objectsNewToThemLock = RLock() self.downloadPending = 0 self.downloadQueue = Queue() self.initInvBloom() self.initAddrBloom() + self.lastCleaned = time.time() def initInvBloom(self): if haveBloom: @@ -48,15 +52,20 @@ class ObjectTracker(object): error_rate=ObjectTracker.invErrorRate) def clean(self): - if self.lastcleaned < time.time() - BMQueues.invCleanPeriod: + if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod: if haveBloom: if PendingDownloadQueue().size() == 0: self.initInvBloom() self.initAddrBloom() - else: - # release memory - self.objectsNewToMe = self.objectsNewToMe.copy() - self.objectsNewToThem = self.objectsNewToThem.copy() + else: + # release memory + with self.objectsNewToMeLock: + tmp = self.objectsNewToMe.copy() + self.objectsNewToMe = tmp + with self.objectsNewToThemLock: + tmp = self.objectsNewToThem.copy() + self.objectsNewToThem = tmp + self.lastCleaned = time.time() def hasObj(self, hashid): if haveBloom: @@ -69,11 +78,13 @@ class ObjectTracker(object): self.invBloom.add(hashId) elif hashId in Inventory(): try: - del self.objectsNewToThem[hashId] + with self.objectsNewToThemLock: + del self.objectsNewToThem[hashId] except KeyError: pass else: - self.objectsNewToMe[hashId] = True + with self.objectsNewToMeLock: + self.objectsNewToMe[hashId] = True # self.DownloadQueue.put(hashId) def hasAddr(self, addr): diff --git a/src/network/tcp.py b/src/network/tcp.py index f6d0c7ac..d3b2f862 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -149,10 +149,10 @@ class TCPConnection(BMProto, TLSDispatcher): def handle_connect_event(self): try: - asyncore.dispatcher.handle_connect_event(self) + AdvancedDispatcher.handle_connect_event(self) except socket.error as e: if e.errno in asyncore._DISCONNECTED: - self.close("Connection failed") + logger.debug("%s:%i: Connection failed: %s" % (self.destination.host, self.destination.port, str(e))) return self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) #print "%s:%i: Sending version" % (self.destination.host, self.destination.port) @@ -162,15 +162,13 @@ class TCPConnection(BMProto, TLSDispatcher): try: TLSDispatcher.handle_read(self) except socket.error as e: - #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) - self.close() + logger.debug("%s:%i: Handle read fail: %s" % (self.destination.host, self.destination.port, str(e))) def handle_write(self): try: TLSDispatcher.handle_write(self) except socket.error as e: - #print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) - self.close() + logger.debug("%s:%i: Handle write fail: %s" % (self.destination.host, self.destination.port, str(e))) class Socks5BMConnection(Socks5Connection, TCPConnection):