Move shutdown from shared.py to state.py

This commit is contained in:
Peter Šurda 2017-01-14 23:20:15 +01:00
parent bcc7692e14
commit ad75552b5c
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
14 changed files with 55 additions and 49 deletions

View File

@ -33,6 +33,7 @@ from struct import pack
from helper_sql import sqlQuery,sqlExecute,SqlBulkExecute,sqlStoredProcedure
from debug import logger
from inventory import Inventory
import state
from version import softwareVersion
# Helper Functions
@ -52,7 +53,7 @@ class APIError(Exception):
class StoppableXMLRPCServer(SimpleXMLRPCServer):
def serve_forever(self):
while shared.shutdown == 0:
while state.shutdown == 0:
self.handle_request()

View File

@ -68,7 +68,7 @@ def checkHasNormalAddress():
def createAddressIfNeeded(myapp):
if not checkHasNormalAddress():
shared.addressGeneratorQueue.put(('createRandomAddress', 4, 1, str(QtGui.QApplication.translate("Support", SUPPORT_MY_LABEL)), 1, "", False, protocol.networkDefaultProofOfWorkNonceTrialsPerByte, protocol.networkDefaultPayloadLengthExtraBytes))
while shared.shutdown == 0 and not checkHasNormalAddress():
while state.shutdown == 0 and not checkHasNormalAddress():
time.sleep(.2)
myapp.rerenderComboBoxSendFrom()
return checkHasNormalAddress()
@ -76,7 +76,7 @@ def createAddressIfNeeded(myapp):
def createSupportMessage(myapp):
checkAddressBook(myapp)
address = createAddressIfNeeded(myapp)
if shared.shutdown:
if state.shutdown:
return
myapp.ui.lineEditSubject.setText(str(QtGui.QApplication.translate("Support", SUPPORT_SUBJECT)))

View File

@ -14,6 +14,7 @@ import protocol
from pyelliptic import arithmetic
import tr
from binascii import hexlify
import state
class addressGenerator(threading.Thread, StoppableThread):
@ -30,7 +31,7 @@ class addressGenerator(threading.Thread, StoppableThread):
super(addressGenerator, self).stopThread()
def run(self):
while shared.shutdown == 0:
while state.shutdown == 0:
queueValue = shared.addressGeneratorQueue.get()
nonceTrialsPerByte = 0
payloadLengthExtraBytes = 0

View File

@ -22,7 +22,7 @@ import helper_msgcoding
import helper_sent
from helper_sql import *
import protocol
from state import neededPubkeys
import state
import tr
from debug import logger
import l10n
@ -73,7 +73,7 @@ class objectProcessor(threading.Thread):
except Exception as e:
logger.critical("Critical error within objectProcessorThread: \n%s" % traceback.format_exc())
if shared.shutdown:
if state.shutdown:
time.sleep(.5) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute() as sql:
@ -83,7 +83,7 @@ class objectProcessor(threading.Thread):
objectType,data)
numberOfObjectsThatWereInTheObjectProcessorQueue += 1
logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue))
shared.shutdown = 2
state.shutdown = 2
break
def processgetpubkey(self, data):
@ -286,12 +286,12 @@ class objectProcessor(threading.Thread):
return
tag = data[readPosition:readPosition + 32]
if tag not in neededPubkeys:
if tag not in state.neededPubkeys:
logger.info('We don\'t need this v4 pubkey. We didn\'t ask for it.')
return
# Let us try to decrypt the pubkey
toAddress, cryptorObject = neededPubkeys[tag]
toAddress, cryptorObject = state.neededPubkeys[tag]
if shared.decryptAndCheckPubkeyPayload(data, toAddress) == 'successful':
# At this point we know that we have been waiting on this pubkey.
# This function will command the workerThread to start work on
@ -783,8 +783,8 @@ class objectProcessor(threading.Thread):
# stream number, and RIPE hash.
status, addressVersion, streamNumber, ripe = decodeAddress(address)
if addressVersion <=3:
if address in neededPubkeys:
del neededPubkeys[address]
if address in state.neededPubkeys:
del state.neededPubkeys[address]
self.sendMessages(address)
else:
logger.debug('We don\'t need this pub key. We didn\'t ask for it. For address: %s' % address)
@ -794,8 +794,8 @@ class objectProcessor(threading.Thread):
elif addressVersion >= 4:
tag = hashlib.sha512(hashlib.sha512(encodeVarint(
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()[32:]
if tag in neededPubkeys:
del neededPubkeys[tag]
if tag in state.neededPubkeys:
del state.neededPubkeys[tag]
self.sendMessages(address)
def sendMessages(self, address):

View File

@ -38,7 +38,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
shared.knownNodes[self.streamNumber][peer] = time.time()
shared.knownNodesLock.release()
else:
while not shared.shutdown:
while not state.shutdown:
shared.knownNodesLock.acquire()
try:
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
@ -82,7 +82,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
maximumConnections = 1 if state.trustedPeer else 8 # maximum number of outgoing connections = 8
while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections:
self.stop.wait(10)
if shared.shutdown:
if state.shutdown:
break
random.seed()
peer = self._getPeer()
@ -93,7 +93,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
random.seed()
peer = self._getPeer()
self.stop.wait(1)
if shared.shutdown:
if state.shutdown:
break
# Clear out the shared.alreadyAttemptedConnectionsList every half
# hour so that this program will again attempt a connection
@ -108,7 +108,7 @@ class outgoingSynSender(threading.Thread, StoppableThread):
shared.alreadyAttemptedConnectionsListLock.release()
except threading.ThreadError as e:
pass
if shared.shutdown:
if state.shutdown:
break
self.name = "outgoingSynSender-" + peer.host.replace(":", ".") # log parser field separator
address_family = socket.AF_INET

View File

@ -48,7 +48,7 @@ class singleCleaner(threading.Thread, StoppableThread):
# Either the user hasn't set stopresendingafterxdays and stopresendingafterxmonths yet or the options are missing from the config file.
shared.maximumLengthOfTimeToBotherResendingMessages = float('inf')
while shared.shutdown == 0:
while state.shutdown == 0:
shared.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
Inventory().flush()

View File

@ -65,7 +65,7 @@ class singleListener(threading.Thread, StoppableThread):
if state.trustedPeer:
return
while BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect') and shared.shutdown == 0:
while BMConfigParser().safeGetBoolean('bitmessagesettings', 'dontconnect') and state.shutdown == 0:
self.stop.wait(1)
helper_bootstrap.dns()
# We typically don't want to accept incoming connections if the user is using a
@ -76,7 +76,7 @@ class singleListener(threading.Thread, StoppableThread):
while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and \
(not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and \
".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname')) and \
shared.shutdown == 0:
state.shutdown == 0:
self.stop.wait(5)
logger.info('Listening for incoming connections.')
@ -99,19 +99,19 @@ class singleListener(threading.Thread, StoppableThread):
# regexp to match an IPv4-mapped IPv6 address
mappedAddressRegexp = re.compile(r'^::ffff:([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)$')
while shared.shutdown == 0:
while state.shutdown == 0:
# We typically don't want to accept incoming connections if the user is using a
# SOCKS proxy, unless they have configured otherwise. If they eventually select
# proxy 'none' or configure SOCKS listening then this will start listening for
# connections.
while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') and shared.shutdown == 0:
while BMConfigParser().get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not BMConfigParser().getboolean('bitmessagesettings', 'sockslisten') and ".onion" not in BMConfigParser().get('bitmessagesettings', 'onionhostname') and state.shutdown == 0:
self.stop.wait(10)
while len(shared.connectedHostsList) > 220 and shared.shutdown == 0:
while len(shared.connectedHostsList) > 220 and state.shutdown == 0:
logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.')
self.stop.wait(10)
while shared.shutdown == 0:
while state.shutdown == 0:
socketObject, sockaddr = sock.accept()
(HOST, PORT) = sockaddr[0:2]

View File

@ -21,7 +21,7 @@ from helper_threading import *
from inventory import Inventory
import l10n
import protocol
from state import neededPubkeys
import state
from binascii import hexlify, unhexlify
# This thread, of which there is only one, does the heavy lifting:
@ -57,13 +57,13 @@ class singleWorker(threading.Thread, StoppableThread):
toAddress, = row
toStatus, toAddressVersionNumber, toStreamNumber, toRipe = decodeAddress(toAddress)
if toAddressVersionNumber <= 3 :
neededPubkeys[toAddress] = 0
state.neededPubkeys[toAddress] = 0
elif toAddressVersionNumber >= 4:
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint(
toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe).digest()).digest()
privEncryptionKey = doubleHashOfAddressData[:32] # Note that this is the first half of the sha512 hash.
tag = doubleHashOfAddressData[32:]
neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it.
state.neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it.
# Initialize the shared.ackdataForWhichImWatching data structure
queryreturn = sqlQuery(
@ -76,7 +76,7 @@ class singleWorker(threading.Thread, StoppableThread):
self.stop.wait(
10) # give some time for the GUI to start before we start on existing POW tasks.
if shared.shutdown == 0:
if state.shutdown == 0:
# just in case there are any pending tasks for msg
# messages that have yet to be sent.
shared.workerQueue.put(('sendmessage', ''))
@ -84,7 +84,7 @@ class singleWorker(threading.Thread, StoppableThread):
# that have yet to be sent.
shared.workerQueue.put(('sendbroadcast', ''))
while shared.shutdown == 0:
while state.shutdown == 0:
self.busy = 0
command, data = shared.workerQueue.get()
self.busy = 1
@ -553,7 +553,7 @@ class singleWorker(threading.Thread, StoppableThread):
toTag = ''
else:
toTag = hashlib.sha512(hashlib.sha512(encodeVarint(toAddressVersionNumber)+encodeVarint(toStreamNumber)+toRipe).digest()).digest()[32:]
if toaddress in neededPubkeys or toTag in neededPubkeys:
if toaddress in state.neededPubkeys or toTag in state.neededPubkeys:
# We already sent a request for the pubkey
sqlExecute(
'''UPDATE sent SET status='awaitingpubkey', sleeptill=? WHERE toaddress=? AND status='msgqueued' ''',
@ -577,7 +577,7 @@ class singleWorker(threading.Thread, StoppableThread):
toAddressVersionNumber) + encodeVarint(toStreamNumber) + toRipe).digest()).digest()
privEncryptionKey = doubleHashOfToAddressData[:32] # The first half of the sha512 hash.
tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash.
neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)))
state.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)))
for value in Inventory().by_type_and_tag(1, toTag):
if shared.decryptAndCheckPubkeyPayload(value.payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table.
@ -585,7 +585,7 @@ class singleWorker(threading.Thread, StoppableThread):
sqlExecute(
'''UPDATE sent SET status='doingmsgpow', retrynumber=0 WHERE toaddress=? AND (status='msgqueued' or status='awaitingpubkey' or status='doingpubkeypow')''',
toaddress)
del neededPubkeys[tag]
del state.neededPubkeys[tag]
break
#else: # There was something wrong with this pubkey object even
# though it had the correct tag- almost certainly because
@ -879,15 +879,15 @@ class singleWorker(threading.Thread, StoppableThread):
retryNumber = queryReturn[0][0]
if addressVersionNumber <= 3:
neededPubkeys[toAddress] = 0
state.neededPubkeys[toAddress] = 0
elif addressVersionNumber >= 4:
# If the user just clicked 'send' then the tag (and other information) will already
# be in the neededPubkeys dictionary. But if we are recovering from a restart
# of the client then we have to put it in now.
privEncryptionKey = hashlib.sha512(hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+ripe).digest()).digest()[:32] # Note that this is the first half of the sha512 hash.
tag = hashlib.sha512(hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+ripe).digest()).digest()[32:] # Note that this is the second half of the sha512 hash.
if tag not in neededPubkeys:
neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it.
if tag not in state.neededPubkeys:
state.neededPubkeys[tag] = (toAddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) # We'll need this for when we receive a pubkey reply: it will be encrypted and we'll need to decrypt it.
if retryNumber == 0:
TTL = 2.5*24*60*60 # 2.5 days. This was chosen fairly arbitrarily.

View File

@ -10,6 +10,7 @@ from debug import logger
from helper_threading import *
from bitmessageqt.uisignaler import UISignaler
import shared
import state
SMTPDOMAIN = "bmaddr.lan"
@ -34,7 +35,7 @@ class smtpDeliver(threading.Thread, StoppableThread):
return cls._instance
def run(self):
while shared.shutdown == 0:
while state.shutdown == 0:
command, data = shared.UISignalQueue.get()
if command == 'writeNewAddressToTable':
label, address, streamNumber = data

View File

@ -7,7 +7,7 @@ import os
from configparser import BMConfigParser
import paths
from shared import shutdown
from state import shutdown
from debug import logger
libAvailable = True

View File

@ -15,6 +15,8 @@ import tr
import os
import ctypes
import state
bitmsglib = 'bitmsghash.so'
def _set_idle():
@ -43,10 +45,10 @@ def _doSafePoW(target, initialHash):
logger.debug("Safe PoW start")
nonce = 0
trialValue = float('inf')
while trialValue > target and shared.shutdown == 0:
while trialValue > target and state.shutdown == 0:
nonce += 1
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shared.shutdown != 0:
if state.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("Safe PoW done")
return [trialValue, nonce]
@ -71,7 +73,7 @@ def _doFastPoW(target, initialHash):
result.append(pool.apply_async(_pool_worker, args=(i, initialHash, target, pool_size)))
while True:
if shared.shutdown > 0:
if state.shutdown > 0:
try:
pool.terminate()
pool.join()
@ -101,7 +103,7 @@ def _doCPoW(target, initialHash):
logger.debug("C PoW start")
nonce = bmpow(out_h, out_m)
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shared.shutdown != 0:
if state.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("C PoW done")
return [trialValue, nonce]
@ -117,7 +119,7 @@ def _doGPUPoW(target, initialHash):
logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames)
openclpow.enabledGpus = []
raise Exception("GPU did not calculate correctly.")
if shared.shutdown != 0:
if state.shutdown != 0:
raise StopIteration("Interrupted")
logger.debug("GPU PoW done")
return [trialValue, nonce]
@ -186,7 +188,7 @@ def buildCPoW():
notifyBuild(True)
def run(target, initialHash):
if shared.shutdown != 0:
if state.shutdown != 0:
raise
target = int(target)
if openclpow.openclEnabled():

View File

@ -51,7 +51,6 @@ knownNodes = {}
printLock = threading.Lock()
statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit.
thisapp = None # singleton lock instance
alreadyAttemptedConnectionsList = {
} # This is a list of nodes to which we have already attempted a connection
@ -197,8 +196,7 @@ def reloadBroadcastSendersForWhichImWatching():
MyECSubscriptionCryptorObjects[tag] = highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))
def doCleanShutdown():
global shutdown
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
state.shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
try:
parserInputQueue.put(None, False)
except Queue.Full:
@ -228,7 +226,7 @@ def doCleanShutdown():
# Verify that the objectProcessor has finished exiting. It should have incremented the
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
while shutdown == 1:
while state.shutdown == 1:
time.sleep(.1)
# This one last useless query will guarantee that the previous flush committed and that the

View File

@ -15,6 +15,8 @@ networkProtocolAvailability = None
appdata = '' #holds the location of the application data storage directory
shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit.
# If the trustedpeer option is specified in keys.dat then this will
# contain a Peer which will be connected to instead of using the
# addresses advertised by other peers. The client will only connect to

View File

@ -9,6 +9,7 @@ import time
from configparser import BMConfigParser
from helper_threading import *
import shared
import state
import tr
def createRequestXML(service, action, arguments=None):
@ -197,7 +198,7 @@ class uPnPThread(threading.Thread, StoppableThread):
logger.debug("Starting UPnP thread")
logger.debug("Local IP: %s", self.localIP)
lastSent = 0
while shared.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'):
while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'):
if time.time() - lastSent > self.sendSleep and len(self.routers) == 0:
try:
self.sendSearchRouter()
@ -205,7 +206,7 @@ class uPnPThread(threading.Thread, StoppableThread):
pass
lastSent = time.time()
try:
while shared.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'):
while state.shutdown == 0 and BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'):
resp,(ip,port) = self.sock.recvfrom(1000)
if resp is None:
continue