diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py index 92faaf17..327656db 100644 --- a/src/class_objectHashHolder.py +++ b/src/class_objectHashHolder.py @@ -39,6 +39,12 @@ class objectHashHolder(threading.Thread): def holdHash(self,hash): self.collectionOfHashLists[random.randrange(0, self.size)].append(hash) + def hasHash(self, hash): + if hash in (hashlist for hashlist in self.collectionOfHashLists): + logger.debug("Hash in hashHolder") + return True + return False + def holdPeer(self,peerDetails): self.collectionOfPeerLists[random.randrange(0, self.size)].append(peerDetails) diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index 58db78b8..81403053 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -156,27 +156,29 @@ class outgoingSynSender(threading.Thread, StoppableThread): try: self.sock.connect((peer.host, peer.port)) - rd = receiveDataThread() - rd.daemon = True # close the main program even if there are threads left someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection. + + sd = sendDataThread(sendDataThreadQueue) + sd.setup(self.sock, peer.host, peer.port, self.streamNumber, + someObjectsOfWhichThisRemoteNodeIsAlreadyAware) + sd.start() + + rd = receiveDataThread() + rd.daemon = True # close the main program even if there are threads left rd.setup(self.sock, peer.host, peer.port, self.streamNumber, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, - sendDataThreadQueue) + sendDataThreadQueue, + sd.objectHashHolderInstance) rd.start() - logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.') - - sd = sendDataThread(sendDataThreadQueue) - sd.setup(self.sock, peer.host, peer.port, self.streamNumber, - someObjectsOfWhichThisRemoteNodeIsAlreadyAware) - sd.start() sd.sendVersionMessage() + logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.') except socks.GeneralProxyError as err: if shared.verbose >= 2: logger.debug('Could NOT connect to ' + str(peer) + ' during outgoing attempt. ' + str(err)) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index e7574bc5..38b9aafe 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -44,7 +44,8 @@ class receiveDataThread(threading.Thread): streamNumber, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, selfInitiatedConnections, - sendDataThreadQueue): + sendDataThreadQueue, + objectHashHolderInstance): self.sock = sock self.peer = shared.Peer(HOST, port) @@ -63,6 +64,7 @@ class receiveDataThread(threading.Thread): self.initiatedConnection = True self.selfInitiatedConnections[streamNumber][self] = 0 self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware + self.objectHashHolderInstance = objectHashHolderInstance self.startTime = time.time() def run(self): @@ -136,10 +138,10 @@ class receiveDataThread(threading.Thread): # 0.2 is avg message transmission time now = time.time() if initial and now - delay < self.startTime: - logger.info("Sleeping for %.2fs", delay - (now - self.startTime)) + logger.debug("Initial sleeping for %.2fs", delay - (now - self.startTime)) time.sleep(delay - (now - self.startTime)) elif not initial: - logger.info("Sleeping for %.2fs", delay) + logger.debug("Sleeping due to missing object for %.2fs", delay) time.sleep(delay) def processData(self): @@ -322,7 +324,7 @@ class receiveDataThread(threading.Thread): bigInvList = {} for row in queryreturn: hash, = row - if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash): bigInvList[hash] = 0 # We also have messages in our inventory in memory (which is a python # dictionary). Let's fetch those too. @@ -334,7 +336,6 @@ class receiveDataThread(threading.Thread): bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' - self.antiIntersectionDelay(True) # Now let us start appending all of these hashes together. They will be # sent out in a big inv message to our new peer. for hash, storedValue in bigInvList.items(): @@ -478,6 +479,7 @@ class receiveDataThread(threading.Thread): if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems): logger.debug('getdata message does not contain enough data. Ignoring.') return + self.antiIntersectionDelay(True) # only handle getdata requests if we have been connected long enough for i in xrange(numberOfRequestedInventoryItems): hash = data[lengthOfVarint + ( i * 32):32 + lengthOfVarint + (i * 32)] @@ -485,7 +487,10 @@ class receiveDataThread(threading.Thread): shared.numberOfInventoryLookupsPerformed += 1 shared.inventoryLock.acquire() - if hash in shared.inventory: + if self.objectHashHolderInstance.hasHash(hash): + shared.inventoryLock.release() + self.antiIntersectionDelay() + elif hash in shared.inventory: objectType, streamNumber, payload, expiresTime, tag = shared.inventory[hash] shared.inventoryLock.release() self.sendObject(payload) diff --git a/src/class_singleListener.py b/src/class_singleListener.py index a571867a..807570dc 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -130,7 +130,7 @@ class singleListener(threading.Thread, StoppableThread): rd = receiveDataThread() rd.daemon = True # close the main program even if there are threads left rd.setup( - socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue) + socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue, sd.objectHashHolderInstance) rd.start() logger.info('connected to ' + HOST + ' during INCOMING request.')