From ad2a2b3fb4e579ddc8794a6c2faa107f97e6c09a Mon Sep 17 00:00:00 2001 From: Dmitri Bogomolov <4glitch@gmail.com> Date: Thu, 1 Aug 2019 14:37:26 +0300 Subject: [PATCH] Inherit helper_threading.StoppableThread from threading.Thread and do random.seed() in its __init__ --- src/api.py | 10 ++++------ src/class_addressGenerator.py | 8 ++------ src/class_objectProcessor.py | 3 ++- src/class_singleCleaner.py | 8 ++------ src/class_singleWorker.py | 6 ++---- src/class_smtpDeliver.py | 8 ++------ src/class_smtpServer.py | 6 +++--- src/helper_random.py | 5 +++++ src/helper_threading.py | 16 ++++++++++++++-- src/network/addrthread.py | 13 ++++--------- src/network/announcethread.py | 8 +++----- src/network/downloadthread.py | 7 ++----- src/network/invthread.py | 31 ++++++++++++++----------------- src/network/networkthread.py | 7 ++----- src/network/receivequeuethread.py | 15 +++------------ src/network/uploadthread.py | 7 ++----- src/upnp.py | 6 ++---- 17 files changed, 68 insertions(+), 96 deletions(-) diff --git a/src/api.py b/src/api.py index d3e87dfd..fad5d623 100644 --- a/src/api.py +++ b/src/api.py @@ -21,7 +21,6 @@ import json import random # nosec import socket import subprocess -import threading import time from binascii import hexlify, unhexlify from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer @@ -32,7 +31,6 @@ from version import softwareVersion import defaults import helper_inbox import helper_sent -import helper_threading import network.stats import proofofwork import queues @@ -44,6 +42,7 @@ from bmconfigparser import BMConfigParser from debug import logger from helper_ackPayload import genAckPayload from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery, sqlStoredProcedure +from helper_threading import StoppableThread from inventory import Inventory str_chan = '[chan]' @@ -73,11 +72,10 @@ class StoppableXMLRPCServer(SimpleXMLRPCServer): # This thread, of which there is only one, runs the API. -class singleAPI(threading.Thread, helper_threading.StoppableThread): +class singleAPI(StoppableThread): """API thread""" - def __init__(self): - threading.Thread.__init__(self, name="singleAPI") - self.initStop() + + name = "singleAPI" def stopThread(self): super(singleAPI, self).stopThread() diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index 0893b73a..d930fc99 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -1,6 +1,5 @@ import time -import threading import hashlib from binascii import hexlify from pyelliptic import arithmetic @@ -19,12 +18,9 @@ from fallback import RIPEMD160Hash from helper_threading import StoppableThread -class addressGenerator(threading.Thread, StoppableThread): +class addressGenerator(StoppableThread): - def __init__(self): - # QThread.__init__(self, parent) - threading.Thread.__init__(self, name="addressGenerator") - self.initStop() + name = "addressGenerator" def stopThread(self): try: diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 720cdf02..1a9c0d81 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -35,12 +35,13 @@ class objectProcessor(threading.Thread): objects (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads. """ def __init__(self): + threading.Thread.__init__(self, name="objectProcessor") + random.seed() # It may be the case that the last time Bitmessage was running, # the user closed it before it finished processing everything in the # objectProcessorQueue. Assuming that Bitmessage wasn't closed # forcefully, it should have saved the data in the queue into the # objectprocessorqueue table. Let's pull it out. - threading.Thread.__init__(self, name="objectProcessor") queryreturn = sqlQuery( '''SELECT objecttype, data FROM objectprocessorqueue''') for row in queryreturn: diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index e2cdbb89..a5938716 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -21,7 +21,6 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...) import gc import os import shared -import threading import time import tr @@ -36,14 +35,11 @@ import queues import state -class singleCleaner(threading.Thread, StoppableThread): +class singleCleaner(StoppableThread): + name = "singleCleaner" cycleLength = 300 expireDiscoveredPeers = 300 - def __init__(self): - threading.Thread.__init__(self, name="singleCleaner") - self.initStop() - def run(self): gc.disable() timeWeLastClearedInventoryAndPubkeysTables = 0 diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index d979ae19..0798296e 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -7,7 +7,6 @@ src/class_singleWorker.py from __future__ import division import hashlib -import threading import time from binascii import hexlify, unhexlify from struct import pack @@ -43,12 +42,11 @@ def sizeof_fmt(num, suffix='h/s'): return "%.1f%s%s" % (num, 'Yi', suffix) -class singleWorker(threading.Thread, StoppableThread): +class singleWorker(StoppableThread): """Thread for performing PoW""" def __init__(self): - threading.Thread.__init__(self, name="singleWorker") - self.initStop() + super(singleWorker, self).__init__(name="singleWorker") proofofwork.init() def stopThread(self): diff --git a/src/class_smtpDeliver.py b/src/class_smtpDeliver.py index ef7a4363..fa607220 100644 --- a/src/class_smtpDeliver.py +++ b/src/class_smtpDeliver.py @@ -6,7 +6,6 @@ src/class_smtpDeliver.py import smtplib import sys -import threading import urlparse from email.header import Header from email.mime.text import MIMEText @@ -20,14 +19,11 @@ from helper_threading import StoppableThread SMTPDOMAIN = "bmaddr.lan" -class smtpDeliver(threading.Thread, StoppableThread): +class smtpDeliver(StoppableThread): """SMTP client thread for delivery""" + name = "smtpDeliver" _instance = None - def __init__(self): - threading.Thread.__init__(self, name="smtpDeliver") - self.initStop() - def stopThread(self): try: queues.UISignallerQueue.put(("stopThread", "data")) # pylint: disable=no-member diff --git a/src/class_smtpServer.py b/src/class_smtpServer.py index 216d35be..d87ab69b 100644 --- a/src/class_smtpServer.py +++ b/src/class_smtpServer.py @@ -154,10 +154,10 @@ class smtpServerPyBitmessage(smtpd.SMTPServer): continue return -class smtpServer(threading.Thread, StoppableThread): + +class smtpServer(StoppableThread): def __init__(self, parent=None): - threading.Thread.__init__(self, name="smtpServerThread") - self.initStop() + super(smtpServer, self).__init__(name="smtpServerThread") self.server = smtpServerPyBitmessage(('127.0.0.1', LISTENPORT), None) def stopThread(self): diff --git a/src/helper_random.py b/src/helper_random.py index bb173d1b..57f0ccb3 100644 --- a/src/helper_random.py +++ b/src/helper_random.py @@ -6,6 +6,11 @@ from pyelliptic.openssl import OpenSSL NoneType = type(None) +def seed(): + """Initialize random number generator""" + random.seed() + + def randomBytes(n): """Method randomBytes.""" try: diff --git a/src/helper_threading.py b/src/helper_threading.py index 6b6a5e25..4b0a074e 100644 --- a/src/helper_threading.py +++ b/src/helper_threading.py @@ -1,7 +1,9 @@ """Helper threading perform all the threading operations.""" -from contextlib import contextmanager import threading +from contextlib import contextmanager + +import helper_random try: import prctl @@ -22,7 +24,16 @@ else: threading.Thread._Thread__bootstrap = _thread_name_hack -class StoppableThread(object): +class StoppableThread(threading.Thread): + name = None + + def __init__(self, name=None): + if name: + self.name = name + super(StoppableThread, self).__init__(name=self.name) + self.initStop() + helper_random.seed() + def initStop(self): self.stop = threading.Event() self._stopped = False @@ -35,6 +46,7 @@ class StoppableThread(object): class BusyError(threading.ThreadError): pass + @contextmanager def nonBlocking(lock): locked = lock.acquire(False) diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 5b0ea638..9f516e80 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -1,18 +1,13 @@ import Queue -import threading -import addresses from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool from queues import addrQueue -import protocol import state -class AddrThread(threading.Thread, StoppableThread): - def __init__(self): - threading.Thread.__init__(self, name="AddrBroadcaster") - self.initStop() - self.name = "AddrBroadcaster" + +class AddrThread(StoppableThread): + name = "AddrBroadcaster" def run(self): while not state.shutdown: @@ -28,7 +23,7 @@ class AddrThread(threading.Thread, StoppableThread): except KeyError: continue - #finish + # finish addrQueue.iterate() for i in range(len(chunk)): diff --git a/src/network/announcethread.py b/src/network/announcethread.py index a94eeb36..e7bec45a 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -1,4 +1,3 @@ -import threading import time from bmconfigparser import BMConfigParser @@ -9,11 +8,10 @@ from network.connectionpool import BMConnectionPool from network.udp import UDPSocket import state -class AnnounceThread(threading.Thread, StoppableThread): + +class AnnounceThread(StoppableThread): def __init__(self): - threading.Thread.__init__(self, name="Announcer") - self.initStop() - self.name = "Announcer" + super(AnnounceThread, self).__init__(name="Announcer") logger.info("init announce thread") def run(self): diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index babce5da..308dffcb 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -1,4 +1,3 @@ -import threading import time import addresses @@ -12,7 +11,7 @@ from network.connectionpool import BMConnectionPool from objectracker import missingObjects -class DownloadThread(threading.Thread, StoppableThread): +class DownloadThread(StoppableThread): minPending = 200 maxRequestChunk = 1000 requestTimeout = 60 @@ -20,9 +19,7 @@ class DownloadThread(threading.Thread, StoppableThread): requestExpires = 3600 def __init__(self): - threading.Thread.__init__(self, name="Downloader") - self.initStop() - self.name = "Downloader" + super(DownloadThread, self).__init__(name="Downloader") logger.info("init download thread") self.lastCleaned = time.time() diff --git a/src/network/invthread.py b/src/network/invthread.py index 6f6f1364..e79172ba 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -1,16 +1,14 @@ import Queue -from random import randint, shuffle -import threading +import random from time import time import addresses -from bmconfigparser import BMConfigParser +import protocol +import state from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool from network.dandelion import Dandelion from queues import invQueue -import protocol -import state def handleExpiredDandelion(expired): @@ -33,11 +31,8 @@ def handleExpiredDandelion(expired): i.objectsNewToThem[hashid] = time() -class InvThread(threading.Thread, StoppableThread): - def __init__(self): - threading.Thread.__init__(self, name="InvBroadcaster") - self.initStop() - self.name = "InvBroadcaster" +class InvThread(StoppableThread): + name = "InvBroadcaster" def handleLocallyGenerated(self, stream, hashId): Dandelion().addHash(hashId, stream=stream) @@ -80,7 +75,7 @@ class InvThread(threading.Thread, StoppableThread): if connection == Dandelion().objectChildStem(inv[1]): # Fluff trigger by RNG # auto-ignore if config set to 0, i.e. dandelion is off - if randint(1, 100) >= state.dandelion: + if random.randint(1, 100) >= state.dandelion: fluffs.append(inv[1]) # send a dinv only if the stem node supports dandelion elif connection.services & protocol.NODE_DANDELION > 0: @@ -91,13 +86,15 @@ class InvThread(threading.Thread, StoppableThread): fluffs.append(inv[1]) if fluffs: - shuffle(fluffs) - connection.append_write_buf(protocol.CreatePacket('inv', \ - addresses.encodeVarint(len(fluffs)) + "".join(fluffs))) + random.shuffle(fluffs) + connection.append_write_buf(protocol.CreatePacket( + 'inv', addresses.encodeVarint(len(fluffs)) + + "".join(fluffs))) if stems: - shuffle(stems) - connection.append_write_buf(protocol.CreatePacket('dinv', \ - addresses.encodeVarint(len(stems)) + "".join(stems))) + random.shuffle(stems) + connection.append_write_buf(protocol.CreatePacket( + 'dinv', addresses.encodeVarint(len(stems)) + + "".join(stems))) invQueue.iterate() for i in range(len(chunk)): diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 9ceb856b..433c771e 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -1,4 +1,3 @@ -import threading import network.asyncore_pollchoose as asyncore import state @@ -8,11 +7,9 @@ from network.connectionpool import BMConnectionPool from queues import excQueue -class BMNetworkThread(threading.Thread, StoppableThread): +class BMNetworkThread(StoppableThread): def __init__(self): - threading.Thread.__init__(self, name="Asyncore") - self.initStop() - self.name = "Asyncore" + super(BMNetworkThread, self).__init__(name="Asyncore") logger.info("init asyncore thread") def run(self): diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 0a7562cb..5d8cbd37 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -1,27 +1,18 @@ import errno import Queue import socket -import sys -import threading -import time -import addresses -from bmconfigparser import BMConfigParser from debug import logger from helper_threading import StoppableThread -from inventory import Inventory from network.connectionpool import BMConnectionPool -from network.bmproto import BMProto from network.advanceddispatcher import UnknownStateError from queues import receiveDataQueue -import protocol import state -class ReceiveQueueThread(threading.Thread, StoppableThread): + +class ReceiveQueueThread(StoppableThread): def __init__(self, num=0): - threading.Thread.__init__(self, name="ReceiveQueue_%i" %(num)) - self.initStop() - self.name = "ReceiveQueue_%i" % (num) + super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) logger.info("init receive queue thread %i", num) def run(self): diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 61ee6fab..9b29ef0a 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -2,7 +2,6 @@ src/network/uploadthread.py """ # pylint: disable=unsubscriptable-object -import threading import time import helper_random @@ -15,14 +14,12 @@ from network.dandelion import Dandelion from randomtrackingdict import RandomTrackingDict -class UploadThread(threading.Thread, StoppableThread): +class UploadThread(StoppableThread): """This is a thread that uploads the objects that the peers requested from me """ maxBufSize = 2097152 # 2MB def __init__(self): - threading.Thread.__init__(self, name="Uploader") - self.initStop() - self.name = "Uploader" + super(UploadThread, self).__init__(name="Uploader") logger.info("init upload thread") def run(self): diff --git a/src/upnp.py b/src/upnp.py index d4ffce36..fdc4bc1d 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -9,7 +9,6 @@ Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-por import httplib import socket -import threading import time import urllib2 from random import randint @@ -201,7 +200,7 @@ class Router: # pylint: disable=old-style-class return resp -class uPnPThread(threading.Thread, StoppableThread): +class uPnPThread(StoppableThread): """Start a thread to handle UPnP activity""" SSDP_ADDR = "239.255.255.250" @@ -211,7 +210,7 @@ class uPnPThread(threading.Thread, StoppableThread): SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" def __init__(self): - threading.Thread.__init__(self, name="uPnPThread") + super(uPnPThread, self).__init__(name="uPnPThread") try: self.extPort = BMConfigParser().getint('bitmessagesettings', 'extport') except: @@ -223,7 +222,6 @@ class uPnPThread(threading.Thread, StoppableThread): self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) self.sock.settimeout(5) self.sendSleep = 60 - self.initStop() def run(self): """Start the thread to manage UPnP activity"""