Sleep on invalid getdata

- postpone initial sleep until the first getdata is received
- also sleep when received a getdata request for an object that hasn't
been advertised to the other node yet
This commit is contained in:
Peter Šurda 2016-02-18 00:53:13 +01:00
parent 1a92db54c9
commit 4c2ce7208c
4 changed files with 29 additions and 16 deletions

View File

@ -39,6 +39,12 @@ class objectHashHolder(threading.Thread):
def holdHash(self,hash): def holdHash(self,hash):
self.collectionOfHashLists[random.randrange(0, self.size)].append(hash) self.collectionOfHashLists[random.randrange(0, self.size)].append(hash)
def hasHash(self, hash):
if hash in (hashlist for hashlist in self.collectionOfHashLists):
logger.debug("Hash in hashHolder")
return True
return False
def holdPeer(self,peerDetails): def holdPeer(self,peerDetails):
self.collectionOfPeerLists[random.randrange(0, self.size)].append(peerDetails) self.collectionOfPeerLists[random.randrange(0, self.size)].append(peerDetails)

View File

@ -156,27 +156,29 @@ class outgoingSynSender(threading.Thread, StoppableThread):
try: try:
self.sock.connect((peer.host, peer.port)) self.sock.connect((peer.host, peer.port))
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
sd = sendDataThread(sendDataThreadQueue)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
rd.setup(self.sock, rd.setup(self.sock,
peer.host, peer.host,
peer.port, peer.port,
self.streamNumber, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
self.selfInitiatedConnections, self.selfInitiatedConnections,
sendDataThreadQueue) sendDataThreadQueue,
sd.objectHashHolderInstance)
rd.start() rd.start()
logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.')
sd = sendDataThread(sendDataThreadQueue)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
sd.sendVersionMessage() sd.sendVersionMessage()
logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.')
except socks.GeneralProxyError as err: except socks.GeneralProxyError as err:
if shared.verbose >= 2: if shared.verbose >= 2:
logger.debug('Could NOT connect to ' + str(peer) + ' during outgoing attempt. ' + str(err)) logger.debug('Could NOT connect to ' + str(peer) + ' during outgoing attempt. ' + str(err))

View File

@ -44,7 +44,8 @@ class receiveDataThread(threading.Thread):
streamNumber, streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
selfInitiatedConnections, selfInitiatedConnections,
sendDataThreadQueue): sendDataThreadQueue,
objectHashHolderInstance):
self.sock = sock self.sock = sock
self.peer = shared.Peer(HOST, port) self.peer = shared.Peer(HOST, port)
@ -63,6 +64,7 @@ class receiveDataThread(threading.Thread):
self.initiatedConnection = True self.initiatedConnection = True
self.selfInitiatedConnections[streamNumber][self] = 0 self.selfInitiatedConnections[streamNumber][self] = 0
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
self.objectHashHolderInstance = objectHashHolderInstance
self.startTime = time.time() self.startTime = time.time()
def run(self): def run(self):
@ -136,10 +138,10 @@ class receiveDataThread(threading.Thread):
# 0.2 is avg message transmission time # 0.2 is avg message transmission time
now = time.time() now = time.time()
if initial and now - delay < self.startTime: if initial and now - delay < self.startTime:
logger.info("Sleeping for %.2fs", delay - (now - self.startTime)) logger.debug("Initial sleeping for %.2fs", delay - (now - self.startTime))
time.sleep(delay - (now - self.startTime)) time.sleep(delay - (now - self.startTime))
elif not initial: elif not initial:
logger.info("Sleeping for %.2fs", delay) logger.debug("Sleeping due to missing object for %.2fs", delay)
time.sleep(delay) time.sleep(delay)
def processData(self): def processData(self):
@ -322,7 +324,7 @@ class receiveDataThread(threading.Thread):
bigInvList = {} bigInvList = {}
for row in queryreturn: for row in queryreturn:
hash, = row hash, = row
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
bigInvList[hash] = 0 bigInvList[hash] = 0
# We also have messages in our inventory in memory (which is a python # We also have messages in our inventory in memory (which is a python
# dictionary). Let's fetch those too. # dictionary). Let's fetch those too.
@ -334,7 +336,6 @@ class receiveDataThread(threading.Thread):
bigInvList[hash] = 0 bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0 numberOfObjectsInInvMessage = 0
payload = '' payload = ''
self.antiIntersectionDelay(True)
# Now let us start appending all of these hashes together. They will be # Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer. # sent out in a big inv message to our new peer.
for hash, storedValue in bigInvList.items(): for hash, storedValue in bigInvList.items():
@ -478,6 +479,7 @@ class receiveDataThread(threading.Thread):
if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems): if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems):
logger.debug('getdata message does not contain enough data. Ignoring.') logger.debug('getdata message does not contain enough data. Ignoring.')
return return
self.antiIntersectionDelay(True) # only handle getdata requests if we have been connected long enough
for i in xrange(numberOfRequestedInventoryItems): for i in xrange(numberOfRequestedInventoryItems):
hash = data[lengthOfVarint + ( hash = data[lengthOfVarint + (
i * 32):32 + lengthOfVarint + (i * 32)] i * 32):32 + lengthOfVarint + (i * 32)]
@ -485,7 +487,10 @@ class receiveDataThread(threading.Thread):
shared.numberOfInventoryLookupsPerformed += 1 shared.numberOfInventoryLookupsPerformed += 1
shared.inventoryLock.acquire() shared.inventoryLock.acquire()
if hash in shared.inventory: if self.objectHashHolderInstance.hasHash(hash):
shared.inventoryLock.release()
self.antiIntersectionDelay()
elif hash in shared.inventory:
objectType, streamNumber, payload, expiresTime, tag = shared.inventory[hash] objectType, streamNumber, payload, expiresTime, tag = shared.inventory[hash]
shared.inventoryLock.release() shared.inventoryLock.release()
self.sendObject(payload) self.sendObject(payload)

View File

@ -130,7 +130,7 @@ class singleListener(threading.Thread, StoppableThread):
rd = receiveDataThread() rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left rd.daemon = True # close the main program even if there are threads left
rd.setup( rd.setup(
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue) socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance)
rd.start() rd.start()
logger.info('connected to ' + HOST + ' during INCOMING request.') logger.info('connected to ' + HOST + ' during INCOMING request.')