Refactor objects to be downloaded

- moved logic into a Missing singleton
- shouldn't try to download duplicates anymore, only requests a hash
  once every 5 minutes and not from the same host
- removed obsoleted variables
- the "Objects to be synced" in the Network tab should now be correct
- removed some checks which aren't necessary anymore in my opinion
- fix missing self in Throttle (thanks landscape.io)
This commit is contained in:
Peter Šurda 2017-01-15 19:21:24 +01:00
parent 6d2a75bfc9
commit f079ff5b99
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
6 changed files with 99 additions and 97 deletions

View File

@ -77,6 +77,7 @@ from dialogs import AddAddressDialog
from class_objectHashHolder import objectHashHolder
from class_singleWorker import singleWorker
from helper_generic import powQueueSize, invQueueSize
from inventory import Missing
import paths
from proofofwork import getPowType
import protocol
@ -2712,11 +2713,9 @@ class MyForm(settingsmixin.SMainWindow):
elif reply == QtGui.QMessage.Cancel:
return
toBeDownloaded = sum(shared.numberOfObjectsThatWeHaveYetToGetPerPeer.itervalues())
if toBeDownloaded > 0:
if Missing().len() > 0:
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Synchronisation pending"),
_translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, toBeDownloaded),
_translate("MainWindow", "Bitmessage hasn't synchronised with the network, %n object(s) to be downloaded. If you quit now, it may cause delivery delays. Wait until the synchronisation finishes?", None, QtCore.QCoreApplication.CodecForTr, Missing().len()),
QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel)
if reply == QtGui.QMessageBox.Yes:
waitForSync = True
@ -2747,7 +2746,7 @@ class MyForm(settingsmixin.SMainWindow):
if waitForSync:
self.statusBar().showMessage(_translate(
"MainWindow", "Waiting for finishing synchronisation..."))
while sum(shared.numberOfObjectsThatWeHaveYetToGetPerPeer.itervalues()) > 0:
while Missing().len() > 0:
time.sleep(0.5)
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000)

View File

@ -2,7 +2,7 @@ from PyQt4 import QtCore, QtGui
import time
import shared
from tr import _translate
from inventory import Inventory
from inventory import Inventory, Missing
import l10n
from retranslateui import RetranslateMixin
from uisignaler import UISignaler
@ -45,7 +45,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
return "%4.0f kB" % num
def updateNumberOfObjectsToBeSynced(self):
self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, sum(shared.numberOfObjectsThatWeHaveYetToGetPerPeer.itervalues())))
self.labelSyncStatus.setText(_translate("networkstatus", "Object(s) to be synced: %n", None, QtCore.QCoreApplication.CodecForTr, Missing().len()))
def updateNumberOfMessagesProcessed(self):
self.updateNumberOfObjectsToBeSynced()

View File

@ -30,7 +30,7 @@ from helper_sql import sqlQuery
from debug import logger
import paths
import protocol
from inventory import Inventory
from inventory import Inventory, Missing
import state
import throttle
import tr
@ -62,7 +62,6 @@ class receiveDataThread(threading.Thread):
self.peer = state.Peer(HOST, port)
self.name = "receiveData-" + self.peer.host.replace(":", ".") # ":" log parser field separator
self.streamNumber = streamNumber
self.objectsThatWeHaveYetToGetFromThisPeer = {}
self.selfInitiatedConnections = selfInitiatedConnections
self.sendDataThreadQueue = sendDataThreadQueue # used to send commands and data to the sendDataThread
self.hostIdent = self.peer.port if ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname') and protocol.checkSocksIP(self.peer.host) else self.peer.host
@ -129,11 +128,7 @@ class receiveDataThread(threading.Thread):
except Exception as err:
logger.error('Could not delete ' + str(self.hostIdent) + ' from shared.connectedHostsList.' + str(err))
try:
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
self.peer]
except:
pass
Missing().threadEnd()
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
self.checkTimeOffsetNotification()
logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
@ -222,37 +217,16 @@ class receiveDataThread(threading.Thread):
self.data = self.data[payloadLength + protocol.Header.size:] # take this message out and then process the next message
if self.data == '': # if there are no more messages
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0 and not self.sendDataThreadQueue.full():
objectHash, = random.sample(
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
while Missing().len() > 0 and not self.sendDataThreadQueue.full():
objectHash = Missing().pull()
if objectHash is None:
break
if objectHash in Inventory():
logger.debug('Inventory already has object listed in inv message.')
del self.objectsThatWeHaveYetToGetFromThisPeer[objectHash]
Missing().delete(objectHash)
else:
# We don't have the object in our inventory. Let's request it.
self.sendgetdata(objectHash)
del self.objectsThatWeHaveYetToGetFromThisPeer[
objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway.
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
try:
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
except:
pass
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
try:
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
except:
pass
if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)))
shared.numberOfObjectsThatWeHaveYetToGetPerPeer[self.peer] = len(
self.objectsThatWeHaveYetToGetFromThisPeer) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
self.processData()
@ -425,13 +399,6 @@ class receiveDataThread(threading.Thread):
# We have received an inv message
def recinv(self, data):
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = 0 # this counts duplicates separately because they take up memory
if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0:
for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items():
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value
logger.debug('number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer: ' + str(len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer)) + "\n" + \
'totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers))
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
if numberOfItemsInInv > 50000:
sys.stderr.write('Too many items in inv message!')
@ -439,35 +406,16 @@ class receiveDataThread(threading.Thread):
if len(data) < lengthOfVarint + (numberOfItemsInInv * 32):
logger.info('inv message doesn\'t contain enough data. Ignoring.')
return
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and state.trustedPeer == None: # inv flooding attack mitigation
logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.')
return
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
data[lengthOfVarint:32 + lengthOfVarint]] = 0
if data[lengthOfVarint:32 + lengthOfVarint] in Inventory():
logger.debug('Inventory has inventory item already.')
else:
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
else:
# There are many items listed in this inv message. Let us create a
# 'set' of objects we are aware of and a set of objects in this inv
# message so that we can diff one from the other cheaply.
startTime = time.time()
advertisedSet = set()
for i in range(numberOfItemsInInv):
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber)
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe:
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and state.trustedPeer == None: # inv flooding attack mitigation
logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)), ' from this node in particular. Ignoring the rest of this inv message.')
break
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
self.objectsThatWeHaveYetToGetFromThisPeer[item] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
self.peer] = len(self.objectsThatWeHaveYetToGetFromThisPeer)
startTime = time.time()
advertisedSet = set()
for i in range(numberOfItemsInInv):
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber)
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe:
Missing().add(item)
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
# Send a getdata message to our peer to request the object with the given
# hash

View File

@ -1,13 +1,11 @@
import collections
from threading import RLock
import random
from threading import current_thread, RLock
import time
from helper_sql import *
from singleton import Singleton
inventoryLock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag')
@Singleton
class Inventory(collections.MutableMapping):
@ -16,26 +14,28 @@ class Inventory(collections.MutableMapping):
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self.numberOfInventoryLookupsPerformed = 0
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
self.lock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
self.InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag')
def __contains__(self, hash):
with inventoryLock:
with self.lock:
self.numberOfInventoryLookupsPerformed += 1
if hash in self._inventory:
return True
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash))
def __getitem__(self, hash):
with inventoryLock:
with self.lock:
if hash in self._inventory:
return self._inventory[hash]
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
if not rows:
raise KeyError(hash)
return InventoryItem(*rows[0])
return self.InventoryItem(*rows[0])
def __setitem__(self, hash, value):
with inventoryLock:
value = InventoryItem(*value)
with self.lock:
value = self.InventoryItem(*value)
self._inventory[hash] = value
self._streams[value.stream].add(hash)
@ -43,42 +43,93 @@ class Inventory(collections.MutableMapping):
raise NotImplementedError
def __iter__(self):
with inventoryLock:
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (hash for hash, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
with inventoryLock:
with self.lock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, type, tag):
with inventoryLock:
with self.lock:
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
values += (self.InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
return values
def hashes_by_stream(self, stream):
with inventoryLock:
with self.lock:
return self._streams[stream]
def unexpired_hashes_by_stream(self, stream):
with inventoryLock:
with self.lock:
t = int(time.time())
hashes = [hash for hash, value in self._inventory.items() if value.stream == stream and value.expires > t]
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
return hashes
def flush(self):
with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with self.lock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for hash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value)
self._inventory.clear()
def clean(self):
with inventoryLock:
with self.lock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
self._streams.clear()
for hash, value in self.items():
self._streams[value.stream].add(hash)
@Singleton
class Missing(object):
def __init__(self):
super(self.__class__, self).__init__()
self.lock = RLock()
self.hashes = {}
def add(self, objectHash):
with self.lock:
if not objectHash in self.hashes:
self.hashes[objectHash] = {'peers':[], 'requested':0}
self.hashes[objectHash]['peers'].append(current_thread().peer)
def len(self):
with self.lock:
return len(self.hashes)
def pull(self):
with self.lock:
now = time.time()
since = now - 300 # once every 5 minutes
try:
objectHash = random.choice({k:v for k, v in self.hashes.iteritems() if current_thread().peer in self.hashes[k]['peers'] and self.hashes[k]['requested'] < since}.keys())
except (IndexError, KeyError): # list is empty
return None
try:
self.hashes[objectHash]['peers'].remove(current_thread().peer)
except ValueError:
pass
if len(self.hashes[objectHash]['peers']) == 0:
self.delete(objectHash)
else:
self.hashes[objectHash]['requested'] = now
return objectHash
def delete(self, objectHash):
with self.lock:
if objectHash in self.hashes:
del self.hashes[objectHash]
def threadEnd(self):
with self.lock:
for objectHash in self.hashes:
try:
self.hashes[objectHash]['peers'].remove(current_thread().peer)
except ValueError:
pass
# current_thread().peer

View File

@ -27,7 +27,7 @@ import highlevelcrypto
#import helper_startup
from helper_sql import *
from helper_threading import *
from inventory import Inventory
from inventory import Inventory, Missing
import protocol
import state
@ -55,7 +55,6 @@ alreadyAttemptedConnectionsList = {
alreadyAttemptedConnectionsListLock = threading.Lock()
alreadyAttemptedConnectionsListResetTime = int(
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
numberOfObjectsThatWeHaveYetToGetPerPeer = {}
successfullyDecryptMessageTimings = [
] # A list of the amounts of time it took to successfully decrypt msg messages
apiAddressGeneratorReturnQueue = Queue.Queue(
@ -475,6 +474,7 @@ def _checkAndShareUndefinedObjectWithPeers(data):
return
inventoryHash = calculateInventoryHash(data)
Missing().delete(inventoryHash)
if inventoryHash in Inventory():
logger.debug('We have already received this undefined object. Ignoring.')
return
@ -498,6 +498,7 @@ def _checkAndShareMsgWithPeers(data):
return
readPosition += streamNumberLength
inventoryHash = calculateInventoryHash(data)
Missing().delete(inventoryHash)
if inventoryHash in Inventory():
logger.debug('We have already received this msg message. Ignoring.')
return
@ -530,6 +531,7 @@ def _checkAndShareGetpubkeyWithPeers(data):
readPosition += streamNumberLength
inventoryHash = calculateInventoryHash(data)
Missing().delete(inventoryHash)
if inventoryHash in Inventory():
logger.debug('We have already received this getpubkey request. Ignoring it.')
return
@ -565,6 +567,7 @@ def _checkAndSharePubkeyWithPeers(data):
tag = ''
inventoryHash = calculateInventoryHash(data)
Missing().delete(inventoryHash)
if inventoryHash in Inventory():
logger.debug('We have already received this pubkey. Ignoring it.')
return
@ -600,6 +603,7 @@ def _checkAndShareBroadcastWithPeers(data):
else:
tag = ''
inventoryHash = calculateInventoryHash(data)
Missing().delete(inventoryHash)
if inventoryHash in Inventory():
logger.debug('We have already received this broadcast object. Ignoring.')
return

View File

@ -68,7 +68,7 @@ class SendThrottle(Throttle):
def resetLimit(self):
with self.lock:
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxuploadrate')*1024
Throttle.resetChunkSize()
Throttle.resetChunkSize(self)
@Singleton
class ReceiveThrottle(Throttle):
@ -78,4 +78,4 @@ class ReceiveThrottle(Throttle):
def resetLimit(self):
with self.lock:
self.limit = BMConfigParser().safeGetInt('bitmessagesettings', 'maxdownloadrate')*1024
Throttle.resetChunkSize()
Throttle.resetChunkSize(self)