@ -1,6 +1,11 @@
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# pylint: disable=too-many-statements,too-many-branches,no-self-use,too-many-lines,attribute-defined-outside-init
# pylint: disable=global-statement
"""
src / network / asyncore_pollchoose . py
== == == == == == == == == == == == == == == == ==
# ======================================================================
# Copyright 1996 by Sam Rushing
@ -25,7 +30,7 @@
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
""" Basic infrastructure for asynchronous socket service clients and servers .
Basic infrastructure for asynchronous socket service clients and servers .
There are only two ways to have a program on a single processor do " more
than one thing at a time " . Multi-threaded programming is the simplest and
@ -46,22 +51,20 @@ many of the difficult problems for you, making the task of building
sophisticated high - performance network servers and clients a snap .
"""
# randomise object order for bandwidth balancing
import random
import os
import select
import socket
import sys
import time
from threading import current_thread
import warnings
from errno import (
EADDRINUSE , EAGAIN , EALREADY , EBADF , ECONNABORTED , ECONNREFUSED , ECONNRESET , EHOSTUNREACH , EINPROGRESS , EINTR ,
EINVAL , EISCONN , ENETUNREACH , ENOTCONN , ENOTSOCK , EPIPE , ESHUTDOWN , ETIMEDOUT , EWOULDBLOCK , errorcode
)
from threading import current_thread
import os
import helper_random
from errno import EALREADY , EINPROGRESS , EWOULDBLOCK , ECONNRESET , EINVAL , \
ENOTCONN , ESHUTDOWN , EISCONN , EBADF , ECONNABORTED , EPIPE , EAGAIN , \
ECONNREFUSED , EHOSTUNREACH , ENETUNREACH , ENOTSOCK , EINTR , ETIMEDOUT , \
EADDRINUSE , \
errorcode
try :
from errno import WSAEWOULDBLOCK
except ( ImportError , AttributeError ) :
@ -75,13 +78,15 @@ try:
except ( ImportError , AttributeError ) :
WSAECONNRESET = ECONNRESET
try :
from errno import WSAEADDRINUSE
# Desirable side-effects on Windows; imports winsock error numbers
from errno import WSAEADDRINUSE # pylint: disable=unused-import
except ( ImportError , AttributeError ) :
WSAEADDRINUSE = EADDRINUSE
_DISCONNECTED = frozenset ( ( ECONNRESET , ENOTCONN , ESHUTDOWN , ECONNABORTED , EPIPE ,
EBADF , ECONNREFUSED , EHOSTUNREACH , ENETUNREACH , ETIMEDOUT ,
WSAECONNRESET ) )
_DISCONNECTED = frozenset ( (
ECONNRESET , ENOTCONN , ESHUTDOWN , ECONNABORTED , EPIPE , EBADF , ECONNREFUSED ,
EHOSTUNREACH , ENETUNREACH , ETIMEDOUT , WSAECONNRESET ) )
OP_READ = 1
OP_WRITE = 2
@ -91,17 +96,21 @@ try:
except NameError :
socket_map = { }
def _strerror ( err ) :
try :
return os . strerror ( err )
except ( ValueError , OverflowError , NameError ) :
if err in errorcode :
return errorcode [ err ]
return " Unknown error %s " % err
return " Unknown error %s " % err
class ExitNow ( Exception ) :
""" We don ' t use directly but may be necessary as we replace asyncore due to some library raising or expecting it """
pass
_reraised_exceptions = ( ExitNow , KeyboardInterrupt , SystemExit )
maxDownloadRate = 0
@ -113,28 +122,38 @@ uploadTimestamp = 0
uploadBucket = 0
sentBytes = 0
def read ( obj ) :
""" Event to read from the object, i.e. its network socket. """
if not can_receive ( ) :
return
try :
obj . handle_read_event ( )
except _reraised_exceptions :
raise
except :
except BaseException :
obj . handle_error ( )
def write ( obj ) :
""" Event to write to the object, i.e. its network socket. """
if not can_send ( ) :
return
try :
obj . handle_write_event ( )
except _reraised_exceptions :
raise
except :
except BaseException :
obj . handle_error ( )
def set_rates ( download , upload ) :
""" Set throttling rates """
global maxDownloadRate , maxUploadRate , downloadBucket , uploadBucket , downloadTimestamp , uploadTimestamp
maxDownloadRate = float ( download ) * 1024
maxUploadRate = float ( upload ) * 1024
downloadBucket = maxDownloadRate
@ -142,14 +161,24 @@ def set_rates(download, upload):
downloadTimestamp = time . time ( )
uploadTimestamp = time . time ( )
def can_receive ( ) :
""" Predicate indicating whether the download throttle is in effect """
return maxDownloadRate == 0 or downloadBucket > 0
def can_send ( ) :
""" Predicate indicating whether the upload throttle is in effect """
return maxUploadRate == 0 or uploadBucket > 0
def update_received ( download = 0 ) :
""" Update the receiving throttle """
global receivedBytes , downloadBucket , downloadTimestamp
currentTimestamp = time . time ( )
receivedBytes + = download
if maxDownloadRate > 0 :
@ -160,8 +189,12 @@ def update_received(download=0):
downloadBucket - = download
downloadTimestamp = currentTimestamp
def update_sent ( upload = 0 ) :
""" Update the sending throttle """
global sentBytes , uploadBucket , uploadTimestamp
currentTimestamp = time . time ( )
sentBytes + = upload
if maxUploadRate > 0 :
@ -172,15 +205,21 @@ def update_sent(upload=0):
uploadBucket - = upload
uploadTimestamp = currentTimestamp
def _exception ( obj ) :
""" Handle exceptions as appropriate """
try :
obj . handle_expt_event ( )
except _reraised_exceptions :
raise
except :
except BaseException :
obj . handle_error ( )
def readwrite ( obj , flags ) :
""" Read and write any pending data to/from the object """
try :
if flags & select . POLLIN and can_receive ( ) :
obj . handle_read_event ( )
@ -197,15 +236,20 @@ def readwrite(obj, flags):
obj . handle_close ( )
except _reraised_exceptions :
raise
except :
except BaseException :
obj . handle_error ( )
def select_poller ( timeout = 0.0 , map = None ) :
""" A poller which uses select(), available on most platforms. """
# pylint: disable=redefined-builtin
if map is None :
map = socket_map
if map :
r = [ ] ; w = [ ] ; e = [ ]
r = [ ]
w = [ ]
e = [ ]
for fd , obj in list ( map . items ( ) ) :
is_r = obj . readable ( )
is_w = obj . writable ( )
@ -251,13 +295,16 @@ def select_poller(timeout=0.0, map=None):
else :
current_thread ( ) . stop . wait ( timeout )
def poll_poller ( timeout = 0.0 , map = None ) :
""" A poller which uses poll(), available on most UNIXen. """
# pylint: disable=redefined-builtin
if map is None :
map = socket_map
if timeout is not None :
# timeout is in milliseconds
timeout = int ( timeout * 1000 )
timeout = int ( timeout * 1000 )
try :
poll_poller . pollster
except AttributeError :
@ -301,12 +348,16 @@ def poll_poller(timeout=0.0, map=None):
else :
current_thread ( ) . stop . wait ( timeout )
# Aliases for backward compatibility
poll = select_poller
poll2 = poll3 = poll_poller
def epoll_poller ( timeout = 0.0 , map = None ) :
""" A poller which uses epoll(), supported on Linux 2.5.44 and newer. """
# pylint: disable=redefined-builtin
if map is None :
map = socket_map
try :
@ -346,7 +397,7 @@ def epoll_poller(timeout=0.0, map=None):
if e . errno != EINTR :
raise
r = [ ]
except select . error , err :
except select . error as err :
if err . args [ 0 ] != EINTR :
raise
r = [ ]
@ -354,12 +405,15 @@ def epoll_poller(timeout=0.0, map=None):
obj = map . get ( fd )
if obj is None :
continue
readwrite ( obj , flags )
readwrite ( obj , flags )
else :
current_thread ( ) . stop . wait ( timeout )
def kqueue_poller ( timeout = 0.0 , map = None ) :
""" A poller which uses kqueue(), BSD specific. """
# pylint: disable=redefined-builtin,no-member
if map is None :
map = socket_map
try :
@ -408,7 +462,7 @@ def kqueue_poller(timeout=0.0, map=None):
for event in events :
fd = event . ident
obj = map . get ( fd )
obj = map . get ( fd )
if obj is None :
continue
if event . flags & select . KQ_EV_ERROR :
@ -425,13 +479,15 @@ def kqueue_poller(timeout=0.0, map=None):
current_thread ( ) . stop . wait ( timeout )
def loop ( timeout = 30.0 , use_poll = False , map = None , count = None ,
poller = None ) :
def loop ( timeout = 30.0 , use_poll = False , map = None , count = None , poller = None ) :
""" Poll in a loop, until count or timeout is reached """
# pylint: disable=redefined-builtin
if map is None :
map = socket_map
if count is None :
count = True
# code which grants backward compatibility with "use_poll"
count = True
# code which grants backward compatibility with "use_poll"
# argument which should no longer be used in favor of
# "poller"
@ -460,10 +516,13 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
break
# then poll
poller ( subtimeout , map )
if type( count ) is int :
if isinstance( count , int ) :
count = count - 1
class dispatcher :
""" Dispatcher for socket objects """
# pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class
debug = False
connected = False
@ -478,6 +537,7 @@ class dispatcher:
minTx = 1500
def __init__ ( self , sock = None , map = None ) :
# pylint: disable=redefined-builtin
if map is None :
self . _map = socket_map
else :
@ -510,7 +570,7 @@ class dispatcher:
self . socket = None
def __repr__ ( self ) :
status = [ self . __class__ . __module__ + " . " + self . __class__ . __name__ ]
status = [ self . __class__ . __module__ + " . " + self . __class__ . __name__ ]
if self . accepting and self . addr :
status . append ( ' listening ' )
elif self . connected :
@ -525,7 +585,9 @@ class dispatcher:
__str__ = __repr__
def add_channel ( self , map = None ) :
#self.log_info('adding channel %s' % self)
""" Add a channel """
# pylint: disable=redefined-builtin
if map is None :
map = self . _map
map [ self . _fileno ] = self
@ -533,11 +595,13 @@ class dispatcher:
self . poller_filter = 0
def del_channel ( self , map = None ) :
""" Delete a channel """
# pylint: disable=redefined-builtin
fd = self . _fileno
if map is None :
map = self . _map
if fd in map :
#self.log_info('closing channel %d:%s' % (fd, self))
del map [ fd ]
if self . _fileno :
try :
@ -564,25 +628,29 @@ class dispatcher:
self . poller_registered = False
def create_socket ( self , family = socket . AF_INET , socket_type = socket . SOCK_STREAM ) :
""" Create a socket """
self . family_and_type = family , socket_type
sock = socket . socket ( family , socket_type )
sock . setblocking ( 0 )
self . set_socket ( sock )
def set_socket ( self , sock , map = None ) :
""" Set socket """
# pylint: disable=redefined-builtin
self . socket = sock
## self.__dict__['socket'] = sock
self . _fileno = sock . fileno ( )
self . add_channel ( map )
def set_reuse_addr ( self ) :
# try to re-use a server port if possible
""" try to re-use a server port if possible """
try :
self . socket . setsockopt (
socket . SOL_SOCKET , socket . SO_REUSEADDR ,
self . socket . getsockopt ( socket . SOL_SOCKET ,
socket . SO_REUSEADDR ) | 1
)
)
except socket . error :
pass
@ -593,11 +661,13 @@ class dispatcher:
# ==================================================
def readable ( self ) :
""" Predicate to indicate download throttle status """
if maxDownloadRate > 0 :
return downloadBucket > dispatcher . minTx
return True
def writable ( self ) :
""" Predicate to indicate upload throttle status """
if maxUploadRate > 0 :
return uploadBucket > dispatcher . minTx
return True
@ -607,21 +677,24 @@ class dispatcher:
# ==================================================
def listen ( self , num ) :
""" Listen on a port """
self . accepting = True
if os . name == ' nt ' and num > 5 :
num = 5
return self . socket . listen ( num )
def bind ( self , addr ) :
""" Bind to an address """
self . addr = addr
return self . socket . bind ( addr )
def connect ( self , address ) :
""" Connect to an address """
self . connected = False
self . connecting = True
err = self . socket . connect_ex ( address )
if err in ( EINPROGRESS , EALREADY , EWOULDBLOCK , WSAEWOULDBLOCK ) \
or err == EINVAL and os . name in ( ' nt ' , ' ce ' ) :
or err == EINVAL and os . name in ( ' nt ' , ' ce ' ) :
self . addr = address
return
if err in ( 0 , EISCONN ) :
@ -631,7 +704,7 @@ class dispatcher:
raise socket . error ( err , errorcode [ err ] )
def accept ( self ) :
# XXX can return either an address pair or None
""" Accept incoming connections. Returns either an address pair or None. """
try :
conn , addr = self . socket . accept ( )
except TypeError :
@ -645,6 +718,7 @@ class dispatcher:
return conn , addr
def send ( self , data ) :
""" Send data """
try :
result = self . socket . send ( data )
return result
@ -658,6 +732,7 @@ class dispatcher:
raise
def recv ( self , buffer_size ) :
""" Receive data """
try :
data = self . socket . recv ( buffer_size )
if not data :
@ -665,8 +740,7 @@ class dispatcher:
# a read condition, and having recv() return 0.
self . handle_close ( )
return b ' '
else :
return data
return data
except socket . error as why :
# winsock sometimes raises ENOTCONN
if why . args [ 0 ] in ( EAGAIN , EWOULDBLOCK , WSAEWOULDBLOCK ) :
@ -678,6 +752,7 @@ class dispatcher:
raise
def close ( self ) :
""" Close connection """
self . connected = False
self . accepting = False
self . connecting = False
@ -695,10 +770,10 @@ class dispatcher:
retattr = getattr ( self . socket , attr )
except AttributeError :
raise AttributeError ( " %s instance has no attribute ' %s ' "
% ( self . __class__ . __name__ , attr ) )
% ( self . __class__ . __name__ , attr ) )
else :
msg = " %(me)s . %(attr)s is deprecated; use %(me)s .socket. %(attr)s " \
" instead " % { ' me ' : self . __class__ . __name__ , ' attr ' : attr }
" instead " % { ' me ' : self . __class__ . __name__ , ' attr ' : attr }
warnings . warn ( msg , DeprecationWarning , stacklevel = 2 )
return retattr
@ -707,13 +782,16 @@ class dispatcher:
# and 'log_info' is for informational, warning and error logging.
def log ( self , message ) :
""" Log a message to stderr """
sys . stderr . write ( ' log: %s \n ' % str ( message ) )
def log_info ( self , message , log_type = ' info ' ) :
""" Conditionally print a message """
if log_type not in self . ignore_log_types :
print ( ' %s : %s ' % ( log_type , message ) )
print ' %s : %s ' % ( log_type , message )
def handle_read_event ( self ) :
""" Handle a read event """
if self . accepting :
# accepting sockets are never connected, they "spawn" new
# sockets that are connected
@ -726,6 +804,7 @@ class dispatcher:
self . handle_read ( )
def handle_connect_event ( self ) :
""" Handle a connection event """
err = self . socket . getsockopt ( socket . SOL_SOCKET , socket . SO_ERROR )
if err != 0 :
raise socket . error ( err , _strerror ( err ) )
@ -734,6 +813,7 @@ class dispatcher:
self . connecting = False
def handle_write_event ( self ) :
""" Handle a write event """
if self . accepting :
# Accepting sockets shouldn't get a write event.
# We will pretend it didn't happen.
@ -745,6 +825,7 @@ class dispatcher:
self . handle_write ( )
def handle_expt_event ( self ) :
""" Handle expected exceptions """
# handle_expt_event() is called if there might be an error on the
# socket, or if there is OOB data
# check for the error condition first
@ -763,12 +844,13 @@ class dispatcher:
self . handle_expt ( )
def handle_error ( self ) :
nil , t , v , tbinfo = compact_traceback ( )
""" Handle unexpected exceptions """
_ , t , v , tbinfo = compact_traceback ( )
# sometimes a user repr method will crash.
try :
self_repr = repr ( self )
except :
except BaseException :
self_repr = ' <__repr__(self) failed for object at %0x > ' % id ( self )
self . log_info (
@ -777,89 +859,110 @@ class dispatcher:
t ,
v ,
tbinfo
) ,
) ,
' error '
)
)
self . handle_close ( )
def handle_accept ( self ) :
""" Handle an accept event """
pair = self . accept ( )
if pair is not None :
self . handle_accepted ( * pair )
def handle_expt ( self ) :
""" Log that the subclass does not implement handle_expt """
self . log_info ( ' unhandled incoming priority event ' , ' warning ' )
def handle_read ( self ) :
""" Log that the subclass does not implement handle_read """
self . log_info ( ' unhandled read event ' , ' warning ' )
def handle_write ( self ) :
""" Log that the subclass does not implement handle_write """
self . log_info ( ' unhandled write event ' , ' warning ' )
def handle_connect ( self ) :
""" Log that the subclass does not implement handle_connect """
self . log_info ( ' unhandled connect event ' , ' warning ' )
def handle_accept ( self ) :
pair = self . accept ( )
if pair is not None :
self . handle_accepted ( * pair )
def handle_accepted ( self , sock , addr ) :
""" Log that the subclass does not implement handle_accepted """
sock . close ( )
self . log_info ( ' unhandled accepted event on %s ' % ( addr ) , ' warning ' )
def handle_close ( self ) :
""" Log that the subclass does not implement handle_close """
self . log_info ( ' unhandled close event ' , ' warning ' )
self . close ( )
# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------
class dispatcher_with_send ( dispatcher ) :
"""
adds simple buffered output capability , useful for simple clients .
[ for more sophisticated usage use asynchat . async_chat ]
"""
# pylint: disable=redefined-builtin
def __init__ ( self , sock = None , map = None ) :
# pylint: disable=redefined-builtin
dispatcher . __init__ ( self , sock , map )
self . out_buffer = b ' '
def initiate_send ( self ) :
""" Initiate a send """
num_sent = 0
num_sent = dispatcher . send ( self , self . out_buffer [ : 512 ] )
self . out_buffer = self . out_buffer [ num_sent : ]
def handle_write ( self ) :
""" Handle a write event """
self . initiate_send ( )
def writable ( self ) :
return ( not self . connected ) or len ( self . out_buffer )
""" Predicate to indicate if the object is writable """
return not self . connected or len ( self . out_buffer )
def send ( self , data ) :
""" Send data """
if self . debug :
self . log_info ( ' sending %s ' % repr ( data ) )
self . out_buffer = self . out_buffer + data
self . initiate_send ( )
# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------
def compact_traceback ( ) :
""" Return a compact traceback """
t , v , tb = sys . exc_info ( )
tbinfo = [ ]
if not tb : # Must have a traceback
if not tb : # Must have a traceback
raise AssertionError ( " traceback does not exist " )
while tb :
tbinfo . append ( (
tb . tb_frame . f_code . co_filename ,
tb . tb_frame . f_code . co_name ,
str ( tb . tb_lineno )
) )
) )
tb = tb . tb_next
# just to be safe
del tb
file , function , line = tbinfo [ - 1 ]
file name , function , line = tbinfo [ - 1 ]
info = ' ' . join ( [ ' [ %s | %s | %s ] ' % x for x in tbinfo ] )
return ( file , function , line ) , t , v , info
return ( filename , function , line ) , t , v , info
def close_all ( map = None , ignore_all = False ) :
""" Close all connections """
# pylint: disable=redefined-builtin
if map is None :
map = socket_map
for x in list ( map . values ( ) ) :
@ -872,11 +975,12 @@ def close_all(map=None, ignore_all=False):
raise
except _reraised_exceptions :
raise
except :
except BaseException :
if not ignore_all :
raise
map . clear ( )
# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
@ -890,27 +994,34 @@ def close_all(map=None, ignore_all=False):
#
# Regardless, this is useful for pipes, and stdin/stdout...
if os . name == ' posix ' :
import fcntl
class file_wrapper :
# Here we override just enough to make a file
# look like a socket for the purposes of asyncore.
# The passed fd is automatically os.dup()'d
"""
Here we override just enough to make a file look like a socket for the purposes of asyncore .
The passed fd is automatically os . dup ( ) ' d
"""
# pylint: disable=old-style-class
def __init__ ( self , fd ) :
self . fd = os . dup ( fd )
def recv ( self , * args ) :
""" Fake recv() """
return os . read ( self . fd , * args )
def send ( self , * args ) :
""" Fake send() """
return os . write ( self . fd , * args )
def getsockopt ( self , level , optname , buflen = None ) :
""" Fake getsockopt() """
if ( level == socket . SOL_SOCKET and
optname == socket . SO_ERROR and
not buflen ) :
optname == socket . SO_ERROR and
not buflen ) :
return 0
raise NotImplementedError ( " Only asyncore specific behaviour "
" implemented. " )
@ -919,14 +1030,19 @@ if os.name == 'posix':
write = send
def close ( self ) :
""" Fake close() """
os . close ( self . fd )
def fileno ( self ) :
""" Fake fileno() """
return self . fd
class file_dispatcher ( dispatcher ) :
""" A dispatcher for file_wrapper objects """
def __init__ ( self , fd , map = None ) :
# pylint: disable=redefined-builtin
dispatcher . __init__ ( self , None , map )
self . connected = True
try :
@ -940,6 +1056,7 @@ if os.name == 'posix':
fcntl . fcntl ( fd , fcntl . F_SETFL , flags )
def set_file ( self , fd ) :
""" Set file """
self . socket = file_wrapper ( fd )
self . _fileno = self . socket . fileno ( )
self . add_channel ( )