Socket closing changes

- explicit close only through asyncore error handler
- implicit close through garbage collector
- avoid duplicate closing
This commit is contained in:
Peter Šurda 2017-11-17 13:37:51 +01:00
parent 2da1115d17
commit 5a787f41d2
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
3 changed files with 10 additions and 7 deletions

View File

@ -78,7 +78,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
self.uploadChunk = asyncore.uploadBucket self.uploadChunk = asyncore.uploadBucket
self.uploadChunk = min(self.uploadChunk, len(self.write_buf)) self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
return asyncore.dispatcher.writable(self) and \ return asyncore.dispatcher.writable(self) and \
(self.connecting or self.uploadChunk > 0) (self.connecting or (self.connected and self.uploadChunk > 0))
def readable(self): def readable(self):
self.downloadChunk = AdvancedDispatcher._buf_len self.downloadChunk = AdvancedDispatcher._buf_len
@ -92,7 +92,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
except AttributeError: except AttributeError:
pass pass
return asyncore.dispatcher.readable(self) and \ return asyncore.dispatcher.readable(self) and \
(self.connecting or self.downloadChunk > 0) (self.connecting or self.accepting or (self.connected and self.downloadChunk > 0))
def handle_read(self): def handle_read(self):
self.lastTx = time.time() self.lastTx = time.time()
@ -127,5 +127,5 @@ class AdvancedDispatcher(asyncore.dispatcher):
self.read_buf = bytearray() self.read_buf = bytearray()
with self.writeLock: with self.writeLock:
self.write_buf = bytearray() self.write_buf = bytearray()
self.state = "close" self.set_state("close")
self.close() self.close()

View File

@ -546,6 +546,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def handle_close(self): def handle_close(self):
self.set_state("close") self.set_state("close")
if not (self.accepting or self.connecting or self.connected):
# already disconnected
return
try: try:
logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, self.close_reason) logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, self.close_reason)
except AttributeError: except AttributeError:

View File

@ -214,11 +214,13 @@ class BMConnectionPool(object):
else: else:
if self.listeningSockets: if self.listeningSockets:
for i in self.listeningSockets.values(): for i in self.listeningSockets.values():
i.handle_close() i.close_reason = "Stopping listening"
i.accepting = i.connecting = i.connected = False
logger.info('Stopped listening for incoming connections.') logger.info('Stopped listening for incoming connections.')
if self.udpSockets: if self.udpSockets:
for i in self.udpSockets.values(): for i in self.udpSockets.values():
i.handle_close() i.close_reason = "Stopping UDP socket"
i.accepting = i.connecting = i.connected = False
logger.info('Stopped udp sockets.') logger.info('Stopped udp sockets.')
loopTime = float(self.spawnWait) loopTime = float(self.spawnWait)
@ -238,8 +240,6 @@ class BMConnectionPool(object):
i.close_reason = "Timeout (%is)" % (time.time() - i.lastTx) i.close_reason = "Timeout (%is)" % (time.time() - i.lastTx)
i.set_state("close") i.set_state("close")
for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values(): for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values():
if i.state == "close":
i.handle_close()
if not (i.accepting or i.connecting or i.connected): if not (i.accepting or i.connecting or i.connected):
reaper.append(i) reaper.append(i)
for i in reaper: for i in reaper: