Add task_done to asyncore-related queues

This commit is contained in:
Peter Šurda 2017-05-27 19:39:19 +02:00
parent f8b4b427fc
commit fa9ad537a5
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 13 additions and 4 deletions

View File

@ -80,6 +80,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
while len(self.write_buf) < bufSize: while len(self.write_buf) < bufSize:
try: try:
self.write_buf += self.writeQueue.get(False) self.write_buf += self.writeQueue.get(False)
self.writeQueue.task_done()
except Queue.Empty: except Queue.Empty:
break break
if len(self.write_buf) > 0: if len(self.write_buf) > 0:
@ -103,11 +104,13 @@ class AdvancedDispatcher(asyncore.dispatcher):
while True: while True:
try: try:
self.writeQueue.get(False) self.writeQueue.get(False)
self.writeQueue.task_done()
except Queue.Empty: except Queue.Empty:
break break
while True: while True:
try: try:
self.receiveQueue.get(False) self.receiveQueue.get(False)
self.receiveQueue.task_done()
except Queue.Empty: except Queue.Empty:
break break
asyncore.dispatcher.close(self) asyncore.dispatcher.close(self)

View File

@ -11,9 +11,12 @@ def chooseConnection(stream):
return state.trustedPeer return state.trustedPeer
else: else:
try: try:
return portCheckerQueue.get(False) retval = portCheckerQueue.get(False)
portCheckerQueue.task_done()
except Queue.Empty: except Queue.Empty:
try: try:
return peerDiscoveryQueue.get(False) retval = peerDiscoveryQueue.get(False)
peerDiscoveryQueue.task_done()
except Queue.Empty: except Queue.Empty:
return random.choice(knownnodes.knownNodes[stream].keys()) return random.choice(knownnodes.knownNodes[stream].keys())
return retval

View File

@ -35,7 +35,9 @@ class ReceiveQueueThread(threading.Thread, StoppableThread):
processed += 1 processed += 1
try: try:
getattr(self, "command_" + str(command))(i, args) getattr(self, "command_" + str(command))(i, args)
i.receiveQueue.task_done()
except AttributeError: except AttributeError:
i.receiveQueue.task_done()
# missing command # missing command
raise raise
if processed == 0: if processed == 0:

View File

@ -137,7 +137,6 @@ class UDPSocket(BMProto):
return len(self.read_buf) < AdvancedDispatcher._buf_len return len(self.read_buf) < AdvancedDispatcher._buf_len
def handle_read(self): def handle_read(self):
print "read!"
try: try:
(addr, recdata) = self.socket.recvfrom(AdvancedDispatcher._buf_len) (addr, recdata) = self.socket.recvfrom(AdvancedDispatcher._buf_len)
except socket.error as e: except socket.error as e:
@ -150,6 +149,7 @@ class UDPSocket(BMProto):
self.local = True self.local = True
else: else:
self.local = False self.local = False
print "read %ib" % (len(recdata))
# overwrite the old buffer to avoid mixing data and so that self.local works correctly # overwrite the old buffer to avoid mixing data and so that self.local works correctly
self.read_buf = data self.read_buf = data
self.process() self.process()
@ -162,9 +162,10 @@ class UDPSocket(BMProto):
return return
try: try:
retval = self.socket.sendto(data, ('<broadcast>', UDPSocket.port)) retval = self.socket.sendto(data, ('<broadcast>', UDPSocket.port))
# print "broadcasted %ib" % (retval) print "broadcasted %ib" % (retval)
except socket.error as e: except socket.error as e:
print "socket error on sendato: %s" % (e) print "socket error on sendato: %s" % (e)
self.writeQueue.task_done()
def close(self, reason=None): def close(self, reason=None):
self.set_state("close") self.set_state("close")