2017-05-25 21:04:33 +00:00
|
|
|
import Queue
|
2017-06-21 10:16:33 +00:00
|
|
|
import socket
|
|
|
|
import sys
|
2017-07-06 17:45:36 +00:00
|
|
|
import threading
|
2017-05-24 14:51:49 +00:00
|
|
|
import time
|
|
|
|
|
2017-03-10 22:11:57 +00:00
|
|
|
import asyncore_pollchoose as asyncore
|
2017-05-28 22:24:07 +00:00
|
|
|
from debug import logger
|
2017-02-08 13:19:02 +00:00
|
|
|
|
2017-01-10 20:20:49 +00:00
|
|
|
class AdvancedDispatcher(asyncore.dispatcher):
|
2017-03-11 10:12:08 +00:00
|
|
|
_buf_len = 2097152 # 2MB
|
2017-01-10 20:20:49 +00:00
|
|
|
|
2017-03-20 17:32:26 +00:00
|
|
|
def __init__(self, sock=None):
|
2017-03-10 22:11:57 +00:00
|
|
|
if not hasattr(self, '_map'):
|
2017-03-20 17:32:26 +00:00
|
|
|
asyncore.dispatcher.__init__(self, sock)
|
2017-03-10 22:11:57 +00:00
|
|
|
self.read_buf = b""
|
|
|
|
self.write_buf = b""
|
2017-01-10 20:20:49 +00:00
|
|
|
self.state = "init"
|
2017-05-24 14:51:49 +00:00
|
|
|
self.lastTx = time.time()
|
2017-05-25 12:59:18 +00:00
|
|
|
self.sentBytes = 0
|
|
|
|
self.receivedBytes = 0
|
2017-05-27 20:30:30 +00:00
|
|
|
self.expectBytes = 0
|
2017-07-06 17:45:36 +00:00
|
|
|
self.readLock = threading.RLock()
|
|
|
|
self.writeLock = threading.RLock()
|
|
|
|
|
|
|
|
def append_write_buf(self, data):
|
|
|
|
if data:
|
|
|
|
with self.writeLock:
|
|
|
|
self.write_buf += data
|
2017-01-10 20:20:49 +00:00
|
|
|
|
2017-02-08 13:19:02 +00:00
|
|
|
def slice_write_buf(self, length=0):
|
2017-04-16 16:27:15 +00:00
|
|
|
if length > 0:
|
2017-07-06 17:45:36 +00:00
|
|
|
with self.writeLock:
|
|
|
|
self.write_buf = self.write_buf[length:]
|
2017-04-16 16:27:15 +00:00
|
|
|
|
|
|
|
def slice_read_buf(self, length=0):
|
|
|
|
if length > 0:
|
2017-07-06 17:45:36 +00:00
|
|
|
with self.readLock:
|
|
|
|
self.read_buf = self.read_buf[length:]
|
2017-01-10 20:20:49 +00:00
|
|
|
|
|
|
|
def process(self):
|
2017-05-27 17:09:21 +00:00
|
|
|
if not self.connected:
|
2017-07-07 05:55:29 +00:00
|
|
|
return False
|
2017-07-06 17:45:36 +00:00
|
|
|
while len(self.read_buf) >= self.expectBytes:
|
2017-01-10 20:20:49 +00:00
|
|
|
try:
|
|
|
|
if getattr(self, "state_" + str(self.state))() is False:
|
|
|
|
break
|
|
|
|
except AttributeError:
|
|
|
|
raise
|
2017-07-07 05:55:29 +00:00
|
|
|
return False
|
2017-01-10 20:20:49 +00:00
|
|
|
|
2017-05-27 20:30:30 +00:00
|
|
|
def set_state(self, state, length=0, expectBytes=0):
|
|
|
|
self.expectBytes = expectBytes
|
2017-01-10 20:20:49 +00:00
|
|
|
self.slice_read_buf(length)
|
|
|
|
self.state = state
|
|
|
|
|
|
|
|
def writable(self):
|
2017-07-07 05:55:29 +00:00
|
|
|
self.uploadChunk = AdvancedDispatcher._buf_len
|
|
|
|
if asyncore.maxUploadRate > 0:
|
|
|
|
self.uploadChunk = asyncore.uploadBucket
|
|
|
|
self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
|
2017-05-28 22:24:07 +00:00
|
|
|
return asyncore.dispatcher.writable(self) and \
|
2017-07-07 05:55:29 +00:00
|
|
|
(self.connecting or self.uploadChunk > 0)
|
2017-01-10 20:20:49 +00:00
|
|
|
|
|
|
|
def readable(self):
|
2017-07-07 05:55:29 +00:00
|
|
|
self.downloadChunk = AdvancedDispatcher._buf_len
|
|
|
|
if asyncore.maxDownloadRate > 0:
|
|
|
|
self.downloadChunk = asyncore.downloadBucket
|
|
|
|
try:
|
|
|
|
if self.expectBytes > 0 and not self.fullyEstablished:
|
|
|
|
self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf))
|
|
|
|
except AttributeError:
|
|
|
|
pass
|
2017-05-28 22:24:07 +00:00
|
|
|
return asyncore.dispatcher.readable(self) and \
|
2017-07-07 05:55:29 +00:00
|
|
|
(self.connecting or self.downloadChunk > 0)
|
2017-01-10 20:20:49 +00:00
|
|
|
|
|
|
|
def handle_read(self):
|
2017-05-24 14:51:49 +00:00
|
|
|
self.lastTx = time.time()
|
2017-07-07 05:55:29 +00:00
|
|
|
newData = self.recv(self.downloadChunk)
|
|
|
|
self.receivedBytes += len(newData)
|
|
|
|
asyncore.update_received(len(newData))
|
|
|
|
with self.readLock:
|
|
|
|
self.read_buf += newData
|
2017-01-10 20:20:49 +00:00
|
|
|
|
|
|
|
def handle_write(self):
|
2017-05-24 14:51:49 +00:00
|
|
|
self.lastTx = time.time()
|
2017-07-07 05:55:29 +00:00
|
|
|
written = self.send(self.write_buf[0:self.uploadChunk])
|
|
|
|
asyncore.update_sent(written)
|
|
|
|
self.sentBytes += written
|
|
|
|
self.slice_write_buf(written)
|
2017-03-10 22:11:57 +00:00
|
|
|
|
2017-06-03 14:30:05 +00:00
|
|
|
def handle_connect_event(self):
|
|
|
|
try:
|
|
|
|
asyncore.dispatcher.handle_connect_event(self)
|
|
|
|
except socket.error as e:
|
|
|
|
if e.args[0] not in asyncore._DISCONNECTED:
|
|
|
|
raise
|
|
|
|
|
2017-03-10 22:11:57 +00:00
|
|
|
def handle_connect(self):
|
2017-05-24 14:51:49 +00:00
|
|
|
self.lastTx = time.time()
|
|
|
|
|
2017-05-25 21:04:33 +00:00
|
|
|
def state_close(self):
|
2017-07-06 17:45:36 +00:00
|
|
|
return False
|
2017-05-25 21:04:33 +00:00
|
|
|
|
2017-06-02 05:09:35 +00:00
|
|
|
def handle_close(self):
|
2017-05-24 14:51:49 +00:00
|
|
|
self.read_buf = b""
|
|
|
|
self.write_buf = b""
|
2017-05-28 22:24:07 +00:00
|
|
|
self.state = "close"
|
2017-05-24 14:51:49 +00:00
|
|
|
asyncore.dispatcher.close(self)
|