moved inventory in state - global runtime variable from singleton
This commit is contained in:
parent
d555a79200
commit
5faef8d40e
|
@ -94,7 +94,6 @@ from defaults import (
|
|||
from helper_sql import (
|
||||
SqlBulkExecute, sqlExecute, sqlQuery, sqlStoredProcedure, sql_ready)
|
||||
from highlevelcrypto import calculateInventoryHash
|
||||
from inventory import Inventory
|
||||
|
||||
try:
|
||||
from network import BMConnectionPool
|
||||
|
@ -1340,7 +1339,7 @@ class BMRPCDispatcher(object):
|
|||
encryptedPayload = pack('>Q', nonce) + encryptedPayload
|
||||
|
||||
inventoryHash = calculateInventoryHash(encryptedPayload)
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, toStreamNumber, encryptedPayload,
|
||||
expiresTime, b''
|
||||
)
|
||||
|
@ -1396,7 +1395,7 @@ class BMRPCDispatcher(object):
|
|||
inventoryHash = calculateInventoryHash(payload)
|
||||
objectType = 1 # .. todo::: support v4 pubkeys
|
||||
TTL = 28 * 24 * 60 * 60
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, pubkeyStreamNumber, payload, int(time.time()) + TTL, ''
|
||||
)
|
||||
logger.info(
|
||||
|
|
|
@ -30,7 +30,6 @@ import state
|
|||
from addresses import addBMIfNotPresent, decodeAddress
|
||||
from bmconfigparser import config
|
||||
from helper_sql import sqlExecute, sqlQuery
|
||||
from inventory import Inventory
|
||||
|
||||
# pylint: disable=global-statement
|
||||
|
||||
|
@ -145,8 +144,8 @@ def scrollbox(d, text, height=None, width=None):
|
|||
def resetlookups():
|
||||
"""Reset the Inventory Lookups"""
|
||||
global inventorydata
|
||||
inventorydata = Inventory().numberOfInventoryLookupsPerformed
|
||||
Inventory().numberOfInventoryLookupsPerformed = 0
|
||||
inventorydata = state.Inventory.numberOfInventoryLookupsPerformed
|
||||
state.Inventory.numberOfInventoryLookupsPerformed = 0
|
||||
Timer(1, resetlookups, ()).start()
|
||||
|
||||
|
||||
|
|
|
@ -176,8 +176,7 @@ class Main(object):
|
|||
# The closeEvent should command this thread to exit gracefully.
|
||||
sqlLookup.daemon = False
|
||||
sqlLookup.start()
|
||||
|
||||
Inventory() # init
|
||||
state.Inventory = Inventory() # init
|
||||
|
||||
if state.enableObjProc: # Not needed if objproc is disabled
|
||||
# Start the address generation thread
|
||||
|
|
|
@ -9,10 +9,10 @@ from PyQt4 import QtCore, QtGui
|
|||
|
||||
import queues
|
||||
import widgets
|
||||
import state
|
||||
from account import AccountMixin, GatewayAccount, MailchuckAccount, accountClass
|
||||
from addresses import addBMIfNotPresent, decodeAddress, encodeVarint
|
||||
from bmconfigparser import config as global_config
|
||||
from inventory import Inventory
|
||||
from tr import _translate
|
||||
|
||||
|
||||
|
@ -190,13 +190,13 @@ class NewSubscriptionDialog(AddressDataDialog):
|
|||
" broadcasts."
|
||||
))
|
||||
else:
|
||||
Inventory().flush()
|
||||
state.Inventory.flush()
|
||||
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(
|
||||
encodeVarint(addressVersion)
|
||||
+ encodeVarint(streamNumber) + ripe
|
||||
).digest()).digest()
|
||||
tag = doubleHashOfAddressData[32:]
|
||||
self.recent = Inventory().by_type_and_tag(3, tag)
|
||||
self.recent = state.Inventory.by_type_and_tag(3, tag)
|
||||
count = len(self.recent)
|
||||
if count == 0:
|
||||
self.checkBoxDisplayMessagesAlreadyInInventory.setText(
|
||||
|
|
|
@ -10,7 +10,6 @@ import l10n
|
|||
import network.stats
|
||||
import state
|
||||
import widgets
|
||||
from inventory import Inventory
|
||||
from network import BMConnectionPool, knownnodes
|
||||
from retranslateui import RetranslateMixin
|
||||
from tr import _translate
|
||||
|
@ -50,7 +49,7 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
|
|||
|
||||
def startUpdate(self):
|
||||
"""Start a timer to update counters every 2 seconds"""
|
||||
Inventory().numberOfInventoryLookupsPerformed = 0
|
||||
state.Inventory.numberOfInventoryLookupsPerformed = 0
|
||||
self.runEveryTwoSeconds()
|
||||
self.timer.start(2000) # milliseconds
|
||||
|
||||
|
@ -229,8 +228,8 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
|
|||
def runEveryTwoSeconds(self):
|
||||
"""Updates counters, runs every 2 seconds if the timer is running"""
|
||||
self.labelLookupsPerSecond.setText(_translate("networkstatus", "Inventory lookups per second: %1").arg(
|
||||
str(Inventory().numberOfInventoryLookupsPerformed / 2)))
|
||||
Inventory().numberOfInventoryLookupsPerformed = 0
|
||||
str(state.Inventory.numberOfInventoryLookupsPerformed / 2)))
|
||||
state.Inventory.numberOfInventoryLookupsPerformed = 0
|
||||
self.updateNumberOfBytes()
|
||||
self.updateNumberOfObjectsToBeSynced()
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ from bmconfigparser import config
|
|||
from fallback import RIPEMD160Hash
|
||||
from helper_sql import (
|
||||
sql_ready, sql_timeout, SqlBulkExecute, sqlExecute, sqlQuery)
|
||||
from inventory import Inventory
|
||||
from network import knownnodes
|
||||
from network.node import Peer
|
||||
from tr import _translate
|
||||
|
@ -736,7 +735,7 @@ class objectProcessor(threading.Thread):
|
|||
objectType, toStreamNumber, expiresTime = \
|
||||
protocol.decodeObjectParameters(ackPayload)
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(ackPayload)
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, toStreamNumber, ackPayload, expiresTime, b'')
|
||||
queues.invQueue.put((toStreamNumber, inventoryHash))
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import queues
|
|||
import state
|
||||
from bmconfigparser import config
|
||||
from helper_sql import sqlExecute, sqlQuery
|
||||
from inventory import Inventory
|
||||
from network import BMConnectionPool, knownnodes, StoppableThread
|
||||
from tr import _translate
|
||||
|
||||
|
@ -69,7 +68,7 @@ class singleCleaner(StoppableThread):
|
|||
'updateStatusBar',
|
||||
'Doing housekeeping (Flushing inventory in memory to disk...)'
|
||||
))
|
||||
Inventory().flush()
|
||||
state.Inventory.flush()
|
||||
queues.UISignalQueue.put(('updateStatusBar', ''))
|
||||
|
||||
# If we are running as a daemon then we are going to fill up the UI
|
||||
|
@ -82,7 +81,7 @@ class singleCleaner(StoppableThread):
|
|||
tick = int(time.time())
|
||||
if timeWeLastClearedInventoryAndPubkeysTables < tick - 7380:
|
||||
timeWeLastClearedInventoryAndPubkeysTables = tick
|
||||
Inventory().clean()
|
||||
state.Inventory.clean()
|
||||
queues.workerQueue.put(('sendOnionPeerObj', ''))
|
||||
# pubkeys
|
||||
sqlExecute(
|
||||
|
|
|
@ -28,7 +28,6 @@ import tr
|
|||
from addresses import decodeAddress, decodeVarint, encodeVarint
|
||||
from bmconfigparser import config
|
||||
from helper_sql import sqlExecute, sqlQuery
|
||||
from inventory import Inventory
|
||||
from network import knownnodes, StoppableThread
|
||||
from six.moves import configparser, queue
|
||||
|
||||
|
@ -117,7 +116,7 @@ class singleWorker(StoppableThread):
|
|||
# For the case if user deleted knownnodes
|
||||
# but is still having onionpeer objects in inventory
|
||||
if not knownnodes.knownNodesActual:
|
||||
for item in Inventory().by_type_and_tag(protocol.OBJECT_ONIONPEER):
|
||||
for item in state.Inventory.by_type_and_tag(protocol.OBJECT_ONIONPEER):
|
||||
queues.objectProcessorQueue.put((
|
||||
protocol.OBJECT_ONIONPEER, item.payload
|
||||
))
|
||||
|
@ -288,7 +287,7 @@ class singleWorker(StoppableThread):
|
|||
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
||||
objectType = 1
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, embeddedTime, '')
|
||||
|
||||
self.logger.info(
|
||||
|
@ -376,7 +375,7 @@ class singleWorker(StoppableThread):
|
|||
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
||||
objectType = 1
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, embeddedTime, '')
|
||||
|
||||
self.logger.info(
|
||||
|
@ -467,7 +466,7 @@ class singleWorker(StoppableThread):
|
|||
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
||||
objectType = 1
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, embeddedTime,
|
||||
doubleHashOfAddressData[32:]
|
||||
)
|
||||
|
@ -503,7 +502,7 @@ class singleWorker(StoppableThread):
|
|||
objectPayload = encodeVarint(peer.port) + protocol.encodeHost(peer.host)
|
||||
tag = highlevelcrypto.calculateInventoryHash(objectPayload)
|
||||
|
||||
if Inventory().by_type_and_tag(objectType, tag):
|
||||
if state.Inventory.by_type_and_tag(objectType, tag):
|
||||
return # not expired
|
||||
|
||||
payload = pack('>Q', embeddedTime)
|
||||
|
@ -516,7 +515,7 @@ class singleWorker(StoppableThread):
|
|||
payload, TTL, log_prefix='(For onionpeer object)')
|
||||
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, streamNumber, buffer(payload), # noqa: F821
|
||||
embeddedTime, buffer(tag) # noqa: F821
|
||||
)
|
||||
|
@ -684,7 +683,7 @@ class singleWorker(StoppableThread):
|
|||
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
||||
objectType = 3
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, embeddedTime, tag)
|
||||
self.logger.info(
|
||||
'sending inv (within sendBroadcast function)'
|
||||
|
@ -843,7 +842,7 @@ class singleWorker(StoppableThread):
|
|||
hexlify(privEncryptionKey))
|
||||
)
|
||||
|
||||
for value in Inventory().by_type_and_tag(1, toTag):
|
||||
for value in state.Inventory.by_type_and_tag(1, toTag):
|
||||
# if valid, this function also puts it
|
||||
# in the pubkeys table.
|
||||
if protocol.decryptAndCheckPubkeyPayload(
|
||||
|
@ -1301,7 +1300,7 @@ class singleWorker(StoppableThread):
|
|||
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(encryptedPayload)
|
||||
objectType = 2
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, toStreamNumber, encryptedPayload, embeddedTime, '')
|
||||
if config.has_section(toaddress) or \
|
||||
not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK):
|
||||
|
@ -1457,7 +1456,7 @@ class singleWorker(StoppableThread):
|
|||
|
||||
inventoryHash = highlevelcrypto.calculateInventoryHash(payload)
|
||||
objectType = 1
|
||||
Inventory()[inventoryHash] = (
|
||||
state.Inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, embeddedTime, '')
|
||||
self.logger.info('sending inv (for the getpubkey message)')
|
||||
queues.invQueue.put((streamNumber, inventoryHash))
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
"""The Inventory singleton"""
|
||||
"""The Inventory"""
|
||||
|
||||
# TODO make this dynamic, and watch out for frozen, like with messagetypes
|
||||
import storage.filesystem
|
||||
import storage.sqlite
|
||||
from bmconfigparser import config
|
||||
from singleton import Singleton
|
||||
|
||||
|
||||
def create_inventory_instance(backend="sqlite"):
|
||||
|
@ -17,10 +16,9 @@ def create_inventory_instance(backend="sqlite"):
|
|||
"{}Inventory".format(backend.title()))()
|
||||
|
||||
|
||||
@Singleton
|
||||
class Inventory():
|
||||
class Inventory:
|
||||
"""
|
||||
Inventory singleton class which uses storage backends
|
||||
Inventory class which uses storage backends
|
||||
to manage the inventory.
|
||||
"""
|
||||
def __init__(self):
|
||||
|
|
|
@ -7,7 +7,6 @@ import time
|
|||
import protocol
|
||||
import state
|
||||
from highlevelcrypto import calculateInventoryHash
|
||||
from inventory import Inventory
|
||||
from network.dandelion import Dandelion
|
||||
|
||||
logger = logging.getLogger('default')
|
||||
|
@ -115,7 +114,7 @@ class BMObject(object): # pylint: disable=too-many-instance-attributes
|
|||
# if it's a stem duplicate, pretend we don't have it
|
||||
if Dandelion().hasHash(self.inventoryHash):
|
||||
return
|
||||
if self.inventoryHash in Inventory():
|
||||
if self.inventoryHash in state.Inventory:
|
||||
raise BMObjectAlreadyHaveError()
|
||||
|
||||
def checkObjectByType(self):
|
||||
|
|
|
@ -17,7 +17,6 @@ import knownnodes
|
|||
import protocol
|
||||
import state
|
||||
from bmconfigparser import config
|
||||
from inventory import Inventory
|
||||
from queues import invQueue, objectProcessorQueue, portCheckerQueue
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
from network.advanceddispatcher import AdvancedDispatcher
|
||||
|
@ -356,7 +355,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
return True
|
||||
|
||||
for i in map(str, items):
|
||||
if i in Inventory() and not Dandelion().hasHash(i):
|
||||
if i in state.Inventory and not Dandelion().hasHash(i):
|
||||
continue
|
||||
if dandelion and not Dandelion().hasHash(i):
|
||||
Dandelion().addHash(i, self)
|
||||
|
@ -421,12 +420,12 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
except KeyError:
|
||||
pass
|
||||
|
||||
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(
|
||||
if self.object.inventoryHash in state.Inventory and Dandelion().hasHash(
|
||||
self.object.inventoryHash):
|
||||
Dandelion().removeHash(
|
||||
self.object.inventoryHash, "cycle detection")
|
||||
|
||||
Inventory()[self.object.inventoryHash] = (
|
||||
state.Inventory[self.object.inventoryHash] = (
|
||||
self.object.objectType, self.object.streamNumber,
|
||||
buffer(self.payload[objectOffset:]), self.object.expiresTime, # noqa: F821
|
||||
buffer(self.object.tag) # noqa: F821
|
||||
|
|
|
@ -2,12 +2,11 @@
|
|||
`DownloadThread` class definition
|
||||
"""
|
||||
import time
|
||||
|
||||
import state
|
||||
import addresses
|
||||
import helper_random
|
||||
import protocol
|
||||
from dandelion import Dandelion
|
||||
from inventory import Inventory
|
||||
from network.connectionpool import BMConnectionPool
|
||||
from objectracker import missingObjects
|
||||
from threads import StoppableThread
|
||||
|
@ -61,7 +60,7 @@ class DownloadThread(StoppableThread):
|
|||
payload = bytearray()
|
||||
chunkCount = 0
|
||||
for chunk in request:
|
||||
if chunk in Inventory() and not Dandelion().hasHash(chunk):
|
||||
if chunk in state.Inventory and not Dandelion().hasHash(chunk):
|
||||
try:
|
||||
del i.objectsNewToMe[chunk]
|
||||
except KeyError:
|
||||
|
|
|
@ -17,7 +17,6 @@ import protocol
|
|||
import state
|
||||
from bmconfigparser import config
|
||||
from highlevelcrypto import randomBytes
|
||||
from inventory import Inventory
|
||||
from queues import invQueue, receiveDataQueue, UISignalQueue
|
||||
from tr import _translate
|
||||
|
||||
|
@ -230,7 +229,7 @@ class TCPConnection(BMProto, TLSDispatcher):
|
|||
# may lock for a long time, but I think it's better than
|
||||
# thousands of small locks
|
||||
with self.objectsNewToThemLock:
|
||||
for objHash in Inventory().unexpired_hashes_by_stream(stream):
|
||||
for objHash in state.Inventory.unexpired_hashes_by_stream(stream):
|
||||
# don't advertise stem objects on bigInv
|
||||
if Dandelion().hasHash(objHash):
|
||||
continue
|
||||
|
|
|
@ -5,7 +5,7 @@ import time
|
|||
|
||||
import helper_random
|
||||
import protocol
|
||||
from inventory import Inventory
|
||||
import state
|
||||
from network.connectionpool import BMConnectionPool
|
||||
from network.dandelion import Dandelion
|
||||
from randomtrackingdict import RandomTrackingDict
|
||||
|
@ -50,7 +50,7 @@ class UploadThread(StoppableThread):
|
|||
break
|
||||
try:
|
||||
payload.extend(protocol.CreatePacket(
|
||||
'object', Inventory()[chunk].payload))
|
||||
'object', state.Inventory[chunk].payload))
|
||||
chunk_count += 1
|
||||
except KeyError:
|
||||
i.antiIntersectionDelay()
|
||||
|
|
|
@ -9,7 +9,6 @@ from six.moves import queue
|
|||
import state
|
||||
from debug import logger
|
||||
from helper_sql import sqlQuery, sqlStoredProcedure
|
||||
from inventory import Inventory
|
||||
from network import StoppableThread
|
||||
from network.knownnodes import saveKnownNodes
|
||||
from queues import (
|
||||
|
@ -41,7 +40,7 @@ def doCleanShutdown():
|
|||
'updateStatusBar',
|
||||
'Flushing inventory in memory out to disk.'
|
||||
' This should normally only take a second...'))
|
||||
Inventory().flush()
|
||||
state.Inventory.flush()
|
||||
|
||||
# Verify that the objectProcessor has finished exiting. It should have
|
||||
# incremented the shutdown variable from 1 to 2. This must finish before
|
||||
|
|
30
src/state.py
30
src/state.py
|
@ -11,7 +11,7 @@ extPort = None
|
|||
socksIP = None
|
||||
"""for Tor hidden service"""
|
||||
|
||||
appdata = ''
|
||||
appdata = ""
|
||||
"""holds the location of the application data storage directory"""
|
||||
|
||||
shutdown = 0
|
||||
|
@ -59,7 +59,7 @@ numberOfMessagesProcessed = 0
|
|||
numberOfBroadcastsProcessed = 0
|
||||
numberOfPubkeysProcessed = 0
|
||||
|
||||
statusIconColor = 'red'
|
||||
statusIconColor = "red"
|
||||
"""
|
||||
GUI status icon color
|
||||
.. note:: bad style, refactor it
|
||||
|
@ -71,3 +71,29 @@ thisapp = None
|
|||
"""Singleton instance"""
|
||||
|
||||
backend_py3_compatible = False
|
||||
|
||||
|
||||
class Placeholder(object): # pylint:disable=too-few-public-methods
|
||||
"""Placeholder class"""
|
||||
|
||||
def __init__(self, className):
|
||||
self.className = className
|
||||
|
||||
def __getattr__(self, name):
|
||||
self._raise()
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._raise()
|
||||
|
||||
def __getitem__(self, key):
|
||||
self._raise()
|
||||
|
||||
def _raise(self):
|
||||
raise NotImplementedError(
|
||||
"Probabaly you forgot to initialize state variable for {}".format(
|
||||
self.className
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Inventory = Placeholder("Inventory")
|
||||
|
|
|
@ -76,6 +76,8 @@ class TestAPIThread(TestPartialRun):
|
|||
"""Call disseminatePreEncryptedMsg API command and check inventory"""
|
||||
import proofofwork
|
||||
from inventory import Inventory
|
||||
import state
|
||||
state.Inventory = Inventory()
|
||||
|
||||
proofofwork.init()
|
||||
self.assertEqual(
|
||||
|
@ -87,7 +89,7 @@ class TestAPIThread(TestPartialRun):
|
|||
invhash = unhexlify(self.api.disseminatePreEncryptedMsg(
|
||||
hexlify(update_object).decode()
|
||||
))
|
||||
obj_type, obj_stream, obj_data = Inventory()[invhash][:3]
|
||||
obj_type, obj_stream, obj_data = state.Inventory[invhash][:3]
|
||||
self.assertEqual(obj_type, 42)
|
||||
self.assertEqual(obj_stream, 2)
|
||||
self.assertEqual(sample_object_data[16:], obj_data[16:])
|
||||
|
|
Reference in New Issue
Block a user