PyBitmessage-2021-04-27/src/network/asyncore_pollchoose.py

1013 lines
31 KiB
Python
Raw Permalink Normal View History

"""
Basic infrastructure for asynchronous socket service clients and servers.
"""
2020-01-06 11:44:47 +00:00
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# pylint: disable=too-many-branches,too-many-lines,global-statement
# pylint: disable=redefined-builtin,no-self-use
import os
import select
import socket
import sys
import time
import warnings
from errno import (
2020-01-06 11:44:47 +00:00
EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED,
ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, EINVAL, EISCONN, ENETUNREACH,
ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode
)
from threading import current_thread
import helper_random
try:
from errno import WSAEWOULDBLOCK
except (ImportError, AttributeError):
WSAEWOULDBLOCK = EWOULDBLOCK
2017-05-29 11:14:25 +00:00
try:
from errno import WSAENOTSOCK
except (ImportError, AttributeError):
WSAENOTSOCK = ENOTSOCK
try:
from errno import WSAECONNRESET
except (ImportError, AttributeError):
2017-08-06 19:26:25 +00:00
WSAECONNRESET = ECONNRESET
try:
# Desirable side-effects on Windows; imports winsock error numbers
from errno import WSAEADDRINUSE # pylint: disable=unused-import
except (ImportError, AttributeError):
WSAEADDRINUSE = EADDRINUSE
_DISCONNECTED = frozenset((
ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE, EBADF, ECONNREFUSED,
EHOSTUNREACH, ENETUNREACH, ETIMEDOUT, WSAECONNRESET))
2017-04-16 16:27:15 +00:00
OP_READ = 1
OP_WRITE = 2
try:
socket_map
except NameError:
socket_map = {}
def _strerror(err):
try:
return os.strerror(err)
except (ValueError, OverflowError, NameError):
if err in errorcode:
return errorcode[err]
return "Unknown error %s" % err
class ExitNow(Exception):
2020-01-06 11:44:47 +00:00
"""We don't use directly but may be necessary as we replace
asyncore due to some library raising or expecting it"""
pass
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
maxDownloadRate = 0
downloadTimestamp = 0
downloadBucket = 0
receivedBytes = 0
maxUploadRate = 0
uploadTimestamp = 0
uploadBucket = 0
sentBytes = 0
def read(obj):
"""Event to read from the object, i.e. its network socket."""
if not can_receive():
return
try:
obj.handle_read_event()
except _reraised_exceptions:
raise
except BaseException:
obj.handle_error()
def write(obj):
"""Event to write to the object, i.e. its network socket."""
if not can_send():
return
try:
obj.handle_write_event()
except _reraised_exceptions:
raise
except BaseException:
obj.handle_error()
def set_rates(download, upload):
"""Set throttling rates"""
2020-01-06 11:44:47 +00:00
global maxDownloadRate, maxUploadRate, downloadBucket
global uploadBucket, downloadTimestamp, uploadTimestamp
maxDownloadRate = float(download) * 1024
maxUploadRate = float(upload) * 1024
downloadBucket = maxDownloadRate
uploadBucket = maxUploadRate
downloadTimestamp = time.time()
uploadTimestamp = time.time()
def can_receive():
"""Predicate indicating whether the download throttle is in effect"""
return maxDownloadRate == 0 or downloadBucket > 0
def can_send():
"""Predicate indicating whether the upload throttle is in effect"""
return maxUploadRate == 0 or uploadBucket > 0
def update_received(download=0):
"""Update the receiving throttle"""
2017-08-22 11:49:27 +00:00
global receivedBytes, downloadBucket, downloadTimestamp
currentTimestamp = time.time()
receivedBytes += download
if maxDownloadRate > 0:
2020-01-06 11:44:47 +00:00
bucketIncrease = \
maxDownloadRate * (currentTimestamp - downloadTimestamp)
downloadBucket += bucketIncrease
if downloadBucket > maxDownloadRate:
downloadBucket = int(maxDownloadRate)
downloadBucket -= download
downloadTimestamp = currentTimestamp
def update_sent(upload=0):
"""Update the sending throttle"""
2017-08-22 11:49:27 +00:00
global sentBytes, uploadBucket, uploadTimestamp
currentTimestamp = time.time()
sentBytes += upload
if maxUploadRate > 0:
bucketIncrease = maxUploadRate * (currentTimestamp - uploadTimestamp)
uploadBucket += bucketIncrease
if uploadBucket > maxUploadRate:
uploadBucket = int(maxUploadRate)
uploadBucket -= upload
uploadTimestamp = currentTimestamp
def _exception(obj):
"""Handle exceptions as appropriate"""
try:
obj.handle_expt_event()
except _reraised_exceptions:
raise
except BaseException:
obj.handle_error()
def readwrite(obj, flags):
"""Read and write any pending data to/from the object"""
try:
if flags & select.POLLIN and can_receive():
obj.handle_read_event()
if flags & select.POLLOUT and can_send():
obj.handle_write_event()
if flags & select.POLLPRI:
obj.handle_expt_event()
if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
obj.handle_close()
except socket.error as e:
if e.args[0] not in _DISCONNECTED:
obj.handle_error()
else:
obj.handle_close()
except _reraised_exceptions:
raise
except BaseException:
obj.handle_error()
def select_poller(timeout=0.0, map=None):
"""A poller which uses select(), available on most platforms."""
if map is None:
map = socket_map
if map:
r = []
w = []
e = []
for fd, obj in list(map.items()):
is_r = obj.readable()
is_w = obj.writable()
if is_r:
r.append(fd)
# accepting sockets should not be writable
if is_w and not obj.accepting:
w.append(fd)
if is_r or is_w:
e.append(fd)
if [] == r == w == e:
time.sleep(timeout)
return
try:
r, w, e = select.select(r, w, e, timeout)
except KeyboardInterrupt:
return
except socket.error as err:
2017-08-06 18:40:35 +00:00
if err.args[0] in (EBADF, EINTR):
return
except Exception as err:
if err.args[0] in (WSAENOTSOCK, ):
return
for fd in helper_random.randomsample(r, len(r)):
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in helper_random.randomsample(w, len(w)):
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
else:
current_thread().stop.wait(timeout)
def poll_poller(timeout=0.0, map=None):
"""A poller which uses poll(), available on most UNIXen."""
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout * 1000)
try:
poll_poller.pollster
except AttributeError:
poll_poller.pollster = select.poll()
if map:
for fd, obj in list(map.items()):
2017-04-16 16:27:15 +00:00
flags = newflags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
2017-04-16 16:27:15 +00:00
newflags |= OP_READ
else:
newflags &= ~ OP_READ
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
2017-04-16 16:27:15 +00:00
newflags |= OP_WRITE
else:
newflags &= ~ OP_WRITE
if newflags != obj.poller_flags:
obj.poller_flags = newflags
try:
if obj.poller_registered:
poll_poller.pollster.modify(fd, flags)
else:
poll_poller.pollster.register(fd, flags)
obj.poller_registered = True
except IOError:
pass
try:
r = poll_poller.pollster.poll(timeout)
except KeyboardInterrupt:
r = []
except socket.error as err:
if err.args[0] in (EBADF, WSAENOTSOCK, EINTR):
return
for fd, flags in helper_random.randomsample(r, len(r)):
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
else:
current_thread().stop.wait(timeout)
# Aliases for backward compatibility
poll = select_poller
poll2 = poll3 = poll_poller
def epoll_poller(timeout=0.0, map=None):
"""A poller which uses epoll(), supported on Linux 2.5.44 and newer."""
if map is None:
map = socket_map
try:
epoll_poller.pollster
except AttributeError:
epoll_poller.pollster = select.epoll()
if map:
for fd, obj in map.items():
2017-04-16 16:27:15 +00:00
flags = newflags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
2017-04-16 16:27:15 +00:00
newflags |= OP_READ
else:
newflags &= ~ OP_READ
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
2017-04-16 16:27:15 +00:00
newflags |= OP_WRITE
else:
newflags &= ~ OP_WRITE
if newflags != obj.poller_flags:
obj.poller_flags = newflags
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
try:
if obj.poller_registered:
epoll_poller.pollster.modify(fd, flags)
else:
epoll_poller.pollster.register(fd, flags)
obj.poller_registered = True
except IOError:
pass
try:
r = epoll_poller.pollster.poll(timeout)
except IOError as e:
if e.errno != EINTR:
raise
r = []
except select.error as err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in helper_random.randomsample(r, len(r)):
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
else:
current_thread().stop.wait(timeout)
def kqueue_poller(timeout=0.0, map=None):
"""A poller which uses kqueue(), BSD specific."""
2020-01-06 11:44:47 +00:00
# pylint: disable=no-member,too-many-statements
if map is None:
map = socket_map
try:
kqueue_poller.pollster
except AttributeError:
kqueue_poller.pollster = select.kqueue()
if map:
updates = []
selectables = 0
for fd, obj in map.items():
2017-08-22 11:49:27 +00:00
kq_filter = 0
if obj.readable():
kq_filter |= 1
selectables += 1
if obj.writable() and not obj.accepting:
kq_filter |= 2
selectables += 1
if kq_filter != obj.poller_filter:
# unlike other pollers, READ and WRITE aren't OR able but have
# to be set and checked separately
if kq_filter & 1 != obj.poller_filter & 1:
poller_flags = select.KQ_EV_ADD
if kq_filter & 1:
poller_flags |= select.KQ_EV_ENABLE
else:
poller_flags |= select.KQ_EV_DISABLE
2020-01-06 11:44:47 +00:00
updates.append(
select.kevent(
fd, filter=select.KQ_FILTER_READ,
flags=poller_flags))
if kq_filter & 2 != obj.poller_filter & 2:
poller_flags = select.KQ_EV_ADD
if kq_filter & 2:
poller_flags |= select.KQ_EV_ENABLE
else:
poller_flags |= select.KQ_EV_DISABLE
2020-01-06 11:44:47 +00:00
updates.append(
select.kevent(
fd, filter=select.KQ_FILTER_WRITE,
flags=poller_flags))
obj.poller_filter = kq_filter
if not selectables:
# unlike other pollers, kqueue poll does not wait if there are no
# filters setup
current_thread().stop.wait(timeout)
return
events = kqueue_poller.pollster.control(updates, selectables, timeout)
if len(events) > 1:
events = helper_random.randomsample(events, len(events))
for event in events:
fd = event.ident
obj = map.get(fd)
if obj is None:
continue
if event.flags & select.KQ_EV_ERROR:
_exception(obj)
continue
if event.flags & select.KQ_EV_EOF and event.data and event.fflags:
obj.handle_close()
continue
if event.filter == select.KQ_FILTER_READ:
read(obj)
if event.filter == select.KQ_FILTER_WRITE:
write(obj)
else:
current_thread().stop.wait(timeout)
def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
"""Poll in a loop, until count or timeout is reached"""
if map is None:
map = socket_map
if count is None:
count = True
# code which grants backward compatibility with "use_poll"
# argument which should no longer be used in favor of
# "poller"
if poller is None:
2017-08-22 11:49:27 +00:00
if use_poll:
poller = poll_poller
elif hasattr(select, 'epoll'):
poller = epoll_poller
elif hasattr(select, 'kqueue'):
poller = kqueue_poller
elif hasattr(select, 'poll'):
poller = poll_poller
elif hasattr(select, 'select'):
poller = select_poller
if timeout == 0:
deadline = 0
else:
deadline = time.time() + timeout
while count:
# fill buckets first
update_sent()
update_received()
subtimeout = deadline - time.time()
if subtimeout <= 0:
break
# then poll
poller(subtimeout, map)
if isinstance(count, int):
count = count - 1
2020-01-06 11:44:47 +00:00
class dispatcher(object):
"""Dispatcher for socket objects"""
2020-01-06 11:44:47 +00:00
# pylint: disable=too-many-public-methods,too-many-instance-attributes
debug = False
connected = False
accepting = False
connecting = False
closing = False
addr = None
ignore_log_types = frozenset(['warning'])
2017-04-16 16:27:15 +00:00
poller_registered = False
poller_flags = 0
# don't do network IO with a smaller bucket than this
minTx = 1500
def __init__(self, sock=None, map=None):
if map is None:
self._map = socket_map
else:
self._map = map
self._fileno = None
if sock:
# Set to nonblocking just to make sure for cases where we
# get a socket from a blocking source.
sock.setblocking(0)
self.set_socket(sock, map)
self.connected = True
# The constructor no longer requires that the socket
# passed be connected.
try:
self.addr = sock.getpeername()
except socket.error as err:
if err.args[0] in (ENOTCONN, EINVAL):
# To handle the case where we got an unconnected
# socket.
self.connected = False
else:
# The socket is broken in some unknown way, alert
# the user and remove it from the map (to prevent
# polling of broken sockets).
self.del_channel(map)
raise
else:
self.socket = None
def __repr__(self):
status = [self.__class__.__module__ + "." + self.__class__.__name__]
if self.accepting and self.addr:
status.append('listening')
elif self.connected:
status.append('connected')
if self.addr is not None:
try:
status.append('%s:%d' % self.addr)
except TypeError:
status.append(repr(self.addr))
return '<%s at %#x>' % (' '.join(status), id(self))
__str__ = __repr__
def add_channel(self, map=None):
"""Add a channel"""
2020-01-06 11:44:47 +00:00
# pylint: disable=attribute-defined-outside-init
if map is None:
map = self._map
map[self._fileno] = self
self.poller_flags = 0
self.poller_filter = 0
def del_channel(self, map=None):
"""Delete a channel"""
fd = self._fileno
if map is None:
map = self._map
if fd in map:
del map[fd]
if self._fileno:
try:
2020-01-06 11:44:47 +00:00
kqueue_poller.pollster.control([select.kevent(
fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0)
except(AttributeError, KeyError, TypeError, IOError, OSError):
pass
try:
2020-01-06 11:44:47 +00:00
kqueue_poller.pollster.control([select.kevent(
fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0)
except(AttributeError, KeyError, TypeError, IOError, OSError):
pass
try:
epoll_poller.pollster.unregister(fd)
except (AttributeError, KeyError, TypeError, IOError):
# no epoll used, or not registered
pass
try:
poll_poller.pollster.unregister(fd)
except (AttributeError, KeyError, TypeError, IOError):
# no poll used, or not registered
pass
self._fileno = None
self.poller_flags = 0
self.poller_filter = 0
self.poller_registered = False
2020-01-06 11:44:47 +00:00
def create_socket(
self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM):
"""Create a socket"""
2020-01-06 11:44:47 +00:00
# pylint: disable=attribute-defined-outside-init
2017-08-22 11:49:27 +00:00
self.family_and_type = family, socket_type
sock = socket.socket(family, socket_type)
sock.setblocking(0)
self.set_socket(sock)
def set_socket(self, sock, map=None):
"""Set socket"""
self.socket = sock
self._fileno = sock.fileno()
self.add_channel(map)
def set_reuse_addr(self):
"""try to re-use a server port if possible"""
try:
self.socket.setsockopt(
2020-01-06 11:44:47 +00:00
socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1
)
except socket.error:
pass
# ==================================================
# predicates for select()
# these are used as filters for the lists of sockets
# to pass to select().
# ==================================================
def readable(self):
"""Predicate to indicate download throttle status"""
if maxDownloadRate > 0:
return downloadBucket > dispatcher.minTx
return True
def writable(self):
"""Predicate to indicate upload throttle status"""
if maxUploadRate > 0:
return uploadBucket > dispatcher.minTx
return True
# ==================================================
# socket object methods.
# ==================================================
def listen(self, num):
"""Listen on a port"""
self.accepting = True
if os.name == 'nt' and num > 5:
num = 5
return self.socket.listen(num)
def bind(self, addr):
"""Bind to an address"""
self.addr = addr
return self.socket.bind(addr)
def connect(self, address):
"""Connect to an address"""
self.connected = False
self.connecting = True
err = self.socket.connect_ex(address)
if err in (EINPROGRESS, EALREADY, EWOULDBLOCK, WSAEWOULDBLOCK) \
or err == EINVAL and os.name in ('nt', 'ce'):
self.addr = address
return
if err in (0, EISCONN):
self.addr = address
self.handle_connect_event()
else:
raise socket.error(err, errorcode[err])
def accept(self):
2020-01-06 11:44:47 +00:00
"""Accept incoming connections.
Returns either an address pair or None."""
try:
conn, addr = self.socket.accept()
except TypeError:
return None
except socket.error as why:
2020-01-06 11:44:47 +00:00
if why.args[0] in (
EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED,
EAGAIN, ENOTCONN):
return None
else:
raise
else:
return conn, addr
def send(self, data):
"""Send data"""
try:
result = self.socket.send(data)
return result
except socket.error as why:
if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK):
return 0
elif why.args[0] in _DISCONNECTED:
self.handle_close()
return 0
else:
raise
def recv(self, buffer_size):
"""Receive data"""
try:
data = self.socket.recv(buffer_size)
if not data:
# a closed connection is indicated by signaling
# a read condition, and having recv() return 0.
self.handle_close()
return b''
return data
except socket.error as why:
# winsock sometimes raises ENOTCONN
if why.args[0] in (EAGAIN, EWOULDBLOCK, WSAEWOULDBLOCK):
return b''
if why.args[0] in _DISCONNECTED:
self.handle_close()
return b''
else:
raise
def close(self):
"""Close connection"""
self.connected = False
self.accepting = False
self.connecting = False
self.del_channel()
try:
self.socket.close()
except socket.error as why:
if why.args[0] not in (ENOTCONN, EBADF):
raise