Download optimisation

- new data structure to handle download tracking, uses less CPU
This commit is contained in:
Peter Šurda 2018-02-01 12:19:39 +01:00
parent d223bfc6f2
commit 68b58ce0c5
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
5 changed files with 129 additions and 36 deletions

View File

@ -546,7 +546,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \ for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \
network.connectionpool.BMConnectionPool().outboundConnections.values(): network.connectionpool.BMConnectionPool().outboundConnections.values():
try: try:
with connection.objectsNewToMeLock:
del connection.objectsNewToMe[hashId] del connection.objectsNewToMe[hashId]
except KeyError: except KeyError:
pass pass

View File

@ -40,7 +40,6 @@ class BMConnectionPool(object):
if not i.fullyEstablished: if not i.fullyEstablished:
continue continue
try: try:
with i.objectsNewToMeLock:
del i.objectsNewToMe[hashid] del i.objectsNewToMe[hashid]
except KeyError: except KeyError:
with i.objectsNewToThemLock: with i.objectsNewToThemLock:

View File

@ -3,10 +3,8 @@ import threading
import time import time
import addresses import addresses
#from bmconfigparser import BMConfigParser
from debug import logger from debug import logger
from helper_threading import StoppableThread from helper_threading import StoppableThread
#from inventory import Inventory
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
import protocol import protocol
from state import missingObjects from state import missingObjects
@ -49,32 +47,15 @@ class DownloadThread(threading.Thread, StoppableThread):
for i in connections: for i in connections:
now = time.time() now = time.time()
timedOut = now - DownloadThread.requestTimeout timedOut = now - DownloadThread.requestTimeout
# this may take a while, but it needs a consistency so I think it's better to lock a bigger chunk
with i.objectsNewToMeLock:
try: try:
downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if k in missingObjects and missingObjects[k] > timedOut and not v))) request = i.objectsNewToMe.randomKeys(requestChunk)
except KeyError: except KeyError:
continue continue
if downloadPending >= DownloadThread.minPending:
continue
# keys with True values in the dict
try:
request = list((k for k, v in i.objectsNewToMe.iteritems() if k not in missingObjects or missingObjects[k] < timedOut))
except KeyError:
continue
random.shuffle(request)
if len(request) > requestChunk - downloadPending:
request = request[:max(1, requestChunk - downloadPending)]
if not request:
continue
# mark them as pending
for k in request:
i.objectsNewToMe[k] = False
missingObjects[k] = now
payload = bytearray() payload = bytearray()
payload.extend(addresses.encodeVarint(len(request))) payload.extend(addresses.encodeVarint(len(request)))
for chunk in request: for chunk in request:
payload.extend(chunk) payload.extend(chunk)
missingObjects[k] = now
i.append_write_buf(protocol.CreatePacket('getdata', payload)) i.append_write_buf(protocol.CreatePacket('getdata', payload))
logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request)) logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request))
requested += len(request) requested += len(request)

View File

@ -5,6 +5,7 @@ from threading import RLock
from debug import logger from debug import logger
from inventory import Inventory from inventory import Inventory
from network.dandelion import Dandelion from network.dandelion import Dandelion
from randomtrakcingdict import RandomTrackingDict
from state import missingObjects from state import missingObjects
haveBloom = False haveBloom = False
@ -32,8 +33,7 @@ class ObjectTracker(object):
initialTimeOffset = 60 initialTimeOffset = 60
def __init__(self): def __init__(self):
self.objectsNewToMe = {} self.objectsNewToMe = RandomTrackingDict()
self.objectsNewToMeLock = RLock()
self.objectsNewToThem = {} self.objectsNewToThem = {}
self.objectsNewToThemLock = RLock() self.objectsNewToThemLock = RLock()
self.initInvBloom() self.initInvBloom()
@ -61,9 +61,6 @@ class ObjectTracker(object):
self.initAddrBloom() self.initAddrBloom()
else: else:
# release memory # release memory
with self.objectsNewToMeLock:
tmp = self.objectsNewToMe.copy()
self.objectsNewToMe = tmp
deadline = time.time() - ObjectTracker.trackingExpires deadline = time.time() - ObjectTracker.trackingExpires
with self.objectsNewToThemLock: with self.objectsNewToThemLock:
self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline} self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline}
@ -88,8 +85,7 @@ class ObjectTracker(object):
if hashId in Dandelion().hashMap: if hashId in Dandelion().hashMap:
Dandelion().fluffTrigger(hashId) Dandelion().fluffTrigger(hashId)
if hashId not in missingObjects: if hashId not in missingObjects:
missingObjects[hashId] = time.time() - ObjectTracker.initialTimeOffset missingObjects[hashId] = True
with self.objectsNewToMeLock:
self.objectsNewToMe[hashId] = True self.objectsNewToMe[hashId] = True
def hasAddr(self, addr): def hasAddr(self, addr):

118
src/randomtrackingdict.py Normal file
View File

@ -0,0 +1,118 @@
import random
from threading import RLock
from time import time
class RandomTrackingDict(object):
maxPending = 10
pendingTimeout = 60
def __init__(self): # O(1)
self.dictionary = {}
self.indexDict = []
self.len = 0
self.pendingLen = 0
self.lastPoll = 0
self.lock = RLock()
def __len__(self):
return self.len
def __contains__(self, key):
return key in self.dictionary
def __getitem__(self, key):
return self.dictionary[key][1]
def __setitem__(self, key, value):
with self.lock:
if key in self.dictionary:
self.dictionary[key][1] = value
else:
self.indexDict.append(key)
self.dictionary[key] = [self.len, value]
self.len += 1
def __delitem__(self, key):
if not key in self.dictionary:
raise KeyError
with self.lock:
index = self.dictionary[key][0]
self.indexDict[index] = self.indexDict[self.len - 1]
self.dictionary[self.indexDict[index]][0] = index
# if the following del is batched, performance of this single
# operation can improve 4x, but it's already very fast so we'll
# ignore it for the time being
del self.indexDict[-1]
del self.dictionary[key]
self.len -= 1
if index >= self.len - self.pendingLen:
self.pendingLen -= 1
def setMaxPending(self, maxPending):
self.maxPending = maxPending
def setPendingTimeout(self, pendingTimeout):
self.pendingTimeout = pendingTimeout
def randomKeys(self, count=1):
if self.lastPoll + self.pendingTimeout < time():
with self.lock:
self.pendingLen = 0
if self.len == 0 or self.pendingLen >= self.maxPending:
raise KeyError
with self.lock:
available = self.len - self.pendingLen
if count > available:
count = available
retval = random.sample(self.indexDict[:self.len - self.pendingLen], count)
for i in retval[::-1]:
# swap with one below lowest pending
self.pendingLen += 1
swapKey = self.indexDict[-self.pendingLen]
curIndex = self.dictionary[i][0]
self.indexDict[-self.pendingLen] = i
self.indexDict[curIndex] = swapKey
self.dictionary[i][0] = self.len - self.pendingLen
self.dictionary[swapKey][0] = curIndex
self.lastPoll = time()
return retval
if __name__ == '__main__':
def randString():
retval = b''
for _ in range(32):
retval += chr(random.randint(0,255))
return retval
a = []
k = RandomTrackingDict()
d = {}
# print "populating normal dict"
# a.append(time())
# for i in range(50000):
# d[randString()] = True
# a.append(time())
print "populating random tracking dict"
a.append(time())
for i in range(50000):
k[randString()] = True
a.append(time())
print "done"
while len(k) > 0:
retval = k.randomKeys(1000)
if not retval:
print "error getting random keys"
#a.append(time())
try:
k.randomKeys(100)
print "bad"
except KeyError:
pass
#a.append(time())
for i in retval:
del k[i]
#a.append(time())
a.append(time())
for x in range(len(a) - 1):
print "%i: %.3f" % (x, a[x+1] - a[x])