diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index dc7426ac..0922d805 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -44,6 +44,7 @@ from network import ( InvThread, ReceiveQueueThread, DownloadThread, UploadThread ) from network.knownnodes import readKnownNodes +from queues import workerQueue from singleinstance import singleinstance # Synchronous threads from threads import ( @@ -186,10 +187,12 @@ class Main(object): addressGeneratorThread.start() # Start the thread that calculates POWs - singleWorkerThread = singleWorker() + singleWorkerThread = singleWorker(workerQueue, Inventory()) # close the main program even if there are threads left singleWorkerThread.daemon = True singleWorkerThread.start() + else: + Inventory() # init # Start the SQL thread sqlLookup = sqlThread() @@ -198,7 +201,6 @@ class Main(object): sqlLookup.daemon = False sqlLookup.start() - Inventory() # init # init, needs to be early because other thread may access it early Dandelion() diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index f302300d..f21802f3 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -12,6 +12,8 @@ from binascii import hexlify, unhexlify from struct import pack from subprocess import call # nosec +from six.moves import configparser, queue + import defaults import helper_inbox import helper_msgcoding @@ -30,9 +32,7 @@ from addresses import ( ) from bmconfigparser import BMConfigParser from helper_sql import sqlExecute, sqlQuery -from inventory import Inventory from network import knownnodes, StoppableThread -from six.moves import configparser, queue def sizeof_fmt(num, suffix='h/s'): @@ -48,15 +48,17 @@ def sizeof_fmt(num, suffix='h/s'): class singleWorker(StoppableThread): """Thread for performing PoW""" - def __init__(self): + def __init__(self, queue, inventory): super(singleWorker, self).__init__(name="singleWorker") + self.inventory = inventory + self.queue = queue proofofwork.init() def stopThread(self): """Signal through the queue that the thread should be stopped""" try: - queues.workerQueue.put(("stopThread", "data")) + self.queue.put(("stopThread", "data")) except queue.Full: self.logger.error('workerQueue is Full') super(singleWorker, self).stopThread() @@ -119,7 +121,8 @@ class singleWorker(StoppableThread): # For the case if user deleted knownnodes # but is still having onionpeer objects in inventory if not knownnodes.knownNodesActual: - for item in Inventory().by_type_and_tag(protocol.OBJECT_ONIONPEER): + for item in self.inventory.by_type_and_tag( + protocol.OBJECT_ONIONPEER): queues.objectProcessorQueue.put(( protocol.OBJECT_ONIONPEER, item.payload )) @@ -134,17 +137,17 @@ class singleWorker(StoppableThread): # just in case there are any pending tasks for msg # messages that have yet to be sent. - queues.workerQueue.put(('sendmessage', '')) + self.queue.put(('sendmessage', '')) # just in case there are any tasks for Broadcasts # that have yet to be sent. - queues.workerQueue.put(('sendbroadcast', '')) + self.queue.put(('sendbroadcast', '')) # send onionpeer object - queues.workerQueue.put(('sendOnionPeerObj', '')) + self.queue.put(('sendOnionPeerObj', '')) while state.shutdown == 0: self.busy = 0 - command, data = queues.workerQueue.get() + command, data = self.queue.get() self.busy = 1 if command == 'sendmessage': try: @@ -191,7 +194,7 @@ class singleWorker(StoppableThread): command ) - queues.workerQueue.task_done() + self.queue.task_done() self.logger.info("Quitting...") def _getKeysForAddress(self, address): @@ -290,13 +293,12 @@ class singleWorker(StoppableThread): inventoryHash = calculateInventoryHash(payload) objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, '') + self.inventory.put( + inventoryHash, objectType, streamNumber, payload, embeddedTime) self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -378,13 +380,11 @@ class singleWorker(StoppableThread): inventoryHash = calculateInventoryHash(payload) objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, '') + self.inventory.put(objectType, streamNumber, payload, embeddedTime) self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -471,15 +471,13 @@ class singleWorker(StoppableThread): inventoryHash = calculateInventoryHash(payload) objectType = 1 - Inventory()[inventoryHash] = ( + self.inventory.put( objectType, streamNumber, payload, embeddedTime, - doubleHashOfAddressData[32:] - ) + doubleHashOfAddressData[32:]) self.logger.info( 'broadcasting inv with hash: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) try: BMConfigParser().set( @@ -507,7 +505,7 @@ class singleWorker(StoppableThread): objectPayload = encodeVarint(peer.port) + protocol.encodeHost(peer.host) tag = calculateInventoryHash(objectPayload) - if Inventory().by_type_and_tag(objectType, tag): + if self.inventory.by_type_and_tag(objectType, tag): return # not expired payload = pack('>Q', embeddedTime) @@ -520,14 +518,14 @@ class singleWorker(StoppableThread): payload, TTL, log_prefix='(For onionpeer object)') inventoryHash = calculateInventoryHash(payload) - Inventory()[inventoryHash] = ( + self.inventory.put( objectType, streamNumber, buffer(payload), - embeddedTime, buffer(tag) - ) + embeddedTime, buffer(tag)) + self.logger.info( 'sending inv (within sendOnionPeerObj function) for object: %s', hexlify(inventoryHash)) - queues.invQueue.put((streamNumber, inventoryHash)) + queues.UISignalQueue.put(('updateStatusBar', '')) def sendBroadcast(self): """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" @@ -688,14 +686,14 @@ class singleWorker(StoppableThread): inventoryHash = calculateInventoryHash(payload) objectType = 3 - Inventory()[inventoryHash] = ( + self.inventory.put( objectType, streamNumber, payload, embeddedTime, tag) + self.logger.info( 'sending inv (within sendBroadcast function)' ' for object: %s', hexlify(inventoryHash) ) - queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(( 'updateSentItemStatusByAckdata', ( @@ -847,7 +845,8 @@ class singleWorker(StoppableThread): hexlify(privEncryptionKey)) ) - for value in Inventory().by_type_and_tag(1, toTag): + for value in self.inventory.by_type_and_tag( + 1, toTag): # if valid, this function also puts it # in the pubkeys table. if protocol.decryptAndCheckPubkeyPayload( @@ -1303,8 +1302,9 @@ class singleWorker(StoppableThread): inventoryHash = calculateInventoryHash(encryptedPayload) objectType = 2 - Inventory()[inventoryHash] = ( - objectType, toStreamNumber, encryptedPayload, embeddedTime, '') + self.inventory.put( + objectType, toStreamNumber, encryptedPayload, embeddedTime) + if BMConfigParser().has_section(toaddress) or \ not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): queues.UISignalQueue.put(( @@ -1329,7 +1329,6 @@ class singleWorker(StoppableThread): 'Broadcasting inv for my msg(within sendmsg function): %s', hexlify(inventoryHash) ) - queues.invQueue.put((toStreamNumber, inventoryHash)) # Update the sent message in the sent table with the # necessary information. @@ -1461,10 +1460,10 @@ class singleWorker(StoppableThread): inventoryHash = calculateInventoryHash(payload) objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, '') + self.inventory.put( + objectType, streamNumber, payload, embeddedTime) + self.logger.info('sending inv (for the getpubkey message)') - queues.invQueue.put((streamNumber, inventoryHash)) # wait 10% past expiration sleeptill = int(time.time() + TTL * 1.1) diff --git a/src/inventory.py b/src/inventory.py index fc06e455..17ae45b1 100644 --- a/src/inventory.py +++ b/src/inventory.py @@ -4,6 +4,8 @@ import storage.filesystem import storage.sqlite from bmconfigparser import BMConfigParser +# TODO: init with queue +from queues import invQueue from singleton import Singleton @@ -39,3 +41,7 @@ class Inventory(): # hint for pylint: this is dictionary like object def __getitem__(self, key): return self._realInventory[key] + + def put(self, invhash, obj_type, stream, payload, embedded_time, tag=''): + self[invhash] = (obj_type, stream, payload, embedded_time, tag) + invQueue.put((stream, invhash))