From 32f1e0447a69b4c62540b597d99ae6d60869c195 Mon Sep 17 00:00:00 2001 From: Peter Surda Date: Sat, 22 Oct 2016 05:00:35 +0200 Subject: [PATCH] Multiprocessing PoW fixes and improvements - the multiprocessing PoW should now work correctly - it also should be interruptible correctly and the GUI will ask about it during exit --- src/bitmessageqt/__init__.py | 3 +- src/helper_generic.py | 5 ++-- src/proofofwork.py | 57 ++++++++++++++++-------------------- src/shared.py | 7 ----- 4 files changed, 29 insertions(+), 43 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index e47ff0a8..5c407c34 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2735,8 +2735,7 @@ class MyForm(settingsmixin.SMainWindow): waitForSync = False # C PoW currently doesn't support interrupting and OpenCL is untested - # Since Windows doesn't have UNIX-style signals, it probably doesn't work on Win either, so disabling there - if getPowType == "python" and ('win32' in sys.platform or 'win64' in sys.platform) and (powQueueSize() > 0 or invQueueSize() > 0): + if getPowType() == "python" and (powQueueSize() > 0 or invQueueSize() > 0): reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"), _translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " + _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, invQueueSize()) + "\n\n" + diff --git a/src/helper_generic.py b/src/helper_generic.py index bcac4ad4..5997a8ca 100644 --- a/src/helper_generic.py +++ b/src/helper_generic.py @@ -45,10 +45,9 @@ def signal_handler(signal, frame): logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name) if current_process().name == "RegExParser": # on Windows this isn't triggered, but it's fine, it has its own process termination thing - print "RegExParser interrupted" raise SystemExit - if current_process().name != "MainProcess": - raise StopIteration("Interrupted") + if "PoolWorker" in current_process().name: + raise SystemExit if current_thread().name != "MainThread": return logger.error("Got signal %i", signal) diff --git a/src/proofofwork.py b/src/proofofwork.py index b83aea3e..b4ce69b2 100644 --- a/src/proofofwork.py +++ b/src/proofofwork.py @@ -2,11 +2,11 @@ #import time #from multiprocessing import Pool, cpu_count import hashlib -import signal from struct import unpack, pack import sys +import time from debug import logger -from shared import config, frozen, codePath, shutdown, safeConfigGetBoolean, UISignalQueue +import shared import openclpow import tr import os @@ -30,7 +30,7 @@ def _set_idle(): def _pool_worker(nonce, initialHash, target, pool_size): _set_idle() trialValue = float('inf') - while trialValue > target and shutdown == 0: + while trialValue > target: nonce += pool_size trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) return [trialValue, nonce] @@ -39,57 +39,52 @@ def _doSafePoW(target, initialHash): logger.debug("Safe PoW start") nonce = 0 trialValue = float('inf') - while trialValue > target and shutdown == 0: + while trialValue > target and shared.shutdown == 0: nonce += 1 trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) - if shutdown != 0: + if shared.shutdown != 0: raise StopIteration("Interrupted") logger.debug("Safe PoW done") return [trialValue, nonce] def _doFastPoW(target, initialHash): logger.debug("Fast PoW start") - import time from multiprocessing import Pool, cpu_count try: pool_size = cpu_count() except: pool_size = 4 try: - maxCores = config.getint('bitmessagesettings', 'maxcores') + maxCores = shared.config.getint('bitmessagesettings', 'maxcores') except: maxCores = 99999 if pool_size > maxCores: pool_size = maxCores - # temporarily disable handlers - #int_handler = signal.getsignal(signal.SIGINT) - #term_handler = signal.getsignal(signal.SIGTERM) - #signal.signal(signal.SIGINT, signal.SIG_IGN) - #signal.signal(signal.SIGTERM, signal.SIG_IGN) - pool = Pool(processes=pool_size) result = [] for i in range(pool_size): - result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) - - # re-enable handlers - #signal.signal(signal.SIGINT, int_handler) - #signal.signal(signal.SIGTERM, term_handler) + result.append(pool.apply_async(_pool_worker, args=(i, initialHash, target, pool_size))) while True: - if shutdown >= 1: - pool.terminate() + if shared.shutdown > 0: + try: + pool.terminate() + pool.join() + except: + pass raise StopIteration("Interrupted") for i in range(pool_size): if result[i].ready(): try: result[i].successful() except AssertionError: + pool.terminate() + pool.join() raise StopIteration("Interrupted") result = result[i].get() pool.terminate() - pool.join() #Wait for the workers to exit... + pool.join() logger.debug("Fast PoW done") return result[0], result[1] time.sleep(0.2) @@ -102,7 +97,7 @@ def _doCPoW(target, initialHash): logger.debug("C PoW start") nonce = bmpow(out_h, out_m) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) - if shutdown != 0: + if shared.shutdown != 0: raise StopIteration("Interrupted") logger.debug("C PoW done") return [trialValue, nonce] @@ -114,11 +109,11 @@ def _doGPUPoW(target, initialHash): #print "{} - value {} < {}".format(nonce, trialValue, target) if trialValue > target: deviceNames = ", ".join(gpu.name for gpu in openclpow.gpus) - UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.'))) + shared.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.'))) logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames) openclpow.ctx = False raise Exception("GPU did not calculate correctly.") - if shutdown != 0: + if shared.shutdown != 0: raise StopIteration("Interrupted") logger.debug("GPU PoW done") return [trialValue, nonce] @@ -149,17 +144,17 @@ def estimate(difficulty, format = False): return ret def getPowType(): - if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): + if shared.safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): return "OpenCL" if bmpow: return "C" return "python" def run(target, initialHash): - if shutdown != 0: + if shared.shutdown != 0: raise target = int(target) - if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): + if shared.safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): # trialvalue1, nonce1 = _doGPUPoW(target, initialHash) # trialvalue, nonce = _doFastPoW(target, initialHash) # print "GPU: %s, %s" % (trialvalue1, nonce1) @@ -178,7 +173,7 @@ def run(target, initialHash): raise except: pass # fallback - if frozen == "macosx_app" or not frozen: + if shared.frozen == "macosx_app" or not shared.frozen: # on my (Peter Surda) Windows 10, Windows Defender # does not like this and fights with PyBitmessage # over CPU, resulting in very slow PoW @@ -207,7 +202,7 @@ if "win32" == sys.platform: bitmsglib = 'bitmsghash64.dll' try: # MSVS - bso = ctypes.WinDLL(os.path.join(codePath(), "bitmsghash", bitmsglib)) + bso = ctypes.WinDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib)) logger.info("Loaded C PoW DLL (stdcall) %s", bitmsglib) bmpow = bso.BitmessagePOW bmpow.restype = ctypes.c_ulonglong @@ -217,7 +212,7 @@ if "win32" == sys.platform: logger.error("C PoW test fail.", exc_info=True) try: # MinGW - bso = ctypes.CDLL(os.path.join(codePath(), "bitmsghash", bitmsglib)) + bso = ctypes.CDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib)) logger.info("Loaded C PoW DLL (cdecl) %s", bitmsglib) bmpow = bso.BitmessagePOW bmpow.restype = ctypes.c_ulonglong @@ -228,7 +223,7 @@ if "win32" == sys.platform: bso = None else: try: - bso = ctypes.CDLL(os.path.join(codePath(), "bitmsghash", bitmsglib)) + bso = ctypes.CDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib)) logger.info("Loaded C PoW DLL %s", bitmsglib) except: bso = None diff --git a/src/shared.py b/src/shared.py index de50d89d..3fdbc075 100644 --- a/src/shared.py +++ b/src/shared.py @@ -17,7 +17,6 @@ import pickle import Queue import random from multiprocessing import active_children, Queue as mpQueue, Lock as mpLock -from signal import SIGTERM import socket import sys import stat @@ -512,12 +511,6 @@ def doCleanShutdown(): parserInputQueue.put(None, False) except Queue.Full: pass - for child in active_children(): - try: - logger.info("Killing PoW child %i", child.pid) - os.kill(child.pid, SIGTERM) - except: - pass broadcastToSendDataQueues((0, 'shutdown', 'no data')) objectProcessorQueue.put(('checkShutdownVariable', 'no data')) for thread in threading.enumerate():