This repository has been archived on 2025-01-19. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2025-01-19/src/network/advanceddispatcher.py
Peter Surda dcc181bf75
Asyncore processing thread synchronisation
- threre was a synchronisation problem where one thread could process
more data than another thread was expecting, leading to the thread
crashing
2017-07-10 23:18:58 +02:00

118 lines
3.6 KiB
Python

import Queue
import socket
import sys
import threading
import time
import asyncore_pollchoose as asyncore
from debug import logger
from helper_threading import BusyError, nonBlocking
class AdvancedDispatcher(asyncore.dispatcher):
_buf_len = 2097152 # 2MB
def __init__(self, sock=None):
if not hasattr(self, '_map'):
asyncore.dispatcher.__init__(self, sock)
self.read_buf = b""
self.write_buf = b""
self.state = "init"
self.lastTx = time.time()
self.sentBytes = 0
self.receivedBytes = 0
self.expectBytes = 0
self.readLock = threading.RLock()
self.writeLock = threading.RLock()
self.processingLock = threading.RLock()
def append_write_buf(self, data):
if data:
with self.writeLock:
self.write_buf += data
def slice_write_buf(self, length=0):
if length > 0:
with self.writeLock:
self.write_buf = self.write_buf[length:]
def slice_read_buf(self, length=0):
if length > 0:
with self.readLock:
self.read_buf = self.read_buf[length:]
def process(self):
if not self.connected:
return False
loop = 0
while True:
try:
with nonBlocking(self.processingLock):
if len(self.read_buf) < self.expectBytes:
return False
if getattr(self, "state_" + str(self.state))() is False:
break
except AttributeError:
raise
except BusyError:
return False
return False
def set_state(self, state, length=0, expectBytes=0):
self.expectBytes = expectBytes
self.slice_read_buf(length)
self.state = state
def writable(self):
self.uploadChunk = AdvancedDispatcher._buf_len
if asyncore.maxUploadRate > 0:
self.uploadChunk = asyncore.uploadBucket
self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
return asyncore.dispatcher.writable(self) and \
(self.connecting or self.uploadChunk > 0)
def readable(self):
self.downloadChunk = AdvancedDispatcher._buf_len
if asyncore.maxDownloadRate > 0:
self.downloadChunk = asyncore.downloadBucket
try:
if self.expectBytes > 0 and not self.fullyEstablished:
self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf))
except AttributeError:
pass
return asyncore.dispatcher.readable(self) and \
(self.connecting or self.downloadChunk > 0)
def handle_read(self):
self.lastTx = time.time()
newData = self.recv(self.downloadChunk)
self.receivedBytes += len(newData)
asyncore.update_received(len(newData))
with self.readLock:
self.read_buf += newData
def handle_write(self):
self.lastTx = time.time()
written = self.send(self.write_buf[0:self.uploadChunk])
asyncore.update_sent(written)
self.sentBytes += written
self.slice_write_buf(written)
def handle_connect_event(self):
try:
asyncore.dispatcher.handle_connect_event(self)
except socket.error as e:
if e.args[0] not in asyncore._DISCONNECTED:
raise
def handle_connect(self):
self.lastTx = time.time()
def state_close(self):
return False
def handle_close(self):
self.read_buf = b""
self.write_buf = b""
self.state = "close"
asyncore.dispatcher.close(self)