Asyncore updates

- clean object tracking dictionaries in the cleaner thread
- clean up close / handle_close
- add locking to tracking dictionaries
This commit is contained in:
Peter Šurda 2017-06-02 07:09:35 +02:00
parent 4c17a18006
commit d75d920a68
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
6 changed files with 40 additions and 25 deletions

View File

@ -119,6 +119,10 @@ class singleCleaner(threading.Thread, StoppableThread):
if thread.isAlive() and hasattr(thread, 'downloadQueue'): if thread.isAlive() and hasattr(thread, 'downloadQueue'):
thread.downloadQueue.clear() thread.downloadQueue.clear()
# inv/object tracking
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values():
connection.clean()
# TODO: cleanup pending upload / download # TODO: cleanup pending upload / download
if state.shutdown == 0: if state.shutdown == 0:

View File

@ -106,7 +106,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
def state_close(self): def state_close(self):
pass pass
def close(self): def handle_close(self):
self.read_buf = b"" self.read_buf = b""
self.write_buf = b"" self.write_buf = b""
self.state = "close" self.state = "close"

View File

@ -80,7 +80,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.bm_proto_reset() self.bm_proto_reset()
self.set_state("bm_header", length=1, expectBytes=protocol.Header.size) self.set_state("bm_header", length=1, expectBytes=protocol.Header.size)
logger.debug("Bad magic") logger.debug("Bad magic")
self.close() self.handle_close("Bad magic")
return False return False
if self.payloadLength > BMProto.maxMessageSize: if self.payloadLength > BMProto.maxMessageSize:
self.invalid = True self.invalid = True
@ -127,7 +127,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
else: else:
#print "Skipping command %s due to invalid data" % (self.command) #print "Skipping command %s due to invalid data" % (self.command)
logger.debug("Closing due to invalid command %s", self.command) logger.debug("Closing due to invalid command %s", self.command)
self.close() self.handle_close("Invalid command %s" % (self.command))
return False return False
if retval: if retval:
self.set_state("bm_header", length=self.payloadLength, expectBytes=protocol.Header.size) self.set_state("bm_header", length=self.payloadLength, expectBytes=protocol.Header.size)
@ -445,7 +445,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
payload += struct.pack('>H', peer.port) # remote port payload += struct.pack('>H', peer.port) # remote port
return protocol.CreatePacket('addr', payload) return protocol.CreatePacket('addr', payload)
def close(self, reason=None): def handle_close(self, reason=None):
self.set_state("close") self.set_state("close")
if reason is None: if reason is None:
#logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, ''.join(traceback.format_stack())) #logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, ''.join(traceback.format_stack()))
@ -453,4 +453,4 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
#traceback.print_stack() #traceback.print_stack()
else: else:
logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason) logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason)
AdvancedDispatcher.close(self) AdvancedDispatcher.handle_close(self)

View File

@ -31,7 +31,6 @@ class BMConnectionPool(object):
self.streams = [] self.streams = []
self.lastSpawned = 0 self.lastSpawned = 0
self.spawnWait = 0.3 self.spawnWait = 0.3
self.bootstrapped = False self.bootstrapped = False
def handleReceivedObject(self, streamNumber, hashid, connection = None): def handleReceivedObject(self, streamNumber, hashid, connection = None):
@ -41,12 +40,15 @@ class BMConnectionPool(object):
if not i.fullyEstablished: if not i.fullyEstablished:
continue continue
try: try:
del i.objectsNewToMe[hashid] with i.objectsNewToMeLock:
del i.objectsNewToMe[hashid]
except KeyError: except KeyError:
i.objectsNewToThem[hashid] = True with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = True
if i == connection: if i == connection:
try: try:
del i.objectsNewToThem[hashid] with i.objectsNewToThemLock:
del i.objectsNewToThem[hashid]
except KeyError: except KeyError:
pass pass
@ -171,11 +173,11 @@ class BMConnectionPool(object):
logger.info('Starting UDP socket(s).') logger.info('Starting UDP socket(s).')
if len(self.listeningSockets) > 0 and not acceptConnections: if len(self.listeningSockets) > 0 and not acceptConnections:
for i in self.listeningSockets: for i in self.listeningSockets:
i.close() i.handle_close()
logger.info('Stopped listening for incoming connections.') logger.info('Stopped listening for incoming connections.')
if len(self.udpSockets) > 0 and not acceptConnections: if len(self.udpSockets) > 0 and not acceptConnections:
for i in self.udpSockets: for i in self.udpSockets:
i.close() i.handle_close()
logger.info('Stopped udp sockets.') logger.info('Stopped udp sockets.')
# while len(asyncore.socket_map) > 0 and state.shutdown == 0: # while len(asyncore.socket_map) > 0 and state.shutdown == 0:
@ -194,7 +196,7 @@ class BMConnectionPool(object):
if i.fullyEstablished: if i.fullyEstablished:
i.writeQueue.put(protocol.CreatePacket('ping')) i.writeQueue.put(protocol.CreatePacket('ping'))
else: else:
i.close("Timeout (%is)" % (time.time() - i.lastTx)) i.handle_close("Timeout (%is)" % (time.time() - i.lastTx))
for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values(): for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values():
if not (i.accepting or i.connecting or i.connected): if not (i.accepting or i.connecting or i.connected):
reaper.append(i) reaper.append(i)

View File

@ -1,5 +1,6 @@
from Queue import Queue from Queue import Queue
import time import time
from threading import RLock
from inventory import Inventory from inventory import Inventory
from network.downloadqueue import DownloadQueue from network.downloadqueue import DownloadQueue
@ -29,11 +30,14 @@ class ObjectTracker(object):
def __init__(self): def __init__(self):
self.objectsNewToMe = {} self.objectsNewToMe = {}
self.objectsNewToMeLock = RLock()
self.objectsNewToThem = {} self.objectsNewToThem = {}
self.objectsNewToThemLock = RLock()
self.downloadPending = 0 self.downloadPending = 0
self.downloadQueue = Queue() self.downloadQueue = Queue()
self.initInvBloom() self.initInvBloom()
self.initAddrBloom() self.initAddrBloom()
self.lastCleaned = time.time()
def initInvBloom(self): def initInvBloom(self):
if haveBloom: if haveBloom:
@ -48,15 +52,20 @@ class ObjectTracker(object):
error_rate=ObjectTracker.invErrorRate) error_rate=ObjectTracker.invErrorRate)
def clean(self): def clean(self):
if self.lastcleaned < time.time() - BMQueues.invCleanPeriod: if self.lastCleaned < time.time() - ObjectTracker.invCleanPeriod:
if haveBloom: if haveBloom:
if PendingDownloadQueue().size() == 0: if PendingDownloadQueue().size() == 0:
self.initInvBloom() self.initInvBloom()
self.initAddrBloom() self.initAddrBloom()
else: else:
# release memory # release memory
self.objectsNewToMe = self.objectsNewToMe.copy() with self.objectsNewToMeLock:
self.objectsNewToThem = self.objectsNewToThem.copy() tmp = self.objectsNewToMe.copy()
self.objectsNewToMe = tmp
with self.objectsNewToThemLock:
tmp = self.objectsNewToThem.copy()
self.objectsNewToThem = tmp
self.lastCleaned = time.time()
def hasObj(self, hashid): def hasObj(self, hashid):
if haveBloom: if haveBloom:
@ -69,11 +78,13 @@ class ObjectTracker(object):
self.invBloom.add(hashId) self.invBloom.add(hashId)
elif hashId in Inventory(): elif hashId in Inventory():
try: try:
del self.objectsNewToThem[hashId] with self.objectsNewToThemLock:
del self.objectsNewToThem[hashId]
except KeyError: except KeyError:
pass pass
else: else:
self.objectsNewToMe[hashId] = True with self.objectsNewToMeLock:
self.objectsNewToMe[hashId] = True
# self.DownloadQueue.put(hashId) # self.DownloadQueue.put(hashId)
def hasAddr(self, addr): def hasAddr(self, addr):

View File

@ -149,10 +149,10 @@ class TCPConnection(BMProto, TLSDispatcher):
def handle_connect_event(self): def handle_connect_event(self):
try: try:
asyncore.dispatcher.handle_connect_event(self) AdvancedDispatcher.handle_connect_event(self)
except socket.error as e: except socket.error as e:
if e.errno in asyncore._DISCONNECTED: if e.errno in asyncore._DISCONNECTED:
self.close("Connection failed") logger.debug("%s:%i: Connection failed: %s" % (self.destination.host, self.destination.port, str(e)))
return return
self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False)) self.writeQueue.put(protocol.assembleVersionMessage(self.destination.host, self.destination.port, network.connectionpool.BMConnectionPool().streams, False))
#print "%s:%i: Sending version" % (self.destination.host, self.destination.port) #print "%s:%i: Sending version" % (self.destination.host, self.destination.port)
@ -162,15 +162,13 @@ class TCPConnection(BMProto, TLSDispatcher):
try: try:
TLSDispatcher.handle_read(self) TLSDispatcher.handle_read(self)
except socket.error as e: except socket.error as e:
#print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) logger.debug("%s:%i: Handle read fail: %s" % (self.destination.host, self.destination.port, str(e)))
self.close()
def handle_write(self): def handle_write(self):
try: try:
TLSDispatcher.handle_write(self) TLSDispatcher.handle_write(self)
except socket.error as e: except socket.error as e:
#print "%s:%i: socket error: %s" % (self.destination.host, self.destination.port, str(e)) logger.debug("%s:%i: Handle write fail: %s" % (self.destination.host, self.destination.port, str(e)))
self.close()
class Socks5BMConnection(Socks5Connection, TCPConnection): class Socks5BMConnection(Socks5Connection, TCPConnection):