From fa9811f4264a2b231744e946b3936e50edab1992 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Tue, 30 May 2017 23:53:43 +0200 Subject: [PATCH] Asyncore update - duplicate checking implemented - connection pool vs. socket closing cleanup --- src/network/bmobject.py | 5 +++++ src/network/bmproto.py | 38 ++++++++++++++++------------------- src/network/connectionpool.py | 17 +++++++++++----- src/network/tcp.py | 1 + 4 files changed, 35 insertions(+), 26 deletions(-) diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 2c3fb59c..e16a6937 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -3,6 +3,7 @@ import time from addresses import calculateInventoryHash from debug import logger +from inventory import Inventory import protocol import state @@ -64,6 +65,10 @@ class BMObject(object): logger.debug('The streamNumber %s isn\'t one we are interested in.' % self.streamNumber) raise BMObjectUnwantedStreamError() + def checkAlreadyHave(self): + if self.inventoryHash in Inventory(): + raise BMObjectAlreadyHaveError() + def checkMessage(self): return diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 87d24bba..ffd79056 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -280,27 +280,24 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.object.checkProofOfWorkSufficient() self.object.checkEOLSanity() self.object.checkStream() + self.object.checkAlreadyHave() - try: - if self.object.objectType == protocol.OBJECT_GETPUBKEY: - self.object.checkGetpubkey() - elif self.object.objectType == protocol.OBJECT_PUBKEY: - self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32]) - elif self.object.objectType == protocol.OBJECT_MSG: - self.object.checkMessage() - elif self.object.objectType == protocol.OBJECT_BROADCAST: - self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32]) - # other objects don't require other types of tests - except BMObjectAlreadyHaveError: - pass - else: - Inventory()[self.object.inventoryHash] = ( - self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag) - objectProcessorQueue.put((self.object.objectType,self.object.data)) - #DownloadQueue().task_done(self.object.inventoryHash) - invQueue.put((self.object.streamNumber, self.object.inventoryHash, self)) - #ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash)) - #broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) + if self.object.objectType == protocol.OBJECT_GETPUBKEY: + self.object.checkGetpubkey() + elif self.object.objectType == protocol.OBJECT_PUBKEY: + self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32]) + elif self.object.objectType == protocol.OBJECT_MSG: + self.object.checkMessage() + elif self.object.objectType == protocol.OBJECT_BROADCAST: + self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32]) + # other objects don't require other types of tests + Inventory()[self.object.inventoryHash] = ( + self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag) + objectProcessorQueue.put((self.object.objectType,self.object.data)) + #DownloadQueue().task_done(self.object.inventoryHash) + invQueue.put((self.object.streamNumber, self.object.inventoryHash, self)) + #ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash)) + #broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) return True def _decode_addr(self): @@ -456,5 +453,4 @@ class BMProto(AdvancedDispatcher, ObjectTracker): #traceback.print_stack() else: logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason) - network.connectionpool.BMConnectionPool().removeConnection(self) AdvancedDispatcher.close(self) diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 1e2ed311..ef2fb26b 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -64,7 +64,9 @@ class BMConnectionPool(object): def removeConnection(self, connection): if isinstance(connection, network.udp.UDPSocket): - return + del self.udpSockets[connection.destination.host] + if isinstance(connection, network.tcp.TCPServer): + del self.listeningSockets[state.Peer(connection.destination.host, connection.destination.port)] elif connection.isOutbound: try: del self.outboundConnections[connection.destination] @@ -99,9 +101,10 @@ class BMConnectionPool(object): def startUDPSocket(self, bind=None): if bind is None: host = self.getListeningIP() - self.udpSockets[host] = network.udp.UDPSocket(host=host) + udpSocket = network.udp.UDPSocket(host=host) else: - self.udpSockets[bind] = network.udp.UDPSocket(host=bind) + udpSocket = network.udp.UDPSocket(host=bind) + self.udpSockets[udpSocket.destination.host] = udpSocket def loop(self): # defaults to empty loop if outbound connections are maxed @@ -164,12 +167,10 @@ class BMConnectionPool(object): if len(self.listeningSockets) > 0 and not acceptConnections: for i in self.listeningSockets: i.close() - self.listeningSockets = {} logger.info('Stopped listening for incoming connections.') if len(self.udpSockets) > 0 and not acceptConnections: for i in self.udpSockets: i.close() - self.udpSockets = {} logger.info('Stopped udp sockets.') # while len(asyncore.socket_map) > 0 and state.shutdown == 0: @@ -179,6 +180,7 @@ class BMConnectionPool(object): loopTime = 1.0 asyncore.loop(timeout=loopTime, count=10) + reaper = [] for i in self.inboundConnections.values() + self.outboundConnections.values(): minTx = time.time() - 20 if i.fullyEstablished: @@ -188,3 +190,8 @@ class BMConnectionPool(object): i.writeQueue.put(protocol.CreatePacket('ping')) else: i.close("Timeout (%is)" % (time.time() - i.lastTx)) + for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values(): + if not (i.accepting or i.connecting or i.connected): + reaper.append(i) + for i in reaper: + self.removeConnection(i) diff --git a/src/network/tcp.py b/src/network/tcp.py index 42d5a831..2a2188ea 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -198,6 +198,7 @@ class TCPServer(AdvancedDispatcher): self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((host, port)) + self.destination = state.Peer(host, port) self.listen(5) def handle_accept(self):