From 341651973a1ebbffed9e6ae2c133c25675848428 Mon Sep 17 00:00:00 2001 From: Dmitri Bogomolov <4glitch@gmail.com> Date: Sun, 27 Oct 2019 15:15:45 +0200 Subject: [PATCH] Reduced imports: - exported from network package all objects used outside; - made all threads available in threads module. Wrote some module docstrings. --- src/bitmessagemain.py | 34 ++++++++--------------- src/bitmessageqt/networkstatus.py | 2 +- src/class_addressGenerator.py | 6 +++- src/class_objectProcessor.py | 9 +++--- src/class_singleCleaner.py | 3 +- src/class_singleWorker.py | 2 +- src/class_sqlThread.py | 6 ++++ src/helper_sql.py | 30 +++++++++++++++++--- src/helper_threading.py | 21 -------------- src/network/__init__.py | 17 ++++++++++++ src/network/receivequeuethread.py | 6 ++-- src/shutdown.py | 2 +- src/threads.py | 46 +++++++++++++++++++++++++++++++ src/upnp.py | 3 +- 14 files changed, 123 insertions(+), 64 deletions(-) delete mode 100644 src/helper_threading.py create mode 100644 src/threads.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 4ad9311f..81702783 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -41,30 +41,18 @@ import shared import knownnodes import state import shutdown -from debug import logger # this should go before any threads - -# Classes -from class_sqlThread import sqlThread -from class_singleCleaner import singleCleaner -from class_objectProcessor import objectProcessor -from class_singleWorker import singleWorker -from class_addressGenerator import addressGenerator from bmconfigparser import BMConfigParser - +from debug import logger # this should go before any threads from inventory import Inventory - -from network.connectionpool import BMConnectionPool -from network.dandelion import Dandelion -from network.networkthread import BMNetworkThread -from network.receivequeuethread import ReceiveQueueThread -from network.announcethread import AnnounceThread -from network.invthread import InvThread -from network.addrthread import AddrThread -from network.downloadthread import DownloadThread -from network.uploadthread import UploadThread - -# Helper Functions -import helper_threading +# Network objects and threads +from network import ( + BMConnectionPool, Dandelion, + AddrThread, AnnounceThread, BMNetworkThread, InvThread, ReceiveQueueThread, + DownloadThread, UploadThread) +# Synchronous threads +from threads import ( + set_thread_name, + addressGenerator, objectProcessor, singleCleaner, singleWorker, sqlThread) def connectToStream(streamNumber): @@ -275,7 +263,7 @@ class Main: self.setSignalHandler() - helper_threading.set_thread_name("PyBitmessage") + set_thread_name("PyBitmessage") state.dandelion = config.safeGetInt('network', 'dandelion') # dandelion requires outbound connections, without them, diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 5f014563..6fbf5df6 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -14,7 +14,7 @@ import network.stats import shared import widgets from inventory import Inventory -from network.connectionpool import BMConnectionPool +from network import BMConnectionPool from retranslateui import RetranslateMixin from tr import _translate from uisignaler import UISignaler diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index fa268377..c7c7e261 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -14,7 +14,7 @@ import highlevelcrypto from bmconfigparser import BMConfigParser from addresses import decodeAddress, encodeAddress, encodeVarint from fallback import RIPEMD160Hash -from network.threads import StoppableThread +from network import StoppableThread class addressGenerator(StoppableThread): @@ -29,6 +29,10 @@ class addressGenerator(StoppableThread): super(addressGenerator, self).stopThread() def run(self): + """ + Process the requests for addresses generation + from `.queues.addressGeneratorQueue` + """ while state.shutdown == 0: queueValue = queues.addressGeneratorQueue.get() nonceTrialsPerByte = 0 diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 6ae46658..e2b95447 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -57,6 +57,7 @@ class objectProcessor(threading.Thread): self.successfullyDecryptMessageTimings = [] def run(self): + """Process the objects from `.queues.objectProcessorQueue`""" while True: objectType, data = queues.objectProcessorQueue.get() @@ -1051,7 +1052,8 @@ class objectProcessor(threading.Thread): # for it. elif addressVersion >= 4: tag = hashlib.sha512(hashlib.sha512( - encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe + encodeVarint(addressVersion) + encodeVarint(streamNumber) + + ripe ).digest()).digest()[32:] if tag in state.neededPubkeys: del state.neededPubkeys[tag] @@ -1059,9 +1061,8 @@ class objectProcessor(threading.Thread): def sendMessages(self, address): """ - This function is called by the possibleNewPubkey function when - that function sees that we now have the necessary pubkey - to send one or more messages. + This method is called by the `possibleNewPubkey` when it sees + that we now have the necessary pubkey to send one or more messages. """ logger.info('We have been awaiting the arrival of this pubkey.') sqlExecute( diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index fc53a5b0..4717c3cb 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -31,8 +31,7 @@ import tr from bmconfigparser import BMConfigParser from helper_sql import sqlQuery, sqlExecute from inventory import Inventory -from network.connectionpool import BMConnectionPool -from network.threads import StoppableThread +from network import BMConnectionPool, StoppableThread class singleCleaner(StoppableThread): diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 77fa18c0..60eabe2e 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -28,7 +28,7 @@ from addresses import calculateInventoryHash, decodeAddress, decodeVarint, encod from bmconfigparser import BMConfigParser from helper_sql import sqlExecute, sqlQuery from inventory import Inventory -from network.threads import StoppableThread +from network import StoppableThread def sizeof_fmt(num, suffix='h/s'): diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index a45571e0..bcb56303 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -1,3 +1,7 @@ +""" +sqlThread is defined here +""" + import threading from bmconfigparser import BMConfigParser import sqlite3 @@ -19,11 +23,13 @@ import tr class sqlThread(threading.Thread): + """A thread for all SQL operations""" def __init__(self): threading.Thread.__init__(self, name="SQL") def run(self): + """Process SQL queries from `.helper_sql.sqlSubmitQueue`""" self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn.text_factory = str self.cur = self.conn.cursor() diff --git a/src/helper_sql.py b/src/helper_sql.py index 2b558f62..138a9f50 100644 --- a/src/helper_sql.py +++ b/src/helper_sql.py @@ -1,17 +1,39 @@ -"""Helper Sql performs sql operations.""" +""" +SQL-related functions defined here are really pass the queries (or other SQL +commands) to :class:`.threads.sqlThread` through `sqlSubmitQueue` queue and check +or return the result got from `sqlReturnQueue`. + +This is done that way because :mod:`sqlite3` is so thread-unsafe that they +won't even let you call it from different threads using your own locks. +SQLite objects can only be used from one thread. + +.. note:: This actually only applies for certain deployments, and/or + really old version of sqlite. I haven't actually seen it anywhere. + Current versions do have support for threading and multiprocessing. + I don't see an urgent reason to refactor this, but it should be noted + in the comment that the problem is mostly not valid. Sadly, last time + I checked, there is no reliable way to check whether the library is + or isn't thread-safe. +""" import threading import Queue sqlSubmitQueue = Queue.Queue() -# SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. -# SQL objects #can only be called from one thread. +"""the queue for SQL""" sqlReturnQueue = Queue.Queue() +"""the queue for results""" sqlLock = threading.Lock() def sqlQuery(sqlStatement, *args): - """SQLLITE execute statement and return query.""" + """ + Query sqlite and return results + + :param str sqlStatement: SQL statement string + :param list args: SQL query parameters + :rtype: list + """ sqlLock.acquire() sqlSubmitQueue.put(sqlStatement) diff --git a/src/helper_threading.py b/src/helper_threading.py deleted file mode 100644 index 56dd7063..00000000 --- a/src/helper_threading.py +++ /dev/null @@ -1,21 +0,0 @@ -"""set_thread_name for threads that don't use StoppableThread""" - -import threading - -try: - import prctl -except ImportError: - def set_thread_name(name): - """Set the thread name for external use (visible from the OS).""" - threading.current_thread().name = name -else: - def set_thread_name(name): - """Set a name for the thread for python internal use.""" - prctl.set_name(name) - - def _thread_name_hack(self): - set_thread_name(self.name) - threading.Thread.__bootstrap_original__(self) - # pylint: disable=protected-access - threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap - threading.Thread._Thread__bootstrap = _thread_name_hack diff --git a/src/network/__init__.py b/src/network/__init__.py index e69de29b..51c4c4da 100644 --- a/src/network/__init__.py +++ b/src/network/__init__.py @@ -0,0 +1,17 @@ +from addrthread import AddrThread +from announcethread import AnnounceThread +from connectionpool import BMConnectionPool +from dandelion import Dandelion +from downloadthread import DownloadThread +from invthread import InvThread +from networkthread import BMNetworkThread +from receivequeuethread import ReceiveQueueThread +from threads import StoppableThread +from uploadthread import UploadThread + + +__all__ = [ + "BMConnectionPool", "Dandelion", + "AddrThread", "AnnounceThread", "BMNetworkThread", "DownloadThread", + "InvThread", "ReceiveQueueThread", "UploadThread", "StoppableThread" +] diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 13c12ce2..cd904065 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -32,14 +32,12 @@ class ReceiveQueueThread(StoppableThread): try: connection = BMConnectionPool().getConnectionByAddr(dest) - # KeyError = connection object not found - except KeyError: + except KeyError: # connection object not found receiveDataQueue.task_done() continue try: connection.process() - # UnknownStateError = state isn't implemented - except UnknownStateError: + except UnknownStateError: # state isn't implemented pass except socket.error as err: if err.errno == errno.EBADF: diff --git a/src/shutdown.py b/src/shutdown.py index 1d40a90f..c81a519a 100644 --- a/src/shutdown.py +++ b/src/shutdown.py @@ -10,7 +10,7 @@ from debug import logger from helper_sql import sqlQuery, sqlStoredProcedure from inventory import Inventory from knownnodes import saveKnownNodes -from network.threads import StoppableThread +from network import StoppableThread from queues import ( addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue) diff --git a/src/threads.py b/src/threads.py new file mode 100644 index 00000000..08d61196 --- /dev/null +++ b/src/threads.py @@ -0,0 +1,46 @@ +""" +PyBitmessage does various tasks in separate threads. Most of them inherit +from `.network.StoppableThread`. There are `addressGenerator` for +addresses generation, `objectProcessor` for processing the network objects +passed minimal validation, `singleCleaner` to periodically clean various +internal storages (like inventory and knownnodes) and do forced garbage +collection, `singleWorker` for doing PoW, `sqlThread` for querying sqlite +database. + +There are also other threads in the `.network` package. + +:func:`set_thread_name` is defined here for the threads that don't inherit from +:class:`.network.StoppableThread` +""" + +import threading + +try: + import prctl +except ImportError: + def set_thread_name(name): + """Set a name for the thread for python internal use.""" + threading.current_thread().name = name +else: + def set_thread_name(name): + """Set the thread name for external use (visible from the OS).""" + prctl.set_name(name) + + def _thread_name_hack(self): + set_thread_name(self.name) + threading.Thread.__bootstrap_original__(self) + # pylint: disable=protected-access + threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap + threading.Thread._Thread__bootstrap = _thread_name_hack + +from class_addressGenerator import addressGenerator +from class_objectProcessor import objectProcessor +from class_singleCleaner import singleCleaner +from class_singleWorker import singleWorker +from class_sqlThread import sqlThread + + +__all__ = [ + "addressGenerator", "objectProcessor", "singleCleaner", "singleWorker", + "sqlThread" +] diff --git a/src/upnp.py b/src/upnp.py index b1ee2e7b..979b4186 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -21,8 +21,7 @@ import state import tr from bmconfigparser import BMConfigParser from debug import logger -from network.connectionpool import BMConnectionPool -from network.threads import StoppableThread +from network import BMConnectionPool, StoppableThread def createRequestXML(service, action, arguments=None):