|
|
|
@ -77,7 +77,7 @@ class sendDataThread(threading.Thread):
|
|
|
|
|
def sendBytes(self, data = ""):
|
|
|
|
|
self.buffer += data
|
|
|
|
|
if len(self.buffer) < throttle.SendThrottle().chunkSize and self.sendDataThreadQueue.qsize() > 1:
|
|
|
|
|
return
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
while self.buffer and state.shutdown == 0:
|
|
|
|
|
isSSL = False
|
|
|
|
@ -104,16 +104,18 @@ class sendDataThread(threading.Thread):
|
|
|
|
|
select.select([], [self.sslSock if isSSL else self.sock], [], 10)
|
|
|
|
|
logger.debug('sock.recv retriable error')
|
|
|
|
|
continue
|
|
|
|
|
if e.errno in (errno.EPIPE):
|
|
|
|
|
logger.debug('Connection broken')
|
|
|
|
|
return False
|
|
|
|
|
raise
|
|
|
|
|
throttle.SendThrottle().wait(amountSent)
|
|
|
|
|
self.lastTimeISentData = int(time.time())
|
|
|
|
|
self.buffer = self.buffer[amountSent:]
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
logger.debug('sendDataThread starting. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(state.sendDataQueues)))
|
|
|
|
|
while True:
|
|
|
|
|
self.sendBytes()
|
|
|
|
|
while self.sendBytes():
|
|
|
|
|
deststream, command, data = self.sendDataThreadQueue.get()
|
|
|
|
|
|
|
|
|
|
if deststream == 0 or deststream in self.streamNumber:
|
|
|
|
|