Move start of network threads to the network package,

reduce exported symbols and imports. Remove unused thread variables from state.
This commit is contained in:
Dmitri Bogomolov 2020-11-14 17:00:00 +02:00 committed by Lee Miller
parent 0632441143
commit a3d0e24623
Signed by untrusted user: lee.miller
GPG Key ID: 4F97A5EA88F4AB63
3 changed files with 68 additions and 75 deletions

View File

@ -30,6 +30,8 @@ import time
import traceback import traceback
import defaults import defaults
# Network subsystem
import network
import shared import shared
import shutdown import shutdown
import state import state
@ -40,12 +42,6 @@ from debug import logger # this should go before any threads
from helper_startup import ( from helper_startup import (
adjustHalfOpenConnectionsLimit, fixSocket, start_proxyconfig) adjustHalfOpenConnectionsLimit, fixSocket, start_proxyconfig)
from inventory import Inventory from inventory import Inventory
# Network objects and threads
from network import (
BMConnectionPool, Dandelion, AddrThread, AnnounceThread, BMNetworkThread,
InvThread, ReceiveQueueThread, DownloadThread, UploadThread
)
from network.knownnodes import readKnownNodes
from singleinstance import singleinstance from singleinstance import singleinstance
# Synchronous threads # Synchronous threads
from threads import ( from threads import (
@ -175,10 +171,18 @@ class Main(object):
defaults.networkDefaultPayloadLengthExtraBytes = int( defaults.networkDefaultPayloadLengthExtraBytes = int(
defaults.networkDefaultPayloadLengthExtraBytes / 100) defaults.networkDefaultPayloadLengthExtraBytes / 100)
readKnownNodes() # Start the SQL thread
sqlLookup = sqlThread()
# DON'T close the main program even if there are threads left.
# The closeEvent should command this thread to exit gracefully.
sqlLookup.daemon = False
sqlLookup.start()
# Not needed if objproc is disabled Inventory() # init
if state.enableObjProc:
if state.enableObjProc: # Not needed if objproc is disabled
shared.reloadMyAddressHashes()
shared.reloadBroadcastSendersForWhichImWatching()
# Start the address generation thread # Start the address generation thread
addressGeneratorThread = addressGenerator() addressGeneratorThread = addressGenerator()
@ -192,19 +196,13 @@ class Main(object):
singleWorkerThread.daemon = True singleWorkerThread.daemon = True
singleWorkerThread.start() singleWorkerThread.start()
# Start the SQL thread # Start the object processing thread
sqlLookup = sqlThread() objectProcessorThread = objectProcessor()
# DON'T close the main program even if there are threads left. # DON'T close the main program even if the thread remains.
# The closeEvent should command this thread to exit gracefully. # This thread checks the shutdown variable after processing
sqlLookup.daemon = False # each object.
sqlLookup.start() objectProcessorThread.daemon = False
objectProcessorThread.start()
Inventory() # init
# init, needs to be early because other thread may access it early
Dandelion()
# Enable object processor and SMTP only if objproc enabled
if state.enableObjProc:
# SMTP delivery thread # SMTP delivery thread
if daemon and config.safeGet( if daemon and config.safeGet(
@ -220,25 +218,6 @@ class Main(object):
smtpServerThread = smtpServer() smtpServerThread = smtpServer()
smtpServerThread.start() smtpServerThread.start()
# Start the thread that calculates POWs
objectProcessorThread = objectProcessor()
# DON'T close the main program even the thread remains.
# This thread checks the shutdown variable after processing
# each object.
objectProcessorThread.daemon = False
objectProcessorThread.start()
# Start the cleanerThread
singleCleanerThread = singleCleaner()
# close the main program even if there are threads left
singleCleanerThread.daemon = True
singleCleanerThread.start()
# Not needed if objproc disabled
if state.enableObjProc:
shared.reloadMyAddressHashes()
shared.reloadBroadcastSendersForWhichImWatching()
# API is also objproc dependent # API is also objproc dependent
if config.safeGetBoolean('bitmessagesettings', 'apienabled'): if config.safeGetBoolean('bitmessagesettings', 'apienabled'):
import api # pylint: disable=relative-import import api # pylint: disable=relative-import
@ -247,34 +226,26 @@ class Main(object):
singleAPIThread.daemon = True singleAPIThread.daemon = True
singleAPIThread.start() singleAPIThread.start()
# Start the cleanerThread
singleCleanerThread = singleCleaner()
# close the main program even if there are threads left
singleCleanerThread.daemon = True
singleCleanerThread.start()
# start network components if networking is enabled # start network components if networking is enabled
if state.enableNetwork: if state.enableNetwork:
start_proxyconfig() start_proxyconfig()
BMConnectionPool().connectToStream(1) network.start()
asyncoreThread = BMNetworkThread()
asyncoreThread.daemon = True # Optional components
asyncoreThread.start()
for i in range(config.getint('threads', 'receive')): for i in range(config.getint('threads', 'receive')):
receiveQueueThread = ReceiveQueueThread(i) receiveQueueThread = network.ReceiveQueueThread(i)
receiveQueueThread.daemon = True receiveQueueThread.daemon = True
receiveQueueThread.start() receiveQueueThread.start()
if config.safeGetBoolean('bitmessagesettings', 'udp'): if config.safeGetBoolean('bitmessagesettings', 'udp'):
state.announceThread = AnnounceThread() state.announceThread = network.AnnounceThread()
state.announceThread.daemon = True state.announceThread.daemon = True
state.announceThread.start() state.announceThread.start()
state.invThread = InvThread()
state.invThread.daemon = True
state.invThread.start()
state.addrThread = AddrThread()
state.addrThread.daemon = True
state.addrThread.start()
state.downloadThread = DownloadThread()
state.downloadThread.daemon = True
state.downloadThread.start()
state.uploadThread = UploadThread()
state.uploadThread.daemon = True
state.uploadThread.start()
if config.safeGetBoolean('bitmessagesettings', 'upnp'): if config.safeGetBoolean('bitmessagesettings', 'upnp'):
import upnp import upnp
upnpThread = upnp.uPnPThread() upnpThread = upnp.uPnPThread()

View File

@ -1,20 +1,47 @@
""" """
Network subsystem packages Network subsystem package
""" """
from addrthread import AddrThread
from announcethread import AnnounceThread from announcethread import AnnounceThread
from connectionpool import BMConnectionPool from connectionpool import BMConnectionPool
from receivequeuethread import ReceiveQueueThread
from threads import StoppableThread
__all__ = [
"AnnounceThread", "BMConnectionPool",
"ReceiveQueueThread", "StoppableThread"
# "AddrThread", "AnnounceThread", "BMNetworkThread", "Dandelion",
# "DownloadThread", "InvThread", "UploadThread",
]
def start():
"""Start network threads"""
from addrthread import AddrThread
from dandelion import Dandelion from dandelion import Dandelion
from downloadthread import DownloadThread from downloadthread import DownloadThread
from invthread import InvThread from invthread import InvThread
from networkthread import BMNetworkThread from networkthread import BMNetworkThread
from receivequeuethread import ReceiveQueueThread from knownnodes import readKnownNodes
from threads import StoppableThread
from uploadthread import UploadThread from uploadthread import UploadThread
readKnownNodes()
__all__ = [ # init, needs to be early because other thread may access it early
"BMConnectionPool", "Dandelion", Dandelion()
"AddrThread", "AnnounceThread", "BMNetworkThread", "DownloadThread", BMConnectionPool().connectToStream(1)
"InvThread", "ReceiveQueueThread", "UploadThread", "StoppableThread" asyncoreThread = BMNetworkThread()
] asyncoreThread.daemon = True
asyncoreThread.start()
invThread = InvThread()
invThread.daemon = True
invThread.start()
addrThread = AddrThread()
addrThread.daemon = True
addrThread.start()
downloadThread = DownloadThread()
downloadThread.daemon = True
downloadThread.start()
uploadThread = UploadThread()
uploadThread.daemon = True
uploadThread.start()

View File

@ -40,11 +40,6 @@ maximumNumberOfHalfOpenConnections = 0
maximumLengthOfTimeToBotherResendingMessages = 0 maximumLengthOfTimeToBotherResendingMessages = 0
invThread = None
addrThread = None
downloadThread = None
uploadThread = None
ownAddresses = {} ownAddresses = {}
discoveredPeers = {} discoveredPeers = {}