|
|
@ -1,20 +1,33 @@ |
|
|
|
""" |
|
|
|
src/network/advanceddispatcher.py |
|
|
|
================================= |
|
|
|
""" |
|
|
|
# pylint: disable=attribute-defined-outside-init |
|
|
|
|
|
|
|
import socket |
|
|
|
import threading |
|
|
|
import time |
|
|
|
|
|
|
|
import asyncore_pollchoose as asyncore |
|
|
|
import network.asyncore_pollchoose as asyncore |
|
|
|
import state |
|
|
|
from debug import logger |
|
|
|
from helper_threading import BusyError, nonBlocking |
|
|
|
import state |
|
|
|
|
|
|
|
|
|
|
|
class ProcessingError(Exception): |
|
|
|
"""General class for protocol parser exception, use as a base for others.""" |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
class UnknownStateError(ProcessingError): |
|
|
|
"""Parser points to an unknown (unimplemented) state.""" |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
_buf_len = 131072 # 128kB |
|
|
|
"""Improved version of asyncore dispatcher, with buffers and protocol state.""" |
|
|
|
# pylint: disable=too-many-instance-attributes |
|
|
|
_buf_len = 131072 # 128kB |
|
|
|
|
|
|
|
def __init__(self, sock=None): |
|
|
|
if not hasattr(self, '_map'): |
|
|
@ -31,6 +44,7 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
self.processingLock = threading.RLock() |
|
|
|
|
|
|
|
def append_write_buf(self, data): |
|
|
|
"""Append binary data to the end of stream write buffer.""" |
|
|
|
if data: |
|
|
|
if isinstance(data, list): |
|
|
|
with self.writeLock: |
|
|
@ -41,6 +55,7 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
self.write_buf.extend(data) |
|
|
|
|
|
|
|
def slice_write_buf(self, length=0): |
|
|
|
"""Cut the beginning of the stream write buffer.""" |
|
|
|
if length > 0: |
|
|
|
with self.writeLock: |
|
|
|
if length >= len(self.write_buf): |
|
|
@ -49,6 +64,7 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
del self.write_buf[0:length] |
|
|
|
|
|
|
|
def slice_read_buf(self, length=0): |
|
|
|
"""Cut the beginning of the stream read buffer.""" |
|
|
|
if length > 0: |
|
|
|
with self.readLock: |
|
|
|
if length >= len(self.read_buf): |
|
|
@ -57,6 +73,7 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
del self.read_buf[0:length] |
|
|
|
|
|
|
|
def process(self): |
|
|
|
"""Process (parse) data that's in the buffer, as long as there is enough data and the connection is open.""" |
|
|
|
while self.connected and not state.shutdown: |
|
|
|
try: |
|
|
|
with nonBlocking(self.processingLock): |
|
|
@ -68,27 +85,30 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
cmd = getattr(self, "state_" + str(self.state)) |
|
|
|
except AttributeError: |
|
|
|
logger.error("Unknown state %s", self.state, exc_info=True) |
|
|
|
raise UnknownState(self.state) |
|
|
|
raise UnknownStateError(self.state) |
|
|
|
if not cmd(): |
|
|
|
break |
|
|
|
except BusyError: |
|
|
|
return False |
|
|
|
return False |
|
|
|
|
|
|
|
def set_state(self, state, length=0, expectBytes=0): |
|
|
|
def set_state(self, state_str, length=0, expectBytes=0): |
|
|
|
"""Set the next processing state.""" |
|
|
|
self.expectBytes = expectBytes |
|
|
|
self.slice_read_buf(length) |
|
|
|
self.state = state |
|
|
|
self.state = state_str |
|
|
|
|
|
|
|
def writable(self): |
|
|
|
"""Is data from the write buffer ready to be sent to the network?""" |
|
|
|
self.uploadChunk = AdvancedDispatcher._buf_len |
|
|
|
if asyncore.maxUploadRate > 0: |
|
|
|
self.uploadChunk = int(asyncore.uploadBucket) |
|
|
|
self.uploadChunk = min(self.uploadChunk, len(self.write_buf)) |
|
|
|
return asyncore.dispatcher.writable(self) and \ |
|
|
|
(self.connecting or (self.connected and self.uploadChunk > 0)) |
|
|
|
(self.connecting or (self.connected and self.uploadChunk > 0)) |
|
|
|
|
|
|
|
def readable(self): |
|
|
|
"""Is the read buffer ready to accept data from the network?""" |
|
|
|
self.downloadChunk = AdvancedDispatcher._buf_len |
|
|
|
if asyncore.maxDownloadRate > 0: |
|
|
|
self.downloadChunk = int(asyncore.downloadBucket) |
|
|
@ -100,9 +120,10 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
except AttributeError: |
|
|
|
pass |
|
|
|
return asyncore.dispatcher.readable(self) and \ |
|
|
|
(self.connecting or self.accepting or (self.connected and self.downloadChunk > 0)) |
|
|
|
(self.connecting or self.accepting or (self.connected and self.downloadChunk > 0)) |
|
|
|
|
|
|
|
def handle_read(self): |
|
|
|
"""Append incoming data to the read buffer.""" |
|
|
|
self.lastTx = time.time() |
|
|
|
newData = self.recv(self.downloadChunk) |
|
|
|
self.receivedBytes += len(newData) |
|
|
@ -111,6 +132,7 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
self.read_buf.extend(newData) |
|
|
|
|
|
|
|
def handle_write(self): |
|
|
|
"""Send outgoing data from write buffer.""" |
|
|
|
self.lastTx = time.time() |
|
|
|
written = self.send(self.write_buf[0:self.uploadChunk]) |
|
|
|
asyncore.update_sent(written) |
|
|
@ -118,19 +140,24 @@ class AdvancedDispatcher(asyncore.dispatcher): |
|
|
|
self.slice_write_buf(written) |
|
|
|
|
|
|
|
def handle_connect_event(self): |
|
|
|
"""Callback for connection established event.""" |
|
|
|
try: |
|
|
|
asyncore.dispatcher.handle_connect_event(self) |
|
|
|
except socket.error as e: |
|
|
|
if e.args[0] not in asyncore._DISCONNECTED: |
|
|
|
if e.args[0] not in asyncore._DISCONNECTED: # pylint: disable=protected-access |
|
|
|
raise |
|
|
|
|
|
|
|
def handle_connect(self): |
|
|
|
"""Method for handling connection established implementations.""" |
|
|
|
self.lastTx = time.time() |
|
|
|
|
|
|
|
def state_close(self): |
|
|
|
"""Signal to the processing loop to end.""" |
|
|
|
# pylint: disable=no-self-use |
|
|
|
return False |
|
|
|
|
|
|
|
def handle_close(self): |
|
|
|
"""Callback for connection being closed, but can also be called directly when you want connection to close.""" |
|
|
|
with self.readLock: |
|
|
|
self.read_buf = bytearray() |
|
|
|
with self.writeLock: |
|
|
|