Multiple PoW updates

- fixes "fast python" (multiprocessing) PoW
- python PoW (both slow and fast) interruptible on *NIX
- signal handler should handle multiple processes and threads correctly
(only tested on Linux)
- popul window asking whether to interrupt PoW when quitting QT GUI
- PoW status in "sent" folder fixes and now also displays broadcast
status which didn't exist before
- Fixes #894
This commit is contained in:
Peter Šurda 2016-10-05 20:06:47 +02:00
parent 029b0525de
commit e6ce73f4bd
Signed by: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
6 changed files with 149 additions and 87 deletions

View File

@ -74,6 +74,8 @@ from account import *
from dialogs import AddAddressDialog from dialogs import AddAddressDialog
from class_objectHashHolder import objectHashHolder from class_objectHashHolder import objectHashHolder
from class_singleWorker import singleWorker from class_singleWorker import singleWorker
from helper_generic import powQueueSize, invQueueSize
from proofofwork import getPowType
def _translate(context, text, disambiguation = None, encoding = None, number = None): def _translate(context, text, disambiguation = None, encoding = None, number = None):
if number is None: if number is None:
@ -991,7 +993,7 @@ class MyForm(settingsmixin.SMainWindow):
"MainWindow", "Waiting for their encryption key. Will request it again soon.") "MainWindow", "Waiting for their encryption key. Will request it again soon.")
elif status == 'doingpowforpubkey': elif status == 'doingpowforpubkey':
statusText = _translate( statusText = _translate(
"MainWindow", "Encryption key request queued.") "MainWindow", "Doing work necessary to request encryption key.")
elif status == 'msgqueued': elif status == 'msgqueued':
statusText = _translate( statusText = _translate(
"MainWindow", "Queued.") "MainWindow", "Queued.")
@ -1003,13 +1005,16 @@ class MyForm(settingsmixin.SMainWindow):
l10n.formatTimestamp(lastactiontime)) l10n.formatTimestamp(lastactiontime))
elif status == 'doingmsgpow': elif status == 'doingmsgpow':
statusText = _translate( statusText = _translate(
"MainWindow", "Need to do work to send message. Work is queued.") "MainWindow", "Doing work necessary to send message.")
elif status == 'ackreceived': elif status == 'ackreceived':
statusText = _translate("MainWindow", "Acknowledgement of the message received %1").arg( statusText = _translate("MainWindow", "Acknowledgement of the message received %1").arg(
l10n.formatTimestamp(lastactiontime)) l10n.formatTimestamp(lastactiontime))
elif status == 'broadcastqueued': elif status == 'broadcastqueued':
statusText = _translate( statusText = _translate(
"MainWindow", "Broadcast queued.") "MainWindow", "Broadcast queued.")
elif status == 'doingbroadcastpow':
statusText = _translate(
"MainWindow", "Doing work necessary to send broadcast.")
elif status == 'broadcastsent': elif status == 'broadcastsent':
statusText = _translate("MainWindow", "Broadcast on %1").arg( statusText = _translate("MainWindow", "Broadcast on %1").arg(
l10n.formatTimestamp(lastactiontime)) l10n.formatTimestamp(lastactiontime))
@ -2715,22 +2720,33 @@ class MyForm(settingsmixin.SMainWindow):
return return
''' '''
self.show()
self.raise_()
self.activateWindow()
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
"MainWindow", "Shutting down PyBitmessage... %1%").arg(str(0))) "MainWindow", "Shutting down PyBitmessage... %1%").arg(str(0)))
waitForPow = True
# C PoW currently doesn't support interrupting and OpenCL is untested
# Since Windows doesn't have UNIX-style signals, it probably doesn't work on Win either, so disabling there
if getPowType == "python" and ('win32' in sys.platform or 'win64' in sys.platform) and (ppowQueueSize() > 0 or invQueueSize() > 0):
reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"),
_translate("MainWindow", "%n object(s) pending proof of work", None, powQueueSize()) + ", " +
_translate("MainWindow", "%n object(s) waiting to be distributed", None, invQueueSize()) + "\n\n" +
_translate("MainWindow", "Wait until these tasks finish?"),
QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if reply == QtGui.QMessageBox.No:
waitForPow = False
if waitForPow:
# check if PoW queue empty # check if PoW queue empty
maxWorkerQueue = 0 maxWorkerQueue = 0
curWorkerQueue = 1 curWorkerQueue = powQueueSize()
while curWorkerQueue > 0: while curWorkerQueue > 0:
# worker queue size # worker queue size
curWorkerQueue = shared.workerQueue.qsize() curWorkerQueue = powQueueSize()
# if worker is busy add 1
for thread in threading.enumerate():
try:
if isinstance(thread, singleWorker):
curWorkerQueue += thread.busy
except:
pass
if curWorkerQueue > maxWorkerQueue: if curWorkerQueue > maxWorkerQueue:
maxWorkerQueue = curWorkerQueue maxWorkerQueue = curWorkerQueue
if curWorkerQueue > 0: if curWorkerQueue > 0:
@ -2748,15 +2764,9 @@ class MyForm(settingsmixin.SMainWindow):
# check if objectHashHolder empty # check if objectHashHolder empty
self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50))) self.statusBar().showMessage(_translate("MainWindow", "Waiting for objects to be sent... %1%").arg(str(50)))
maxWaitingObjects = 0 maxWaitingObjects = 0
curWaitingObjects = 1 curWaitingObjects = invQueueSize()
while curWaitingObjects > 0: while curWaitingObjects > 0:
curWaitingObjects = 0 curWaitingObjects = invQueueSize()
for thread in threading.enumerate():
try:
if isinstance(thread, objectHashHolder):
curWaitingObjects += thread.hashCount()
except:
pass
if curWaitingObjects > maxWaitingObjects: if curWaitingObjects > maxWaitingObjects:
maxWaitingObjects = curWaitingObjects maxWaitingObjects = curWaitingObjects
if curWaitingObjects > 0: if curWaitingObjects > 0:

View File

@ -14,7 +14,7 @@ import threading
class objectHashHolder(threading.Thread): class objectHashHolder(threading.Thread):
size = 10 size = 10
def __init__(self, sendDataThreadMailbox): def __init__(self, sendDataThreadMailbox):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="objectHashHolder")
self.shutdown = False self.shutdown = False
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread. self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
self.collectionOfHashLists = {} self.collectionOfHashLists = {}

View File

@ -63,7 +63,7 @@ class singleWorker(threading.Thread, StoppableThread):
# Initialize the shared.ackdataForWhichImWatching data structure # Initialize the shared.ackdataForWhichImWatching data structure
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''') '''SELECT ackdata FROM sent WHERE status = 'msgsent' ''')
for row in queryreturn: for row in queryreturn:
ackdata, = row ackdata, = row
logger.info('Watching for ackdata ' + hexlify(ackdata)) logger.info('Watching for ackdata ' + hexlify(ackdata))
@ -73,18 +73,12 @@ class singleWorker(threading.Thread, StoppableThread):
10) # give some time for the GUI to start before we start on existing POW tasks. 10) # give some time for the GUI to start before we start on existing POW tasks.
if shared.shutdown == 0: if shared.shutdown == 0:
queryreturn = sqlQuery(
'''SELECT DISTINCT toaddress FROM sent WHERE (status='doingpubkeypow' AND folder='sent')''')
for row in queryreturn:
toaddress, = row
logger.debug("c: %s", shared.shutdown)
self.requestPubKey(toaddress)
# just in case there are any pending tasks for msg # just in case there are any pending tasks for msg
# messages that have yet to be sent. # messages that have yet to be sent.
self.sendMsg() shared.workerQueue.put(('sendmessage', ''))
# just in case there are any tasks for Broadcasts # just in case there are any tasks for Broadcasts
# that have yet to be sent. # that have yet to be sent.
self.sendBroadcast() shared.workerQueue.put(('sendbroadcast', ''))
while shared.shutdown == 0: while shared.shutdown == 0:
self.busy = 0 self.busy = 0
@ -122,6 +116,7 @@ class singleWorker(threading.Thread, StoppableThread):
logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command) logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)
shared.workerQueue.task_done() shared.workerQueue.task_done()
logger.info("Quitting...")
def doPOWForMyV2Pubkey(self, hash): # This function also broadcasts out the pubkey message once it is done with the POW def doPOWForMyV2Pubkey(self, hash): # This function also broadcasts out the pubkey message once it is done with the POW
# Look up my stream number based on my address hash # Look up my stream number based on my address hash
@ -371,8 +366,12 @@ class singleWorker(threading.Thread, StoppableThread):
logger.error('Error: Couldn\'t add the lastpubkeysendtime to the keys.dat file. Error message: %s' % err) logger.error('Error: Couldn\'t add the lastpubkeysendtime to the keys.dat file. Error message: %s' % err)
def sendBroadcast(self): def sendBroadcast(self):
# Reset just in case
sqlExecute(
'''UPDATE sent SET status='broadcastqueued' WHERE status = 'doingbroadcastpow' ''')
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''SELECT fromaddress, subject, message, ackdata, ttl FROM sent WHERE status=? and folder='sent' ''', 'broadcastqueued') '''SELECT fromaddress, subject, message, ackdata, ttl FROM sent WHERE status=? and folder='sent' ''', 'broadcastqueued')
for row in queryreturn: for row in queryreturn:
fromaddress, subject, body, ackdata, TTL = row fromaddress, subject, body, ackdata, TTL = row
status, addressVersionNumber, streamNumber, ripe = decodeAddress( status, addressVersionNumber, streamNumber, ripe = decodeAddress(
@ -392,6 +391,10 @@ class singleWorker(threading.Thread, StoppableThread):
ackdata, tr._translate("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) ackdata, tr._translate("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
continue continue
sqlExecute(
'''UPDATE sent SET status='doingbroadcastpow' WHERE ackdata=? AND status='broadcastqueued' ''',
ackdata)
privSigningKeyHex = hexlify(shared.decodeWalletImportFormat( privSigningKeyHex = hexlify(shared.decodeWalletImportFormat(
privSigningKeyBase58)) privSigningKeyBase58))
privEncryptionKeyHex = hexlify(shared.decodeWalletImportFormat( privEncryptionKeyHex = hexlify(shared.decodeWalletImportFormat(
@ -496,15 +499,12 @@ class singleWorker(threading.Thread, StoppableThread):
def sendMsg(self): def sendMsg(self):
while True: # while we have a msg that needs some work # Reset just in case
sqlExecute(
# Select just one msg that needs work. '''UPDATE sent SET status='msgqueued' WHERE status IN ('doingpubkeypow', 'doingmsgpow')''')
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''SELECT toaddress, fromaddress, subject, message, ackdata, status, ttl, retrynumber FROM sent WHERE (status='msgqueued' or status='doingmsgpow' or status='forcepow') and folder='sent' LIMIT 1''') '''SELECT toaddress, fromaddress, subject, message, ackdata, status, ttl, retrynumber FROM sent WHERE (status='msgqueued' or status='forcepow') and folder='sent' ''')
if len(queryreturn) == 0: # if there is no work to do then for row in queryreturn: # while we have a msg that needs some work
break # break out of this sendMsg loop and
# wait for something to get put in the shared.workerQueue.
row = queryreturn[0]
toaddress, fromaddress, subject, message, ackdata, status, TTL, retryNumber = row toaddress, fromaddress, subject, message, ackdata, status, TTL, retryNumber = row
toStatus, toAddressVersionNumber, toStreamNumber, toRipe = decodeAddress( toStatus, toAddressVersionNumber, toStreamNumber, toRipe = decodeAddress(
toaddress) toaddress)

View File

@ -1,10 +1,33 @@
import os
import socket import socket
import sys import sys
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
from multiprocessing import current_process
from threading import current_thread, enumerate
from debug import logger from debug import logger
import shared import shared
def powQueueSize():
curWorkerQueue = shared.workerQueue.qsize()
for thread in enumerate():
try:
if thread.name == "singleWorker":
curWorkerQueue += thread.busy
except:
pass
return curWorkerQueue
def invQueueSize():
curInvQueue = 0
for thread in enumerate():
try:
if thread.name == "objectHashHolder":
curInvQueue += thread.hashCount()
except:
pass
return curInvQueue
def convertIntToString(n): def convertIntToString(n):
a = __builtins__.hex(n) a = __builtins__.hex(n)
if a[-1:] == 'L': if a[-1:] == 'L':
@ -19,6 +42,11 @@ def convertStringToInt(s):
return int(hexlify(s), 16) return int(hexlify(s), 16)
def signal_handler(signal, frame): def signal_handler(signal, frame):
logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name)
if current_process().name != "MainProcess":
raise StopIteration("Interrupted")
if current_thread().name != "MainThread":
return
logger.error("Got signal %i", signal) logger.error("Got signal %i", signal)
if shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'): if shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'):
shared.doCleanShutdown() shared.doCleanShutdown()

View File

@ -2,6 +2,7 @@
#import time #import time
#from multiprocessing import Pool, cpu_count #from multiprocessing import Pool, cpu_count
import hashlib import hashlib
import signal
from struct import unpack, pack from struct import unpack, pack
import sys import sys
from debug import logger from debug import logger
@ -42,7 +43,7 @@ def _doSafePoW(target, initialHash):
nonce += 1 nonce += 1
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shutdown != 0: if shutdown != 0:
raise Exception("Interrupted") raise StopIteration("Interrupted")
logger.debug("Safe PoW done") logger.debug("Safe PoW done")
return [trialValue, nonce] return [trialValue, nonce]
@ -62,10 +63,10 @@ def _doFastPoW(target, initialHash):
pool_size = maxCores pool_size = maxCores
# temporarily disable handlers # temporarily disable handlers
int_handler = signal.getsignal(signal.SIGINT) #int_handler = signal.getsignal(signal.SIGINT)
term_handler = signal.getsignal(signal.SIGTERM) #term_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGINT, signal.SIG_IGN) #signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN) #signal.signal(signal.SIGTERM, signal.SIG_IGN)
pool = Pool(processes=pool_size) pool = Pool(processes=pool_size)
result = [] result = []
@ -73,15 +74,19 @@ def _doFastPoW(target, initialHash):
result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size)))
# re-enable handlers # re-enable handlers
signal.signal(signal.SIGINT, int_handler) #signal.signal(signal.SIGINT, int_handler)
signal.signal(signal.SIGTERM, term_handler) #signal.signal(signal.SIGTERM, term_handler)
while True: while True:
if shutdown >= 1: if shutdown >= 1:
pool.terminate() pool.terminate()
raise Exception("Interrupted") raise StopIteration("Interrupted")
for i in range(pool_size): for i in range(pool_size):
if result[i].ready(): if result[i].ready():
try:
result[i].successful()
except AssertionError:
raise StopIteration("Interrupted")
result = result[i].get() result = result[i].get()
pool.terminate() pool.terminate()
pool.join() #Wait for the workers to exit... pool.join() #Wait for the workers to exit...
@ -98,7 +103,7 @@ def _doCPoW(target, initialHash):
nonce = bmpow(out_h, out_m) nonce = bmpow(out_h, out_m)
trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8])
if shutdown != 0: if shutdown != 0:
raise Exception("Interrupted") raise StopIteration("Interrupted")
logger.debug("C PoW done") logger.debug("C PoW done")
return [trialValue, nonce] return [trialValue, nonce]
@ -114,7 +119,7 @@ def _doGPUPoW(target, initialHash):
openclpow.ctx = False openclpow.ctx = False
raise Exception("GPU did not calculate correctly.") raise Exception("GPU did not calculate correctly.")
if shutdown != 0: if shutdown != 0:
raise Exception("Interrupted") raise StopIteration("Interrupted")
logger.debug("GPU PoW done") logger.debug("GPU PoW done")
return [trialValue, nonce] return [trialValue, nonce]
@ -143,7 +148,16 @@ def estimate(difficulty, format = False):
else: else:
return ret return ret
def getPowType():
if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
return "OpenCL"
if bmpow:
return "C"
return "python"
def run(target, initialHash): def run(target, initialHash):
if shutdown != 0:
raise
target = int(target) target = int(target)
if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl(): if safeConfigGetBoolean('bitmessagesettings', 'opencl') and openclpow.has_opencl():
# trialvalue1, nonce1 = _doGPUPoW(target, initialHash) # trialvalue1, nonce1 = _doGPUPoW(target, initialHash)
@ -153,16 +167,16 @@ def run(target, initialHash):
# return [trialvalue, nonce] # return [trialvalue, nonce]
try: try:
return _doGPUPoW(target, initialHash) return _doGPUPoW(target, initialHash)
except: except StopIteration:
if shutdown != 0:
raise raise
except:
pass # fallback pass # fallback
if bmpow: if bmpow:
try: try:
return _doCPoW(target, initialHash) return _doCPoW(target, initialHash)
except: except StopIteration:
if shutdown != 0:
raise raise
except:
pass # fallback pass # fallback
if frozen == "macosx_app" or not frozen: if frozen == "macosx_app" or not frozen:
# on my (Peter Surda) Windows 10, Windows Defender # on my (Peter Surda) Windows 10, Windows Defender
@ -171,15 +185,17 @@ def run(target, initialHash):
# added on 2015-11-29: multiprocesing.freeze_support() doesn't help # added on 2015-11-29: multiprocesing.freeze_support() doesn't help
try: try:
return _doFastPoW(target, initialHash) return _doFastPoW(target, initialHash)
except: except StopIteration:
if shutdown != 0: logger.error("Fast PoW got StopIteration")
raise raise
except:
logger.error("Fast PoW got exception:", exc_info=True)
pass #fallback pass #fallback
try: try:
return _doSafePoW(target, initialHash) return _doSafePoW(target, initialHash)
except: except StopIteration:
if shutdown != 0:
raise raise
except:
pass #fallback pass #fallback
# init # init

View File

@ -16,6 +16,8 @@ import os
import pickle import pickle
import Queue import Queue
import random import random
from multiprocessing import active_children
from signal import SIGTERM
import socket import socket
import sys import sys
import stat import stat
@ -502,6 +504,12 @@ def isProofOfWorkSufficient(data,
def doCleanShutdown(): def doCleanShutdown():
global shutdown, thisapp global shutdown, thisapp
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
for child in active_children():
try:
logger.info("Killing PoW child %i", child.pid)
os.kill(child.pid, SIGTERM)
except:
pass
broadcastToSendDataQueues((0, 'shutdown', 'no data')) broadcastToSendDataQueues((0, 'shutdown', 'no data'))
objectProcessorQueue.put(('checkShutdownVariable', 'no data')) objectProcessorQueue.put(('checkShutdownVariable', 'no data'))
for thread in threading.enumerate(): for thread in threading.enumerate():