Migrate antiIntersectionDelay to asyncore
- implemented by ignoring getdata during the delay rather than sleeping as it was in the threaded model - it can happen that a valid getdata request is received during the delay. A node should be implemented in a way that retries to download, that may not be the case with older PyBitmessage versions or other implementations
This commit is contained in:
parent
fc19e4119a
commit
fe0664640e
|
@ -234,17 +234,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
||||||
|
|
||||||
def bm_command_getdata(self):
|
def bm_command_getdata(self):
|
||||||
items = self.decode_payload_content("L32s")
|
items = self.decode_payload_content("L32s")
|
||||||
# if time.time() < self.skipUntil:
|
# skip?
|
||||||
# print "skipping getdata"
|
if time.time() < self.skipUntil:
|
||||||
# return True
|
return True
|
||||||
for i in items:
|
for i in items:
|
||||||
#print "received getdata request for item %s" % (hexlify(i))
|
self.receiveQueue.put(("object", i))
|
||||||
#logger.debug('received getdata request for item:' + hexlify(i))
|
|
||||||
#if i in ObjUploadQueue.streamElems(1):
|
|
||||||
if False:
|
|
||||||
self.antiIntersectionDelay()
|
|
||||||
else:
|
|
||||||
self.receiveQueue.put(("object", i))
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def bm_command_inv(self):
|
def bm_command_inv(self):
|
||||||
|
|
|
@ -54,7 +54,7 @@ class ReceiveQueueThread(threading.Thread, StoppableThread):
|
||||||
connection.writeQueue.put(protocol.CreatePacket('object', Inventory()[objHash].payload))
|
connection.writeQueue.put(protocol.CreatePacket('object', Inventory()[objHash].payload))
|
||||||
except KeyError:
|
except KeyError:
|
||||||
connection.antiIntersectionDelay()
|
connection.antiIntersectionDelay()
|
||||||
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (connection.destination,))
|
logger.info('%s asked for an object we don\'t have.', connection.destination)
|
||||||
|
|
||||||
def command_biginv(self, connection, dummy):
|
def command_biginv(self, connection, dummy):
|
||||||
def sendChunk():
|
def sendChunk():
|
||||||
|
|
|
@ -29,7 +29,7 @@ from network.tls import TLSDispatcher
|
||||||
|
|
||||||
import addresses
|
import addresses
|
||||||
from bmconfigparser import BMConfigParser
|
from bmconfigparser import BMConfigParser
|
||||||
from queues import objectProcessorQueue, portCheckerQueue, UISignalQueue
|
from queues import invQueue, objectProcessorQueue, portCheckerQueue, UISignalQueue
|
||||||
import shared
|
import shared
|
||||||
import state
|
import state
|
||||||
import protocol
|
import protocol
|
||||||
|
@ -77,7 +77,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
|
|
||||||
def antiIntersectionDelay(self, initial = False):
|
def antiIntersectionDelay(self, initial = False):
|
||||||
# estimated time for a small object to propagate across the whole network
|
# estimated time for a small object to propagate across the whole network
|
||||||
delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + UploadQueue.queueCount/2)
|
delay = math.ceil(math.log(max(len(knownnodes.knownNodes[x]) for x in knownnodes.knownNodes) + 2, 20)) * (0.2 + invQueue.queueCount/2.0)
|
||||||
# take the stream with maximum amount of nodes
|
# take the stream with maximum amount of nodes
|
||||||
# +2 is to avoid problems with log(0) and log(1)
|
# +2 is to avoid problems with log(0) and log(1)
|
||||||
# 20 is avg connected nodes count
|
# 20 is avg connected nodes count
|
||||||
|
@ -86,9 +86,9 @@ class TCPConnection(BMProto, TLSDispatcher):
|
||||||
if initial:
|
if initial:
|
||||||
self.skipUntil = self.connectedAt + delay
|
self.skipUntil = self.connectedAt + delay
|
||||||
if self.skipUntil > time.time():
|
if self.skipUntil > time.time():
|
||||||
logger.debug("Skipping processing for %.2fs", self.skipUntil - time.time())
|
logger.debug("Initial skipping processing getdata for %.2fs", self.skipUntil - time.time())
|
||||||
else:
|
else:
|
||||||
logger.debug("Skipping processing due to missing object for %.2fs", self.skipUntil - time.time())
|
logger.debug("Skipping processing getdata due to missing object for %.2fs", self.skipUntil - time.time())
|
||||||
self.skipUntil = time.time() + delay
|
self.skipUntil = time.time() + delay
|
||||||
|
|
||||||
def set_connection_fully_established(self):
|
def set_connection_fully_established(self):
|
||||||
|
|
Reference in New Issue
Block a user