From a5c1b0c52955e95da90c38f5f0522f912065bf3c Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Mon, 29 May 2017 12:56:59 +0200 Subject: [PATCH] Asyncore fixes - better handling of WSA* checks on non-windows systems - handle EBADF on Windows/select - better timeouts / loop lengths in main asyncore loop and spawning new connections - remove InvThread prints --- src/bitmessagemain.py | 6 ------ src/network/asyncore_pollchoose.py | 30 ++++++++++++++++++------------ src/network/connectionpool.py | 10 +++++++++- src/network/invthread.py | 3 +++ 4 files changed, 30 insertions(+), 19 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index e6b8b56b..57eab27a 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -102,12 +102,6 @@ def _fixSocket(): if sys.platform.startswith('linux'): socket.SO_BINDTODEVICE = 25 - if not sys.platform.startswith('win'): - errno.WSAEWOULDBLOCK = errno.EWOULDBLOCK - errno.WSAENETUNREACH = errno.ENETUNREACH - errno.WSAECONNREFUSED = errno.ECONNREFUSED - errno.WSAEHOSTUNREACH = errno.EHOSTUNREACH - if not sys.platform.startswith('win'): return diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 25a5b3fb..02a362a1 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -61,8 +61,9 @@ from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \ errorcode try: from errno import WSAEWOULDBLOCK -except: - pass +except (ImportError, AttributeError): + WSAEWOULDBLOCK = EWOULDBLOCK + from ssl import SSLError, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, @@ -199,6 +200,9 @@ def select_poller(timeout=0.0, map=None): r, w, e = select.select(r, w, e, timeout) except KeyboardInterrupt: return + except socket.error as err: + if err.args[0] in (EBADF): + return for fd in random.sample(r, len(r)): obj = map.get(fd) @@ -369,12 +373,18 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, # then poll poller(timeout, map) else: - timeout /= count + if timeout == 0: + deadline = 0 + else: + deadline = time.time() + timeout while map and count > 0: # fill buckets first update_sent() update_received() - poller(timeout, map) + subtimeout = deadline - time.time() + if subtimeout <= 0: + break + poller(subtimeout, map) # then poll count = count - 1 @@ -555,10 +565,8 @@ class dispatcher: else: raise except socket.error as why: - if why.args[0] in (EAGAIN, EWOULDBLOCK) or \ - (sys.platform.startswith('win') and \ - err.errno == WSAEWOULDBLOCK): - return 0 + if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK): + return 0 elif why.args[0] in _DISCONNECTED: self.handle_close() return 0 @@ -582,10 +590,8 @@ class dispatcher: raise except socket.error as why: # winsock sometimes raises ENOTCONN - if why.args[0] in (EAGAIN, EWOULDBLOCK) or \ - (sys.platform.startswith('win') and \ - err.errno == WSAEWOULDBLOCK): - return b'' + if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK): + return b'' if why.args[0] in _DISCONNECTED: self.handle_close() return b'' diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index d1a6b6ee..e9bc56c8 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -19,6 +19,7 @@ import state @Singleton class BMConnectionPool(object): + def __init__(self): asyncore.set_rates( BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate") * 1024, @@ -28,6 +29,8 @@ class BMConnectionPool(object): self.listeningSockets = {} self.udpSockets = {} self.streams = [] + self.lastSpawned = 0 + self.spawnWait = 0.3 self.bootstrapped = False @@ -146,6 +149,8 @@ class BMConnectionPool(object): if e.errno == errno.ENETUNREACH: continue + self.lastSpawned = time.time() + if acceptConnections and len(self.listeningSockets) == 0: self.startListening() logger.info('Listening for incoming connections.') @@ -169,7 +174,10 @@ class BMConnectionPool(object): # while len(asyncore.socket_map) > 0 and state.shutdown == 0: # print "loop, state = %s" % (proxy.state) - asyncore.loop(timeout=2.0, count=1) + loopTime = float(self.spawnWait) + if self.lastSpawned < time.time() - self.spawnWait: + loopTime = 1.0 + asyncore.loop(timeout=loopTime, count=10) for i in self.inboundConnections.values() + self.outboundConnections.values(): minTx = time.time() - 20 diff --git a/src/network/invthread.py b/src/network/invthread.py index 37fb7094..a880607a 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -1,3 +1,4 @@ +from binascii import hexlify import collections import Queue import random @@ -35,6 +36,7 @@ class InvThread(threading.Thread, StoppableThread): try: (stream, hash) = invQueue.get(False) self.holdHash (stream, hash) + #print "Holding hash %i, %s" % (stream, hexlify(hash)) except Queue.Empty: break @@ -50,6 +52,7 @@ class InvThread(threading.Thread, StoppableThread): except KeyError: continue if len(hashes) > 0: + #print "sending inv of %i" % (len(hashes)) connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + b"".join(hashes))) self.collectionOfInvs[iterator] = {} iterator += 1