New network subsystem updates

- auto-select select/poll/epoll/kqueue depending on what's available
This commit is contained in:
Peter Šurda 2017-03-10 23:56:38 +01:00
parent 998935be5f
commit a1d1114cb2
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
2 changed files with 37 additions and 10 deletions

View File

@ -172,7 +172,10 @@ def poll_poller(timeout=0.0, map=None):
if timeout is not None: if timeout is not None:
# timeout is in milliseconds # timeout is in milliseconds
timeout = int(timeout*1000) timeout = int(timeout*1000)
pollster = select.poll() try:
poll_poller.pollster
except AttributeError:
poll_poller.pollster = select.poll()
if map: if map:
for fd, obj in list(map.items()): for fd, obj in list(map.items()):
flags = 0 flags = 0
@ -182,9 +185,12 @@ def poll_poller(timeout=0.0, map=None):
if obj.writable() and not obj.accepting: if obj.writable() and not obj.accepting:
flags |= select.POLLOUT flags |= select.POLLOUT
if flags: if flags:
pollster.register(fd, flags) try:
poll_poller.pollster.modify(fd, flags)
except IOError:
poll_poller.pollster.register(fd, flags)
try: try:
r = pollster.poll(timeout) r = poll_poller.pollster.poll(timeout)
except KeyboardInterrupt: except KeyboardInterrupt:
r = [] r = []
for fd, flags in r: for fd, flags in r:
@ -201,7 +207,10 @@ def epoll_poller(timeout=0.0, map=None):
"""A poller which uses epoll(), supported on Linux 2.5.44 and newer.""" """A poller which uses epoll(), supported on Linux 2.5.44 and newer."""
if map is None: if map is None:
map = socket_map map = socket_map
pollster = select.epoll() try:
epoll_poller.pollster
except AttributeError:
epoll_poller.pollster = select.epoll()
if map: if map:
for fd, obj in map.items(): for fd, obj in map.items():
flags = 0 flags = 0
@ -213,9 +222,12 @@ def epoll_poller(timeout=0.0, map=None):
# Only check for exceptions if object was either readable # Only check for exceptions if object was either readable
# or writable. # or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags) try:
epoll_poller.pollster.register(fd, flags)
except IOError:
epoll_poller.pollster.modify(fd, flags)
try: try:
r = pollster.poll(timeout) r = epoll_poller.pollster.poll(timeout)
except select.error, err: except select.error, err:
if err.args[0] != EINTR: if err.args[0] != EINTR:
raise raise
@ -265,9 +277,14 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None,
# code which grants backward compatibility with "use_poll" # code which grants backward compatibility with "use_poll"
# argument which should no longer be used in favor of # argument which should no longer be used in favor of
# "poller" # "poller"
if use_poll and hasattr(select, 'poll'):
if hasattr(select, 'epoll'):
poller = epoll_poller
elif hasattr(select, 'kqueue'):
poller = kqueue_poller
elif hasattr(select, 'poll'):
poller = poll_poller poller = poll_poller
else: elif hasattr(select, 'select'):
poller = select_poller poller = select_poller
if count is None: if count is None:
@ -349,6 +366,16 @@ class dispatcher:
#self.log_info('closing channel %d:%s' % (fd, self)) #self.log_info('closing channel %d:%s' % (fd, self))
del map[fd] del map[fd]
self._fileno = None self._fileno = None
try:
epoll_poller.pollster.unregister(fd)
except (AttributeError, KeyError):
# no epoll used, or not registered
pass
try:
poll_poller.pollster.unregister(fd)
except (AttributeError, KeyError):
# no poll used, or not registered
pass
def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM): def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
self.family_and_type = family, type self.family_and_type = family, type

View File

@ -20,13 +20,13 @@ class HttpConnection(AdvancedDispatcher):
def state_init(self): def state_init(self):
self.write_buf += "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (self.path, self.destination[0]) self.write_buf += "GET %s HTTP/1.1\r\nHost: %s\r\nConnection: close\r\n\r\n" % (self.path, self.destination[0])
print "\"%s\"" % (self.write_buf) print "Sending %ib" % (len(self.write_buf))
self.set_state("http_request_sent", 0) self.set_state("http_request_sent", 0)
return False return False
def state_http_request_sent(self): def state_http_request_sent(self):
if len(self.read_buf) > 0: if len(self.read_buf) > 0:
print self.read_buf print "Received %ib" % (len(self.read_buf))
self.read_buf = b"" self.read_buf = b""
if not self.connected: if not self.connected:
self.set_state("close", 0) self.set_state("close", 0)