Reduced imports:

- exported from network package all objects used outside;
  - made all threads available in threads module.

Wrote some module docstrings.
This commit is contained in:
Dmitri Bogomolov 2019-10-27 15:15:45 +02:00
parent 061a9ef973
commit 341651973a
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
14 changed files with 123 additions and 64 deletions

View File

@ -41,30 +41,18 @@ import shared
import knownnodes import knownnodes
import state import state
import shutdown import shutdown
from debug import logger # this should go before any threads
# Classes
from class_sqlThread import sqlThread
from class_singleCleaner import singleCleaner
from class_objectProcessor import objectProcessor
from class_singleWorker import singleWorker
from class_addressGenerator import addressGenerator
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from debug import logger # this should go before any threads
from inventory import Inventory from inventory import Inventory
# Network objects and threads
from network.connectionpool import BMConnectionPool from network import (
from network.dandelion import Dandelion BMConnectionPool, Dandelion,
from network.networkthread import BMNetworkThread AddrThread, AnnounceThread, BMNetworkThread, InvThread, ReceiveQueueThread,
from network.receivequeuethread import ReceiveQueueThread DownloadThread, UploadThread)
from network.announcethread import AnnounceThread # Synchronous threads
from network.invthread import InvThread from threads import (
from network.addrthread import AddrThread set_thread_name,
from network.downloadthread import DownloadThread addressGenerator, objectProcessor, singleCleaner, singleWorker, sqlThread)
from network.uploadthread import UploadThread
# Helper Functions
import helper_threading
def connectToStream(streamNumber): def connectToStream(streamNumber):
@ -275,7 +263,7 @@ class Main:
self.setSignalHandler() self.setSignalHandler()
helper_threading.set_thread_name("PyBitmessage") set_thread_name("PyBitmessage")
state.dandelion = config.safeGetInt('network', 'dandelion') state.dandelion = config.safeGetInt('network', 'dandelion')
# dandelion requires outbound connections, without them, # dandelion requires outbound connections, without them,

View File

@ -14,7 +14,7 @@ import network.stats
import shared import shared
import widgets import widgets
from inventory import Inventory from inventory import Inventory
from network.connectionpool import BMConnectionPool from network import BMConnectionPool
from retranslateui import RetranslateMixin from retranslateui import RetranslateMixin
from tr import _translate from tr import _translate
from uisignaler import UISignaler from uisignaler import UISignaler

View File

@ -14,7 +14,7 @@ import highlevelcrypto
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from addresses import decodeAddress, encodeAddress, encodeVarint from addresses import decodeAddress, encodeAddress, encodeVarint
from fallback import RIPEMD160Hash from fallback import RIPEMD160Hash
from network.threads import StoppableThread from network import StoppableThread
class addressGenerator(StoppableThread): class addressGenerator(StoppableThread):
@ -29,6 +29,10 @@ class addressGenerator(StoppableThread):
super(addressGenerator, self).stopThread() super(addressGenerator, self).stopThread()
def run(self): def run(self):
"""
Process the requests for addresses generation
from `.queues.addressGeneratorQueue`
"""
while state.shutdown == 0: while state.shutdown == 0:
queueValue = queues.addressGeneratorQueue.get() queueValue = queues.addressGeneratorQueue.get()
nonceTrialsPerByte = 0 nonceTrialsPerByte = 0

View File

@ -57,6 +57,7 @@ class objectProcessor(threading.Thread):
self.successfullyDecryptMessageTimings = [] self.successfullyDecryptMessageTimings = []
def run(self): def run(self):
"""Process the objects from `.queues.objectProcessorQueue`"""
while True: while True:
objectType, data = queues.objectProcessorQueue.get() objectType, data = queues.objectProcessorQueue.get()
@ -1051,7 +1052,8 @@ class objectProcessor(threading.Thread):
# for it. # for it.
elif addressVersion >= 4: elif addressVersion >= 4:
tag = hashlib.sha512(hashlib.sha512( tag = hashlib.sha512(hashlib.sha512(
encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe encodeVarint(addressVersion) + encodeVarint(streamNumber)
+ ripe
).digest()).digest()[32:] ).digest()).digest()[32:]
if tag in state.neededPubkeys: if tag in state.neededPubkeys:
del state.neededPubkeys[tag] del state.neededPubkeys[tag]
@ -1059,9 +1061,8 @@ class objectProcessor(threading.Thread):
def sendMessages(self, address): def sendMessages(self, address):
""" """
This function is called by the possibleNewPubkey function when This method is called by the `possibleNewPubkey` when it sees
that function sees that we now have the necessary pubkey that we now have the necessary pubkey to send one or more messages.
to send one or more messages.
""" """
logger.info('We have been awaiting the arrival of this pubkey.') logger.info('We have been awaiting the arrival of this pubkey.')
sqlExecute( sqlExecute(

View File

@ -31,8 +31,7 @@ import tr
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from helper_sql import sqlQuery, sqlExecute from helper_sql import sqlQuery, sqlExecute
from inventory import Inventory from inventory import Inventory
from network.connectionpool import BMConnectionPool from network import BMConnectionPool, StoppableThread
from network.threads import StoppableThread
class singleCleaner(StoppableThread): class singleCleaner(StoppableThread):

View File

@ -28,7 +28,7 @@ from addresses import calculateInventoryHash, decodeAddress, decodeVarint, encod
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from helper_sql import sqlExecute, sqlQuery from helper_sql import sqlExecute, sqlQuery
from inventory import Inventory from inventory import Inventory
from network.threads import StoppableThread from network import StoppableThread
def sizeof_fmt(num, suffix='h/s'): def sizeof_fmt(num, suffix='h/s'):

View File

@ -1,3 +1,7 @@
"""
sqlThread is defined here
"""
import threading import threading
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
import sqlite3 import sqlite3
@ -19,11 +23,13 @@ import tr
class sqlThread(threading.Thread): class sqlThread(threading.Thread):
"""A thread for all SQL operations"""
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="SQL") threading.Thread.__init__(self, name="SQL")
def run(self): def run(self):
"""Process SQL queries from `.helper_sql.sqlSubmitQueue`"""
self.conn = sqlite3.connect(state.appdata + 'messages.dat') self.conn = sqlite3.connect(state.appdata + 'messages.dat')
self.conn.text_factory = str self.conn.text_factory = str
self.cur = self.conn.cursor() self.cur = self.conn.cursor()

View File

@ -1,17 +1,39 @@
"""Helper Sql performs sql operations.""" """
SQL-related functions defined here are really pass the queries (or other SQL
commands) to :class:`.threads.sqlThread` through `sqlSubmitQueue` queue and check
or return the result got from `sqlReturnQueue`.
This is done that way because :mod:`sqlite3` is so thread-unsafe that they
won't even let you call it from different threads using your own locks.
SQLite objects can only be used from one thread.
.. note:: This actually only applies for certain deployments, and/or
really old version of sqlite. I haven't actually seen it anywhere.
Current versions do have support for threading and multiprocessing.
I don't see an urgent reason to refactor this, but it should be noted
in the comment that the problem is mostly not valid. Sadly, last time
I checked, there is no reliable way to check whether the library is
or isn't thread-safe.
"""
import threading import threading
import Queue import Queue
sqlSubmitQueue = Queue.Queue() sqlSubmitQueue = Queue.Queue()
# SQLITE3 is so thread-unsafe that they won't even let you call it from different threads using your own locks. """the queue for SQL"""
# SQL objects #can only be called from one thread.
sqlReturnQueue = Queue.Queue() sqlReturnQueue = Queue.Queue()
"""the queue for results"""
sqlLock = threading.Lock() sqlLock = threading.Lock()
def sqlQuery(sqlStatement, *args): def sqlQuery(sqlStatement, *args):
"""SQLLITE execute statement and return query.""" """
Query sqlite and return results
:param str sqlStatement: SQL statement string
:param list args: SQL query parameters
:rtype: list
"""
sqlLock.acquire() sqlLock.acquire()
sqlSubmitQueue.put(sqlStatement) sqlSubmitQueue.put(sqlStatement)

View File

@ -1,21 +0,0 @@
"""set_thread_name for threads that don't use StoppableThread"""
import threading
try:
import prctl
except ImportError:
def set_thread_name(name):
"""Set the thread name for external use (visible from the OS)."""
threading.current_thread().name = name
else:
def set_thread_name(name):
"""Set a name for the thread for python internal use."""
prctl.set_name(name)
def _thread_name_hack(self):
set_thread_name(self.name)
threading.Thread.__bootstrap_original__(self)
# pylint: disable=protected-access
threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap
threading.Thread._Thread__bootstrap = _thread_name_hack

View File

@ -0,0 +1,17 @@
from addrthread import AddrThread
from announcethread import AnnounceThread
from connectionpool import BMConnectionPool
from dandelion import Dandelion
from downloadthread import DownloadThread
from invthread import InvThread
from networkthread import BMNetworkThread
from receivequeuethread import ReceiveQueueThread
from threads import StoppableThread
from uploadthread import UploadThread
__all__ = [
"BMConnectionPool", "Dandelion",
"AddrThread", "AnnounceThread", "BMNetworkThread", "DownloadThread",
"InvThread", "ReceiveQueueThread", "UploadThread", "StoppableThread"
]

View File

@ -32,14 +32,12 @@ class ReceiveQueueThread(StoppableThread):
try: try:
connection = BMConnectionPool().getConnectionByAddr(dest) connection = BMConnectionPool().getConnectionByAddr(dest)
# KeyError = connection object not found except KeyError: # connection object not found
except KeyError:
receiveDataQueue.task_done() receiveDataQueue.task_done()
continue continue
try: try:
connection.process() connection.process()
# UnknownStateError = state isn't implemented except UnknownStateError: # state isn't implemented
except UnknownStateError:
pass pass
except socket.error as err: except socket.error as err:
if err.errno == errno.EBADF: if err.errno == errno.EBADF:

View File

@ -10,7 +10,7 @@ from debug import logger
from helper_sql import sqlQuery, sqlStoredProcedure from helper_sql import sqlQuery, sqlStoredProcedure
from inventory import Inventory from inventory import Inventory
from knownnodes import saveKnownNodes from knownnodes import saveKnownNodes
from network.threads import StoppableThread from network import StoppableThread
from queues import ( from queues import (
addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue) addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue)

46
src/threads.py Normal file
View File

@ -0,0 +1,46 @@
"""
PyBitmessage does various tasks in separate threads. Most of them inherit
from `.network.StoppableThread`. There are `addressGenerator` for
addresses generation, `objectProcessor` for processing the network objects
passed minimal validation, `singleCleaner` to periodically clean various
internal storages (like inventory and knownnodes) and do forced garbage
collection, `singleWorker` for doing PoW, `sqlThread` for querying sqlite
database.
There are also other threads in the `.network` package.
:func:`set_thread_name` is defined here for the threads that don't inherit from
:class:`.network.StoppableThread`
"""
import threading
try:
import prctl
except ImportError:
def set_thread_name(name):
"""Set a name for the thread for python internal use."""
threading.current_thread().name = name
else:
def set_thread_name(name):
"""Set the thread name for external use (visible from the OS)."""
prctl.set_name(name)
def _thread_name_hack(self):
set_thread_name(self.name)
threading.Thread.__bootstrap_original__(self)
# pylint: disable=protected-access
threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap
threading.Thread._Thread__bootstrap = _thread_name_hack
from class_addressGenerator import addressGenerator
from class_objectProcessor import objectProcessor
from class_singleCleaner import singleCleaner
from class_singleWorker import singleWorker
from class_sqlThread import sqlThread
__all__ = [
"addressGenerator", "objectProcessor", "singleCleaner", "singleWorker",
"sqlThread"
]

View File

@ -21,8 +21,7 @@ import state
import tr import tr
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from debug import logger from debug import logger
from network.connectionpool import BMConnectionPool from network import BMConnectionPool, StoppableThread
from network.threads import StoppableThread
def createRequestXML(service, action, arguments=None): def createRequestXML(service, action, arguments=None):