Asyncore fixes

- better handling of WSA* checks on non-windows systems
- handle EBADF on Windows/select
- better timeouts / loop lengths in main asyncore loop and
spawning new connections
- remove InvThread prints
This commit is contained in:
Peter Šurda 2017-05-29 12:56:59 +02:00
parent 74f1a74a8c
commit a5c1b0c529
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 30 additions and 19 deletions

View File

@ -102,12 +102,6 @@ def _fixSocket():
if sys.platform.startswith('linux'): if sys.platform.startswith('linux'):
socket.SO_BINDTODEVICE = 25 socket.SO_BINDTODEVICE = 25
if not sys.platform.startswith('win'):
errno.WSAEWOULDBLOCK = errno.EWOULDBLOCK
errno.WSAENETUNREACH = errno.ENETUNREACH
errno.WSAECONNREFUSED = errno.ECONNREFUSED
errno.WSAEHOSTUNREACH = errno.EHOSTUNREACH
if not sys.platform.startswith('win'): if not sys.platform.startswith('win'):
return return

View File

@ -61,8 +61,9 @@ from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
errorcode errorcode
try: try:
from errno import WSAEWOULDBLOCK from errno import WSAEWOULDBLOCK
except: except (ImportError, AttributeError):
pass WSAEWOULDBLOCK = EWOULDBLOCK
from ssl import SSLError, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE from ssl import SSLError, SSL_ERROR_WANT_READ, SSL_ERROR_WANT_WRITE
_DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, _DISCONNECTED = frozenset((ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
@ -199,6 +200,9 @@ def select_poller(timeout=0.0, map=None):
r, w, e = select.select(r, w, e, timeout) r, w, e = select.select(r, w, e, timeout)
except KeyboardInterrupt: except KeyboardInterrupt:
return return
except socket.error as err:
if err.args[0] in (EBADF):
return
for fd in random.sample(r, len(r)): for fd in random.sample(r, len(r)):
obj = map.get(fd) obj = map.get(fd)
@ -369,12 +373,18 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
# then poll # then poll
poller(timeout, map) poller(timeout, map)
else: else:
timeout /= count if timeout == 0:
deadline = 0
else:
deadline = time.time() + timeout
while map and count > 0: while map and count > 0:
# fill buckets first # fill buckets first
update_sent() update_sent()
update_received() update_received()
poller(timeout, map) subtimeout = deadline - time.time()
if subtimeout <= 0:
break
poller(subtimeout, map)
# then poll # then poll
count = count - 1 count = count - 1
@ -555,9 +565,7 @@ class dispatcher:
else: else:
raise raise
except socket.error as why: except socket.error as why:
if why.args[0] in (EAGAIN, EWOULDBLOCK) or \ if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK):
(sys.platform.startswith('win') and \
err.errno == WSAEWOULDBLOCK):
return 0 return 0
elif why.args[0] in _DISCONNECTED: elif why.args[0] in _DISCONNECTED:
self.handle_close() self.handle_close()
@ -582,9 +590,7 @@ class dispatcher:
raise raise
except socket.error as why: except socket.error as why:
# winsock sometimes raises ENOTCONN # winsock sometimes raises ENOTCONN
if why.args[0] in (EAGAIN, EWOULDBLOCK) or \ if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK):
(sys.platform.startswith('win') and \
err.errno == WSAEWOULDBLOCK):
return b'' return b''
if why.args[0] in _DISCONNECTED: if why.args[0] in _DISCONNECTED:
self.handle_close() self.handle_close()

View File

@ -19,6 +19,7 @@ import state
@Singleton @Singleton
class BMConnectionPool(object): class BMConnectionPool(object):
def __init__(self): def __init__(self):
asyncore.set_rates( asyncore.set_rates(
BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate") * 1024, BMConfigParser().safeGetInt("bitmessagesettings", "maxdownloadrate") * 1024,
@ -28,6 +29,8 @@ class BMConnectionPool(object):
self.listeningSockets = {} self.listeningSockets = {}
self.udpSockets = {} self.udpSockets = {}
self.streams = [] self.streams = []
self.lastSpawned = 0
self.spawnWait = 0.3
self.bootstrapped = False self.bootstrapped = False
@ -146,6 +149,8 @@ class BMConnectionPool(object):
if e.errno == errno.ENETUNREACH: if e.errno == errno.ENETUNREACH:
continue continue
self.lastSpawned = time.time()
if acceptConnections and len(self.listeningSockets) == 0: if acceptConnections and len(self.listeningSockets) == 0:
self.startListening() self.startListening()
logger.info('Listening for incoming connections.') logger.info('Listening for incoming connections.')
@ -169,7 +174,10 @@ class BMConnectionPool(object):
# while len(asyncore.socket_map) > 0 and state.shutdown == 0: # while len(asyncore.socket_map) > 0 and state.shutdown == 0:
# print "loop, state = %s" % (proxy.state) # print "loop, state = %s" % (proxy.state)
asyncore.loop(timeout=2.0, count=1) loopTime = float(self.spawnWait)
if self.lastSpawned < time.time() - self.spawnWait:
loopTime = 1.0
asyncore.loop(timeout=loopTime, count=10)
for i in self.inboundConnections.values() + self.outboundConnections.values(): for i in self.inboundConnections.values() + self.outboundConnections.values():
minTx = time.time() - 20 minTx = time.time() - 20

View File

@ -1,3 +1,4 @@
from binascii import hexlify
import collections import collections
import Queue import Queue
import random import random
@ -35,6 +36,7 @@ class InvThread(threading.Thread, StoppableThread):
try: try:
(stream, hash) = invQueue.get(False) (stream, hash) = invQueue.get(False)
self.holdHash (stream, hash) self.holdHash (stream, hash)
#print "Holding hash %i, %s" % (stream, hexlify(hash))
except Queue.Empty: except Queue.Empty:
break break
@ -50,6 +52,7 @@ class InvThread(threading.Thread, StoppableThread):
except KeyError: except KeyError:
continue continue
if len(hashes) > 0: if len(hashes) > 0:
#print "sending inv of %i" % (len(hashes))
connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + b"".join(hashes))) connection.writeQueue.put(protocol.CreatePacket('inv', addresses.encodeVarint(len(hashes)) + b"".join(hashes)))
self.collectionOfInvs[iterator] = {} self.collectionOfInvs[iterator] = {}
iterator += 1 iterator += 1