diff --git a/src/api.py b/src/api.py index 968fb29d..edae0dc5 100644 --- a/src/api.py +++ b/src/api.py @@ -850,7 +850,6 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): TTL = 2.5 * 24 * 60 * 60 shared.inventory[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, int(time.time()) + TTL,'') - shared.inventorySets[toStreamNumber].add(inventoryHash) with shared.printLock: print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( @@ -898,7 +897,6 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): TTL = 28 * 24 * 60 * 60 shared.inventory[inventoryHash] = ( objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL,'') - shared.inventorySets[pubkeyStreamNumber].add(inventoryHash) with shared.printLock: print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 43db5ae1..abbcfff4 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -48,12 +48,7 @@ from helper_threading import * def connectToStream(streamNumber): shared.streamsInWhichIAmParticipating[streamNumber] = 'no data' selfInitiatedConnections[streamNumber] = {} - shared.inventorySets[streamNumber] = set() - queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber) - for row in queryData: - shared.inventorySets[streamNumber].add(row[0]) - if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections(): # Some XP and Vista systems can only have 10 outgoing connections at a time. maximumNumberOfHalfOpenConnections = 9 diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 430deac6..cfc5ceec 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2305,16 +2305,12 @@ class MyForm(settingsmixin.SMainWindow): # in the objectProcessorQueue to be processed if self.NewSubscriptionDialogInstance.ui.checkBoxDisplayMessagesAlreadyInInventory.isChecked(): status, addressVersion, streamNumber, ripe = decodeAddress(address) - shared.flushInventory() + shared.inventory.flush() doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest() tag = doubleHashOfAddressData[32:] - queryreturn = sqlQuery( - '''select payload from inventory where objecttype=3 and tag=?''', tag) - for row in queryreturn: - payload, = row - objectType = 3 - shared.objectProcessorQueue.put((objectType,payload)) + for value in shared.inventory.by_type_and_tag(3, tag): + shared.objectProcessorQueue.put((value.type, value.payload)) def click_pushButtonStatusIcon(self): logger.debug('click_pushButtonStatusIcon') @@ -4196,23 +4192,22 @@ class NewSubscriptionDialog(QtGui.QDialog): self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText( _translate("MainWindow", "Address is an old type. We cannot display its past broadcasts.")) else: - shared.flushInventory() + shared.inventory.flush() doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint( addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest() tag = doubleHashOfAddressData[32:] - queryreturn = sqlQuery( - '''select hash from inventory where objecttype=3 and tag=?''', tag) - if len(queryreturn) == 0: + count = len(shared.inventory.by_type_and_tag(3, tag)) + if count == 0: self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText( _translate("MainWindow", "There are no recent broadcasts from this address to display.")) - elif len(queryreturn) == 1: + elif count == 1: self.ui.checkBoxDisplayMessagesAlreadyInInventory.setEnabled(True) self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText( - _translate("MainWindow", "Display the %1 recent broadcast from this address.").arg(str(len(queryreturn)))) + _translate("MainWindow", "Display the %1 recent broadcast from this address.").arg(count)) else: self.ui.checkBoxDisplayMessagesAlreadyInInventory.setEnabled(True) self.ui.checkBoxDisplayMessagesAlreadyInInventory.setText( - _translate("MainWindow", "Display the %1 recent broadcasts from this address.").arg(str(len(queryreturn)))) + _translate("MainWindow", "Display the %1 recent broadcasts from this address.").arg(count)) class NewAddressDialog(QtGui.QDialog): diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index aec343d8..a8854e47 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -216,14 +216,8 @@ class receiveDataThread(threading.Thread): objectHash, = random.sample( self.objectsThatWeHaveYetToGetFromThisPeer, 1) if objectHash in shared.inventory: - logger.debug('Inventory (in memory) already has object listed in inv message.') - del self.objectsThatWeHaveYetToGetFromThisPeer[ - objectHash] - elif shared.isInSqlInventory(objectHash): - if shared.verbose >= 3: - logger.debug('Inventory (SQL on disk) already has object listed in inv message.') - del self.objectsThatWeHaveYetToGetFromThisPeer[ - objectHash] + logger.debug('Inventory already has object listed in inv message.') + del self.objectsThatWeHaveYetToGetFromThisPeer[objectHash] else: # We don't have the object in our inventory. Let's request it. self.sendgetdata(objectHash) @@ -318,23 +312,10 @@ class receiveDataThread(threading.Thread): def sendBigInv(self): # Select all hashes for objects in this stream. - queryreturn = sqlQuery( - '''SELECT hash FROM inventory WHERE expirestime>? and streamnumber=?''', - int(time.time()), - self.streamNumber) bigInvList = {} - for row in queryreturn: - hash, = row + for hash in shared.inventory.unexpired_hashes_by_stream(self.streamNumber): if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash): bigInvList[hash] = 0 - # We also have messages in our inventory in memory (which is a python - # dictionary). Let's fetch those too. - with shared.inventoryLock: - for hash, storedValue in shared.inventory.items(): - if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - objectType, streamNumber, payload, expiresTime, tag = storedValue - if streamNumber == self.streamNumber and expiresTime > int(time.time()): - bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' # Now let us start appending all of these hashes together. They will be @@ -440,9 +421,7 @@ class receiveDataThread(threading.Thread): data[lengthOfVarint:32 + lengthOfVarint]] = 0 shared.numberOfInventoryLookupsPerformed += 1 if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory: - logger.debug('Inventory (in memory) has inventory item already.') - elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): - logger.debug('Inventory (SQL on disk) has inventory item already.') + logger.debug('Inventory has inventory item already.') else: self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) else: @@ -453,7 +432,7 @@ class receiveDataThread(threading.Thread): advertisedSet = set() for i in range(numberOfItemsInInv): advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) - objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber] + objectsNewToMe = advertisedSet - shared.inventory.hashes_by_stream(self.streamNumber) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) for item in objectsNewToMe: if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation @@ -491,20 +470,10 @@ class receiveDataThread(threading.Thread): if self.objectHashHolderInstance.hasHash(hash): shared.inventoryLock.release() self.antiIntersectionDelay() - elif hash in shared.inventory: - objectType, streamNumber, payload, expiresTime, tag = shared.inventory[hash] - shared.inventoryLock.release() - self.sendObject(payload) else: shared.inventoryLock.release() - queryreturn = sqlQuery( - '''select payload from inventory where hash=? and expirestime>=?''', - hash, - int(time.time())) - if queryreturn != []: - for row in queryreturn: - payload, = row - self.sendObject(payload) + if hash in shared.inventory: + self.sendObject(shared.inventory[hash].payload) else: self.antiIntersectionDelay() logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,)) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index f9e87b1a..3467da96 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -46,19 +46,7 @@ class singleCleaner(threading.Thread, StoppableThread): while shared.shutdown == 0: shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. - with SqlBulkExecute() as sql: - for hash, storedValue in shared.inventory.items(): - objectType, streamNumber, payload, expiresTime, tag = storedValue - sql.execute( - '''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash, - objectType, - streamNumber, - payload, - expiresTime, - tag) - del shared.inventory[hash] + shared.inventory.flush() shared.UISignalQueue.put(('updateStatusBar', '')) shared.broadcastToSendDataQueues(( @@ -70,9 +58,7 @@ class singleCleaner(threading.Thread, StoppableThread): shared.UISignalQueue.queue.clear() if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380: timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) - sqlExecute( - '''DELETE FROM inventory WHERE expirestime t] + hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) + return hashes + + def flush(self): + with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. + with SqlBulkExecute() as sql: + for hash, value in self._inventory.items(): + sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value) + self._inventory.clear() + + def clean(self): + with inventoryLock: + sqlExecute('DELETE FROM inventory WHERE expirestime -1: @@ -417,8 +494,8 @@ def doCleanShutdown(): UISignalQueue.put(( 'updateStatusBar', 'Flushing inventory in memory out to disk. This should normally only take a second...')) - flushInventory() - + inventory.flush() + # Verify that the objectProcessor has finished exiting. It should have incremented the # shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit. while shutdown == 1: @@ -452,15 +529,6 @@ def broadcastToSendDataQueues(data): # logger.debug('running broadcastToSendDataQueues') for q in sendDataQueues: q.put(data) - -def flushInventory(): - #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. - with SqlBulkExecute() as sql: - for hash, storedValue in inventory.items(): - objectType, streamNumber, payload, expiresTime, tag = storedValue - sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''', - hash,objectType,streamNumber,payload,expiresTime,tag) - del inventory[hash] def fixPotentiallyInvalidUTF8Data(text): try: @@ -703,14 +771,9 @@ def _checkAndShareUndefinedObjectWithPeers(data): logger.debug('We have already received this undefined object. Ignoring.') inventoryLock.release() return - elif isInSqlInventory(inventoryHash): - logger.debug('We have already received this undefined object (it is stored on disk in the SQL inventory). Ignoring it.') - inventoryLock.release() - return objectType, = unpack('>I', data[16:20]) inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime,'') - inventorySets[streamNumber].add(inventoryHash) inventoryLock.release() logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex')) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) @@ -735,15 +798,10 @@ def _checkAndShareMsgWithPeers(data): logger.debug('We have already received this msg message. Ignoring.') inventoryLock.release() return - elif isInSqlInventory(inventoryHash): - logger.debug('We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.') - inventoryLock.release() - return # This msg message is valid. Let's let our peers know about it. objectType = 2 inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime,'') - inventorySets[streamNumber].add(inventoryHash) inventoryLock.release() logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex')) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) @@ -776,15 +834,10 @@ def _checkAndShareGetpubkeyWithPeers(data): logger.debug('We have already received this getpubkey request. Ignoring it.') inventoryLock.release() return - elif isInSqlInventory(inventoryHash): - logger.debug('We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.') - inventoryLock.release() - return objectType = 0 inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime,'') - inventorySets[streamNumber].add(inventoryHash) inventoryLock.release() # This getpubkey request is valid. Forward to peers. logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex')) @@ -820,14 +873,9 @@ def _checkAndSharePubkeyWithPeers(data): logger.debug('We have already received this pubkey. Ignoring it.') inventoryLock.release() return - elif isInSqlInventory(inventoryHash): - logger.debug('We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.') - inventoryLock.release() - return objectType = 1 inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime, tag) - inventorySets[streamNumber].add(inventoryHash) inventoryLock.release() # This object is valid. Forward it to peers. logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex')) @@ -864,15 +912,10 @@ def _checkAndShareBroadcastWithPeers(data): logger.debug('We have already received this broadcast object. Ignoring.') inventoryLock.release() return - elif isInSqlInventory(inventoryHash): - logger.debug('We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.') - inventoryLock.release() - return # It is valid. Let's let our peers know about it. objectType = 3 inventory[inventoryHash] = ( objectType, streamNumber, data, embeddedTime, tag) - inventorySets[streamNumber].add(inventoryHash) inventoryLock.release() # This object is valid. Forward it to peers. logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))