2018-10-04 16:23:01 +02:00
|
|
|
"""
|
|
|
|
src/network/advanceddispatcher.py
|
|
|
|
=================================
|
|
|
|
"""
|
|
|
|
# pylint: disable=attribute-defined-outside-init
|
|
|
|
|
2017-06-21 12:16:33 +02:00
|
|
|
import socket
|
2017-07-06 19:45:36 +02:00
|
|
|
import threading
|
2017-05-24 16:51:49 +02:00
|
|
|
import time
|
|
|
|
|
2018-10-04 16:23:01 +02:00
|
|
|
import network.asyncore_pollchoose as asyncore
|
|
|
|
import state
|
2019-08-06 13:04:33 +02:00
|
|
|
from threads import BusyError, nonBlocking
|
2018-10-04 16:23:01 +02:00
|
|
|
|
2017-02-08 14:19:02 +01:00
|
|
|
|
2018-02-19 21:27:38 +01:00
|
|
|
class ProcessingError(Exception):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""General class for protocol parser exception, use as a base for others."""
|
2018-02-19 21:27:38 +01:00
|
|
|
pass
|
|
|
|
|
2018-10-04 16:23:01 +02:00
|
|
|
|
2018-02-19 21:27:38 +01:00
|
|
|
class UnknownStateError(ProcessingError):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Parser points to an unknown (unimplemented) state."""
|
2018-02-19 21:27:38 +01:00
|
|
|
pass
|
|
|
|
|
2018-10-04 16:23:01 +02:00
|
|
|
|
2017-01-10 21:20:49 +01:00
|
|
|
class AdvancedDispatcher(asyncore.dispatcher):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Improved version of asyncore dispatcher, with buffers and protocol state."""
|
|
|
|
# pylint: disable=too-many-instance-attributes
|
|
|
|
_buf_len = 131072 # 128kB
|
2017-01-10 21:20:49 +01:00
|
|
|
|
2017-03-20 18:32:26 +01:00
|
|
|
def __init__(self, sock=None):
|
2017-03-10 23:11:57 +01:00
|
|
|
if not hasattr(self, '_map'):
|
2017-03-20 18:32:26 +01:00
|
|
|
asyncore.dispatcher.__init__(self, sock)
|
2017-10-16 08:07:32 +02:00
|
|
|
self.read_buf = bytearray()
|
|
|
|
self.write_buf = bytearray()
|
2017-01-10 21:20:49 +01:00
|
|
|
self.state = "init"
|
2017-05-24 16:51:49 +02:00
|
|
|
self.lastTx = time.time()
|
2017-05-25 14:59:18 +02:00
|
|
|
self.sentBytes = 0
|
|
|
|
self.receivedBytes = 0
|
2017-05-27 22:30:30 +02:00
|
|
|
self.expectBytes = 0
|
2017-07-06 19:45:36 +02:00
|
|
|
self.readLock = threading.RLock()
|
|
|
|
self.writeLock = threading.RLock()
|
2017-07-10 07:08:10 +02:00
|
|
|
self.processingLock = threading.RLock()
|
2017-07-06 19:45:36 +02:00
|
|
|
|
|
|
|
def append_write_buf(self, data):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Append binary data to the end of stream write buffer."""
|
2017-07-06 19:45:36 +02:00
|
|
|
if data:
|
2017-10-16 08:07:32 +02:00
|
|
|
if isinstance(data, list):
|
|
|
|
with self.writeLock:
|
|
|
|
for chunk in data:
|
|
|
|
self.write_buf.extend(chunk)
|
|
|
|
else:
|
|
|
|
with self.writeLock:
|
|
|
|
self.write_buf.extend(data)
|
2017-01-10 21:20:49 +01:00
|
|
|
|
2017-02-08 14:19:02 +01:00
|
|
|
def slice_write_buf(self, length=0):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Cut the beginning of the stream write buffer."""
|
2017-04-16 18:27:15 +02:00
|
|
|
if length > 0:
|
2017-07-06 19:45:36 +02:00
|
|
|
with self.writeLock:
|
2017-10-19 09:02:33 +02:00
|
|
|
if length >= len(self.write_buf):
|
|
|
|
del self.write_buf[:]
|
|
|
|
else:
|
|
|
|
del self.write_buf[0:length]
|
2017-04-16 18:27:15 +02:00
|
|
|
|
|
|
|
def slice_read_buf(self, length=0):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Cut the beginning of the stream read buffer."""
|
2017-04-16 18:27:15 +02:00
|
|
|
if length > 0:
|
2017-07-06 19:45:36 +02:00
|
|
|
with self.readLock:
|
2017-10-19 09:02:33 +02:00
|
|
|
if length >= len(self.read_buf):
|
|
|
|
del self.read_buf[:]
|
|
|
|
else:
|
|
|
|
del self.read_buf[0:length]
|
2017-01-10 21:20:49 +01:00
|
|
|
|
|
|
|
def process(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Process (parse) data that's in the buffer, as long as there is enough data and the connection is open."""
|
2017-10-19 09:11:34 +02:00
|
|
|
while self.connected and not state.shutdown:
|
2017-01-10 21:20:49 +01:00
|
|
|
try:
|
2017-07-10 07:08:10 +02:00
|
|
|
with nonBlocking(self.processingLock):
|
2017-10-19 09:11:34 +02:00
|
|
|
if not self.connected or state.shutdown:
|
|
|
|
break
|
2017-07-10 23:18:58 +02:00
|
|
|
if len(self.read_buf) < self.expectBytes:
|
|
|
|
return False
|
2018-02-19 21:27:38 +01:00
|
|
|
try:
|
|
|
|
cmd = getattr(self, "state_" + str(self.state))
|
|
|
|
except AttributeError:
|
2019-08-06 13:04:33 +02:00
|
|
|
self.logger.error(
|
|
|
|
'Unknown state %s', self.state, exc_info=True)
|
2018-10-04 16:23:01 +02:00
|
|
|
raise UnknownStateError(self.state)
|
2018-02-19 21:27:38 +01:00
|
|
|
if not cmd():
|
2017-07-10 07:08:10 +02:00
|
|
|
break
|
2017-07-10 20:52:11 +02:00
|
|
|
except BusyError:
|
|
|
|
return False
|
2017-07-07 07:55:29 +02:00
|
|
|
return False
|
2017-01-10 21:20:49 +01:00
|
|
|
|
2018-10-04 16:23:01 +02:00
|
|
|
def set_state(self, state_str, length=0, expectBytes=0):
|
|
|
|
"""Set the next processing state."""
|
2017-05-27 22:30:30 +02:00
|
|
|
self.expectBytes = expectBytes
|
2017-01-10 21:20:49 +01:00
|
|
|
self.slice_read_buf(length)
|
2018-10-04 16:23:01 +02:00
|
|
|
self.state = state_str
|
2017-01-10 21:20:49 +01:00
|
|
|
|
|
|
|
def writable(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Is data from the write buffer ready to be sent to the network?"""
|
2017-07-07 07:55:29 +02:00
|
|
|
self.uploadChunk = AdvancedDispatcher._buf_len
|
|
|
|
if asyncore.maxUploadRate > 0:
|
2018-01-02 15:24:47 +01:00
|
|
|
self.uploadChunk = int(asyncore.uploadBucket)
|
2017-07-07 07:55:29 +02:00
|
|
|
self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
|
2017-05-29 00:24:07 +02:00
|
|
|
return asyncore.dispatcher.writable(self) and \
|
2018-10-04 16:23:01 +02:00
|
|
|
(self.connecting or (self.connected and self.uploadChunk > 0))
|
2017-01-10 21:20:49 +01:00
|
|
|
|
|
|
|
def readable(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Is the read buffer ready to accept data from the network?"""
|
2017-07-07 07:55:29 +02:00
|
|
|
self.downloadChunk = AdvancedDispatcher._buf_len
|
|
|
|
if asyncore.maxDownloadRate > 0:
|
2018-01-02 15:24:47 +01:00
|
|
|
self.downloadChunk = int(asyncore.downloadBucket)
|
2017-07-07 07:55:29 +02:00
|
|
|
try:
|
2017-10-20 01:07:30 +02:00
|
|
|
if self.expectBytes > 0 and not self.fullyEstablished:
|
2017-07-07 07:55:29 +02:00
|
|
|
self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf))
|
2017-10-19 09:00:54 +02:00
|
|
|
if self.downloadChunk < 0:
|
|
|
|
self.downloadChunk = 0
|
2017-07-07 07:55:29 +02:00
|
|
|
except AttributeError:
|
|
|
|
pass
|
2017-05-29 00:24:07 +02:00
|
|
|
return asyncore.dispatcher.readable(self) and \
|
2018-10-04 16:23:01 +02:00
|
|
|
(self.connecting or self.accepting or (self.connected and self.downloadChunk > 0))
|
2017-01-10 21:20:49 +01:00
|
|
|
|
|
|
|
def handle_read(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Append incoming data to the read buffer."""
|
2017-05-24 16:51:49 +02:00
|
|
|
self.lastTx = time.time()
|
2017-07-07 07:55:29 +02:00
|
|
|
newData = self.recv(self.downloadChunk)
|
|
|
|
self.receivedBytes += len(newData)
|
|
|
|
asyncore.update_received(len(newData))
|
|
|
|
with self.readLock:
|
2017-10-16 08:07:32 +02:00
|
|
|
self.read_buf.extend(newData)
|
2017-01-10 21:20:49 +01:00
|
|
|
|
|
|
|
def handle_write(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Send outgoing data from write buffer."""
|
2017-05-24 16:51:49 +02:00
|
|
|
self.lastTx = time.time()
|
2017-07-07 07:55:29 +02:00
|
|
|
written = self.send(self.write_buf[0:self.uploadChunk])
|
|
|
|
asyncore.update_sent(written)
|
|
|
|
self.sentBytes += written
|
|
|
|
self.slice_write_buf(written)
|
2017-03-10 23:11:57 +01:00
|
|
|
|
2017-06-03 16:30:05 +02:00
|
|
|
def handle_connect_event(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Callback for connection established event."""
|
2017-06-03 16:30:05 +02:00
|
|
|
try:
|
|
|
|
asyncore.dispatcher.handle_connect_event(self)
|
|
|
|
except socket.error as e:
|
2018-10-04 16:23:01 +02:00
|
|
|
if e.args[0] not in asyncore._DISCONNECTED: # pylint: disable=protected-access
|
2017-06-03 16:30:05 +02:00
|
|
|
raise
|
|
|
|
|
2017-03-10 23:11:57 +01:00
|
|
|
def handle_connect(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Method for handling connection established implementations."""
|
2017-05-24 16:51:49 +02:00
|
|
|
self.lastTx = time.time()
|
|
|
|
|
2017-05-25 23:04:33 +02:00
|
|
|
def state_close(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Signal to the processing loop to end."""
|
|
|
|
# pylint: disable=no-self-use
|
2017-07-06 19:45:36 +02:00
|
|
|
return False
|
2017-05-25 23:04:33 +02:00
|
|
|
|
2017-06-02 07:09:35 +02:00
|
|
|
def handle_close(self):
|
2018-10-04 16:23:01 +02:00
|
|
|
"""Callback for connection being closed, but can also be called directly when you want connection to close."""
|
2017-10-16 08:07:32 +02:00
|
|
|
with self.readLock:
|
|
|
|
self.read_buf = bytearray()
|
|
|
|
with self.writeLock:
|
|
|
|
self.write_buf = bytearray()
|
2017-11-17 13:37:51 +01:00
|
|
|
self.set_state("close")
|
2017-10-19 09:08:05 +02:00
|
|
|
self.close()
|