Asyncore update
- duplicate checking implemented - connection pool vs. socket closing cleanup
This commit is contained in:
parent
abaa2c72e5
commit
fa9811f426
|
@ -3,6 +3,7 @@ import time
|
|||
|
||||
from addresses import calculateInventoryHash
|
||||
from debug import logger
|
||||
from inventory import Inventory
|
||||
import protocol
|
||||
import state
|
||||
|
||||
|
@ -64,6 +65,10 @@ class BMObject(object):
|
|||
logger.debug('The streamNumber %s isn\'t one we are interested in.' % self.streamNumber)
|
||||
raise BMObjectUnwantedStreamError()
|
||||
|
||||
def checkAlreadyHave(self):
|
||||
if self.inventoryHash in Inventory():
|
||||
raise BMObjectAlreadyHaveError()
|
||||
|
||||
def checkMessage(self):
|
||||
return
|
||||
|
||||
|
|
|
@ -280,27 +280,24 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
self.object.checkProofOfWorkSufficient()
|
||||
self.object.checkEOLSanity()
|
||||
self.object.checkStream()
|
||||
self.object.checkAlreadyHave()
|
||||
|
||||
try:
|
||||
if self.object.objectType == protocol.OBJECT_GETPUBKEY:
|
||||
self.object.checkGetpubkey()
|
||||
elif self.object.objectType == protocol.OBJECT_PUBKEY:
|
||||
self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32])
|
||||
elif self.object.objectType == protocol.OBJECT_MSG:
|
||||
self.object.checkMessage()
|
||||
elif self.object.objectType == protocol.OBJECT_BROADCAST:
|
||||
self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32])
|
||||
# other objects don't require other types of tests
|
||||
except BMObjectAlreadyHaveError:
|
||||
pass
|
||||
else:
|
||||
Inventory()[self.object.inventoryHash] = (
|
||||
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
|
||||
objectProcessorQueue.put((self.object.objectType,self.object.data))
|
||||
#DownloadQueue().task_done(self.object.inventoryHash)
|
||||
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self))
|
||||
#ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash))
|
||||
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
||||
if self.object.objectType == protocol.OBJECT_GETPUBKEY:
|
||||
self.object.checkGetpubkey()
|
||||
elif self.object.objectType == protocol.OBJECT_PUBKEY:
|
||||
self.object.checkPubkey(self.payload[self.payloadOffset:self.payloadOffset+32])
|
||||
elif self.object.objectType == protocol.OBJECT_MSG:
|
||||
self.object.checkMessage()
|
||||
elif self.object.objectType == protocol.OBJECT_BROADCAST:
|
||||
self.object.checkBroadcast(self.payload[self.payloadOffset:self.payloadOffset+32])
|
||||
# other objects don't require other types of tests
|
||||
Inventory()[self.object.inventoryHash] = (
|
||||
self.object.objectType, self.object.streamNumber, self.payload[objectOffset:], self.object.expiresTime, self.object.tag)
|
||||
objectProcessorQueue.put((self.object.objectType,self.object.data))
|
||||
#DownloadQueue().task_done(self.object.inventoryHash)
|
||||
invQueue.put((self.object.streamNumber, self.object.inventoryHash, self))
|
||||
#ObjUploadQueue().put(UploadElem(self.object.streamNumber, self.object.inventoryHash))
|
||||
#broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
|
||||
return True
|
||||
|
||||
def _decode_addr(self):
|
||||
|
@ -456,5 +453,4 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
#traceback.print_stack()
|
||||
else:
|
||||
logger.debug("%s:%i: closing, %s", self.destination.host, self.destination.port, reason)
|
||||
network.connectionpool.BMConnectionPool().removeConnection(self)
|
||||
AdvancedDispatcher.close(self)
|
||||
|
|
|
@ -64,7 +64,9 @@ class BMConnectionPool(object):
|
|||
|
||||
def removeConnection(self, connection):
|
||||
if isinstance(connection, network.udp.UDPSocket):
|
||||
return
|
||||
del self.udpSockets[connection.destination.host]
|
||||
if isinstance(connection, network.tcp.TCPServer):
|
||||
del self.listeningSockets[state.Peer(connection.destination.host, connection.destination.port)]
|
||||
elif connection.isOutbound:
|
||||
try:
|
||||
del self.outboundConnections[connection.destination]
|
||||
|
@ -99,9 +101,10 @@ class BMConnectionPool(object):
|
|||
def startUDPSocket(self, bind=None):
|
||||
if bind is None:
|
||||
host = self.getListeningIP()
|
||||
self.udpSockets[host] = network.udp.UDPSocket(host=host)
|
||||
udpSocket = network.udp.UDPSocket(host=host)
|
||||
else:
|
||||
self.udpSockets[bind] = network.udp.UDPSocket(host=bind)
|
||||
udpSocket = network.udp.UDPSocket(host=bind)
|
||||
self.udpSockets[udpSocket.destination.host] = udpSocket
|
||||
|
||||
def loop(self):
|
||||
# defaults to empty loop if outbound connections are maxed
|
||||
|
@ -164,12 +167,10 @@ class BMConnectionPool(object):
|
|||
if len(self.listeningSockets) > 0 and not acceptConnections:
|
||||
for i in self.listeningSockets:
|
||||
i.close()
|
||||
self.listeningSockets = {}
|
||||
logger.info('Stopped listening for incoming connections.')
|
||||
if len(self.udpSockets) > 0 and not acceptConnections:
|
||||
for i in self.udpSockets:
|
||||
i.close()
|
||||
self.udpSockets = {}
|
||||
logger.info('Stopped udp sockets.')
|
||||
|
||||
# while len(asyncore.socket_map) > 0 and state.shutdown == 0:
|
||||
|
@ -179,6 +180,7 @@ class BMConnectionPool(object):
|
|||
loopTime = 1.0
|
||||
asyncore.loop(timeout=loopTime, count=10)
|
||||
|
||||
reaper = []
|
||||
for i in self.inboundConnections.values() + self.outboundConnections.values():
|
||||
minTx = time.time() - 20
|
||||
if i.fullyEstablished:
|
||||
|
@ -188,3 +190,8 @@ class BMConnectionPool(object):
|
|||
i.writeQueue.put(protocol.CreatePacket('ping'))
|
||||
else:
|
||||
i.close("Timeout (%is)" % (time.time() - i.lastTx))
|
||||
for i in self.inboundConnections.values() + self.outboundConnections.values() + self.listeningSockets.values() + self.udpSockets.values():
|
||||
if not (i.accepting or i.connecting or i.connected):
|
||||
reaper.append(i)
|
||||
for i in reaper:
|
||||
self.removeConnection(i)
|
||||
|
|
|
@ -198,6 +198,7 @@ class TCPServer(AdvancedDispatcher):
|
|||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.set_reuse_addr()
|
||||
self.bind((host, port))
|
||||
self.destination = state.Peer(host, port)
|
||||
self.listen(5)
|
||||
|
||||
def handle_accept(self):
|
||||
|
|
Reference in New Issue
Block a user