From e6ce73f4bde0e9c0def34489be05180db0f6f066 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Wed, 5 Oct 2016 20:06:47 +0200 Subject: [PATCH] Multiple PoW updates - fixes "fast python" (multiprocessing) PoW - python PoW (both slow and fast) interruptible on *NIX - signal handler should handle multiple processes and threads correctly (only tested on Linux) - popul window asking whether to interrupt PoW when quitting QT GUI - PoW status in "sent" folder fixes and now also displays broadcast status which didn't exist before - Fixes #894 --- src/bitmessageqt/__init__.py | 110 ++++++++++++++++++---------------- src/class_objectHashHolder.py | 2 +- src/class_singleWorker.py | 36 +++++------ src/helper_generic.py | 28 +++++++++ src/proofofwork.py | 52 ++++++++++------ src/shared.py | 8 +++ 6 files changed, 149 insertions(+), 87 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 593a8b4b..6fa59542 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -74,6 +74,8 @@ from account import * from dialogs import AddAddressDialog from class_objectHashHolder import objectHashHolder from class_singleWorker import singleWorker +from helper_generic import powQueueSize, invQueueSize +from proofofwork import getPowType def _translate(context, text, disambiguation = None, encoding = None, number = None): if number is None: @@ -991,7 +993,7 @@ class MyForm(settingsmixin.SMainWindow): "MainWindow", "Waiting for their encryption key. Will request it again soon.") elif status == 'doingpowforpubkey': statusText = _translate( - "MainWindow", "Encryption key request queued.") + "MainWindow", "Doing work necessary to request encryption key.") elif status == 'msgqueued': statusText = _translate( "MainWindow", "Queued.") @@ -1003,13 +1005,16 @@ class MyForm(settingsmixin.SMainWindow): l10n.formatTimestamp(lastactiontime)) elif status == 'doingmsgpow': statusText = _translate( - "MainWindow", "Need to do work to send message. Work is queued.") + "MainWindow", "Doing work necessary to send message.") elif status == 'ackreceived': statusText = _translate("MainWindow", "Acknowledgement of the message received %1").arg( l10n.formatTimestamp(lastactiontime)) elif status == 'broadcastqueued': statusText = _translate( "MainWindow", "Broadcast queued.") + elif status == 'doingbroadcastpow': + statusText = _translate( + "MainWindow", "Doing work necessary to send broadcast.") elif status == 'broadcastsent': statusText = _translate("MainWindow", "Broadcast on %1").arg( l10n.formatTimestamp(lastactiontime)) @@ -2715,58 +2720,63 @@ class MyForm(settingsmixin.SMainWindow): return ''' + self.show() + self.raise_() + self.activateWindow() + self.statusBar().showMessage(_translate( "MainWindow", "Shutting down PyBitmessage... %1%").arg(str(0))) + + waitForPow = True + + # C PoW currently doesn't support interrupting and OpenCL is untested + # Since Windows doesn't have UNIX-style signals, it probably doesn't work on Win either, so disabling there + if getPowType == "python" and ('win32' in sys.platform or 'win64' in sys.platform) and (ppowQueueSize() > 0 or invQueueSize() > 0): + reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"), + _translate("MainWindow", "%n object(s) pending proof of work", None, powQueueSize()) + ", " + + _translate("MainWindow", "%n object(s) waiting to be distributed", None, invQueueSize()) + "\n\n" + + _translate("MainWindow", "Wait until these tasks finish?"), + QtGui.QMessageBox.Yes, QtGui.QMessageBox.No) + if reply == QtGui.QMessageBox.No: + waitForPow = False + + if waitForPow: + # check if PoW queue empty + maxWorkerQueue = 0 + curWorkerQueue = powQueueSize() + while curWorkerQueue > 0: + # worker queue size + curWorkerQueue = powQueueSize() + if curWorkerQueue > maxWorkerQueue: + maxWorkerQueue = curWorkerQueue + if curWorkerQueue > 0: + self.statusBar().showMessage(_translate("MainWindow", "Waiting for PoW to finish... %1%").arg(str(50 * (maxWorkerQueue - curWorkerQueue) / maxWorkerQueue))) + time.sleep(0.5) + QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - # check if PoW queue empty - maxWorkerQueue = 0 - curWorkerQueue = 1 - while curWorkerQueue > 0: - # worker queue size - curWorkerQueue = shared.workerQueue.qsize() - # if worker is busy add 1 - for thread in threading.enumerate(): - try: - if isinstance(thread, singleWorker): - curWorkerQueue += thread.busy - except: - pass - if curWorkerQueue > maxWorkerQueue: - maxWorkerQueue = curWorkerQueue - if curWorkerQueue > 0: - self.statusBar().showMessage(_translate("MainWindow", "Waiting for PoW to finish... %1%").arg(str(50 * (maxWorkerQueue - curWorkerQueue) / maxWorkerQueue))) - time.sleep(0.5) - QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) + self.statusBar().showMessage(_translate("MainWindow", "Shutting down Pybitmessage... %1%").arg(str(50))) + + QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) + if maxWorkerQueue > 0: + time.sleep(0.5) # a bit of time so that the hashHolder is populated + QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) + + # check if objectHashHolder empty + self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50))) + maxWaitingObjects = 0 + curWaitingObjects = invQueueSize() + while curWaitingObjects > 0: + curWaitingObjects = invQueueSize() + if curWaitingObjects > maxWaitingObjects: + maxWaitingObjects = curWaitingObjects + if curWaitingObjects > 0: + self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50 + 20 * (maxWaitingObjects - curWaitingObjects) / maxWaitingObjects))) + time.sleep(0.5) + QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - self.statusBar().showMessage(_translate("MainWindow", "Shutting down Pybitmessage... %1%").arg(str(50))) - - QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - if maxWorkerQueue > 0: - time.sleep(0.5) # a bit of time so that the hashHolder is populated - QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - - # check if objectHashHolder empty - self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50))) - maxWaitingObjects = 0 - curWaitingObjects = 1 - while curWaitingObjects > 0: - curWaitingObjects = 0 - for thread in threading.enumerate(): - try: - if isinstance(thread, objectHashHolder): - curWaitingObjects += thread.hashCount() - except: - pass - if curWaitingObjects > maxWaitingObjects: - maxWaitingObjects = curWaitingObjects - if curWaitingObjects > 0: - self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50 + 20 * (maxWaitingObjects - curWaitingObjects) / maxWaitingObjects))) - time.sleep(0.5) - QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - - QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) - if maxWorkerQueue > 0 or maxWaitingObjects > 0: - time.sleep(10) # a bit of time so that the other nodes retrieve the objects + QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) + if maxWorkerQueue > 0 or maxWaitingObjects > 0: + time.sleep(10) # a bit of time so that the other nodes retrieve the objects QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 1000) # save state and geometry self and all widgets diff --git a/src/class_objectHashHolder.py b/src/class_objectHashHolder.py index e4e2c6d8..3fc260d2 100644 --- a/src/class_objectHashHolder.py +++ b/src/class_objectHashHolder.py @@ -14,7 +14,7 @@ import threading class objectHashHolder(threading.Thread): size = 10 def __init__(self, sendDataThreadMailbox): - threading.Thread.__init__(self) + threading.Thread.__init__(self, name="objectHashHolder") self.shutdown = False self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread. self.collectionOfHashLists = {} diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index f5ac16aa..f6fc9a80 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -63,7 +63,7 @@ class singleWorker(threading.Thread, StoppableThread): # Initialize the shared.ackdataForWhichImWatching data structure queryreturn = sqlQuery( - '''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''') + '''SELECT ackdata FROM sent WHERE status = 'msgsent' ''') for row in queryreturn: ackdata, = row logger.info('Watching for ackdata ' + hexlify(ackdata)) @@ -73,18 +73,12 @@ class singleWorker(threading.Thread, StoppableThread): 10) # give some time for the GUI to start before we start on existing POW tasks. if shared.shutdown == 0: - queryreturn = sqlQuery( - '''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''') - for row in queryreturn: - toaddress, = row - logger.debug("c: %s", shared.shutdown) - self.requestPubKey(toaddress) # just in case there are any pending tasks for msg # messages that have yet to be sent. - self.sendMsg() + shared.workerQueue.put(('sendmessage', '')) # just in case there are any tasks for Broadcasts # that have yet to be sent. - self.sendBroadcast() + shared.workerQueue.put(('sendbroadcast', '')) while shared.shutdown == 0: self.busy = 0 @@ -122,6 +116,7 @@ class singleWorker(threading.Thread, StoppableThread): logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command) shared.workerQueue.task_done() + logger.info("Quitting...") def doPOWForMyV2Pubkey(self, hash): # This function also broadcasts out the pubkey message once it is done with the POW # Look up my stream number based on my address hash @@ -371,8 +366,12 @@ class singleWorker(threading.Thread, StoppableThread): logger.error('Error: Couldn\'t add the lastpubkeysendtime to the keys.dat file. Error message: %s' % err) def sendBroadcast(self): + # Reset just in case + sqlExecute( + '''UPDATE sent SET status='broadcastqueued' WHERE status = 'doingbroadcastpow' ''') queryreturn = sqlQuery( '''SELECT fromaddress, subject, message, ackdata, ttl FROM sent WHERE status=? and folder='sent' ''', 'broadcastqueued') + for row in queryreturn: fromaddress, subject, body, ackdata, TTL = row status, addressVersionNumber, streamNumber, ripe = decodeAddress( @@ -392,6 +391,10 @@ class singleWorker(threading.Thread, StoppableThread): ackdata, tr._translate("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) continue + sqlExecute( + '''UPDATE sent SET status='doingbroadcastpow' WHERE ackdata=? AND status='broadcastqueued' ''', + ackdata) + privSigningKeyHex = hexlify(shared.decodeWalletImportFormat( privSigningKeyBase58)) privEncryptionKeyHex = hexlify(shared.decodeWalletImportFormat( @@ -496,15 +499,12 @@ class singleWorker(threading.Thread, StoppableThread): def sendMsg(self): - while True: # while we have a msg that needs some work - - # Select just one msg that needs work. - queryreturn = sqlQuery( - '''SELECT toaddress, fromaddress, subject, message, ackdata, status, ttl, retrynumber FROM sent WHERE (status='msgqueued' or status='doingmsgpow' or status='forcepow') and folder='sent' LIMIT 1''') - if len(queryreturn) == 0: # if there is no work to do then - break # break out of this sendMsg loop and - # wait for something to get put in the shared.workerQueue. - row = queryreturn[0] + # Reset just in case + sqlExecute( + '''UPDATE sent SET status='msgqueued' WHERE status IN ('doingpubkeypow', 'doingmsgpow')''') + queryreturn = sqlQuery( + '''SELECT toaddress, fromaddress, subject, message, ackdata, status, ttl, retrynumber FROM sent WHERE (status='msgqueued' or status='forcepow') and folder='sent' ''') + for row in queryreturn: # while we have a msg that needs some work toaddress, fromaddress, subject, message, ackdata, status, TTL, retryNumber = row toStatus, toAddressVersionNumber, toStreamNumber, toRipe = decodeAddress( toaddress) diff --git a/src/helper_generic.py b/src/helper_generic.py index b6516c05..7be8453a 100644 --- a/src/helper_generic.py +++ b/src/helper_generic.py @@ -1,10 +1,33 @@ +import os import socket import sys from binascii import hexlify, unhexlify +from multiprocessing import current_process +from threading import current_thread, enumerate from debug import logger import shared +def powQueueSize(): + curWorkerQueue = shared.workerQueue.qsize() + for thread in enumerate(): + try: + if thread.name == "singleWorker": + curWorkerQueue += thread.busy + except: + pass + return curWorkerQueue + +def invQueueSize(): + curInvQueue = 0 + for thread in enumerate(): + try: + if thread.name == "objectHashHolder": + curInvQueue += thread.hashCount() + except: + pass + return curInvQueue + def convertIntToString(n): a = __builtins__.hex(n) if a[-1:] == 'L': @@ -19,6 +42,11 @@ def convertStringToInt(s): return int(hexlify(s), 16) def signal_handler(signal, frame): + logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name) + if current_process().name != "MainProcess": + raise StopIteration("Interrupted") + if current_thread().name != "MainThread": + return logger.error("Got signal %i", signal) if shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'): shared.doCleanShutdown() diff --git a/src/proofofwork.py b/src/proofofwork.py index 3bc99c83..b83aea3e 100644 --- a/src/proofofwork.py +++ b/src/proofofwork.py @@ -2,6 +2,7 @@ #import time #from multiprocessing import Pool, cpu_count import hashlib +import signal from struct import unpack, pack import sys from debug import logger @@ -42,7 +43,7 @@ def _doSafePoW(target, initialHash): nonce += 1 trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) if shutdown != 0: - raise Exception("Interrupted") + raise StopIteration("Interrupted") logger.debug("Safe PoW done") return [trialValue, nonce] @@ -62,10 +63,10 @@ def _doFastPoW(target, initialHash): pool_size = maxCores # temporarily disable handlers - int_handler = signal.getsignal(signal.SIGINT) - term_handler = signal.getsignal(signal.SIGTERM) - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_IGN) + #int_handler = signal.getsignal(signal.SIGINT) + #term_handler = signal.getsignal(signal.SIGTERM) + #signal.signal(signal.SIGINT, signal.SIG_IGN) + #signal.signal(signal.SIGTERM, signal.SIG_IGN) pool = Pool(processes=pool_size) result = [] @@ -73,15 +74,19 @@ def _doFastPoW(target, initialHash): result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) # re-enable handlers - signal.signal(signal.SIGINT, int_handler) - signal.signal(signal.SIGTERM, term_handler) + #signal.signal(signal.SIGINT, int_handler) + #signal.signal(signal.SIGTERM, term_handler) while True: if shutdown >= 1: pool.terminate() - raise Exception("Interrupted") + raise StopIteration("Interrupted") for i in range(pool_size): if result[i].ready(): + try: + result[i].successful() + except AssertionError: + raise StopIteration("Interrupted") result = result[i].get() pool.terminate() pool.join() #Wait for the workers to exit... @@ -98,7 +103,7 @@ def _doCPoW(target, initialHash): nonce = bmpow(out_h, out_m) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) if shutdown != 0: - raise Exception("Interrupted") + raise StopIteration("Interrupted") logger.debug("C PoW done") return [trialValue, nonce] @@ -114,7 +119,7 @@ def _doGPUPoW(target, initialHash): openclpow.ctx = False raise Exception("GPU did not calculate correctly.") if shutdown != 0: - raise Exception("Interrupted") + raise StopIteration("Interrupted") logger.debug("GPU PoW done") return [trialValue, nonce] @@ -143,7 +148,16 @@ def estimate(difficulty, format = False): else: return ret +def getPowType(): + if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): + return "OpenCL" + if bmpow: + return "C" + return "python" + def run(target, initialHash): + if shutdown != 0: + raise target = int(target) if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): # trialvalue1, nonce1 = _doGPUPoW(target, initialHash) @@ -153,16 +167,16 @@ def run(target, initialHash): # return [trialvalue, nonce] try: return _doGPUPoW(target, initialHash) + except StopIteration: + raise except: - if shutdown != 0: - raise pass # fallback if bmpow: try: return _doCPoW(target, initialHash) + except StopIteration: + raise except: - if shutdown != 0: - raise pass # fallback if frozen == "macosx_app" or not frozen: # on my (Peter Surda) Windows 10, Windows Defender @@ -171,15 +185,17 @@ def run(target, initialHash): # added on 2015-11-29: multiprocesing.freeze_support() doesn't help try: return _doFastPoW(target, initialHash) + except StopIteration: + logger.error("Fast PoW got StopIteration") + raise except: - if shutdown != 0: - raise + logger.error("Fast PoW got exception:", exc_info=True) pass #fallback try: return _doSafePoW(target, initialHash) + except StopIteration: + raise except: - if shutdown != 0: - raise pass #fallback # init diff --git a/src/shared.py b/src/shared.py index 09433375..8e07cf7b 100644 --- a/src/shared.py +++ b/src/shared.py @@ -16,6 +16,8 @@ import os import pickle import Queue import random +from multiprocessing import active_children +from signal import SIGTERM import socket import sys import stat @@ -502,6 +504,12 @@ def isProofOfWorkSufficient(data, def doCleanShutdown(): global shutdown, thisapp shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. + for child in active_children(): + try: + logger.info("Killing PoW child %i", child.pid) + os.kill(child.pid, SIGTERM) + except: + pass broadcastToSendDataQueues((0, 'shutdown', 'no data')) objectProcessorQueue.put(('checkShutdownVariable', 'no data')) for thread in threading.enumerate():