Fix kqueue poller

- separate read and write filters
- make filters peristent for reduced syscall count
This commit is contained in:
Peter Šurda 2018-01-31 22:25:23 +01:00
parent 222e666a60
commit d223bfc6f2
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87

View File

@ -361,35 +361,65 @@ def kqueue_poller(timeout=0.0, map=None):
"""A poller which uses kqueue(), BSD specific.""" """A poller which uses kqueue(), BSD specific."""
if map is None: if map is None:
map = socket_map map = socket_map
try:
kqueue_poller.pollster
except AttributeError:
kqueue_poller.pollster = select.kqueue()
if map: if map:
kqueue = select.kqueue() updates = []
flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
selectables = 0 selectables = 0
for fd, obj in map.items(): for fd, obj in map.items():
kq_filter = 0 kq_filter = 0
if obj.readable(): if obj.readable():
kq_filter |= select.KQ_FILTER_READ kq_filter |= 1
if obj.writable(): selectables += 1
kq_filter |= select.KQ_FILTER_WRITE if obj.writable() and not obj.accepting:
if kq_filter: kq_filter |= 2
try: selectables += 1
ev = select.kevent(fd, filter=kq_filter, flags=flags) if kq_filter != obj.poller_filter:
kqueue.control([ev], 0) # unlike other pollers, READ and WRITE aren't OR able but have
selectables += 1 # to be set and checked separately
except IOError: if kq_filter & 1 != obj.poller_filter & 1:
pass poller_flags = select.KQ_EV_ADD
if kq_filter & 1:
poller_flags |= select.KQ_EV_ENABLE
else:
poller_flags |= select.KQ_EV_DISABLE
updates.append(select.kevent(fd, filter=select.KQ_FILTER_READ, flags=poller_flags))
if kq_filter & 2 != obj.poller_filter & 2:
poller_flags = select.KQ_EV_ADD
if kq_filter & 2:
poller_flags |= select.KQ_EV_ENABLE
else:
poller_flags |= select.KQ_EV_DISABLE
updates.append(select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=poller_flags))
obj.poller_filter = kq_filter
events = kqueue.control(None, selectables, timeout) if not selectables:
for event in random.sample(events, len(events)): # unlike other pollers, kqueue poll does not wait if there are no
# filters setup
current_thread().stop.wait(timeout)
return
events = kqueue_poller.pollster.control(updates, selectables, timeout)
if len(events) > 1:
events = random.sample(events, len(events))
for event in events:
fd = event.ident fd = event.ident
obj = map.get(fd) obj = map.get(fd)
if obj is None: if obj is None:
continue continue
if event.flags & select.KQ_EV_ERROR:
_exception(obj)
continue
if event.flags & select.KQ_EV_EOF and event.data and event.fflags:
obj.handle_close()
continue
if event.filter == select.KQ_FILTER_READ: if event.filter == select.KQ_FILTER_READ:
read(obj) read(obj)
if event.filter == select.KQ_FILTER_WRITE: if event.filter == select.KQ_FILTER_WRITE:
write(obj) write(obj)
kqueue.close()
else: else:
current_thread().stop.wait(timeout) current_thread().stop.wait(timeout)
@ -499,27 +529,38 @@ class dispatcher:
map = self._map map = self._map
map[self._fileno] = self map[self._fileno] = self
self.poller_flags = 0 self.poller_flags = 0
self.poller_filter = 0
def del_channel(self, map=None): def del_channel(self, map=None):
fd = self._fileno fd = self._fileno
if map is None: if map is None:
map = self._map map = self._map
self.poller_flags = 0
self.poller_registered = False
if fd in map: if fd in map:
#self.log_info('closing channel %d:%s' % (fd, self)) #self.log_info('closing channel %d:%s' % (fd, self))
del map[fd] del map[fd]
if self._fileno:
try:
kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0)
except (AttributeError, KeyError, TypeError, IOError, OSError):
pass
try:
kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0)
except (AttributeError, KeyError, TypeError, IOError, OSError):
pass
try:
epoll_poller.pollster.unregister(fd)
except (AttributeError, KeyError, TypeError, IOError):
# no epoll used, or not registered
pass
try:
poll_poller.pollster.unregister(fd)
except (AttributeError, KeyError, TypeError, IOError):
# no poll used, or not registered
pass
self._fileno = None self._fileno = None
try: self.poller_flags = 0
epoll_poller.pollster.unregister(fd) self.poller_filter = 0
except (AttributeError, KeyError, TypeError, IOError): self.poller_registered = False
# no epoll used, or not registered
pass
try:
poll_poller.pollster.unregister(fd)
except (AttributeError, KeyError, TypeError, IOError):
# no poll used, or not registered
pass
def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM): def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM):
self.family_and_type = family, socket_type self.family_and_type = family, socket_type