Init singleWorker with queue and inventory

This commit is contained in:
Dmitri Bogomolov 2021-09-05 16:56:23 +03:00
parent a7d6037fc5
commit d1bf8dc711
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
3 changed files with 43 additions and 36 deletions

View File

@ -44,6 +44,7 @@ from network import (
InvThread, ReceiveQueueThread, DownloadThread, UploadThread InvThread, ReceiveQueueThread, DownloadThread, UploadThread
) )
from network.knownnodes import readKnownNodes from network.knownnodes import readKnownNodes
from queues import workerQueue
from singleinstance import singleinstance from singleinstance import singleinstance
# Synchronous threads # Synchronous threads
from threads import ( from threads import (
@ -186,10 +187,12 @@ class Main(object):
addressGeneratorThread.start() addressGeneratorThread.start()
# Start the thread that calculates POWs # Start the thread that calculates POWs
singleWorkerThread = singleWorker() singleWorkerThread = singleWorker(workerQueue, Inventory())
# close the main program even if there are threads left # close the main program even if there are threads left
singleWorkerThread.daemon = True singleWorkerThread.daemon = True
singleWorkerThread.start() singleWorkerThread.start()
else:
Inventory() # init
# Start the SQL thread # Start the SQL thread
sqlLookup = sqlThread() sqlLookup = sqlThread()
@ -198,7 +201,6 @@ class Main(object):
sqlLookup.daemon = False sqlLookup.daemon = False
sqlLookup.start() sqlLookup.start()
Inventory() # init
# init, needs to be early because other thread may access it early # init, needs to be early because other thread may access it early
Dandelion() Dandelion()

View File

@ -12,6 +12,8 @@ from binascii import hexlify, unhexlify
from struct import pack from struct import pack
from subprocess import call # nosec from subprocess import call # nosec
from six.moves import configparser, queue
import defaults import defaults
import helper_inbox import helper_inbox
import helper_msgcoding import helper_msgcoding
@ -30,9 +32,7 @@ from addresses import (
) )
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from helper_sql import sqlExecute, sqlQuery from helper_sql import sqlExecute, sqlQuery
from inventory import Inventory
from network import knownnodes, StoppableThread from network import knownnodes, StoppableThread
from six.moves import configparser, queue
def sizeof_fmt(num, suffix='h/s'): def sizeof_fmt(num, suffix='h/s'):
@ -48,15 +48,17 @@ def sizeof_fmt(num, suffix='h/s'):
class singleWorker(StoppableThread): class singleWorker(StoppableThread):
"""Thread for performing PoW""" """Thread for performing PoW"""
def __init__(self): def __init__(self, queue, inventory):
super(singleWorker, self).__init__(name="singleWorker") super(singleWorker, self).__init__(name="singleWorker")
self.inventory = inventory
self.queue = queue
proofofwork.init() proofofwork.init()
def stopThread(self): def stopThread(self):
"""Signal through the queue that the thread should be stopped""" """Signal through the queue that the thread should be stopped"""
try: try:
queues.workerQueue.put(("stopThread", "data")) self.queue.put(("stopThread", "data"))
except queue.Full: except queue.Full:
self.logger.error('workerQueue is Full') self.logger.error('workerQueue is Full')
super(singleWorker, self).stopThread() super(singleWorker, self).stopThread()
@ -119,7 +121,8 @@ class singleWorker(StoppableThread):
# For the case if user deleted knownnodes # For the case if user deleted knownnodes
# but is still having onionpeer objects in inventory # but is still having onionpeer objects in inventory
if not knownnodes.knownNodesActual: if not knownnodes.knownNodesActual:
for item in Inventory().by_type_and_tag(protocol.OBJECT_ONIONPEER): for item in self.inventory.by_type_and_tag(
protocol.OBJECT_ONIONPEER):
queues.objectProcessorQueue.put(( queues.objectProcessorQueue.put((
protocol.OBJECT_ONIONPEER, item.payload protocol.OBJECT_ONIONPEER, item.payload
)) ))
@ -134,17 +137,17 @@ class singleWorker(StoppableThread):
# just in case there are any pending tasks for msg # just in case there are any pending tasks for msg
# messages that have yet to be sent. # messages that have yet to be sent.
queues.workerQueue.put(('sendmessage', '')) self.queue.put(('sendmessage', ''))
# just in case there are any tasks for Broadcasts # just in case there are any tasks for Broadcasts
# that have yet to be sent. # that have yet to be sent.
queues.workerQueue.put(('sendbroadcast', '')) self.queue.put(('sendbroadcast', ''))
# send onionpeer object # send onionpeer object
queues.workerQueue.put(('sendOnionPeerObj', '')) self.queue.put(('sendOnionPeerObj', ''))
while state.shutdown == 0: while state.shutdown == 0:
self.busy = 0 self.busy = 0
command, data = queues.workerQueue.get() command, data = self.queue.get()
self.busy = 1 self.busy = 1
if command == 'sendmessage': if command == 'sendmessage':
try: try:
@ -191,7 +194,7 @@ class singleWorker(StoppableThread):
command command
) )
queues.workerQueue.task_done() self.queue.task_done()
self.logger.info("Quitting...") self.logger.info("Quitting...")
def _getKeysForAddress(self, address): def _getKeysForAddress(self, address):
@ -290,13 +293,12 @@ class singleWorker(StoppableThread):
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( self.inventory.put(
objectType, streamNumber, payload, embeddedTime, '') inventoryHash, objectType, streamNumber, payload, embeddedTime)
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
BMConfigParser().set( BMConfigParser().set(
@ -378,13 +380,11 @@ class singleWorker(StoppableThread):
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( self.inventory.put(objectType, streamNumber, payload, embeddedTime)
objectType, streamNumber, payload, embeddedTime, '')
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
BMConfigParser().set( BMConfigParser().set(
@ -471,15 +471,13 @@ class singleWorker(StoppableThread):
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( self.inventory.put(
objectType, streamNumber, payload, embeddedTime, objectType, streamNumber, payload, embeddedTime,
doubleHashOfAddressData[32:] doubleHashOfAddressData[32:])
)
self.logger.info( self.logger.info(
'broadcasting inv with hash: %s', hexlify(inventoryHash)) 'broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
try: try:
BMConfigParser().set( BMConfigParser().set(
@ -507,7 +505,7 @@ class singleWorker(StoppableThread):
objectPayload = encodeVarint(peer.port) + protocol.encodeHost(peer.host) objectPayload = encodeVarint(peer.port) + protocol.encodeHost(peer.host)
tag = calculateInventoryHash(objectPayload) tag = calculateInventoryHash(objectPayload)
if Inventory().by_type_and_tag(objectType, tag): if self.inventory.by_type_and_tag(objectType, tag):
return # not expired return # not expired
payload = pack('>Q', embeddedTime) payload = pack('>Q', embeddedTime)
@ -520,14 +518,14 @@ class singleWorker(StoppableThread):
payload, TTL, log_prefix='(For onionpeer object)') payload, TTL, log_prefix='(For onionpeer object)')
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
Inventory()[inventoryHash] = ( self.inventory.put(
objectType, streamNumber, buffer(payload), objectType, streamNumber, buffer(payload),
embeddedTime, buffer(tag) embeddedTime, buffer(tag))
)
self.logger.info( self.logger.info(
'sending inv (within sendOnionPeerObj function) for object: %s', 'sending inv (within sendOnionPeerObj function) for object: %s',
hexlify(inventoryHash)) hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', ''))
def sendBroadcast(self): def sendBroadcast(self):
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
@ -688,14 +686,14 @@ class singleWorker(StoppableThread):
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 3 objectType = 3
Inventory()[inventoryHash] = ( self.inventory.put(
objectType, streamNumber, payload, embeddedTime, tag) objectType, streamNumber, payload, embeddedTime, tag)
self.logger.info( self.logger.info(
'sending inv (within sendBroadcast function)' 'sending inv (within sendBroadcast function)'
' for object: %s', ' for object: %s',
hexlify(inventoryHash) hexlify(inventoryHash)
) )
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(( queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', ( 'updateSentItemStatusByAckdata', (
@ -847,7 +845,8 @@ class singleWorker(StoppableThread):
hexlify(privEncryptionKey)) hexlify(privEncryptionKey))
) )
for value in Inventory().by_type_and_tag(1, toTag): for value in self.inventory.by_type_and_tag(
1, toTag):
# if valid, this function also puts it # if valid, this function also puts it
# in the pubkeys table. # in the pubkeys table.
if protocol.decryptAndCheckPubkeyPayload( if protocol.decryptAndCheckPubkeyPayload(
@ -1303,8 +1302,9 @@ class singleWorker(StoppableThread):
inventoryHash = calculateInventoryHash(encryptedPayload) inventoryHash = calculateInventoryHash(encryptedPayload)
objectType = 2 objectType = 2
Inventory()[inventoryHash] = ( self.inventory.put(
objectType, toStreamNumber, encryptedPayload, embeddedTime, '') objectType, toStreamNumber, encryptedPayload, embeddedTime)
if BMConfigParser().has_section(toaddress) or \ if BMConfigParser().has_section(toaddress) or \
not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK): not protocol.checkBitfield(behaviorBitfield, protocol.BITFIELD_DOESACK):
queues.UISignalQueue.put(( queues.UISignalQueue.put((
@ -1329,7 +1329,6 @@ class singleWorker(StoppableThread):
'Broadcasting inv for my msg(within sendmsg function): %s', 'Broadcasting inv for my msg(within sendmsg function): %s',
hexlify(inventoryHash) hexlify(inventoryHash)
) )
queues.invQueue.put((toStreamNumber, inventoryHash))
# Update the sent message in the sent table with the # Update the sent message in the sent table with the
# necessary information. # necessary information.
@ -1461,10 +1460,10 @@ class singleWorker(StoppableThread):
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
Inventory()[inventoryHash] = ( self.inventory.put(
objectType, streamNumber, payload, embeddedTime, '') objectType, streamNumber, payload, embeddedTime)
self.logger.info('sending inv (for the getpubkey message)') self.logger.info('sending inv (for the getpubkey message)')
queues.invQueue.put((streamNumber, inventoryHash))
# wait 10% past expiration # wait 10% past expiration
sleeptill = int(time.time() + TTL * 1.1) sleeptill = int(time.time() + TTL * 1.1)

View File

@ -4,6 +4,8 @@
import storage.filesystem import storage.filesystem
import storage.sqlite import storage.sqlite
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
# TODO: init with queue
from queues import invQueue
from singleton import Singleton from singleton import Singleton
@ -39,3 +41,7 @@ class Inventory():
# hint for pylint: this is dictionary like object # hint for pylint: this is dictionary like object
def __getitem__(self, key): def __getitem__(self, key):
return self._realInventory[key] return self._realInventory[key]
def put(self, invhash, obj_type, stream, payload, embedded_time, tag=''):
self[invhash] = (obj_type, stream, payload, embedded_time, tag)
invQueue.put((stream, invhash))