From 68b58ce0c5c288e964e0036aee284a50d51f0b5a Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Thu, 1 Feb 2018 12:19:39 +0100 Subject: [PATCH] Download optimisation - new data structure to handle download tracking, uses less CPU --- src/network/bmproto.py | 3 +- src/network/connectionpool.py | 3 +- src/network/downloadthread.py | 29 ++------- src/network/objectracker.py | 12 ++-- src/randomtrackingdict.py | 118 ++++++++++++++++++++++++++++++++++ 5 files changed, 129 insertions(+), 36 deletions(-) create mode 100644 src/randomtrackingdict.py diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 21ec692c..4bf63bca 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -546,8 +546,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): for connection in network.connectionpool.BMConnectionPool().inboundConnections.values() + \ network.connectionpool.BMConnectionPool().outboundConnections.values(): try: - with connection.objectsNewToMeLock: - del connection.objectsNewToMe[hashId] + del connection.objectsNewToMe[hashId] except KeyError: pass if not forwardAnyway: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 793e284f..2f34e485 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -40,8 +40,7 @@ class BMConnectionPool(object): if not i.fullyEstablished: continue try: - with i.objectsNewToMeLock: - del i.objectsNewToMe[hashid] + del i.objectsNewToMe[hashid] except KeyError: with i.objectsNewToThemLock: i.objectsNewToThem[hashid] = time.time() diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index 35616f1b..ec921c58 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -3,10 +3,8 @@ import threading import time import addresses -#from bmconfigparser import BMConfigParser from debug import logger from helper_threading import StoppableThread -#from inventory import Inventory from network.connectionpool import BMConnectionPool import protocol from state import missingObjects @@ -49,32 +47,15 @@ class DownloadThread(threading.Thread, StoppableThread): for i in connections: now = time.time() timedOut = now - DownloadThread.requestTimeout - # this may take a while, but it needs a consistency so I think it's better to lock a bigger chunk - with i.objectsNewToMeLock: - try: - downloadPending = len(list((k for k, v in i.objectsNewToMe.iteritems() if k in missingObjects and missingObjects[k] > timedOut and not v))) - except KeyError: - continue - if downloadPending >= DownloadThread.minPending: - continue - # keys with True values in the dict - try: - request = list((k for k, v in i.objectsNewToMe.iteritems() if k not in missingObjects or missingObjects[k] < timedOut)) - except KeyError: - continue - random.shuffle(request) - if len(request) > requestChunk - downloadPending: - request = request[:max(1, requestChunk - downloadPending)] - if not request: - continue - # mark them as pending - for k in request: - i.objectsNewToMe[k] = False - missingObjects[k] = now + try: + request = i.objectsNewToMe.randomKeys(requestChunk) + except KeyError: + continue payload = bytearray() payload.extend(addresses.encodeVarint(len(request))) for chunk in request: payload.extend(chunk) + missingObjects[k] = now i.append_write_buf(protocol.CreatePacket('getdata', payload)) logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, len(request)) requested += len(request) diff --git a/src/network/objectracker.py b/src/network/objectracker.py index 62f01e4f..a786c696 100644 --- a/src/network/objectracker.py +++ b/src/network/objectracker.py @@ -5,6 +5,7 @@ from threading import RLock from debug import logger from inventory import Inventory from network.dandelion import Dandelion +from randomtrakcingdict import RandomTrackingDict from state import missingObjects haveBloom = False @@ -32,8 +33,7 @@ class ObjectTracker(object): initialTimeOffset = 60 def __init__(self): - self.objectsNewToMe = {} - self.objectsNewToMeLock = RLock() + self.objectsNewToMe = RandomTrackingDict() self.objectsNewToThem = {} self.objectsNewToThemLock = RLock() self.initInvBloom() @@ -61,9 +61,6 @@ class ObjectTracker(object): self.initAddrBloom() else: # release memory - with self.objectsNewToMeLock: - tmp = self.objectsNewToMe.copy() - self.objectsNewToMe = tmp deadline = time.time() - ObjectTracker.trackingExpires with self.objectsNewToThemLock: self.objectsNewToThem = {k: v for k, v in self.objectsNewToThem.iteritems() if v >= deadline} @@ -88,9 +85,8 @@ class ObjectTracker(object): if hashId in Dandelion().hashMap: Dandelion().fluffTrigger(hashId) if hashId not in missingObjects: - missingObjects[hashId] = time.time() - ObjectTracker.initialTimeOffset - with self.objectsNewToMeLock: - self.objectsNewToMe[hashId] = True + missingObjects[hashId] = True + self.objectsNewToMe[hashId] = True def hasAddr(self, addr): if haveBloom: diff --git a/src/randomtrackingdict.py b/src/randomtrackingdict.py new file mode 100644 index 00000000..466acc41 --- /dev/null +++ b/src/randomtrackingdict.py @@ -0,0 +1,118 @@ +import random +from threading import RLock +from time import time + +class RandomTrackingDict(object): + maxPending = 10 + pendingTimeout = 60 + def __init__(self): # O(1) + self.dictionary = {} + self.indexDict = [] + self.len = 0 + self.pendingLen = 0 + self.lastPoll = 0 + self.lock = RLock() + + def __len__(self): + return self.len + + def __contains__(self, key): + return key in self.dictionary + + def __getitem__(self, key): + return self.dictionary[key][1] + + def __setitem__(self, key, value): + with self.lock: + if key in self.dictionary: + self.dictionary[key][1] = value + else: + self.indexDict.append(key) + self.dictionary[key] = [self.len, value] + self.len += 1 + + def __delitem__(self, key): + if not key in self.dictionary: + raise KeyError + with self.lock: + index = self.dictionary[key][0] + self.indexDict[index] = self.indexDict[self.len - 1] + self.dictionary[self.indexDict[index]][0] = index + # if the following del is batched, performance of this single + # operation can improve 4x, but it's already very fast so we'll + # ignore it for the time being + del self.indexDict[-1] + del self.dictionary[key] + self.len -= 1 + if index >= self.len - self.pendingLen: + self.pendingLen -= 1 + + def setMaxPending(self, maxPending): + self.maxPending = maxPending + + def setPendingTimeout(self, pendingTimeout): + self.pendingTimeout = pendingTimeout + + def randomKeys(self, count=1): + if self.lastPoll + self.pendingTimeout < time(): + with self.lock: + self.pendingLen = 0 + if self.len == 0 or self.pendingLen >= self.maxPending: + raise KeyError + with self.lock: + available = self.len - self.pendingLen + if count > available: + count = available + retval = random.sample(self.indexDict[:self.len - self.pendingLen], count) + for i in retval[::-1]: + # swap with one below lowest pending + self.pendingLen += 1 + swapKey = self.indexDict[-self.pendingLen] + curIndex = self.dictionary[i][0] + self.indexDict[-self.pendingLen] = i + self.indexDict[curIndex] = swapKey + self.dictionary[i][0] = self.len - self.pendingLen + self.dictionary[swapKey][0] = curIndex + self.lastPoll = time() + return retval + +if __name__ == '__main__': + def randString(): + retval = b'' + for _ in range(32): + retval += chr(random.randint(0,255)) + return retval + + a = [] + k = RandomTrackingDict() + d = {} + +# print "populating normal dict" +# a.append(time()) +# for i in range(50000): +# d[randString()] = True +# a.append(time()) + print "populating random tracking dict" + a.append(time()) + for i in range(50000): + k[randString()] = True + a.append(time()) + print "done" + while len(k) > 0: + retval = k.randomKeys(1000) + if not retval: + print "error getting random keys" + #a.append(time()) + try: + k.randomKeys(100) + print "bad" + except KeyError: + pass + #a.append(time()) + for i in retval: + del k[i] + #a.append(time()) + a.append(time()) + + for x in range(len(a) - 1): + print "%i: %.3f" % (x, a[x+1] - a[x])