Multiprocessing PoW fixes and improvements

- the multiprocessing PoW should now work correctly
- it also should be interruptible correctly and the GUI will ask about
  it during exit
This commit is contained in:
Peter Šurda 2016-10-22 05:00:35 +02:00
parent 9dd09a44fc
commit 32f1e0447a
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
4 changed files with 29 additions and 43 deletions

View File

@ -2735,8 +2735,7 @@ class MyForm(settingsmixin.SMainWindow):
waitForSync = False waitForSync = False
# C PoW currently doesn't support interrupting and OpenCL is untested # C PoW currently doesn't support interrupting and OpenCL is untested
# Since Windows doesn't have UNIX-style signals, it probably doesn't work on Win either, so disabling there if getPowType() == "python" and (powQueueSize() > 0 or invQueueSize() > 0):
if getPowType == "python" and ('win32' in sys.platform or 'win64' in sys.platform) and (powQueueSize() > 0 or invQueueSize() > 0):
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"), reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"),
_translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " + _translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " +
_translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, invQueueSize()) + "\n\n" + _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, invQueueSize()) + "\n\n" +

View File

@ -45,10 +45,9 @@ def signal_handler(signal, frame):
logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name) logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name)
if current_process().name == "RegExParser": if current_process().name == "RegExParser":
# on Windows this isn't triggered, but it's fine, it has its own process termination thing # on Windows this isn't triggered, but it's fine, it has its own process termination thing
print "RegExParser interrupted"
raise SystemExit raise SystemExit
if current_process().name != "MainProcess": if "PoolWorker" in current_process().name:
raise StopIteration("Interrupted") raise SystemExit
if current_thread().name != "MainThread": if current_thread().name != "MainThread":
return return
logger.error("Got signal %i", signal) logger.error("Got signal %i", signal)

View File

@ -2,11 +2,11 @@
#import time #import time
#from multiprocessing import Pool, cpu_count #from multiprocessing import Pool, cpu_count
import hashlib import hashlib
import signal
from struct import unpack, pack from struct import unpack, pack
import sys import sys
import time
from debug import logger from debug import logger
from shared import config, frozen, codePath, shutdown, safeConfigGetBoolean, UISignalQueue import shared
import openclpow import openclpow
import tr import tr
import os import os
@ -30,7 +30,7 @@ def _set_idle():
def _pool_worker(nonce, initialHash, target, pool_size): def _pool_worker(nonce, initialHash, target, pool_size):
_set_idle() _set_idle()
trialValue = float('inf') trialValue = float('inf')
while trialValue > target and shutdown == 0: while trialValue > target:
nonce += pool_size nonce += pool_size
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
return [trialValue, nonce] return [trialValue, nonce]
@ -39,57 +39,52 @@ def _doSafePoW(target, initialHash):
logger.debug("Safe PoW start") logger.debug("Safe PoW start")
nonce = 0 nonce = 0
trialValue = float('inf') trialValue = float('inf')
while trialValue > target and shutdown == 0: while trialValue > target and shared.shutdown == 0:
nonce += 1 nonce += 1
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shutdown != 0: if shared.shutdown != 0:
raise StopIteration("Interrupted") raise StopIteration("Interrupted")
logger.debug("Safe PoW done") logger.debug("Safe PoW done")
return [trialValue, nonce] return [trialValue, nonce]
def _doFastPoW(target, initialHash): def _doFastPoW(target, initialHash):
logger.debug("Fast PoW start") logger.debug("Fast PoW start")
import time
from multiprocessing import Pool, cpu_count from multiprocessing import Pool, cpu_count
try: try:
pool_size = cpu_count() pool_size = cpu_count()
except: except:
pool_size = 4 pool_size = 4
try: try:
maxCores = config.getint('bitmessagesettings', 'maxcores') maxCores = shared.config.getint('bitmessagesettings', 'maxcores')
except: except:
maxCores = 99999 maxCores = 99999
if pool_size > maxCores: if pool_size > maxCores:
pool_size = maxCores pool_size = maxCores
# temporarily disable handlers
#int_handler = signal.getsignal(signal.SIGINT)
#term_handler = signal.getsignal(signal.SIGTERM)
#signal.signal(signal.SIGINT, signal.SIG_IGN)
#signal.signal(signal.SIGTERM, signal.SIG_IGN)
pool = Pool(processes=pool_size) pool = Pool(processes=pool_size)
result = [] result = []
for i in range(pool_size): for i in range(pool_size):
result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) result.append(pool.apply_async(_pool_worker, args=(i, initialHash, target, pool_size)))
# re-enable handlers
#signal.signal(signal.SIGINT, int_handler)
#signal.signal(signal.SIGTERM, term_handler)
while True: while True:
if shutdown >= 1: if shared.shutdown > 0:
try:
pool.terminate() pool.terminate()
pool.join()
except:
pass
raise StopIteration("Interrupted") raise StopIteration("Interrupted")
for i in range(pool_size): for i in range(pool_size):
if result[i].ready(): if result[i].ready():
try: try:
result[i].successful() result[i].successful()
except AssertionError: except AssertionError:
pool.terminate()
pool.join()
raise StopIteration("Interrupted") raise StopIteration("Interrupted")
result = result[i].get() result = result[i].get()
pool.terminate() pool.terminate()
pool.join() #Wait for the workers to exit... pool.join()
logger.debug("Fast PoW done") logger.debug("Fast PoW done")
return result[0], result[1] return result[0], result[1]
time.sleep(0.2) time.sleep(0.2)
@ -102,7 +97,7 @@ def _doCPoW(target, initialHash):
logger.debug("C PoW start") logger.debug("C PoW start")
nonce = bmpow(out_h, out_m) nonce = bmpow(out_h, out_m)
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shutdown != 0: if shared.shutdown != 0:
raise StopIteration("Interrupted") raise StopIteration("Interrupted")
logger.debug("C PoW done") logger.debug("C PoW done")
return [trialValue, nonce] return [trialValue, nonce]
@ -114,11 +109,11 @@ def _doGPUPoW(target, initialHash):
#print "{} - value {} < {}".format(nonce, trialValue, target) #print "{} - value {} < {}".format(nonce, trialValue, target)
if trialValue > target: if trialValue > target:
deviceNames = ", ".join(gpu.name for gpu in openclpow.gpus) deviceNames = ", ".join(gpu.name for gpu in openclpow.gpus)
UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.'))) shared.UISignalQueue.put(('updateStatusBar', tr._translate("MainWindow",'Your GPU(s) did not calculate correctly, disabling OpenCL. Please report to the developers.')))
logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames) logger.error("Your GPUs (%s) did not calculate correctly, disabling OpenCL. Please report to the developers.", deviceNames)
openclpow.ctx = False openclpow.ctx = False
raise Exception("GPU did not calculate correctly.") raise Exception("GPU did not calculate correctly.")
if shutdown != 0: if shared.shutdown != 0:
raise StopIteration("Interrupted") raise StopIteration("Interrupted")
logger.debug("GPU PoW done") logger.debug("GPU PoW done")
return [trialValue, nonce] return [trialValue, nonce]
@ -149,17 +144,17 @@ def estimate(difficulty, format = False):
return ret return ret
def getPowType(): def getPowType():
if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): if shared.safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
return "OpenCL" return "OpenCL"
if bmpow: if bmpow:
return "C" return "C"
return "python" return "python"
def run(target, initialHash): def run(target, initialHash):
if shutdown != 0: if shared.shutdown != 0:
raise raise
target = int(target) target = int(target)
if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): if shared.safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
# trialvalue1, nonce1 = _doGPUPoW(target, initialHash) # trialvalue1, nonce1 = _doGPUPoW(target, initialHash)
# trialvalue, nonce = _doFastPoW(target, initialHash) # trialvalue, nonce = _doFastPoW(target, initialHash)
# print "GPU: %s, %s" % (trialvalue1, nonce1) # print "GPU: %s, %s" % (trialvalue1, nonce1)
@ -178,7 +173,7 @@ def run(target, initialHash):
raise raise
except: except:
pass # fallback pass # fallback
if frozen == "macosx_app" or not frozen: if shared.frozen == "macosx_app" or not shared.frozen:
# on my (Peter Surda) Windows 10, Windows Defender # on my (Peter Surda) Windows 10, Windows Defender
# does not like this and fights with PyBitmessage # does not like this and fights with PyBitmessage
# over CPU, resulting in very slow PoW # over CPU, resulting in very slow PoW
@ -207,7 +202,7 @@ if "win32" == sys.platform:
bitmsglib = 'bitmsghash64.dll' bitmsglib = 'bitmsghash64.dll'
try: try:
# MSVS # MSVS
bso = ctypes.WinDLL(os.path.join(codePath(), "bitmsghash", bitmsglib)) bso = ctypes.WinDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib))
logger.info("Loaded C PoW DLL (stdcall) %s", bitmsglib) logger.info("Loaded C PoW DLL (stdcall) %s", bitmsglib)
bmpow = bso.BitmessagePOW bmpow = bso.BitmessagePOW
bmpow.restype = ctypes.c_ulonglong bmpow.restype = ctypes.c_ulonglong
@ -217,7 +212,7 @@ if "win32" == sys.platform:
logger.error("C PoW test fail.", exc_info=True) logger.error("C PoW test fail.", exc_info=True)
try: try:
# MinGW # MinGW
bso = ctypes.CDLL(os.path.join(codePath(), "bitmsghash", bitmsglib)) bso = ctypes.CDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib))
logger.info("Loaded C PoW DLL (cdecl) %s", bitmsglib) logger.info("Loaded C PoW DLL (cdecl) %s", bitmsglib)
bmpow = bso.BitmessagePOW bmpow = bso.BitmessagePOW
bmpow.restype = ctypes.c_ulonglong bmpow.restype = ctypes.c_ulonglong
@ -228,7 +223,7 @@ if "win32" == sys.platform:
bso = None bso = None
else: else:
try: try:
bso = ctypes.CDLL(os.path.join(codePath(), "bitmsghash", bitmsglib)) bso = ctypes.CDLL(os.path.join(shared.codePath(), "bitmsghash", bitmsglib))
logger.info("Loaded C PoW DLL %s", bitmsglib) logger.info("Loaded C PoW DLL %s", bitmsglib)
except: except:
bso = None bso = None

View File

@ -17,7 +17,6 @@ import pickle
import Queue import Queue
import random import random
from multiprocessing import active_children, Queue as mpQueue, Lock as mpLock from multiprocessing import active_children, Queue as mpQueue, Lock as mpLock
from signal import SIGTERM
import socket import socket
import sys import sys
import stat import stat
@ -512,12 +511,6 @@ def doCleanShutdown():
parserInputQueue.put(None, False) parserInputQueue.put(None, False)
except Queue.Full: except Queue.Full:
pass pass
for child in active_children():
try:
logger.info("Killing PoW child %i", child.pid)
os.kill(child.pid, SIGTERM)
except:
pass
broadcastToSendDataQueues((0, 'shutdown', 'no data')) broadcastToSendDataQueues((0, 'shutdown', 'no data'))
objectProcessorQueue.put(('checkShutdownVariable', 'no data')) objectProcessorQueue.put(('checkShutdownVariable', 'no data'))
for thread in threading.enumerate(): for thread in threading.enumerate():