Inherit helper_threading.StoppableThread from threading.Thread

and do random.seed() in its __init__
This commit is contained in:
Dmitri Bogomolov 2019-08-01 14:37:26 +03:00
parent a7a634be1b
commit ad2a2b3fb4
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
17 changed files with 68 additions and 96 deletions

View File

@ -21,7 +21,6 @@ import json
import random # nosec import random # nosec
import socket import socket
import subprocess import subprocess
import threading
import time import time
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
@ -32,7 +31,6 @@ from version import softwareVersion
import defaults import defaults
import helper_inbox import helper_inbox
import helper_sent import helper_sent
import helper_threading
import network.stats import network.stats
import proofofwork import proofofwork
import queues import queues
@ -44,6 +42,7 @@ from bmconfigparser import BMConfigParser
from debug import logger from debug import logger
from helper_ackPayload import genAckPayload from helper_ackPayload import genAckPayload
from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery, sqlStoredProcedure from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery, sqlStoredProcedure
from helper_threading import StoppableThread
from inventory import Inventory from inventory import Inventory
str_chan = '[chan]' str_chan = '[chan]'
@ -73,11 +72,10 @@ class StoppableXMLRPCServer(SimpleXMLRPCServer):
# This thread, of which there is only one, runs the API. # This thread, of which there is only one, runs the API.
class singleAPI(threading.Thread, helper_threading.StoppableThread): class singleAPI(StoppableThread):
"""API thread""" """API thread"""
def __init__(self):
threading.Thread.__init__(self, name="singleAPI") name = "singleAPI"
self.initStop()
def stopThread(self): def stopThread(self):
super(singleAPI, self).stopThread() super(singleAPI, self).stopThread()

View File

@ -1,6 +1,5 @@
import time import time
import threading
import hashlib import hashlib
from binascii import hexlify from binascii import hexlify
from pyelliptic import arithmetic from pyelliptic import arithmetic
@ -19,12 +18,9 @@ from fallback import RIPEMD160Hash
from helper_threading import StoppableThread from helper_threading import StoppableThread
class addressGenerator(threading.Thread, StoppableThread): class addressGenerator(StoppableThread):
def __init__(self): name = "addressGenerator"
# QThread.__init__(self, parent)
threading.Thread.__init__(self, name="addressGenerator")
self.initStop()
def stopThread(self): def stopThread(self):
try: try:

View File

@ -35,12 +35,13 @@ class objectProcessor(threading.Thread):
objects (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads. objects (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads.
""" """
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="objectProcessor")
random.seed()
# It may be the case that the last time Bitmessage was running, # It may be the case that the last time Bitmessage was running,
# the user closed it before it finished processing everything in the # the user closed it before it finished processing everything in the
# objectProcessorQueue. Assuming that Bitmessage wasn't closed # objectProcessorQueue. Assuming that Bitmessage wasn't closed
# forcefully, it should have saved the data in the queue into the # forcefully, it should have saved the data in the queue into the
# objectprocessorqueue table. Let's pull it out. # objectprocessorqueue table. Let's pull it out.
threading.Thread.__init__(self, name="objectProcessor")
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''SELECT objecttype, data FROM objectprocessorqueue''') '''SELECT objecttype, data FROM objectprocessorqueue''')
for row in queryreturn: for row in queryreturn:

View File

@ -21,7 +21,6 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...)
import gc import gc
import os import os
import shared import shared
import threading
import time import time
import tr import tr
@ -36,14 +35,11 @@ import queues
import state import state
class singleCleaner(threading.Thread, StoppableThread): class singleCleaner(StoppableThread):
name = "singleCleaner"
cycleLength = 300 cycleLength = 300
expireDiscoveredPeers = 300 expireDiscoveredPeers = 300
def __init__(self):
threading.Thread.__init__(self, name="singleCleaner")
self.initStop()
def run(self): def run(self):
gc.disable() gc.disable()
timeWeLastClearedInventoryAndPubkeysTables = 0 timeWeLastClearedInventoryAndPubkeysTables = 0

View File

@ -7,7 +7,6 @@ src/class_singleWorker.py
from __future__ import division from __future__ import division
import hashlib import hashlib
import threading
import time import time
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from struct import pack from struct import pack
@ -43,12 +42,11 @@ def sizeof_fmt(num, suffix='h/s'):
return "%.1f%s%s" % (num, 'Yi', suffix) return "%.1f%s%s" % (num, 'Yi', suffix)
class singleWorker(threading.Thread, StoppableThread): class singleWorker(StoppableThread):
"""Thread for performing PoW""" """Thread for performing PoW"""
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="singleWorker") super(singleWorker, self).__init__(name="singleWorker")
self.initStop()
proofofwork.init() proofofwork.init()
def stopThread(self): def stopThread(self):

View File

@ -6,7 +6,6 @@ src/class_smtpDeliver.py
import smtplib import smtplib
import sys import sys
import threading
import urlparse import urlparse
from email.header import Header from email.header import Header
from email.mime.text import MIMEText from email.mime.text import MIMEText
@ -20,14 +19,11 @@ from helper_threading import StoppableThread
SMTPDOMAIN = "bmaddr.lan" SMTPDOMAIN = "bmaddr.lan"
class smtpDeliver(threading.Thread, StoppableThread): class smtpDeliver(StoppableThread):
"""SMTP client thread for delivery""" """SMTP client thread for delivery"""
name = "smtpDeliver"
_instance = None _instance = None
def __init__(self):
threading.Thread.__init__(self, name="smtpDeliver")
self.initStop()
def stopThread(self): def stopThread(self):
try: try:
queues.UISignallerQueue.put(("stopThread", "data")) # pylint: disable=no-member queues.UISignallerQueue.put(("stopThread", "data")) # pylint: disable=no-member

View File

@ -154,10 +154,10 @@ class smtpServerPyBitmessage(smtpd.SMTPServer):
continue continue
return return
class smtpServer(threading.Thread, StoppableThread):
class smtpServer(StoppableThread):
def __init__(self, parent=None): def __init__(self, parent=None):
threading.Thread.__init__(self, name="smtpServerThread") super(smtpServer, self).__init__(name="smtpServerThread")
self.initStop()
self.server = smtpServerPyBitmessage(('127.0.0.1', LISTENPORT), None) self.server = smtpServerPyBitmessage(('127.0.0.1', LISTENPORT), None)
def stopThread(self): def stopThread(self):

View File

@ -6,6 +6,11 @@ from pyelliptic.openssl import OpenSSL
NoneType = type(None) NoneType = type(None)
def seed():
"""Initialize random number generator"""
random.seed()
def randomBytes(n): def randomBytes(n):
"""Method randomBytes.""" """Method randomBytes."""
try: try:

View File

@ -1,7 +1,9 @@
"""Helper threading perform all the threading operations.""" """Helper threading perform all the threading operations."""
from contextlib import contextmanager
import threading import threading
from contextlib import contextmanager
import helper_random
try: try:
import prctl import prctl
@ -22,7 +24,16 @@ else:
threading.Thread._Thread__bootstrap = _thread_name_hack threading.Thread._Thread__bootstrap = _thread_name_hack
class StoppableThread(object): class StoppableThread(threading.Thread):
name = None
def __init__(self, name=None):
if name:
self.name = name
super(StoppableThread, self).__init__(name=self.name)
self.initStop()
helper_random.seed()
def initStop(self): def initStop(self):
self.stop = threading.Event() self.stop = threading.Event()
self._stopped = False self._stopped = False
@ -35,6 +46,7 @@ class StoppableThread(object):
class BusyError(threading.ThreadError): class BusyError(threading.ThreadError):
pass pass
@contextmanager @contextmanager
def nonBlocking(lock): def nonBlocking(lock):
locked = lock.acquire(False) locked = lock.acquire(False)

View File

@ -1,18 +1,13 @@
import Queue import Queue
import threading
import addresses
from helper_threading import StoppableThread from helper_threading import StoppableThread
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from queues import addrQueue from queues import addrQueue
import protocol
import state import state
class AddrThread(threading.Thread, StoppableThread):
def __init__(self): class AddrThread(StoppableThread):
threading.Thread.__init__(self, name="AddrBroadcaster") name = "AddrBroadcaster"
self.initStop()
self.name = "AddrBroadcaster"
def run(self): def run(self):
while not state.shutdown: while not state.shutdown:
@ -28,7 +23,7 @@ class AddrThread(threading.Thread, StoppableThread):
except KeyError: except KeyError:
continue continue
#finish # finish
addrQueue.iterate() addrQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):

View File

@ -1,4 +1,3 @@
import threading
import time import time
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
@ -9,11 +8,10 @@ from network.connectionpool import BMConnectionPool
from network.udp import UDPSocket from network.udp import UDPSocket
import state import state
class AnnounceThread(threading.Thread, StoppableThread):
class AnnounceThread(StoppableThread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="Announcer") super(AnnounceThread, self).__init__(name="Announcer")
self.initStop()
self.name = "Announcer"
logger.info("init announce thread") logger.info("init announce thread")
def run(self): def run(self):

View File

@ -1,4 +1,3 @@
import threading
import time import time
import addresses import addresses
@ -12,7 +11,7 @@ from network.connectionpool import BMConnectionPool
from objectracker import missingObjects from objectracker import missingObjects
class DownloadThread(threading.Thread, StoppableThread): class DownloadThread(StoppableThread):
minPending = 200 minPending = 200
maxRequestChunk = 1000 maxRequestChunk = 1000
requestTimeout = 60 requestTimeout = 60
@ -20,9 +19,7 @@ class DownloadThread(threading.Thread, StoppableThread):
requestExpires = 3600 requestExpires = 3600
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="Downloader") super(DownloadThread, self).__init__(name="Downloader")
self.initStop()
self.name = "Downloader"
logger.info("init download thread") logger.info("init download thread")
self.lastCleaned = time.time() self.lastCleaned = time.time()

View File

@ -1,16 +1,14 @@
import Queue import Queue
from random import randint, shuffle import random
import threading
from time import time from time import time
import addresses import addresses
from bmconfigparser import BMConfigParser import protocol
import state
from helper_threading import StoppableThread from helper_threading import StoppableThread
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion from network.dandelion import Dandelion
from queues import invQueue from queues import invQueue
import protocol
import state
def handleExpiredDandelion(expired): def handleExpiredDandelion(expired):
@ -33,11 +31,8 @@ def handleExpiredDandelion(expired):
i.objectsNewToThem[hashid] = time() i.objectsNewToThem[hashid] = time()
class InvThread(threading.Thread, StoppableThread): class InvThread(StoppableThread):
def __init__(self): name = "InvBroadcaster"
threading.Thread.__init__(self, name="InvBroadcaster")
self.initStop()
self.name = "InvBroadcaster"
def handleLocallyGenerated(self, stream, hashId): def handleLocallyGenerated(self, stream, hashId):
Dandelion().addHash(hashId, stream=stream) Dandelion().addHash(hashId, stream=stream)
@ -80,7 +75,7 @@ class InvThread(threading.Thread, StoppableThread):
if connection == Dandelion().objectChildStem(inv[1]): if connection == Dandelion().objectChildStem(inv[1]):
# Fluff trigger by RNG # Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off # auto-ignore if config set to 0, i.e. dandelion is off
if randint(1, 100) >= state.dandelion: if random.randint(1, 100) >= state.dandelion:
fluffs.append(inv[1]) fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion # send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0: elif connection.services & protocol.NODE_DANDELION > 0:
@ -91,13 +86,15 @@ class InvThread(threading.Thread, StoppableThread):
fluffs.append(inv[1]) fluffs.append(inv[1])
if fluffs: if fluffs:
shuffle(fluffs) random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket('inv', \ connection.append_write_buf(protocol.CreatePacket(
addresses.encodeVarint(len(fluffs)) + "".join(fluffs))) 'inv', addresses.encodeVarint(len(fluffs)) +
"".join(fluffs)))
if stems: if stems:
shuffle(stems) random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket('dinv', \ connection.append_write_buf(protocol.CreatePacket(
addresses.encodeVarint(len(stems)) + "".join(stems))) 'dinv', addresses.encodeVarint(len(stems)) +
"".join(stems)))
invQueue.iterate() invQueue.iterate()
for i in range(len(chunk)): for i in range(len(chunk)):

View File

@ -1,4 +1,3 @@
import threading
import network.asyncore_pollchoose as asyncore import network.asyncore_pollchoose as asyncore
import state import state
@ -8,11 +7,9 @@ from network.connectionpool import BMConnectionPool
from queues import excQueue from queues import excQueue
class BMNetworkThread(threading.Thread, StoppableThread): class BMNetworkThread(StoppableThread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="Asyncore") super(BMNetworkThread, self).__init__(name="Asyncore")
self.initStop()
self.name = "Asyncore"
logger.info("init asyncore thread") logger.info("init asyncore thread")
def run(self): def run(self):

View File

@ -1,27 +1,18 @@
import errno import errno
import Queue import Queue
import socket import socket
import sys
import threading
import time
import addresses
from bmconfigparser import BMConfigParser
from debug import logger from debug import logger
from helper_threading import StoppableThread from helper_threading import StoppableThread
from inventory import Inventory
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.bmproto import BMProto
from network.advanceddispatcher import UnknownStateError from network.advanceddispatcher import UnknownStateError
from queues import receiveDataQueue from queues import receiveDataQueue
import protocol
import state import state
class ReceiveQueueThread(threading.Thread, StoppableThread):
class ReceiveQueueThread(StoppableThread):
def __init__(self, num=0): def __init__(self, num=0):
threading.Thread.__init__(self, name="ReceiveQueue_%i" %(num)) super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num)
self.initStop()
self.name = "ReceiveQueue_%i" % (num)
logger.info("init receive queue thread %i", num) logger.info("init receive queue thread %i", num)
def run(self): def run(self):

View File

@ -2,7 +2,6 @@
src/network/uploadthread.py src/network/uploadthread.py
""" """
# pylint: disable=unsubscriptable-object # pylint: disable=unsubscriptable-object
import threading
import time import time
import helper_random import helper_random
@ -15,14 +14,12 @@ from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict from randomtrackingdict import RandomTrackingDict
class UploadThread(threading.Thread, StoppableThread): class UploadThread(StoppableThread):
"""This is a thread that uploads the objects that the peers requested from me """ """This is a thread that uploads the objects that the peers requested from me """
maxBufSize = 2097152 # 2MB maxBufSize = 2097152 # 2MB
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="Uploader") super(UploadThread, self).__init__(name="Uploader")
self.initStop()
self.name = "Uploader"
logger.info("init upload thread") logger.info("init upload thread")
def run(self): def run(self):

View File

@ -9,7 +9,6 @@ Reference: http://mattscodecave.com/posts/using-python-and-upnp-to-forward-a-por
import httplib import httplib
import socket import socket
import threading
import time import time
import urllib2 import urllib2
from random import randint from random import randint
@ -201,7 +200,7 @@ class Router: # pylint: disable=old-style-class
return resp return resp
class uPnPThread(threading.Thread, StoppableThread): class uPnPThread(StoppableThread):
"""Start a thread to handle UPnP activity""" """Start a thread to handle UPnP activity"""
SSDP_ADDR = "239.255.255.250" SSDP_ADDR = "239.255.255.250"
@ -211,7 +210,7 @@ class uPnPThread(threading.Thread, StoppableThread):
SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" SSDP_ST = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
def __init__(self): def __init__(self):
threading.Thread.__init__(self, name="uPnPThread") super(uPnPThread, self).__init__(name="uPnPThread")
try: try:
self.extPort = BMConfigParser().getint('bitmessagesettings', 'extport') self.extPort = BMConfigParser().getint('bitmessagesettings', 'extport')
except: except:
@ -223,7 +222,6 @@ class uPnPThread(threading.Thread, StoppableThread):
self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
self.sock.settimeout(5) self.sock.settimeout(5)
self.sendSleep = 60 self.sendSleep = 60
self.initStop()
def run(self): def run(self):
"""Start the thread to manage UPnP activity""" """Start the thread to manage UPnP activity"""