From d223bfc6f29367b7d70e35f7bf1185a7bc3ca239 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 31 Jan 2018 22:25:23 +0100 Subject: [PATCH] Fix kqueue poller - separate read and write filters - make filters peristent for reduced syscall count --- src/network/asyncore_pollchoose.py | 95 +++++++++++++++++++++--------- 1 file changed, 68 insertions(+), 27 deletions(-) diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index f9dc9ee5..cd19063a 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -361,35 +361,65 @@ def kqueue_poller(timeout=0.0, map=None): """A poller which uses kqueue(), BSD specific.""" if map is None: map = socket_map + try: + kqueue_poller.pollster + except AttributeError: + kqueue_poller.pollster = select.kqueue() if map: - kqueue = select.kqueue() - flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE + updates = [] selectables = 0 for fd, obj in map.items(): kq_filter = 0 if obj.readable(): - kq_filter |= select.KQ_FILTER_READ - if obj.writable(): - kq_filter |= select.KQ_FILTER_WRITE - if kq_filter: - try: - ev = select.kevent(fd, filter=kq_filter, flags=flags) - kqueue.control([ev], 0) - selectables += 1 - except IOError: - pass + kq_filter |= 1 + selectables += 1 + if obj.writable() and not obj.accepting: + kq_filter |= 2 + selectables += 1 + if kq_filter != obj.poller_filter: + # unlike other pollers, READ and WRITE aren't OR able but have + # to be set and checked separately + if kq_filter & 1 != obj.poller_filter & 1: + poller_flags = select.KQ_EV_ADD + if kq_filter & 1: + poller_flags |= select.KQ_EV_ENABLE + else: + poller_flags |= select.KQ_EV_DISABLE + updates.append(select.kevent(fd, filter=select.KQ_FILTER_READ, flags=poller_flags)) + if kq_filter & 2 != obj.poller_filter & 2: + poller_flags = select.KQ_EV_ADD + if kq_filter & 2: + poller_flags |= select.KQ_EV_ENABLE + else: + poller_flags |= select.KQ_EV_DISABLE + updates.append(select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=poller_flags)) + obj.poller_filter = kq_filter - events = kqueue.control(None, selectables, timeout) - for event in random.sample(events, len(events)): + if not selectables: + # unlike other pollers, kqueue poll does not wait if there are no + # filters setup + current_thread().stop.wait(timeout) + return + + events = kqueue_poller.pollster.control(updates, selectables, timeout) + if len(events) > 1: + events = random.sample(events, len(events)) + + for event in events: fd = event.ident obj = map.get(fd) if obj is None: continue + if event.flags & select.KQ_EV_ERROR: + _exception(obj) + continue + if event.flags & select.KQ_EV_EOF and event.data and event.fflags: + obj.handle_close() + continue if event.filter == select.KQ_FILTER_READ: read(obj) if event.filter == select.KQ_FILTER_WRITE: write(obj) - kqueue.close() else: current_thread().stop.wait(timeout) @@ -499,27 +529,38 @@ class dispatcher: map = self._map map[self._fileno] = self self.poller_flags = 0 + self.poller_filter = 0 def del_channel(self, map=None): fd = self._fileno if map is None: map = self._map - self.poller_flags = 0 - self.poller_registered = False if fd in map: #self.log_info('closing channel %d:%s' % (fd, self)) del map[fd] + if self._fileno: + try: + kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0) + except (AttributeError, KeyError, TypeError, IOError, OSError): + pass + try: + kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0) + except (AttributeError, KeyError, TypeError, IOError, OSError): + pass + try: + epoll_poller.pollster.unregister(fd) + except (AttributeError, KeyError, TypeError, IOError): + # no epoll used, or not registered + pass + try: + poll_poller.pollster.unregister(fd) + except (AttributeError, KeyError, TypeError, IOError): + # no poll used, or not registered + pass self._fileno = None - try: - epoll_poller.pollster.unregister(fd) - except (AttributeError, KeyError, TypeError, IOError): - # no epoll used, or not registered - pass - try: - poll_poller.pollster.unregister(fd) - except (AttributeError, KeyError, TypeError, IOError): - # no poll used, or not registered - pass + self.poller_flags = 0 + self.poller_filter = 0 + self.poller_registered = False def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM): self.family_and_type = family, socket_type