Sleep on invalid getdata

- postpone initial sleep until the first getdata is received
- also sleep when received a getdata request for an object that hasn't
been advertised to the other node yet
This commit is contained in:
Peter Šurda 2016-02-18 00:53:13 +01:00
parent f99d499d85
commit 0e59102f11
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 29 additions and 16 deletions

View File

@ -39,6 +39,12 @@ class objectHashHolder(threading.Thread):
def holdHash(self,hash):
self.collectionOfHashLists[random.randrange(0, self.size)].append(hash)
def hasHash(self, hash):
if hash in (hashlist for hashlist in self.collectionOfHashLists):
logger.debug("Hash in hashHolder")
return True
return False
def holdPeer(self,peerDetails):
self.collectionOfPeerLists[random.randrange(0, self.size)].append(peerDetails)

View File

@ -156,27 +156,29 @@ class outgoingSynSender(threading.Thread, StoppableThread):
try:
self.sock.connect((peer.host, peer.port))
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
sd = sendDataThread(sendDataThreadQueue)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
rd.setup(self.sock,
peer.host,
peer.port,
self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
self.selfInitiatedConnections,
sendDataThreadQueue)
sendDataThreadQueue,
sd.objectHashHolderInstance)
rd.start()
logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.')
sd = sendDataThread(sendDataThreadQueue)
sd.setup(self.sock, peer.host, peer.port, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
sd.sendVersionMessage()
logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.')
except socks.GeneralProxyError as err:
if shared.verbose >= 2:
logger.debug('Could NOT connect to ' + str(peer) + ' during outgoing attempt. ' + str(err))

View File

@ -44,7 +44,8 @@ class receiveDataThread(threading.Thread):
streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
selfInitiatedConnections,
sendDataThreadQueue):
sendDataThreadQueue,
objectHashHolderInstance):
self.sock = sock
self.peer = shared.Peer(HOST, port)
@ -63,6 +64,7 @@ class receiveDataThread(threading.Thread):
self.initiatedConnection = True
self.selfInitiatedConnections[streamNumber][self] = 0
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
self.objectHashHolderInstance = objectHashHolderInstance
self.startTime = time.time()
def run(self):
@ -136,10 +138,10 @@ class receiveDataThread(threading.Thread):
# 0.2 is avg message transmission time
now = time.time()
if initial and now - delay < self.startTime:
logger.info("Sleeping for %.2fs", delay - (now - self.startTime))
logger.debug("Initial sleeping for %.2fs", delay - (now - self.startTime))
time.sleep(delay - (now - self.startTime))
elif not initial:
logger.info("Sleeping for %.2fs", delay)
logger.debug("Sleeping due to missing object for %.2fs", delay)
time.sleep(delay)
def processData(self):
@ -322,7 +324,7 @@ class receiveDataThread(threading.Thread):
bigInvList = {}
for row in queryreturn:
hash, = row
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
bigInvList[hash] = 0
# We also have messages in our inventory in memory (which is a python
# dictionary). Let's fetch those too.
@ -334,7 +336,6 @@ class receiveDataThread(threading.Thread):
bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0
payload = ''
self.antiIntersectionDelay(True)
# Now let us start appending all of these hashes together. They will be
# sent out in a big inv message to our new peer.
for hash, storedValue in bigInvList.items():
@ -478,6 +479,7 @@ class receiveDataThread(threading.Thread):
if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems):
logger.debug('getdata message does not contain enough data. Ignoring.')
return
self.antiIntersectionDelay(True) # only handle getdata requests if we have been connected long enough
for i in xrange(numberOfRequestedInventoryItems):
hash = data[lengthOfVarint + (
i * 32):32 + lengthOfVarint + (i * 32)]
@ -485,7 +487,10 @@ class receiveDataThread(threading.Thread):
shared.numberOfInventoryLookupsPerformed += 1
shared.inventoryLock.acquire()
if hash in shared.inventory:
if self.objectHashHolderInstance.hasHash(hash):
shared.inventoryLock.release()
self.antiIntersectionDelay()
elif hash in shared.inventory:
objectType, streamNumber, payload, expiresTime, tag = shared.inventory[hash]
shared.inventoryLock.release()
self.sendObject(payload)

View File

@ -130,7 +130,7 @@ class singleListener(threading.Thread, StoppableThread):
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
rd.setup(
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue)
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance)
rd.start()
logger.info('connected to ' + HOST + ' during INCOMING request.')