Asyncore fixes
- fix broken loops - optimise I/O tests
This commit is contained in:
parent
de22e547c5
commit
a98b8690d3
|
@ -40,18 +40,14 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
if not self.connected:
|
if not self.connected:
|
||||||
return
|
return False
|
||||||
loop = 0
|
|
||||||
while len(self.read_buf) >= self.expectBytes:
|
while len(self.read_buf) >= self.expectBytes:
|
||||||
loop += 1
|
|
||||||
if loop > 1000:
|
|
||||||
logger.error("Stuck at state %s, report this bug please", self.state)
|
|
||||||
break
|
|
||||||
try:
|
try:
|
||||||
if getattr(self, "state_" + str(self.state))() is False:
|
if getattr(self, "state_" + str(self.state))() is False:
|
||||||
break
|
break
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
raise
|
raise
|
||||||
|
return False
|
||||||
|
|
||||||
def set_state(self, state, length=0, expectBytes=0):
|
def set_state(self, state, length=0, expectBytes=0):
|
||||||
self.expectBytes = expectBytes
|
self.expectBytes = expectBytes
|
||||||
|
@ -59,39 +55,39 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
||||||
self.state = state
|
self.state = state
|
||||||
|
|
||||||
def writable(self):
|
def writable(self):
|
||||||
|
self.uploadChunk = AdvancedDispatcher._buf_len
|
||||||
|
if asyncore.maxUploadRate > 0:
|
||||||
|
self.uploadChunk = asyncore.uploadBucket
|
||||||
|
self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
|
||||||
return asyncore.dispatcher.writable(self) and \
|
return asyncore.dispatcher.writable(self) and \
|
||||||
(self.connecting or self.write_buf)
|
(self.connecting or self.uploadChunk > 0)
|
||||||
|
|
||||||
def readable(self):
|
def readable(self):
|
||||||
|
self.downloadChunk = AdvancedDispatcher._buf_len
|
||||||
|
if asyncore.maxDownloadRate > 0:
|
||||||
|
self.downloadChunk = asyncore.downloadBucket
|
||||||
|
try:
|
||||||
|
if self.expectBytes > 0 and not self.fullyEstablished:
|
||||||
|
self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf))
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
return asyncore.dispatcher.readable(self) and \
|
return asyncore.dispatcher.readable(self) and \
|
||||||
(self.connecting or len(self.read_buf) < AdvancedDispatcher._buf_len)
|
(self.connecting or self.downloadChunk > 0)
|
||||||
|
|
||||||
def handle_read(self):
|
def handle_read(self):
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
downloadBytes = AdvancedDispatcher._buf_len
|
newData = self.recv(self.downloadChunk)
|
||||||
if asyncore.maxDownloadRate > 0:
|
self.receivedBytes += len(newData)
|
||||||
downloadBytes = asyncore.downloadBucket
|
asyncore.update_received(len(newData))
|
||||||
if self.expectBytes > 0 and downloadBytes > self.expectBytes - len(self.read_buf):
|
with self.readLock:
|
||||||
downloadBytes = self.expectBytes - len(self.read_buf)
|
self.read_buf += newData
|
||||||
if downloadBytes > 0:
|
|
||||||
newData = self.recv(downloadBytes)
|
|
||||||
self.receivedBytes += len(newData)
|
|
||||||
asyncore.update_received(len(newData))
|
|
||||||
with self.readLock:
|
|
||||||
self.read_buf += newData
|
|
||||||
|
|
||||||
def handle_write(self):
|
def handle_write(self):
|
||||||
self.lastTx = time.time()
|
self.lastTx = time.time()
|
||||||
bufSize = AdvancedDispatcher._buf_len
|
written = self.send(self.write_buf[0:self.uploadChunk])
|
||||||
if asyncore.maxUploadRate > 0:
|
asyncore.update_sent(written)
|
||||||
bufSize = asyncore.uploadBucket
|
self.sentBytes += written
|
||||||
if bufSize <= 0:
|
self.slice_write_buf(written)
|
||||||
return
|
|
||||||
if self.write_buf:
|
|
||||||
written = self.send(self.write_buf[0:bufSize])
|
|
||||||
asyncore.update_sent(written)
|
|
||||||
self.sentBytes += written
|
|
||||||
self.slice_write_buf(written)
|
|
||||||
|
|
||||||
def handle_connect_event(self):
|
def handle_connect_event(self):
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -34,11 +34,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread):
|
||||||
# cycle as long as there is data
|
# cycle as long as there is data
|
||||||
# methods should return False if there isn't enough data, or the connection is to be aborted
|
# methods should return False if there isn't enough data, or the connection is to be aborted
|
||||||
try:
|
try:
|
||||||
while connection.process():
|
connection.process()
|
||||||
pass
|
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
# missing command
|
# missing command
|
||||||
logger.error("Unknown state %s, ignoring", connection.state)
|
logger.error("Unknown state %s, ignoring", connection.state)
|
||||||
|
|
||||||
def stopThread(self):
|
|
||||||
super(ReceiveQueueThread, self).stopThread()
|
|
||||||
|
|
Reference in New Issue
Block a user