From 8e0bff7d52658353c7fe1cccc77fbcd15853383b Mon Sep 17 00:00:00 2001 From: Biryuzovye Kleshni Date: Sun, 15 Jul 2018 18:37:23 +0000 Subject: [PATCH] Connect WorkProver to SingleWorker --- src/bitmessagecurses/__init__.py | 2 +- src/bitmessagemain.py | 3 +- src/bitmessageqt/__init__.py | 18 +- src/bitmessageqt/uisignaler.py | 2 + src/class_addressGenerator.py | 24 +- src/class_objectProcessor.py | 150 ++-- src/class_singleCleaner.py | 100 +-- src/class_singleWorker.py | 1418 ------------------------------ src/class_sqlThread.py | 79 +- src/helper_generic.py | 12 - src/network/bmobject.py | 1 + src/network/bmproto.py | 2 +- src/protocol.py | 213 ++--- src/shared.py | 145 --- src/singleworker.py | 942 ++++++++++++++++++++ src/state.py | 5 + src/workprover/__init__.py | 12 +- src/workprover/forkingsolver.py | 23 +- src/workprover/test.py | 2 +- 19 files changed, 1268 insertions(+), 1885 deletions(-) delete mode 100644 src/class_singleWorker.py create mode 100644 src/singleworker.py diff --git a/src/bitmessagecurses/__init__.py b/src/bitmessagecurses/__init__.py index fe9a77d2..df654391 100644 --- a/src/bitmessagecurses/__init__.py +++ b/src/bitmessagecurses/__init__.py @@ -918,7 +918,7 @@ def loadSent(): # Set status string if status == "awaitingpubkey": statstr = "Waiting for their public key. Will request it again soon" - elif status == "doingpowforpubkey": + elif status == "doingpubkeypow": statstr = "Encryption key request queued" elif status == "msgqueued": statstr = "Message queued" diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 507ddf14..aa7a0ffe 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -201,7 +201,6 @@ class Main: smtpServerThread = smtpServer() smtpServerThread.start() - # Start the thread that calculates POWs objectProcessorThread = objectProcessor() # DON'T close the main program even the thread remains. # This thread checks the shutdown variable after processing @@ -430,7 +429,7 @@ if __name__ == "__main__": from class_sqlThread import sqlThread from class_singleCleaner import singleCleaner from class_objectProcessor import objectProcessor - from class_singleWorker import singleWorker + from singleworker import singleWorker from class_addressGenerator import addressGenerator from bmconfigparser import BMConfigParser diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 208923fc..f0932446 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -44,7 +44,6 @@ from account import ( getSortedAccounts, getSortedSubscriptions, accountClass, BMAccount, GatewayAccount, MailchuckAccount, AccountColor) import dialogs -from helper_generic import powQueueSize from network.stats import pendingDownload, pendingUpload from uisignaler import UISignaler import knownnodes @@ -760,6 +759,8 @@ class MyForm(settingsmixin.SMainWindow): "newVersionAvailable(PyQt_PyObject)"), self.newVersionAvailable) QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL( "displayAlert(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), self.displayAlert) + QtCore.QObject.connect(self.UISignalThread, QtCore.SIGNAL( + "updateWorkProverStatus(PyQt_PyObject)"), self.updateWorkProverStatus) self.UISignalThread.start() # Key press in tree view @@ -809,6 +810,8 @@ class MyForm(settingsmixin.SMainWindow): ' Fetch Namecoin ID button') self.ui.pushButtonFetchNamecoinID.hide() + self.POWTasksCount = 0 + def updateTTL(self, sliderPosition): TTL = int(sliderPosition ** 3.199 + 3600) self.updateHumanFriendlyTTLDescription(TTL) @@ -1036,7 +1039,7 @@ class MyForm(settingsmixin.SMainWindow): if status == 'awaitingpubkey': statusText = _translate( "MainWindow", "Waiting for their encryption key. Will request it again soon.") - elif status == 'doingpowforpubkey': + elif status == 'doingpubkeypow': statusText = _translate( "MainWindow", "Doing work necessary to request encryption key.") elif status == 'msgqueued': @@ -1790,6 +1793,9 @@ class MyForm(settingsmixin.SMainWindow): if exitAfterUserClicksOk: os._exit(0) + def updateWorkProverStatus(self, status): + self.POWTasksCount = status[3] + def rerenderMessagelistFromLabels(self): for messagelist in (self.ui.tableWidgetInbox, self.ui.tableWidgetInboxChans, self.ui.tableWidgetInboxSubscriptions): for i in range(messagelist.rowCount()): @@ -2712,9 +2718,9 @@ class MyForm(settingsmixin.SMainWindow): waitForSync = False # C PoW currently doesn't support interrupting and OpenCL is untested - if getPowType() == "python" and (powQueueSize() > 0 or pendingUpload() > 0): + if getPowType() == "python" and (self.POWTasksCount > 0 or pendingUpload() > 0): reply = QtGui.QMessageBox.question(self, _translate("MainWindow", "Proof of work pending"), - _translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, powQueueSize()) + ", " + + _translate("MainWindow", "%n object(s) pending proof of work", None, QtCore.QCoreApplication.CodecForTr, self.POWTasksCount) + ", " + _translate("MainWindow", "%n object(s) waiting to be distributed", None, QtCore.QCoreApplication.CodecForTr, pendingUpload()) + "\n\n" + _translate("MainWindow", "Wait until these tasks finish?"), QtGui.QMessageBox.Yes|QtGui.QMessageBox.No|QtGui.QMessageBox.Cancel, QtGui.QMessageBox.Cancel) @@ -2770,10 +2776,10 @@ class MyForm(settingsmixin.SMainWindow): if waitForPow: # check if PoW queue empty maxWorkerQueue = 0 - curWorkerQueue = powQueueSize() + curWorkerQueue = self.POWTasksCount while curWorkerQueue > 0: # worker queue size - curWorkerQueue = powQueueSize() + curWorkerQueue = self.POWTasksCount if curWorkerQueue > maxWorkerQueue: maxWorkerQueue = curWorkerQueue if curWorkerQueue > 0: diff --git a/src/bitmessageqt/uisignaler.py b/src/bitmessageqt/uisignaler.py index 055f9097..fbb24328 100644 --- a/src/bitmessageqt/uisignaler.py +++ b/src/bitmessageqt/uisignaler.py @@ -74,6 +74,8 @@ class UISignaler(QThread): elif command == 'alert': title, text, exitAfterUserClicksOk = data self.emit(SIGNAL("displayAlert(PyQt_PyObject, PyQt_PyObject, PyQt_PyObject)"), title, text, exitAfterUserClicksOk) + elif command == "updateWorkProverStatus": + self.emit(SIGNAL("updateWorkProverStatus(PyQt_PyObject)"), data) else: sys.stderr.write( 'Command sent to UISignaler not recognized: %s\n' % command) diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index a5750813..134b876a 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -201,12 +201,10 @@ class addressGenerator(threading.Thread, StoppableThread): queues.UISignalQueue.put(('writeNewAddressToTable', ( label, address, streamNumber))) shared.reloadMyAddressHashes() - if addressVersionNumber == 3: - queues.workerQueue.put(( - 'sendOutOrStoreMyV3Pubkey', ripe.digest())) - elif addressVersionNumber == 4: - queues.workerQueue.put(( - 'sendOutOrStoreMyV4Pubkey', address)) + + # If this is a chan address, the worker thread won't send out the pubkey over the network + + queues.workerQueue.put(("sendMyPubkey", address)) elif command == 'createDeterministicAddresses' \ or command == 'getDeterministicAddress' \ @@ -366,15 +364,11 @@ class addressGenerator(threading.Thread, StoppableThread): encodeVarint(streamNumber) + ripe.digest() ).digest()).digest()[32:] shared.myAddressesByTag[tag] = address - if addressVersionNumber == 3: - # If this is a chan address, - # the worker thread won't send out - # the pubkey over the network. - queues.workerQueue.put(( - 'sendOutOrStoreMyV3Pubkey', ripe.digest())) - elif addressVersionNumber == 4: - queues.workerQueue.put(( - 'sendOutOrStoreMyV4Pubkey', address)) + + # If this is a chan address, the worker thread won't send out the pubkey over the network + + queues.workerQueue.put(("sendMyPubkey", address)) + queues.UISignalQueue.put(( 'updateStatusBar', tr._translate( diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 9ae2093b..4a4224e7 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -114,30 +114,29 @@ class objectProcessor(threading.Thread): break def checkackdata(self, data): - # Let's check whether this is a message acknowledgement bound for us. - if len(data) < 32: + ackData = data[16: ] + + if len(ackData) < 16 or ackData not in state.watchedAckData: + logger.debug("This object is not an acknowledgement bound for us") + return - # bypass nonce and time, retain object type/version/stream + body - readPosition = 16 + logger.info("This object is an acknowledgement bound for us") - if data[readPosition:] in shared.ackdataForWhichImWatching: - logger.info('This object is an acknowledgement bound for me.') - del shared.ackdataForWhichImWatching[data[readPosition:]] - sqlExecute( - 'UPDATE sent SET status=?, lastactiontime=?' - ' WHERE ackdata=?', - 'ackreceived', int(time.time()), data[readPosition:]) - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', - (data[readPosition:], - tr._translate( - "MainWindow", - "Acknowledgement of the message received %1" - ).arg(l10n.formatTimestamp())) - )) - else: - logger.debug('This object is not an acknowledgement bound for me.') + state.watchedAckData -= {ackData} + + sqlExecute(""" + UPDATE "sent" SET "status" = 'ackreceived', "lastactiontime" = ? + WHERE "status" IN ('doingmsgpow', 'msgsent') AND "ackdata" == ?; + """, int(time.time()), ackData) + + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Acknowledgement of the message received %1" + ).arg(l10n.formatTimestamp()) + ))) def processgetpubkey(self, data): if len(data) > 200: @@ -237,12 +236,8 @@ class objectProcessor(threading.Thread): 'Found getpubkey-requested-hash in my list of EC hashes.' ' Telling Worker thread to do the POW for a pubkey message' ' and send it out.') - if requestedAddressVersionNumber == 2: - queues.workerQueue.put(('doPOWForMyV2Pubkey', requestedHash)) - elif requestedAddressVersionNumber == 3: - queues.workerQueue.put(('sendOutOrStoreMyV3Pubkey', requestedHash)) - elif requestedAddressVersionNumber == 4: - queues.workerQueue.put(('sendOutOrStoreMyV4Pubkey', myAddress)) + + queues.workerQueue.put(("sendMyPubkey", myAddress)) def processpubkey(self, data): pubkeyProcessingStartTime = time.time() @@ -396,20 +391,23 @@ class objectProcessor(threading.Thread): ' Sanity check failed.') return - tag = data[readPosition:readPosition + 32] - if tag not in state.neededPubkeys: - logger.info( - 'We don\'t need this v4 pubkey. We didn\'t ask for it.') - return + tag = data[readPosition: readPosition + 32] + attributes = state.neededPubkeys.get(tag, None) - # Let us try to decrypt the pubkey - toAddress, _ = state.neededPubkeys[tag] - if shared.decryptAndCheckPubkeyPayload(data, toAddress) == \ - 'successful': - # At this point we know that we have been waiting on this - # pubkey. This function will command the workerThread - # to start work on the messages that require it. - self.possibleNewPubkey(toAddress) + if attributes is None: + logger.info("We don't need this v4 pubkey. We didn't ask for it") + else: + address, cryptor = attributes + + storedData = protocol.decryptAndCheckV4Pubkey(data, address, cryptor) + + if storedData is not None: + sqlExecute(""" + INSERT INTO "pubkeys" ("address", "addressversion", "transmitdata", "time", "usedpersonally") + VALUES (?, 4, ?, ?, 'yes'); + """, address, storedData, int(time.time())) + + self.possibleNewPubkey(address) # Display timing data timeRequiredToProcessPubkey = time.time( @@ -608,7 +606,7 @@ class objectProcessor(threading.Thread): # If the toAddress version number is 3 or higher and not one of # my chan addresses: if decodeAddress(toAddress)[1] >= 3 \ - and not BMConfigParser().safeGetBoolean(toAddress, 'chan'): + and not BMConfigParser().has_section(toAddress): # If I'm not friendly with this person: if not shared.isAddressInMyAddressBookSubscriptionsListOrWhitelist(fromAddress): requiredNonceTrialsPerByte = BMConfigParser().getint( @@ -1015,42 +1013,42 @@ class objectProcessor(threading.Thread): have been waiting for. Let's check. """ - # For address versions <= 3, we wait on a key with the correct - # address version, stream number and RIPE hash. - _, addressVersion, streamNumber, ripe = decodeAddress(address) - if addressVersion <= 3: - if address in state.neededPubkeys: - del state.neededPubkeys[address] - self.sendMessages(address) - else: - logger.debug( - 'We don\'t need this pub key. We didn\'t ask for it.' - ' For address: %s', address) - # For address versions >= 4, we wait on a pubkey with the correct tag. - # Let us create the tag from the address and see if we were waiting - # for it. - elif addressVersion >= 4: - tag = hashlib.sha512(hashlib.sha512( - encodeVarint(addressVersion) + encodeVarint(streamNumber) - + ripe).digest() - ).digest()[32:] - if tag in state.neededPubkeys: - del state.neededPubkeys[tag] - self.sendMessages(address) + status, version, stream, ripe = decodeAddress(address) - def sendMessages(self, address): - """ - This function is called by the possibleNewPubkey function when - that function sees that we now have the necessary pubkey - to send one or more messages. - """ - logger.info('We have been awaiting the arrival of this pubkey.') - sqlExecute( - "UPDATE sent SET status='doingmsgpow', retrynumber=0" - " WHERE toaddress=?" - " AND (status='awaitingpubkey' OR status='doingpubkeypow')" - " AND folder='sent'", address) - queues.workerQueue.put(('sendmessage', '')) + if version <= 3: + needed = address in state.neededPubkeys + state.neededPubkeys.pop(address, None) + elif version == 4: + secretEncryptionKey, tag = protocol.calculateAddressTag(version, stream, ripe) + + needed = tag in state.neededPubkeys + state.neededPubkeys.pop(tag, None) + + if needed: + logger.info("We have been awaiting the arrival of this pubkey") + + sqlExecute(""" + UPDATE "sent" SET "status" = 'msgqueued' + WHERE "status" IN ('doingpubkeypow', 'awaitingpubkey') AND "toaddress" == ? AND "folder" == 'sent'; + """, address) + + queues.workerQueue.put(("sendmessage", )) + + queued = sqlQuery(""" + SELECT "ackdata" FROM "sent" + WHERE "status" == 'msgqueued' AND "toaddress" == ? AND "folder" == 'sent'; + """, address) + + for i, in queued: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + i, + tr._translate( + "MainWindow", + "Queued." + ) + ))) + else: + logger.debug("We don't need this pubkey, we didn't ask for it: %s", address) def ackDataHasAValidHeader(self, ackData): if len(ackData) < protocol.Header.size: diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 1ba342b6..ee7dfed4 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -35,6 +35,44 @@ import knownnodes import queues import state +def resendStaleMessages(): + staleMessages = sqlQuery(""" + SELECT "toaddress", "ackdata", "status" FROM "sent" + WHERE "status" IN ('awaitingpubkey', 'msgsent') AND "sleeptill" < ? AND "senttime" > ? AND "folder" == 'sent'; + """, int(time.time()), int(time.time()) - shared.maximumLengthOfTimeToBotherResendingMessages) + + resendMessages = False + + for destination, ackData, status in staleMessages: + if status == "awaitingpubkey": + logger.info("Retrying getpubkey request for %s", destination) + + sqlExecute(""" + UPDATE "sent" SET "status" = 'msgqueued' + WHERE "status" == 'awaitingpubkey' AND "ackdata" == ?; + """, ackData) + elif status == "msgsent": + state.watchedAckData -= {ackData} + + sqlExecute(""" + UPDATE "sent" SET "status" = 'msgqueued' + WHERE "status" == 'msgsent' AND "ackdata" == ?; + """, ackData) + + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Queued." + ) + ))) + + resendMessages = True + + if resendMessages: + logger.info("Resending old messages with undelivered acks or unknown pubkeys") + + queues.workerQueue.put(("sendmessage", )) class singleCleaner(threading.Thread, StoppableThread): cycleLength = 300 @@ -92,28 +130,8 @@ class singleCleaner(threading.Thread, StoppableThread): # Let us resend getpubkey objects if we have not yet heard # a pubkey, and also msg objects if we have not yet heard # an acknowledgement - queryreturn = sqlQuery( - "SELECT toaddress, ackdata, status FROM sent" - " WHERE ((status='awaitingpubkey' OR status='msgsent')" - " AND folder='sent' AND sleeptill?)", - int(time.time()), - int(time.time()) - - shared.maximumLengthOfTimeToBotherResendingMessages - ) - for row in queryreturn: - if len(row) < 2: - logger.error( - 'Something went wrong in the singleCleaner thread:' - ' a query did not return the requested fields. %r', - row - ) - self.stop.wait(3) - break - toAddress, ackData, status = row - if status == 'awaitingpubkey': - resendPubkeyRequest(toAddress) - elif status == 'msgsent': - resendMsg(ackData) + + resendStaleMessages() # cleanup old nodes now = int(time.time()) @@ -189,41 +207,3 @@ class singleCleaner(threading.Thread, StoppableThread): if state.shutdown == 0: self.stop.wait(singleCleaner.cycleLength) - - -def resendPubkeyRequest(address): - logger.debug( - 'It has been a long time and we haven\'t heard a response to our' - ' getpubkey request. Sending again.' - ) - try: - # We need to take this entry out of the neededPubkeys structure - # because the queues.workerQueue checks to see whether the entry - # is already present and will not do the POW and send the message - # because it assumes that it has already done it recently. - del state.neededPubkeys[address] - except: - pass - - queues.UISignalQueue.put(( - 'updateStatusBar', - 'Doing work necessary to again attempt to request a public key...')) - sqlExecute( - '''UPDATE sent SET status='msgqueued' WHERE toaddress=?''', - address) - queues.workerQueue.put(('sendmessage', '')) - - -def resendMsg(ackdata): - logger.debug( - 'It has been a long time and we haven\'t heard an acknowledgement' - ' to our msg. Sending again.' - ) - sqlExecute( - '''UPDATE sent SET status='msgqueued' WHERE ackdata=?''', - ackdata) - queues.workerQueue.put(('sendmessage', '')) - queues.UISignalQueue.put(( - 'updateStatusBar', - 'Doing work necessary to again attempt to deliver a message...' - )) diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py deleted file mode 100644 index d51e124a..00000000 --- a/src/class_singleWorker.py +++ /dev/null @@ -1,1418 +0,0 @@ -from __future__ import division - -import time -import threading -import hashlib -from struct import pack -# used when the API must execute an outside program -from subprocess import call # nosec -from binascii import hexlify, unhexlify - -import tr -import l10n -import protocol -import queues -import state -import shared -import defaults -import highlevelcrypto -import proofofwork -import helper_inbox -import helper_random -import helper_msgcoding -from bmconfigparser import BMConfigParser -from debug import logger -from inventory import Inventory -from addresses import ( - decodeAddress, encodeVarint, decodeVarint, calculateInventoryHash -) -# from helper_generic import addDataPadding -from helper_threading import StoppableThread -from helper_sql import sqlQuery, sqlExecute - - -# This thread, of which there is only one, does the heavy lifting: -# calculating POWs. - -def sizeof_fmt(num, suffix='h/s'): - for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']: - if abs(num) < 1000.0: - return "%3.1f%s%s" % (num, unit, suffix) - num /= 1024.0 - return "%.1f%s%s" % (num, 'Yi', suffix) - - -class singleWorker(threading.Thread, StoppableThread): - - def __init__(self): - # QThread.__init__(self, parent) - threading.Thread.__init__(self, name="singleWorker") - self.initStop() - proofofwork.init() - - def stopThread(self): - try: - queues.workerQueue.put(("stopThread", "data")) - except: - pass - super(singleWorker, self).stopThread() - - def run(self): - - while not state.sqlReady and state.shutdown == 0: - self.stop.wait(2) - if state.shutdown > 0: - return - - # Initialize the neededPubkeys dictionary. - queryreturn = sqlQuery( - '''SELECT DISTINCT toaddress FROM sent''' - ''' WHERE (status='awaitingpubkey' AND folder='sent')''') - for row in queryreturn: - toAddress, = row - # toStatus - _, toAddressVersionNumber, toStreamNumber, toRipe = \ - decodeAddress(toAddress) - if toAddressVersionNumber <= 3: - state.neededPubkeys[toAddress] = 0 - elif toAddressVersionNumber >= 4: - doubleHashOfAddressData = hashlib.sha512(hashlib.sha512( - encodeVarint(toAddressVersionNumber) + - encodeVarint(toStreamNumber) + toRipe - ).digest()).digest() - # Note that this is the first half of the sha512 hash. - privEncryptionKey = doubleHashOfAddressData[:32] - tag = doubleHashOfAddressData[32:] - # We'll need this for when we receive a pubkey reply: - # it will be encrypted and we'll need to decrypt it. - state.neededPubkeys[tag] = ( - toAddress, - highlevelcrypto.makeCryptor( - hexlify(privEncryptionKey)) - ) - - # Initialize the shared.ackdataForWhichImWatching data structure - queryreturn = sqlQuery( - '''SELECT ackdata FROM sent WHERE status = 'msgsent' ''') - for row in queryreturn: - ackdata, = row - logger.info('Watching for ackdata ' + hexlify(ackdata)) - shared.ackdataForWhichImWatching[ackdata] = 0 - - # Fix legacy (headerless) watched ackdata to include header - for oldack in shared.ackdataForWhichImWatching.keys(): - if (len(oldack) == 32): - # attach legacy header, always constant (msg/1/1) - newack = '\x00\x00\x00\x02\x01\x01' + oldack - shared.ackdataForWhichImWatching[newack] = 0 - sqlExecute( - 'UPDATE sent SET ackdata=? WHERE ackdata=?', - newack, oldack - ) - del shared.ackdataForWhichImWatching[oldack] - - # give some time for the GUI to start - # before we start on existing POW tasks. - self.stop.wait(10) - - if state.shutdown == 0: - # just in case there are any pending tasks for msg - # messages that have yet to be sent. - queues.workerQueue.put(('sendmessage', '')) - # just in case there are any tasks for Broadcasts - # that have yet to be sent. - queues.workerQueue.put(('sendbroadcast', '')) - - while state.shutdown == 0: - self.busy = 0 - command, data = queues.workerQueue.get() - self.busy = 1 - if command == 'sendmessage': - try: - self.sendMsg() - except: - pass - elif command == 'sendbroadcast': - try: - self.sendBroadcast() - except: - pass - elif command == 'doPOWForMyV2Pubkey': - try: - self.doPOWForMyV2Pubkey(data) - except: - pass - elif command == 'sendOutOrStoreMyV3Pubkey': - try: - self.sendOutOrStoreMyV3Pubkey(data) - except: - pass - elif command == 'sendOutOrStoreMyV4Pubkey': - try: - self.sendOutOrStoreMyV4Pubkey(data) - except: - pass - elif command == 'resetPoW': - try: - proofofwork.resetPoW() - except: - pass - elif command == 'stopThread': - self.busy = 0 - return - else: - logger.error( - 'Probable programming error: The command sent' - ' to the workerThread is weird. It is: %s\n', - command - ) - - queues.workerQueue.task_done() - logger.info("Quitting...") - - def _getKeysForAddress(self, address): - privSigningKeyBase58 = BMConfigParser().get( - address, 'privsigningkey') - privEncryptionKeyBase58 = BMConfigParser().get( - address, 'privencryptionkey') - - privSigningKeyHex = hexlify(shared.decodeWalletImportFormat( - privSigningKeyBase58)) - privEncryptionKeyHex = hexlify(shared.decodeWalletImportFormat( - privEncryptionKeyBase58)) - - # The \x04 on the beginning of the public keys are not sent. - # This way there is only one acceptable way to encode - # and send a public key. - pubSigningKey = unhexlify(highlevelcrypto.privToPub( - privSigningKeyHex))[1:] - pubEncryptionKey = unhexlify(highlevelcrypto.privToPub( - privEncryptionKeyHex))[1:] - - return privSigningKeyHex, privEncryptionKeyHex, \ - pubSigningKey, pubEncryptionKey - - def _doPOWDefaults(self, payload, TTL, - log_prefix='', - log_time=False): - target = 2 ** 64 / ( - defaults.networkDefaultProofOfWorkNonceTrialsPerByte * ( - len(payload) + 8 + - defaults.networkDefaultPayloadLengthExtraBytes + (( - TTL * ( - len(payload) + 8 + - defaults.networkDefaultPayloadLengthExtraBytes - )) / (2 ** 16)) - )) - initialHash = hashlib.sha512(payload).digest() - logger.info( - '%s Doing proof of work... TTL set to %s', log_prefix, TTL) - if log_time: - start_time = time.time() - trialValue, nonce = proofofwork.run(target, initialHash) - logger.info( - '%s Found proof of work %s Nonce: %s', - log_prefix, trialValue, nonce - ) - try: - delta = time.time() - start_time - logger.info( - 'PoW took %.1f seconds, speed %s.', - delta, sizeof_fmt(nonce / delta) - ) - except: # NameError - pass - payload = pack('>Q', nonce) + payload - # inventoryHash = calculateInventoryHash(payload) - return payload - - # This function also broadcasts out the pubkey message - # once it is done with the POW - def doPOWForMyV2Pubkey(self, adressHash): - # Look up my stream number based on my address hash - """configSections = shared.config.addresses() - for addressInKeysFile in configSections: - if addressInKeysFile != 'bitmessagesettings': - status, addressVersionNumber, streamNumber, \ - hashFromThisParticularAddress = \ - decodeAddress(addressInKeysFile) - if hash == hashFromThisParticularAddress: - myAddress = addressInKeysFile - break""" - myAddress = shared.myAddressesByHash[adressHash] - # status - _, addressVersionNumber, streamNumber, adressHash = decodeAddress(myAddress) - - # 28 days from now plus or minus five minutes - TTL = int(28 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300)) - embeddedTime = int(time.time() + TTL) - payload = pack('>Q', (embeddedTime)) - payload += '\x00\x00\x00\x01' # object type: pubkey - payload += encodeVarint(addressVersionNumber) # Address version number - payload += encodeVarint(streamNumber) - # bitfield of features supported by me (see the wiki). - payload += protocol.getBitfield(myAddress) - - try: - # privSigningKeyHex, privEncryptionKeyHex - _, _, pubSigningKey, pubEncryptionKey = \ - self._getKeysForAddress(myAddress) - except Exception as err: - logger.error( - 'Error within doPOWForMyV2Pubkey. Could not read' - ' the keys from the keys.dat file for a requested' - ' address. %s\n', err - ) - return - - payload += pubSigningKey + pubEncryptionKey - - # Do the POW for this pubkey message - payload = self._doPOWDefaults( - payload, TTL, log_prefix='(For pubkey message)') - - inventoryHash = calculateInventoryHash(payload) - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, '') - - logger.info('broadcasting inv with hash: %s', hexlify(inventoryHash)) - - queues.invQueue.put((streamNumber, inventoryHash)) - queues.UISignalQueue.put(('updateStatusBar', '')) - try: - BMConfigParser().set( - myAddress, 'lastpubkeysendtime', str(int(time.time()))) - BMConfigParser().save() - except: - # The user deleted the address out of the keys.dat file - # before this finished. - pass - - # If this isn't a chan address, this function assembles the pubkey data, - # does the necessary POW and sends it out. If it *is* a chan then it - # assembles the pubkey and stores is in the pubkey table so that we can - # send messages to "ourselves". - def sendOutOrStoreMyV3Pubkey(self, adressHash): - try: - myAddress = shared.myAddressesByHash[adressHash] - except: - # The address has been deleted. - return - if BMConfigParser().safeGetBoolean(myAddress, 'chan'): - logger.info('This is a chan address. Not sending pubkey.') - return - _, addressVersionNumber, streamNumber, adressHash = decodeAddress( - myAddress) - - # 28 days from now plus or minus five minutes - TTL = int(28 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300)) - embeddedTime = int(time.time() + TTL) - - # signedTimeForProtocolV2 = embeddedTime - TTL - # According to the protocol specification, the expiresTime - # along with the pubkey information is signed. But to be - # backwards compatible during the upgrade period, we shall sign - # not the expiresTime but rather the current time. There must be - # precisely a 28 day difference between the two. After the upgrade - # period we'll switch to signing the whole payload with the - # expiresTime time. - - payload = pack('>Q', (embeddedTime)) - payload += '\x00\x00\x00\x01' # object type: pubkey - payload += encodeVarint(addressVersionNumber) # Address version number - payload += encodeVarint(streamNumber) - # bitfield of features supported by me (see the wiki). - payload += protocol.getBitfield(myAddress) - - try: - # , privEncryptionKeyHex - privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \ - self._getKeysForAddress(myAddress) - except Exception as err: - logger.error( - 'Error within sendOutOrStoreMyV3Pubkey. Could not read' - ' the keys from the keys.dat file for a requested' - ' address. %s\n', err - ) - return - - payload += pubSigningKey + pubEncryptionKey - - payload += encodeVarint(BMConfigParser().getint( - myAddress, 'noncetrialsperbyte')) - payload += encodeVarint(BMConfigParser().getint( - myAddress, 'payloadlengthextrabytes')) - - signature = highlevelcrypto.sign(payload, privSigningKeyHex) - payload += encodeVarint(len(signature)) - payload += signature - - # Do the POW for this pubkey message - payload = self._doPOWDefaults( - payload, TTL, log_prefix='(For pubkey message)') - - inventoryHash = calculateInventoryHash(payload) - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, '') - - logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - - queues.invQueue.put((streamNumber, inventoryHash)) - queues.UISignalQueue.put(('updateStatusBar', '')) - try: - BMConfigParser().set( - myAddress, 'lastpubkeysendtime', str(int(time.time()))) - BMConfigParser().save() - except: - # The user deleted the address out of the keys.dat file - # before this finished. - pass - - # If this isn't a chan address, this function assembles - # the pubkey data, does the necessary POW and sends it out. - def sendOutOrStoreMyV4Pubkey(self, myAddress): - if not BMConfigParser().has_section(myAddress): - # The address has been deleted. - return - if shared.BMConfigParser().safeGetBoolean(myAddress, 'chan'): - logger.info('This is a chan address. Not sending pubkey.') - return - _, addressVersionNumber, streamNumber, addressHash = decodeAddress( - myAddress) - - # 28 days from now plus or minus five minutes - TTL = int(28 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300)) - embeddedTime = int(time.time() + TTL) - payload = pack('>Q', (embeddedTime)) - payload += '\x00\x00\x00\x01' # object type: pubkey - payload += encodeVarint(addressVersionNumber) # Address version number - payload += encodeVarint(streamNumber) - dataToEncrypt = protocol.getBitfield(myAddress) - - try: - # , privEncryptionKeyHex - privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \ - self._getKeysForAddress(myAddress) - except Exception as err: - logger.error( - 'Error within sendOutOrStoreMyV4Pubkey. Could not read' - ' the keys from the keys.dat file for a requested' - ' address. %s\n', err - ) - return - - dataToEncrypt += pubSigningKey + pubEncryptionKey - - dataToEncrypt += encodeVarint(BMConfigParser().getint( - myAddress, 'noncetrialsperbyte')) - dataToEncrypt += encodeVarint(BMConfigParser().getint( - myAddress, 'payloadlengthextrabytes')) - - # When we encrypt, we'll use a hash of the data - # contained in an address as a decryption key. This way - # in order to read the public keys in a pubkey message, - # a node must know the address first. We'll also tag, - # unencrypted, the pubkey with part of the hash so that nodes - # know which pubkey object to try to decrypt - # when they want to send a message. - doubleHashOfAddressData = hashlib.sha512(hashlib.sha512( - encodeVarint(addressVersionNumber) + - encodeVarint(streamNumber) + addressHash - ).digest()).digest() - payload += doubleHashOfAddressData[32:] # the tag - signature = highlevelcrypto.sign( - payload + dataToEncrypt, privSigningKeyHex - ) - dataToEncrypt += encodeVarint(len(signature)) - dataToEncrypt += signature - - privEncryptionKey = doubleHashOfAddressData[:32] - pubEncryptionKey = highlevelcrypto.pointMult(privEncryptionKey) - payload += highlevelcrypto.encrypt( - dataToEncrypt, hexlify(pubEncryptionKey)) - - # Do the POW for this pubkey message - payload = self._doPOWDefaults( - payload, TTL, log_prefix='(For pubkey message)') - - inventoryHash = calculateInventoryHash(payload) - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, - doubleHashOfAddressData[32:] - ) - - logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) - - queues.invQueue.put((streamNumber, inventoryHash)) - queues.UISignalQueue.put(('updateStatusBar', '')) - try: - BMConfigParser().set( - myAddress, 'lastpubkeysendtime', str(int(time.time()))) - BMConfigParser().save() - except Exception as err: - logger.error( - 'Error: Couldn\'t add the lastpubkeysendtime' - ' to the keys.dat file. Error message: %s', err - ) - - def sendBroadcast(self): - # Reset just in case - sqlExecute( - '''UPDATE sent SET status='broadcastqueued' ''' - - '''WHERE status = 'doingbroadcastpow' ''') - queryreturn = sqlQuery( - '''SELECT fromaddress, subject, message, ''' - ''' ackdata, ttl, encodingtype FROM sent ''' - ''' WHERE status=? and folder='sent' ''', 'broadcastqueued') - - for row in queryreturn: - fromaddress, subject, body, ackdata, TTL, encoding = row - # status - _, addressVersionNumber, streamNumber, ripe = \ - decodeAddress(fromaddress) - if addressVersionNumber <= 1: - logger.error( - 'Error: In the singleWorker thread, the ' - ' sendBroadcast function doesn\'t understand' - ' the address version.\n') - return - # We need to convert our private keys to public keys in order - # to include them. - try: - # , privEncryptionKeyHex - privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \ - self._getKeysForAddress(fromaddress) - except: - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Error! Could not find sender address" - " (your address) in the keys.dat file.")) - )) - continue - - sqlExecute( - '''UPDATE sent SET status='doingbroadcastpow' ''' - ''' WHERE ackdata=? AND status='broadcastqueued' ''', - ackdata) - - # At this time these pubkeys are 65 bytes long - # because they include the encoding byte which we won't - # be sending in the broadcast message. - # pubSigningKey = \ - # highlevelcrypto.privToPub(privSigningKeyHex).decode('hex') - - if TTL > 28 * 24 * 60 * 60: - TTL = 28 * 24 * 60 * 60 - if TTL < 60 * 60: - TTL = 60 * 60 - # add some randomness to the TTL - TTL = int(TTL + helper_random.randomrandrange(-300, 300)) - embeddedTime = int(time.time() + TTL) - payload = pack('>Q', embeddedTime) - payload += '\x00\x00\x00\x03' # object type: broadcast - - if addressVersionNumber <= 3: - payload += encodeVarint(4) # broadcast version - else: - payload += encodeVarint(5) # broadcast version - - payload += encodeVarint(streamNumber) - if addressVersionNumber >= 4: - doubleHashOfAddressData = hashlib.sha512(hashlib.sha512( - encodeVarint(addressVersionNumber) + - encodeVarint(streamNumber) + ripe - ).digest()).digest() - tag = doubleHashOfAddressData[32:] - payload += tag - else: - tag = '' - - dataToEncrypt = encodeVarint(addressVersionNumber) - dataToEncrypt += encodeVarint(streamNumber) - # behavior bitfield - dataToEncrypt += protocol.getBitfield(fromaddress) - dataToEncrypt += pubSigningKey + pubEncryptionKey - if addressVersionNumber >= 3: - dataToEncrypt += encodeVarint(BMConfigParser().getint( - fromaddress, 'noncetrialsperbyte')) - dataToEncrypt += encodeVarint(BMConfigParser().getint( - fromaddress, 'payloadlengthextrabytes')) - # message encoding type - dataToEncrypt += encodeVarint(encoding) - encodedMessage = helper_msgcoding.MsgEncode( - {"subject": subject, "body": body}, encoding) - dataToEncrypt += encodeVarint(encodedMessage.length) - dataToEncrypt += encodedMessage.data - dataToSign = payload + dataToEncrypt - - signature = highlevelcrypto.sign( - dataToSign, privSigningKeyHex) - dataToEncrypt += encodeVarint(len(signature)) - dataToEncrypt += signature - - # Encrypt the broadcast with the information - # contained in the broadcaster's address. - # Anyone who knows the address can generate - # the private encryption key to decrypt the broadcast. - # This provides virtually no privacy; its purpose is to keep - # questionable and illegal content from flowing through the - # Internet connections and being stored on the disk of 3rd parties. - if addressVersionNumber <= 3: - privEncryptionKey = hashlib.sha512( - encodeVarint(addressVersionNumber) + - encodeVarint(streamNumber) + ripe - ).digest()[:32] - else: - privEncryptionKey = doubleHashOfAddressData[:32] - - pubEncryptionKey = highlevelcrypto.pointMult(privEncryptionKey) - payload += highlevelcrypto.encrypt( - dataToEncrypt, hexlify(pubEncryptionKey)) - - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Doing work necessary to send broadcast...")) - )) - payload = self._doPOWDefaults( - payload, TTL, log_prefix='(For broadcast message)') - - # Sanity check. The payload size should never be larger - # than 256 KiB. There should be checks elsewhere in the code - # to not let the user try to send a message this large - # until we implement message continuation. - if len(payload) > 2 ** 18: # 256 KiB - logger.critical( - 'This broadcast object is too large to send.' - ' This should never happen. Object size: %s', - len(payload) - ) - continue - - inventoryHash = calculateInventoryHash(payload) - objectType = 3 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, tag) - logger.info( - 'sending inv (within sendBroadcast function)' - ' for object: %s', - hexlify(inventoryHash) - ) - queues.invQueue.put((streamNumber, inventoryHash)) - - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Broadcast sent on %1" - ).arg(l10n.formatTimestamp())) - )) - - # Update the status of the message in the 'sent' table to have - # a 'broadcastsent' status - sqlExecute( - 'UPDATE sent SET msgid=?, status=?, lastactiontime=?' - ' WHERE ackdata=?', - inventoryHash, 'broadcastsent', int(time.time()), ackdata - ) - - def sendMsg(self): - # Reset just in case - sqlExecute( - '''UPDATE sent SET status='msgqueued' ''' - ''' WHERE status IN ('doingpubkeypow', 'doingmsgpow')''') - queryreturn = sqlQuery( - '''SELECT toaddress, fromaddress, subject, message, ''' - ''' ackdata, status, ttl, retrynumber, encodingtype FROM ''' - ''' sent WHERE (status='msgqueued' or status='forcepow') ''' - ''' and folder='sent' ''') - # while we have a msg that needs some work - for row in queryreturn: - toaddress, fromaddress, subject, message, \ - ackdata, status, TTL, retryNumber, encoding = row - # toStatus - _, toAddressVersionNumber, toStreamNumber, toRipe = \ - decodeAddress(toaddress) - # fromStatus, , ,fromRipe - _, fromAddressVersionNumber, fromStreamNumber, _ = \ - decodeAddress(fromaddress) - - # We may or may not already have the pubkey - # for this toAddress. Let's check. - if status == 'forcepow': - # if the status of this msg is 'forcepow' - # then clearly we have the pubkey already - # because the user could not have overridden the message - # about the POW being too difficult without knowing - # the required difficulty. - pass - elif status == 'doingmsgpow': - # We wouldn't have set the status to doingmsgpow - # if we didn't already have the pubkey so let's assume - # that we have it. - pass - # If we are sending a message to ourselves or a chan - # then we won't need an entry in the pubkeys table; - # we can calculate the needed pubkey using the private keys - # in our keys.dat file. - elif BMConfigParser().has_section(toaddress): - sqlExecute( - '''UPDATE sent SET status='doingmsgpow' ''' - ''' WHERE toaddress=? AND status='msgqueued' ''', - toaddress - ) - status = 'doingmsgpow' - elif status == 'msgqueued': - # Let's see if we already have the pubkey in our pubkeys table - queryreturn = sqlQuery( - '''SELECT address FROM pubkeys WHERE address=?''', - toaddress - ) - # If we have the needed pubkey in the pubkey table already, - if queryreturn != []: - # set the status of this msg to doingmsgpow - sqlExecute( - '''UPDATE sent SET status='doingmsgpow' ''' - ''' WHERE toaddress=? AND status='msgqueued' ''', - toaddress - ) - status = 'doingmsgpow' - # mark the pubkey as 'usedpersonally' so that - # we don't delete it later. If the pubkey version - # is >= 4 then usedpersonally will already be set - # to yes because we'll only ever have - # usedpersonally v4 pubkeys in the pubkeys table. - sqlExecute( - '''UPDATE pubkeys SET usedpersonally='yes' ''' - ''' WHERE address=?''', - toaddress - ) - # We don't have the needed pubkey in the pubkeys table already. - else: - if toAddressVersionNumber <= 3: - toTag = '' - else: - toTag = hashlib.sha512(hashlib.sha512( - encodeVarint(toAddressVersionNumber) + - encodeVarint(toStreamNumber) + toRipe - ).digest()).digest()[32:] - if toaddress in state.neededPubkeys or \ - toTag in state.neededPubkeys: - # We already sent a request for the pubkey - sqlExecute( - '''UPDATE sent SET status='awaitingpubkey', ''' - ''' sleeptill=? WHERE toaddress=? ''' - ''' AND status='msgqueued' ''', - int(time.time()) + 2.5 * 24 * 60 * 60, - toaddress - ) - queues.UISignalQueue.put(( - 'updateSentItemStatusByToAddress', ( - toaddress, - tr._translate( - "MainWindow", - "Encryption key was requested earlier.")) - )) - # on with the next msg on which we can do some work - continue - else: - # We have not yet sent a request for the pubkey - needToRequestPubkey = True - # If we are trying to send to address - # version >= 4 then the needed pubkey might be - # encrypted in the inventory. - # If we have it we'll need to decrypt it - # and put it in the pubkeys table. - - # The decryptAndCheckPubkeyPayload function - # expects that the shared.neededPubkeys dictionary - # already contains the toAddress and cryptor - # object associated with the tag for this toAddress. - if toAddressVersionNumber >= 4: - doubleHashOfToAddressData = hashlib.sha512( - hashlib.sha512(encodeVarint( - toAddressVersionNumber) + - encodeVarint(toStreamNumber) + - toRipe - ).digest() - ).digest() - # The first half of the sha512 hash. - privEncryptionKey = doubleHashOfToAddressData[:32] - # The second half of the sha512 hash. - tag = doubleHashOfToAddressData[32:] - state.neededPubkeys[tag] = ( - toaddress, - highlevelcrypto.makeCryptor( - hexlify(privEncryptionKey)) - ) - - for value in Inventory().by_type_and_tag(1, toTag): - # if valid, this function also puts it - # in the pubkeys table. - if shared.decryptAndCheckPubkeyPayload( - value.payload, toaddress - ) == 'successful': - needToRequestPubkey = False - sqlExecute( - '''UPDATE sent SET ''' - ''' status='doingmsgpow', ''' - ''' retrynumber=0 WHERE ''' - ''' toaddress=? AND ''' - ''' (status='msgqueued' or ''' - ''' status='awaitingpubkey' or ''' - ''' status='doingpubkeypow')''', - toaddress) - del state.neededPubkeys[tag] - break - # else: - # There was something wrong with this - # pubkey object even though it had - # the correct tag- almost certainly - # because of malicious behavior or - # a badly programmed client. If there are - # any other pubkeys in our inventory - # with the correct tag then we'll try - # to decrypt those. - if needToRequestPubkey: - sqlExecute( - '''UPDATE sent SET ''' - ''' status='doingpubkeypow' WHERE ''' - ''' toaddress=? AND status='msgqueued' ''', - toaddress - ) - queues.UISignalQueue.put(( - 'updateSentItemStatusByToAddress', ( - toaddress, - tr._translate( - "MainWindow", - "Sending a request for the" - " recipient\'s encryption key.")) - )) - self.requestPubKey(toaddress) - # on with the next msg on which we can do some work - continue - - # At this point we know that we have the necessary pubkey - # in the pubkeys table. - - TTL *= 2**retryNumber - if TTL > 28 * 24 * 60 * 60: - TTL = 28 * 24 * 60 * 60 - # add some randomness to the TTL - TTL = int(TTL + helper_random.randomrandrange(-300, 300)) - embeddedTime = int(time.time() + TTL) - - # if we aren't sending this to ourselves or a chan - if not BMConfigParser().has_section(toaddress): - shared.ackdataForWhichImWatching[ackdata] = 0 - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Looking up the receiver\'s public key")) - )) - logger.info('Sending a message.') - logger.debug( - 'First 150 characters of message: %s', - repr(message[:150]) - ) - - # Let us fetch the recipient's public key out of - # our database. If the required proof of work difficulty - # is too hard then we'll abort. - queryreturn = sqlQuery( - 'SELECT transmitdata FROM pubkeys WHERE address=?', - toaddress) - for row in queryreturn: - pubkeyPayload, = row - - # The pubkey message is stored with the following items - # all appended: - # -address version - # -stream number - # -behavior bitfield - # -pub signing key - # -pub encryption key - # -nonce trials per byte (if address version is >= 3) - # -length extra bytes (if address version is >= 3) - - # to bypass the address version whose length is definitely 1 - readPosition = 1 - _, streamNumberLength = decodeVarint( - pubkeyPayload[readPosition:readPosition + 10]) - readPosition += streamNumberLength - behaviorBitfield = pubkeyPayload[readPosition:readPosition + 4] - # Mobile users may ask us to include their address's - # RIPE hash on a message unencrypted. Before we actually - # do it the sending human must check a box - # in the settings menu to allow it. - - # if receiver is a mobile device who expects that their - # address RIPE is included unencrypted on the front of - # the message.. - if shared.isBitSetWithinBitfield(behaviorBitfield, 30): - # if we are Not willing to include the receiver's - # RIPE hash on the message.. - if not shared.BMConfigParser().safeGetBoolean( - 'bitmessagesettings', 'willinglysendtomobile' - ): - logger.info( - 'The receiver is a mobile user but the' - ' sender (you) has not selected that you' - ' are willing to send to mobiles. Aborting' - ' send.' - ) - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Problem: Destination is a mobile" - " device who requests that the" - " destination be included in the" - " message but this is disallowed in" - " your settings. %1" - ).arg(l10n.formatTimestamp())) - )) - # if the human changes their setting and then - # sends another message or restarts their client, - # this one will send at that time. - continue - readPosition += 4 # to bypass the bitfield of behaviors - # We don't use this key for anything here. - # pubSigningKeyBase256 = - # pubkeyPayload[readPosition:readPosition+64] - readPosition += 64 - pubEncryptionKeyBase256 = pubkeyPayload[ - readPosition:readPosition + 64] - readPosition += 64 - - # Let us fetch the amount of work required by the recipient. - if toAddressVersionNumber == 2: - requiredAverageProofOfWorkNonceTrialsPerByte = \ - defaults.networkDefaultProofOfWorkNonceTrialsPerByte - requiredPayloadLengthExtraBytes = \ - defaults.networkDefaultPayloadLengthExtraBytes - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Doing work necessary to send message.\n" - "There is no required difficulty for" - " version 2 addresses like this.")) - )) - elif toAddressVersionNumber >= 3: - requiredAverageProofOfWorkNonceTrialsPerByte, \ - varintLength = decodeVarint( - pubkeyPayload[readPosition:readPosition + 10]) - readPosition += varintLength - requiredPayloadLengthExtraBytes, varintLength = \ - decodeVarint( - pubkeyPayload[readPosition:readPosition + 10]) - readPosition += varintLength - # We still have to meet a minimum POW difficulty - # regardless of what they say is allowed in order - # to get our message to propagate through the network. - if requiredAverageProofOfWorkNonceTrialsPerByte < \ - defaults.networkDefaultProofOfWorkNonceTrialsPerByte: - requiredAverageProofOfWorkNonceTrialsPerByte = \ - defaults.networkDefaultProofOfWorkNonceTrialsPerByte - if requiredPayloadLengthExtraBytes < \ - defaults.networkDefaultPayloadLengthExtraBytes: - requiredPayloadLengthExtraBytes = \ - defaults.networkDefaultPayloadLengthExtraBytes - logger.debug( - 'Using averageProofOfWorkNonceTrialsPerByte: %s' - ' and payloadLengthExtraBytes: %s.', - requiredAverageProofOfWorkNonceTrialsPerByte, - requiredPayloadLengthExtraBytes - ) - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Doing work necessary to send message.\n" - "Receiver\'s required difficulty: %1" - " and %2" - ).arg(str(float( - requiredAverageProofOfWorkNonceTrialsPerByte) / - defaults.networkDefaultProofOfWorkNonceTrialsPerByte - )).arg(str(float( - requiredPayloadLengthExtraBytes) / - defaults.networkDefaultPayloadLengthExtraBytes - ))))) - if status != 'forcepow': - if (requiredAverageProofOfWorkNonceTrialsPerByte - > BMConfigParser().getint( - 'bitmessagesettings', - 'maxacceptablenoncetrialsperbyte' - ) and - BMConfigParser().getint( - 'bitmessagesettings', - 'maxacceptablenoncetrialsperbyte' - ) != 0) or ( - requiredPayloadLengthExtraBytes - > BMConfigParser().getint( - 'bitmessagesettings', - 'maxacceptablepayloadlengthextrabytes' - ) and - BMConfigParser().getint( - 'bitmessagesettings', - 'maxacceptablepayloadlengthextrabytes' - ) != 0): - # The demanded difficulty is more than - # we are willing to do. - sqlExecute( - '''UPDATE sent SET status='toodifficult' ''' - ''' WHERE ackdata=? ''', - ackdata) - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Problem: The work demanded by" - " the recipient (%1 and %2) is" - " more difficult than you are" - " willing to do. %3" - ).arg(str(float( - requiredAverageProofOfWorkNonceTrialsPerByte) - / defaults.networkDefaultProofOfWorkNonceTrialsPerByte - )).arg(str(float( - requiredPayloadLengthExtraBytes) - / defaults.networkDefaultPayloadLengthExtraBytes - )).arg(l10n.formatTimestamp())) - )) - continue - else: # if we are sending a message to ourselves or a chan.. - logger.info('Sending a message.') - logger.debug( - 'First 150 characters of message: %r', message[:150]) - behaviorBitfield = protocol.getBitfield(fromaddress) - - try: - privEncryptionKeyBase58 = BMConfigParser().get( - toaddress, 'privencryptionkey') - except Exception as err: - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Problem: You are trying to send a" - " message to yourself or a chan but your" - " encryption key could not be found in" - " the keys.dat file. Could not encrypt" - " message. %1" - ).arg(l10n.formatTimestamp())) - )) - logger.error( - 'Error within sendMsg. Could not read the keys' - ' from the keys.dat file for our own address. %s\n', - err) - continue - privEncryptionKeyHex = hexlify(shared.decodeWalletImportFormat( - privEncryptionKeyBase58)) - pubEncryptionKeyBase256 = unhexlify(highlevelcrypto.privToPub( - privEncryptionKeyHex))[1:] - requiredAverageProofOfWorkNonceTrialsPerByte = \ - defaults.networkDefaultProofOfWorkNonceTrialsPerByte - requiredPayloadLengthExtraBytes = \ - defaults.networkDefaultPayloadLengthExtraBytes - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Doing work necessary to send message.")) - )) - - # Now we can start to assemble our message. - payload = encodeVarint(fromAddressVersionNumber) - payload += encodeVarint(fromStreamNumber) - # Bitfield of features and behaviors - # that can be expected from me. (See - # https://bitmessage.org/wiki/Protocol_specification#Pubkey_bitfield_features) - payload += protocol.getBitfield(fromaddress) - - # We need to convert our private keys to public keys in order - # to include them. - try: - privSigningKeyHex, privEncryptionKeyHex, \ - pubSigningKey, pubEncryptionKey = self._getKeysForAddress( - fromaddress) - except: - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Error! Could not find sender address" - " (your address) in the keys.dat file.")) - )) - continue - - payload += pubSigningKey + pubEncryptionKey - - if fromAddressVersionNumber >= 3: - # If the receiver of our message is in our address book, - # subscriptions list, or whitelist then we will allow them to - # do the network-minimum proof of work. Let us check to see if - # the receiver is in any of those lists. - if shared.isAddressInMyAddressBookSubscriptionsListOrWhitelist( - toaddress): - payload += encodeVarint( - defaults.networkDefaultProofOfWorkNonceTrialsPerByte) - payload += encodeVarint( - defaults.networkDefaultPayloadLengthExtraBytes) - else: - payload += encodeVarint(BMConfigParser().getint( - fromaddress, 'noncetrialsperbyte')) - payload += encodeVarint(BMConfigParser().getint( - fromaddress, 'payloadlengthextrabytes')) - - # This hash will be checked by the receiver of the message - # to verify that toRipe belongs to them. This prevents - # a Surreptitious Forwarding Attack. - payload += toRipe - payload += encodeVarint(encoding) # message encoding type - encodedMessage = helper_msgcoding.MsgEncode( - {"subject": subject, "body": message}, encoding - ) - payload += encodeVarint(encodedMessage.length) - payload += encodedMessage.data - if BMConfigParser().has_section(toaddress): - logger.info( - 'Not bothering to include ackdata because we are' - ' sending to ourselves or a chan.' - ) - fullAckPayload = '' - elif not protocol.checkBitfield( - behaviorBitfield, protocol.BITFIELD_DOESACK): - logger.info( - 'Not bothering to include ackdata because' - ' the receiver said that they won\'t relay it anyway.' - ) - fullAckPayload = '' - else: - # The fullAckPayload is a normal msg protocol message - # with the proof of work already completed that the - # receiver of this message can easily send out. - fullAckPayload = self.generateFullAckMessage( - ackdata, toStreamNumber, TTL) - payload += encodeVarint(len(fullAckPayload)) - payload += fullAckPayload - dataToSign = pack('>Q', embeddedTime) + '\x00\x00\x00\x02' + \ - encodeVarint(1) + encodeVarint(toStreamNumber) + payload - signature = highlevelcrypto.sign(dataToSign, privSigningKeyHex) - payload += encodeVarint(len(signature)) - payload += signature - - # We have assembled the data that will be encrypted. - try: - encrypted = highlevelcrypto.encrypt( - payload, "04" + hexlify(pubEncryptionKeyBase256) - ) - except: - sqlExecute( - '''UPDATE sent SET status='badkey' WHERE ackdata=?''', - ackdata - ) - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Problem: The recipient\'s encryption key is" - " no good. Could not encrypt message. %1" - ).arg(l10n.formatTimestamp())) - )) - continue - - encryptedPayload = pack('>Q', embeddedTime) - encryptedPayload += '\x00\x00\x00\x02' # object type: msg - encryptedPayload += encodeVarint(1) # msg version - encryptedPayload += encodeVarint(toStreamNumber) + encrypted - target = 2 ** 64 / ( - requiredAverageProofOfWorkNonceTrialsPerByte * ( - len(encryptedPayload) + 8 + - requiredPayloadLengthExtraBytes + (( - TTL * ( - len(encryptedPayload) + 8 + - requiredPayloadLengthExtraBytes - )) / (2 ** 16)) - )) - logger.info( - '(For msg message) Doing proof of work. Total required' - ' difficulty: %f. Required small message difficulty: %f.', - float(requiredAverageProofOfWorkNonceTrialsPerByte) / - defaults.networkDefaultProofOfWorkNonceTrialsPerByte, - float(requiredPayloadLengthExtraBytes) / - defaults.networkDefaultPayloadLengthExtraBytes - ) - - powStartTime = time.time() - initialHash = hashlib.sha512(encryptedPayload).digest() - trialValue, nonce = proofofwork.run(target, initialHash) - logger.info( - '(For msg message) Found proof of work %s Nonce: %s', - trialValue, nonce - ) - try: - logger.info( - 'PoW took %.1f seconds, speed %s.', - time.time() - powStartTime, - sizeof_fmt(nonce / (time.time() - powStartTime)) - ) - except: - pass - - encryptedPayload = pack('>Q', nonce) + encryptedPayload - - # Sanity check. The encryptedPayload size should never be - # larger than 256 KiB. There should be checks elsewhere - # in the code to not let the user try to send a message - # this large until we implement message continuation. - if len(encryptedPayload) > 2 ** 18: # 256 KiB - logger.critical( - 'This msg object is too large to send. This should' - ' never happen. Object size: %i', - len(encryptedPayload) - ) - continue - - inventoryHash = calculateInventoryHash(encryptedPayload) - objectType = 2 - Inventory()[inventoryHash] = ( - objectType, toStreamNumber, encryptedPayload, embeddedTime, '') - if BMConfigParser().has_section(toaddress) or \ - not protocol.checkBitfield( - behaviorBitfield, protocol.BITFIELD_DOESACK): - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Message sent. Sent at %1" - ).arg(l10n.formatTimestamp())) - )) - else: - # not sending to a chan or one of my addresses - queues.UISignalQueue.put(( - 'updateSentItemStatusByAckdata', ( - ackdata, - tr._translate( - "MainWindow", - "Message sent. Waiting for acknowledgement." - " Sent on %1" - ).arg(l10n.formatTimestamp())) - )) - logger.info( - 'Broadcasting inv for my msg(within sendmsg function): %s', - hexlify(inventoryHash) - ) - queues.invQueue.put((toStreamNumber, inventoryHash)) - - # Update the sent message in the sent table with the - # necessary information. - if BMConfigParser().has_section(toaddress) or \ - not protocol.checkBitfield( - behaviorBitfield, protocol.BITFIELD_DOESACK): - newStatus = 'msgsentnoackexpected' - else: - newStatus = 'msgsent' - # wait 10% past expiration - sleepTill = int(time.time() + TTL * 1.1) - sqlExecute( - '''UPDATE sent SET msgid=?, status=?, retrynumber=?, ''' - ''' sleeptill=?, lastactiontime=? WHERE ackdata=?''', - inventoryHash, newStatus, retryNumber + 1, - sleepTill, int(time.time()), ackdata - ) - - # If we are sending to ourselves or a chan, let's put - # the message in our own inbox. - if BMConfigParser().has_section(toaddress): - # Used to detect and ignore duplicate messages in our inbox - sigHash = hashlib.sha512(hashlib.sha512( - signature).digest()).digest()[32:] - t = (inventoryHash, toaddress, fromaddress, subject, int( - time.time()), message, 'inbox', encoding, 0, sigHash) - helper_inbox.insert(t) - - queues.UISignalQueue.put(('displayNewInboxMessage', ( - inventoryHash, toaddress, fromaddress, subject, message))) - - # If we are behaving as an API then we might need to run an - # outside command to let some program know that a new message - # has arrived. - if BMConfigParser().safeGetBoolean( - 'bitmessagesettings', 'apienabled'): - try: - apiNotifyPath = BMConfigParser().get( - 'bitmessagesettings', 'apinotifypath') - except: - apiNotifyPath = '' - if apiNotifyPath != '': - call([apiNotifyPath, "newMessage"]) - - def requestPubKey(self, toAddress): - toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress( - toAddress) - if toStatus != 'success': - logger.error( - 'Very abnormal error occurred in requestPubKey.' - ' toAddress is: %r. Please report this error to Atheros.', - toAddress - ) - return - - queryReturn = sqlQuery( - '''SELECT retrynumber FROM sent WHERE toaddress=? ''' - ''' AND (status='doingpubkeypow' OR status='awaitingpubkey') ''' - ''' LIMIT 1''', - toAddress - ) - if len(queryReturn) == 0: - logger.critical( - 'BUG: Why are we requesting the pubkey for %s' - ' if there are no messages in the sent folder' - ' to that address?', toAddress - ) - return - retryNumber = queryReturn[0][0] - - if addressVersionNumber <= 3: - state.neededPubkeys[toAddress] = 0 - elif addressVersionNumber >= 4: - # If the user just clicked 'send' then the tag - # (and other information) will already be in the - # neededPubkeys dictionary. But if we are recovering - # from a restart of the client then we have to put it in now. - - # Note that this is the first half of the sha512 hash. - privEncryptionKey = hashlib.sha512(hashlib.sha512( - encodeVarint(addressVersionNumber) + - encodeVarint(streamNumber) + ripe - ).digest()).digest()[:32] - # Note that this is the second half of the sha512 hash. - tag = hashlib.sha512(hashlib.sha512( - encodeVarint(addressVersionNumber) + - encodeVarint(streamNumber) + ripe - ).digest()).digest()[32:] - if tag not in state.neededPubkeys: - # We'll need this for when we receive a pubkey reply: - # it will be encrypted and we'll need to decrypt it. - state.neededPubkeys[tag] = ( - toAddress, - highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)) - ) - - # 2.5 days. This was chosen fairly arbitrarily. - TTL = 2.5 * 24 * 60 * 60 - TTL *= 2 ** retryNumber - if TTL > 28 * 24 * 60 * 60: - TTL = 28 * 24 * 60 * 60 - # add some randomness to the TTL - TTL = TTL + helper_random.randomrandrange(-300, 300) - embeddedTime = int(time.time() + TTL) - payload = pack('>Q', embeddedTime) - payload += '\x00\x00\x00\x00' # object type: getpubkey - payload += encodeVarint(addressVersionNumber) - payload += encodeVarint(streamNumber) - if addressVersionNumber <= 3: - payload += ripe - logger.info( - 'making request for pubkey with ripe: %s', hexlify(ripe)) - else: - payload += tag - logger.info( - 'making request for v4 pubkey with tag: %s', hexlify(tag)) - - # print 'trial value', trialValue - statusbar = 'Doing the computations necessary to request' +\ - ' the recipient\'s public key.' - queues.UISignalQueue.put(('updateStatusBar', statusbar)) - queues.UISignalQueue.put(( - 'updateSentItemStatusByToAddress', ( - toAddress, - tr._translate( - "MainWindow", - "Doing work necessary to request encryption key.")) - )) - - payload = self._doPOWDefaults(payload, TTL) - - inventoryHash = calculateInventoryHash(payload) - objectType = 1 - Inventory()[inventoryHash] = ( - objectType, streamNumber, payload, embeddedTime, '') - logger.info('sending inv (for the getpubkey message)') - queues.invQueue.put((streamNumber, inventoryHash)) - - # wait 10% past expiration - sleeptill = int(time.time() + TTL * 1.1) - sqlExecute( - '''UPDATE sent SET lastactiontime=?, ''' - ''' status='awaitingpubkey', retrynumber=?, sleeptill=? ''' - ''' WHERE toaddress=? AND (status='doingpubkeypow' OR ''' - ''' status='awaitingpubkey') ''', - int(time.time()), retryNumber + 1, sleeptill, toAddress) - - queues.UISignalQueue.put(( - 'updateStatusBar', - tr._translate( - "MainWindow", - "Broadcasting the public key request. This program will" - " auto-retry if they are offline.") - )) - queues.UISignalQueue.put(( - 'updateSentItemStatusByToAddress', ( - toAddress, - tr._translate( - "MainWindow", - "Sending public key request. Waiting for reply." - " Requested at %1" - ).arg(l10n.formatTimestamp())) - )) - - def generateFullAckMessage(self, ackdata, toStreamNumber, TTL): - # It might be perfectly fine to just use the same TTL for - # the ackdata that we use for the message. But I would rather - # it be more difficult for attackers to associate ackData with - # the associated msg object. However, users would want the TTL - # of the acknowledgement to be about the same as they set - # for the message itself. So let's set the TTL of the - # acknowledgement to be in one of three 'buckets': 1 hour, 7 - # days, or 28 days, whichever is relatively close to what the - # user specified. - if TTL < 24 * 60 * 60: # 1 day - TTL = 24 * 60 * 60 # 1 day - elif TTL < 7 * 24 * 60 * 60: # 1 week - TTL = 7 * 24 * 60 * 60 # 1 week - else: - TTL = 28 * 24 * 60 * 60 # 4 weeks - # Add some randomness to the TTL - TTL = int(TTL + helper_random.randomrandrange(-300, 300)) - embeddedTime = int(time.time() + TTL) - - # type/version/stream already included - payload = pack('>Q', (embeddedTime)) + ackdata - - payload = self._doPOWDefaults( - payload, TTL, log_prefix='(For ack message)', log_time=True) - - return protocol.CreatePacket('object', payload) diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 45c4d8c0..4fd65a86 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -14,6 +14,8 @@ import queues import state import tr#anslate import helper_random +import addresses +import binascii # This thread exists because SQLITE3 is so un-threadsafe that we must # submit queries to it and it puts results back in a different queue. They # won't let us just use locks. @@ -165,16 +167,55 @@ class sqlThread(threading.Thread): logger.debug('Vacuuming message.dat. You might notice that the file size gets much smaller.') self.cur.execute( ''' VACUUM ''') - # After code refactoring, the possible status values for sent messages - # have changed. - self.cur.execute( - '''update sent set status='doingmsgpow' where status='doingpow' ''') - self.cur.execute( - '''update sent set status='msgsent' where status='sentmessage' ''') - self.cur.execute( - '''update sent set status='doingpubkeypow' where status='findingpubkey' ''') - self.cur.execute( - '''update sent set status='broadcastqueued' where status='broadcastpending' ''') + # Retry number now has no meaning for getpubkey messages + + self.cur.execute(""" + UPDATE "sent" SET "retrynumber" = 0 + WHERE "status" IN ( + 'doingpubkeypow', 'awaitingpubkey', + 'findingpubkey' -- Old name + ); + """) + + # Reset temporary message status values + # TODO: stop writing them to the database, they should exist only in memory + + self.cur.execute(""" + UPDATE "sent" SET "status" = 'broadcastqueued' + WHERE "status" == 'doingbroadcastpow'; + """) + + self.cur.execute(""" + UPDATE "sent" SET "status" = 'msgqueued' + WHERE "status" IN ( + 'doingmsgpow', 'doingpubkeypow', 'awaitingpubkey', + 'doingpow', 'findingpubkey' -- Old names + ); + """) + + # Update status values inherited from old versions + + self.cur.execute("""UPDATE "sent" SET "status" = 'msgsent' WHERE "status" == 'sentmessage';""") + self.cur.execute("""UPDATE "sent" SET "status" = 'broadcastqueued' WHERE "status" == 'broadcastpending';""") + + # Add tags to all objects + + self.cur.execute("""SELECT "hash", "payload" FROM "inventory" WHERE "tag" == '';""") + updatingCursor = self.conn.cursor() + + for ID, payload in self.cur: + readPosition = 20 + + version, readLength = addresses.decodeVarint(payload[readPosition: readPosition + 9]) + readPosition += readLength + + stream, readLength = addresses.decodeVarint(payload[readPosition: readPosition + 9]) + readPosition += readLength + + tag = buffer(payload[readPosition: readPosition + 32]) # May be shorter than 32 bytes for getpubkeys + + updatingCursor.execute("""UPDATE "inventory" SET "tag" = ? WHERE "hash" == ?;""", (tag, ID)) + self.conn.commit() if not BMConfigParser().has_option('bitmessagesettings', 'sockslisten'): @@ -476,6 +517,24 @@ class sqlThread(threading.Thread): parameters = (int(time.time()),) self.cur.execute(item, parameters) + self.cur.execute("""SELECT "ackdata" FROM "sent" WHERE "status" == 'msgsent';""") + legacyAckData = [] + + for ackData, in self.cur: + logger.info("Watching for ack data: %s", binascii.hexlify(ackData)) + + if len(ackData) == 32: + legacyAckData.append(ackData) + else: + state.watchedAckData.add(ackData) + + for i in legacyAckData: + ackData = "\x00\x00\x00\x02\x01\x01" + i + + self.cur.execute("""UPDATE "sent" SET "ackdata" = ? WHERE "ackdata" == ?;""", (ackData, i)) + + state.watchedAckData.add(ackData) + state.sqlReady = True while True: diff --git a/src/helper_generic.py b/src/helper_generic.py index 5f76b484..9e9b21f2 100644 --- a/src/helper_generic.py +++ b/src/helper_generic.py @@ -18,18 +18,6 @@ import queues import shutdown from debug import logger - -def powQueueSize(): - curWorkerQueue = queues.workerQueue.qsize() - for thread in threading.enumerate(): - try: - if thread.name == "singleWorker": - curWorkerQueue += thread.busy - except Exception as err: - logger.info('Thread error %s', err) - return curWorkerQueue - - def convertIntToString(n): a = __builtins__.hex(n) if a[-1:] == 'L': diff --git a/src/network/bmobject.py b/src/network/bmobject.py index 2e7dd092..87d36d5d 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -47,6 +47,7 @@ class BMObject(object): self.inventoryHash = calculateInventoryHash(data) # copy to avoid memory issues self.data = bytearray(data) + # Doesn't matter if the payload is shorter, old version "getpubkey" objects must have 20-byte tags self.tag = self.data[payloadOffset:payloadOffset+32] def checkProofOfWorkSufficient(self): diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 1f1c67bd..5af47041 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -353,7 +353,6 @@ class BMProto(AdvancedDispatcher, ObjectTracker): try: self.object.checkObjectByType() - objectProcessorQueue.put((self.object.objectType, buffer(self.object.data))) except BMObjectInvalidError as e: BMProto.stopDownloadingObject(self.object.inventoryHash, True) else: @@ -369,6 +368,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.object.objectType, self.object.streamNumber, buffer(self.payload[objectOffset:]), self.object.expiresTime, buffer(self.object.tag)) self.handleReceivedObject(self.object.streamNumber, self.object.inventoryHash) invQueue.put((self.object.streamNumber, self.object.inventoryHash, self.destination)) + objectProcessorQueue.put((self.object.objectType, buffer(self.object.data))) return True def _decode_addr(self): diff --git a/src/protocol.py b/src/protocol.py index 913c8e5f..865c3f72 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -62,32 +62,20 @@ Header = Struct('!L12sL4s') VersionPacket = Struct('>LqQ20s4s36sH') -# Bitfield +def calculateDoubleHash(data): + return hashlib.sha512(hashlib.sha512(data).digest()).digest() +def calculateRipeHash(data): + return hashlib.new("ripemd160", hashlib.sha512(data).digest()).digest() -def getBitfield(address): - """Get a bitfield from an address""" - # bitfield of features supported by me (see the wiki). - bitfield = 0 - # send ack - if not BMConfigParser().safeGetBoolean(address, 'dontsendack'): - bitfield |= BITFIELD_DOESACK - return pack('>I', bitfield) - - -def checkBitfield(bitfieldBinary, flags): - """Check if a bitfield matches the given flags""" - bitfield, = unpack('>I', bitfieldBinary) - return (bitfield & flags) == flags - - -def isBitSetWithinBitfield(fourByteString, n): - """Check if a particular bit is set in a bitfeld""" - # Uses MSB 0 bit numbering across 4 bytes of data - n = 31 - n - x, = unpack('>L', fourByteString) - return x & 2**n != 0 +def calculateAddressTag(version, stream, ripe): + doubleHash = calculateDoubleHash( + encodeVarint(version) + + encodeVarint(stream) + + ripe + ) + return doubleHash[: 32], doubleHash[32: ] # ip addresses @@ -199,7 +187,6 @@ def checkSocksIP(host): state.socksIP = BMConfigParser().get("bitmessagesettings", "sockshostname") return state.socksIP == host - def isProofOfWorkSufficient(data, nonceTrialsPerByte=0, payloadLengthExtraBytes=0, @@ -320,122 +307,98 @@ def assembleErrorMessage(fatal=0, banTime=0, inventoryVector='', errorText=''): # Packet decoding +def decryptAndCheckV4Pubkey(payload, address, cryptor): + status, version, stream, ripe = decodeAddress(address) + + readPosition = 20 -def decryptAndCheckPubkeyPayload(data, address): - """ - Version 4 pubkeys are encrypted. This function is run when we already have the - address to which we want to try to send a message. The 'data' may come either - off of the wire or we might have had it already in our inventory when we tried - to send a msg to this particular address. - """ - # pylint: disable=unused-variable try: - status, addressVersion, streamNumber, ripe = decodeAddress(address) + embeddedVersion, readLength = decodeVarint(payload[readPosition: readPosition + 9]) + readPosition += readLength - readPosition = 20 # bypass the nonce, time, and object type - embeddedAddressVersion, varintLength = decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - embeddedStreamNumber, varintLength = decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - # We'll store the address version and stream number (and some more) in the pubkeys table. - storedData = data[20:readPosition] + embeddedStream, readLength = decodeVarint(payload[readPosition: readPosition + 9]) + readPosition += readLength + except: + return None - if addressVersion != embeddedAddressVersion: - logger.info('Pubkey decryption was UNsuccessful due to address version mismatch.') - return 'failed' - if streamNumber != embeddedStreamNumber: - logger.info('Pubkey decryption was UNsuccessful due to stream number mismatch.') - return 'failed' + if embeddedVersion != 4: + logger.info("Pubkey decryption failed due to address version mismatch") - tag = data[readPosition:readPosition + 32] - readPosition += 32 - # the time through the tag. More data is appended onto signedData below after the decryption. - signedData = data[8:readPosition] - encryptedData = data[readPosition:] + return None - # Let us try to decrypt the pubkey - toAddress, cryptorObject = state.neededPubkeys[tag] - if toAddress != address: - logger.critical( - 'decryptAndCheckPubkeyPayload failed due to toAddress mismatch.' - ' This is very peculiar. toAddress: %s, address %s', - toAddress, - address) - # the only way I can think that this could happen is if someone encodes their address data two different - # ways. That sort of address-malleability should have been caught by the UI or API and an error given to - # the user. - return 'failed' - try: - decryptedData = cryptorObject.decrypt(encryptedData) - except: - # Someone must have encrypted some data with a different key - # but tagged it with a tag for which we are watching. - logger.info('Pubkey decryption was unsuccessful.') - return 'failed' + if embeddedStream != stream: + logger.info("Pubkey decryption failed due to stream number mismatch") - readPosition = 0 - bitfieldBehaviors = decryptedData[readPosition:readPosition + 4] + return None + + result = payload[20: readPosition] + + tag = payload[readPosition: readPosition + 32] + readPosition += 32 + + if len(tag) < 32: + return None + + signedData = payload[8: readPosition] + ciphertext = payload[readPosition: ] + + try: + plaintext = cryptor.decrypt(ciphertext) + except: + logger.info("Pubkey decryption failed") + + return None + + readPosition = 0 + + try: + bitfield = unpack(">I", plaintext[readPosition: readPosition + 4]) readPosition += 4 - publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64] - readPosition += 64 - publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64] - readPosition += 64 - specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedNonceTrialsPerByteLength - specifiedPayloadLengthExtraBytes, specifiedPayloadLengthExtraBytesLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedPayloadLengthExtraBytesLength - storedData += decryptedData[:readPosition] - signedData += decryptedData[:readPosition] - signatureLength, signatureLengthLength = decodeVarint( - decryptedData[readPosition:readPosition + 10]) - readPosition += signatureLengthLength - signature = decryptedData[readPosition:readPosition + signatureLength] + except: + return None - if highlevelcrypto.verify(signedData, signature, hexlify(publicSigningKey)): - logger.info('ECDSA verify passed (within decryptAndCheckPubkeyPayload)') - else: - logger.info('ECDSA verify failed (within decryptAndCheckPubkeyPayload)') - return 'failed' + publicSigningKey = "\x04" + plaintext[readPosition: readPosition + 64] + readPosition += 64 - sha = hashlib.new('sha512') - sha.update(publicSigningKey + publicEncryptionKey) - ripeHasher = hashlib.new('ripemd160') - ripeHasher.update(sha.digest()) - embeddedRipe = ripeHasher.digest() + publicEncryptionKey = "\x04" + plaintext[readPosition: readPosition + 64] + readPosition += 64 - if embeddedRipe != ripe: - # Although this pubkey object had the tag were were looking for and was - # encrypted with the correct encryption key, it doesn't contain the - # correct pubkeys. Someone is either being malicious or using buggy software. - logger.info('Pubkey decryption was UNsuccessful due to RIPE mismatch.') - return 'failed' + if len(publicSigningKey) != 65 or len(publicEncryptionKey) != 65: + return None - # Everything checked out. Insert it into the pubkeys table. + embeddedRipe = calculateRipeHash(publicSigningKey + publicEncryptionKey) - logger.info( - os.linesep.join([ - 'within decryptAndCheckPubkeyPayload,' - ' addressVersion: %s, streamNumber: %s' % addressVersion, streamNumber, - 'ripe %s' % hexlify(ripe), - 'publicSigningKey in hex: %s' % hexlify(publicSigningKey), - 'publicEncryptionKey in hex: %s' % hexlify(publicEncryptionKey), - ]) - ) + if embeddedRipe != ripe: + logger.info("Pubkey decryption failed due to RIPE mismatch") - t = (address, addressVersion, storedData, int(time.time()), 'yes') - sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) - return 'successful' - except varintDecodeError: - logger.info('Pubkey decryption was UNsuccessful due to a malformed varint.') - return 'failed' - except Exception: - logger.critical( - 'Pubkey decryption was UNsuccessful because of an unhandled exception! This is definitely a bug! \n%s', - traceback.format_exc()) - return 'failed' + return None + try: + byteDifficulty, readLength = decodeVarint(plaintext[readPosition: readPosition + 9]) + readPosition += readLength + + lengthExtension, readLength = decodeVarint(plaintext[readPosition: readPosition + 9]) + readPosition += readLength + except: + return None + + result += plaintext[: readPosition] + signedData += plaintext[: readPosition] + + signatureLength, readLength = decodeVarint(plaintext[readPosition: readPosition + 9]) + readPosition += readLength + + signature = plaintext[readPosition: readPosition + signatureLength] + + if len(signature) != signatureLength: + return None + + if not highlevelcrypto.verify(signedData, signature, hexlify(publicSigningKey)): + logger.info("Invalid signature on a pubkey") + + return None + + return result def checkAndShareObjectWithPeers(data): """ diff --git a/src/shared.py b/src/shared.py index caf24769..6fb40b79 100644 --- a/src/shared.py +++ b/src/shared.py @@ -68,7 +68,6 @@ alreadyAttemptedConnectionsListLock = threading.Lock() alreadyAttemptedConnectionsListResetTime = int(time.time()) # A list of the amounts of time it took to successfully decrypt msg messages successfullyDecryptMessageTimings = [] -ackdataForWhichImWatching = {} # used by API command clientStatus clientHasReceivedIncomingConnections = False numberOfMessagesProcessed = 0 @@ -287,150 +286,6 @@ def fixSensitiveFilePermissions(filename, hasEnabledKeys): logger.exception('Keyfile permissions could not be fixed.') raise - -def isBitSetWithinBitfield(fourByteString, n): - # Uses MSB 0 bit numbering across 4 bytes of data - n = 31 - n - x, = unpack('>L', fourByteString) - return x & 2**n != 0 - - -def decryptAndCheckPubkeyPayload(data, address): - """ - Version 4 pubkeys are encrypted. This function is run when we - already have the address to which we want to try to send a message. - The 'data' may come either off of the wire or we might have had it - already in our inventory when we tried to send a msg to this - particular address. - """ - try: - # status - _, addressVersion, streamNumber, ripe = decodeAddress(address) - - readPosition = 20 # bypass the nonce, time, and object type - embeddedAddressVersion, varintLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - embeddedStreamNumber, varintLength = \ - decodeVarint(data[readPosition:readPosition + 10]) - readPosition += varintLength - # We'll store the address version and stream number - # (and some more) in the pubkeys table. - storedData = data[20:readPosition] - - if addressVersion != embeddedAddressVersion: - logger.info( - 'Pubkey decryption was UNsuccessful' - ' due to address version mismatch.') - return 'failed' - if streamNumber != embeddedStreamNumber: - logger.info( - 'Pubkey decryption was UNsuccessful' - ' due to stream number mismatch.') - return 'failed' - - tag = data[readPosition:readPosition + 32] - readPosition += 32 - # the time through the tag. More data is appended onto - # signedData below after the decryption. - signedData = data[8:readPosition] - encryptedData = data[readPosition:] - - # Let us try to decrypt the pubkey - toAddress, cryptorObject = state.neededPubkeys[tag] - if toAddress != address: - logger.critical( - 'decryptAndCheckPubkeyPayload failed due to toAddress' - ' mismatch. This is very peculiar.' - ' toAddress: %s, address %s', - toAddress, address - ) - # the only way I can think that this could happen - # is if someone encodes their address data two different ways. - # That sort of address-malleability should have been caught - # by the UI or API and an error given to the user. - return 'failed' - try: - decryptedData = cryptorObject.decrypt(encryptedData) - except: - # Someone must have encrypted some data with a different key - # but tagged it with a tag for which we are watching. - logger.info('Pubkey decryption was unsuccessful.') - return 'failed' - - readPosition = 0 - # bitfieldBehaviors = decryptedData[readPosition:readPosition + 4] - readPosition += 4 - publicSigningKey = \ - '\x04' + decryptedData[readPosition:readPosition + 64] - readPosition += 64 - publicEncryptionKey = \ - '\x04' + decryptedData[readPosition:readPosition + 64] - readPosition += 64 - specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = \ - decodeVarint(decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedNonceTrialsPerByteLength - specifiedPayloadLengthExtraBytes, \ - specifiedPayloadLengthExtraBytesLength = \ - decodeVarint(decryptedData[readPosition:readPosition + 10]) - readPosition += specifiedPayloadLengthExtraBytesLength - storedData += decryptedData[:readPosition] - signedData += decryptedData[:readPosition] - signatureLength, signatureLengthLength = \ - decodeVarint(decryptedData[readPosition:readPosition + 10]) - readPosition += signatureLengthLength - signature = decryptedData[readPosition:readPosition + signatureLength] - - if not highlevelcrypto.verify( - signedData, signature, hexlify(publicSigningKey)): - logger.info( - 'ECDSA verify failed (within decryptAndCheckPubkeyPayload)') - return 'failed' - - logger.info( - 'ECDSA verify passed (within decryptAndCheckPubkeyPayload)') - - sha = hashlib.new('sha512') - sha.update(publicSigningKey + publicEncryptionKey) - ripeHasher = hashlib.new('ripemd160') - ripeHasher.update(sha.digest()) - embeddedRipe = ripeHasher.digest() - - if embeddedRipe != ripe: - # Although this pubkey object had the tag were were looking for - # and was encrypted with the correct encryption key, - # it doesn't contain the correct pubkeys. Someone is - # either being malicious or using buggy software. - logger.info( - 'Pubkey decryption was UNsuccessful due to RIPE mismatch.') - return 'failed' - - # Everything checked out. Insert it into the pubkeys table. - - logger.info( - 'within decryptAndCheckPubkeyPayload, ' - 'addressVersion: %s, streamNumber: %s\nripe %s\n' - 'publicSigningKey in hex: %s\npublicEncryptionKey in hex: %s', - addressVersion, streamNumber, hexlify(ripe), - hexlify(publicSigningKey), hexlify(publicEncryptionKey) - ) - - t = (address, addressVersion, storedData, int(time.time()), 'yes') - sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t) - return 'successful' - except varintDecodeError: - logger.info( - 'Pubkey decryption was UNsuccessful due to a malformed varint.') - return 'failed' - except Exception: - logger.critical( - 'Pubkey decryption was UNsuccessful because of' - ' an unhandled exception! This is definitely a bug! \n%s' % - traceback.format_exc() - ) - return 'failed' - - def checkAndShareObjectWithPeers(data): """ This function is called after either receiving an object diff --git a/src/singleworker.py b/src/singleworker.py new file mode 100644 index 00000000..9a061cec --- /dev/null +++ b/src/singleworker.py @@ -0,0 +1,942 @@ +import binascii +import collections +import hashlib +import os.path +import struct +import threading +import time + +import addresses +import bmconfigparser +import debug +import defaults +import helper_msgcoding +import helper_sql +import helper_random +import helper_threading +import highlevelcrypto +import inventory +import l10n +import paths +import protocol +import shared +import state +import tr +import queues +import workprover + +# Message status flow: +# +# +----------------------------------------------------------------------------------------+ +# v | +# +-> msgqueued -+---------------------------------------->+-+-> doingmsgpow -+-> msgsent -+-> ackreceived +# ^ | ^ | | +# | +-> awaitingpubkey -+-> doingpubkeypow -+ | | +-> msgsentnoackexpected +# | ^ v | | | | +# +--------------+-------------------+-------------------+ | | +-> badkey +# | | +# | +-> toodifficult --> forcepow -+ +# | | +# +--------------------------------+ + +# Broadcast status flow: +# +# broadcastqueued --> doingbroadcastpow --> broadcastsent + +# TODO: queued pubkey messages are not saved to the database, they disappear when the client is closed + +AddressProperties = collections.namedtuple("AddressProperties", [ + "version", "stream", "ripe", + "own", "chan", "bitfield", "byteDifficulty", "lengthExtension", + "secretSigningKey", "secretEncryptionKey", "publicSigningKey", "publicEncryptionKey" +]) + +def getMyAddressProperties(address, defaultDifficulty = False): + status, version, stream, ripe = addresses.decodeAddress(address) + + if defaultDifficulty: + byteDifficulty = defaults.networkDefaultProofOfWorkNonceTrialsPerByte + lengthExtension = defaults.networkDefaultPayloadLengthExtraBytes + else: + byteDifficulty = bmconfigparser.BMConfigParser().safeGetInt(address, "noncetrialsperbyte", None) + lengthExtension = bmconfigparser.BMConfigParser().safeGetInt(address, "payloadlengthextrabytes", None) + + chan = bmconfigparser.BMConfigParser().safeGetBoolean(address, "chan") + bitfield = 0 + + if not bmconfigparser.BMConfigParser().safeGetBoolean(address, "dontsendack"): + bitfield |= protocol.BITFIELD_DOESACK + + secretSigningKeyBase58 = bmconfigparser.BMConfigParser().get(address, "privsigningkey") + secretEncryptionKeyBase58 = bmconfigparser.BMConfigParser().get(address, "privencryptionkey") + + secretSigningKey = shared.decodeWalletImportFormat(secretSigningKeyBase58) + secretEncryptionKey = shared.decodeWalletImportFormat(secretEncryptionKeyBase58) + + publicSigningKey = binascii.unhexlify(highlevelcrypto.privToPub(binascii.hexlify(secretSigningKey))) + publicEncryptionKey = binascii.unhexlify(highlevelcrypto.privToPub(binascii.hexlify(secretEncryptionKey))) + + return AddressProperties( + version, stream, ripe, + True, chan, bitfield, byteDifficulty, lengthExtension, + secretSigningKey, secretEncryptionKey, publicSigningKey, publicEncryptionKey + ) + +def parsePubkeyMessage(encoded): + readPosition = 0 + + version, readLength = addresses.decodeVarint(encoded[readPosition: readPosition + 9]) + readPosition += readLength + + stream, readLength = addresses.decodeVarint(encoded[readPosition: readPosition + 9]) + readPosition += readLength + + bitfield, = struct.unpack(">I", encoded[readPosition: readPosition + 4]) + readPosition += 4 + + publicSigningKey = "\x04" + encoded[readPosition: readPosition + 64] + readPosition += 64 + + publicEncryptionKey = "\x04" + encoded[readPosition: readPosition + 64] + readPosition += 64 + + ripe = protocol.calculateRipeHash(publicSigningKey + publicEncryptionKey) + + if version < 3: + byteDifficulty = defaults.networkDefaultProofOfWorkNonceTrialsPerByte + lengthExtension = defaults.networkDefaultPayloadLengthExtraBytes + else: + byteDifficulty, readLength = addresses.decodeVarint(encoded[readPosition: readPosition + 9]) + readPosition += readLength + + lengthExtension, readLength = addresses.decodeVarint(encoded[readPosition: readPosition + 9]) + readPosition += readLength + + byteDifficulty = max(defaults.networkDefaultProofOfWorkNonceTrialsPerByte, byteDifficulty) + lengthExtension = max(defaults.networkDefaultPayloadLengthExtraBytes, lengthExtension) + + return AddressProperties( + version, stream, ripe, + False, False, bitfield, byteDifficulty, lengthExtension, + None, None, publicSigningKey, publicEncryptionKey + ) + +def getDestinationAddressProperties(address): + # Search own and chan addresses + + try: + return getMyAddressProperties(address, True) + except: + pass + + # Search the "pubkeys" table in the database + + status, version, stream, ripe = addresses.decodeAddress(address) + + if version == 4: + secretEncryptionKey, tag = protocol.calculateAddressTag(version, stream, ripe) + + cryptor = highlevelcrypto.makeCryptor(binascii.hexlify(secretEncryptionKey)) + + alreadyNeeded = tag in state.neededPubkeys + state.neededPubkeys[tag] = address, cryptor + else: + alreadyNeeded = address in state.neededPubkeys + state.neededPubkeys[address] = None + + helper_sql.sqlExecute("""UPDATE "pubkeys" SET "usedpersonally" = 'yes' WHERE "address" == ?;""", address) + encodedPubkeys = helper_sql.sqlQuery("""SELECT "transmitdata" FROM "pubkeys" WHERE "address" == ?;""", address) + + result = None + + if len(encodedPubkeys) != 0: + result = parsePubkeyMessage(encodedPubkeys[-1][0]) + + # Search the inventory for encrypted keys + + if result is None and version == 4: + for i in inventory.Inventory().by_type_and_tag(1, tag): + encodedPubkey = protocol.decryptAndCheckV4Pubkey(i.payload, address, cryptor) + + if encodedPubkey is None: + continue + + helper_sql.sqlExecute(""" + INSERT INTO "pubkeys" ("address", "addressversion", "transmitdata", "time", "usedpersonally") + VALUES (?, 4, ?, ?, 'yes'); + """, address, encodedPubkey, int(time.time())) + + result = parsePubkeyMessage(encodedPubkey) + + break + + if result is not None: + if version == 4: + state.neededPubkeys.pop(tag, None) + else: + state.neededPubkeys.pop(address, None) + + helper_sql.sqlExecute(""" + UPDATE "sent" SET "status" = 'msgqueued' + WHERE "status" IN ('doingpubkeypow', 'awaitingpubkey') AND "toaddress" == ? AND "folder" == 'sent'; + """, address) + + if alreadyNeeded: + queues.workerQueue.put(("sendmessage", )) + + queued = helper_sql.sqlQuery(""" + SELECT "ackdata" FROM "sent" + WHERE "status" == 'msgqueued' AND "toaddress" == ? AND "folder" == 'sent'; + """, address) + + for i, in queued: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + i, + tr._translate( + "MainWindow", + "Queued." + ) + ))) + + return result + + return None + +def randomizeTTL(TTL): + return TTL + helper_random.randomrandrange(-300, 300) + +def disseminateObject(nonce, expiryTime, headlessPayload, objectType, stream, tag): + payload = nonce + struct.pack(">Q", expiryTime) + headlessPayload + inventoryHash = protocol.calculateDoubleHash(payload)[: 32] + + inventory.Inventory()[inventoryHash] = objectType, stream, payload, expiryTime, buffer(tag) + queues.invQueue.put((stream, inventoryHash)) + + debug.logger.info("Broadcasting inventory object with hash: %s", binascii.hexlify(inventoryHash)) + + return inventoryHash, payload + +class singleWorker(threading.Thread, helper_threading.StoppableThread): + name = "singleWorker" + + def __init__(self): + super(self.__class__, self).__init__() + + self.initStop() + + def stopThread(self): + queues.workerQueue.put(("stopThread", "data")) + + super(self.__class__, self).stopThread() + + def run(self): + GPUVendor = bmconfigparser.BMConfigParser().safeGet("bitmessagesettings", "opencl") + + self.workProver = workprover.WorkProver( + os.path.join(paths.codePath(), "workprover"), + GPUVendor, + helper_random.randomBytes(32), + lambda status: queues.UISignalQueue.put(("updateWorkProverStatus", status)), + queues.workerQueue + ) + + self.workProver.start() + + parallelism = bmconfigparser.BMConfigParser().safeGetInt("bitmessagesettings", "maxcores") + + if parallelism is None: + parallelism = self.workProver.defaultParallelism + + if "gpu" in self.workProver.availableSolvers and GPUVendor is not None: + self.workProver.commandsQueue.put(("setSolver", "gpu", None)) + elif "fast" in self.workProver.availableSolvers: + self.workProver.commandsQueue.put(("setSolver", "fast", parallelism)) + elif "forking" in self.workProver.availableSolvers: + self.workProver.commandsQueue.put(("setSolver", "forking", parallelism)) + else: + self.workProver.commandsQueue.put(("setSolver", "dumb", None)) + + if "fast" not in self.workProver.availableSolvers: + queues.UISignalQueue.put(("updateStatusBar", ( + tr._translate( + "proofofwork", + "C PoW module unavailable. Please build it." + ), 1 + ))) + + self.startedWorks = {} + + # Give some time for the GUI to start + # TODO: use a condition variable + + self.stop.wait(10) + + queues.workerQueue.put(("sendmessage", )) + queues.workerQueue.put(("sendbroadcast", )) + + while state.shutdown == 0: + queueItem = queues.workerQueue.get() + command, arguments = queueItem[0], queueItem[1: ] + + if command == "sendmessage": + self.sendMessages() + elif command == "sendbroadcast": + self.sendBroadcasts() + elif command == "sendMyPubkey": + self.sendMyPubkey(*arguments) + elif command == "requestPubkey": + self.requestPubkey(*arguments) + elif command == "resetPoW": + pass + elif command == "taskDone": + self.workDone(*arguments) + elif command == "stopThread": + self.workProver.commandsQueue.put(("shutdown", )) + self.workProver.join() + + break + + debug.logger.info("Quitting...") + + def startWork(self, ID, headlessPayload, TTL, expiryTime, byteDifficulty, lengthExtension, logPrefix, callback): + debug.logger.info( + "%s Starting work %s, payload length = %s, TTL = %s", + logPrefix, ID, 8 + 8 + len(headlessPayload), TTL + ) + + self.startedWorks[ID] = callback + + self.workProver.commandsQueue.put(( + "addTask", ID, headlessPayload, TTL, expiryTime, + byteDifficulty, lengthExtension + )) + + def workDone(self, ID, nonce, expiryTime): + debug.logger.info("Found proof of work %s", ID) + + self.startedWorks[ID](nonce, expiryTime) + + del self.startedWorks[ID] + + def sendMyPubkey(self, address): + ID = "pubkey", address + + if ID in self.startedWorks: + return + + try: + addressProperties = getMyAddressProperties(address) + except Exception as exception: + debug.logger.error("Could not get the properties of a requested address %s\n", exception) + + return + + if addressProperties.chan: + debug.logger.info("This is a chan address. Not sending pubkey") + + return + + if addressProperties.version == 4: + secretEncryptionKey, tag = protocol.calculateAddressTag( + addressProperties.version, + addressProperties.stream, + addressProperties.ripe + ) + + publicEncryptionKey = highlevelcrypto.pointMult(secretEncryptionKey) + else: + tag = "" + + debug.logger.info("Sending pubkey of %s", address) + + TTL = randomizeTTL(28 * 24 * 60 * 60) + + if addressProperties.version > 2: + expiryTime = int(time.time() + TTL) + else: + expiryTime = None + + headlessPayload = struct.pack(">I", 1) + headlessPayload += addresses.encodeVarint(addressProperties.version) + headlessPayload += addresses.encodeVarint(addressProperties.stream) + + inventoryTagPosition = len(headlessPayload) + + headlessPayload += tag + + if addressProperties.version == 4: + plaintext = struct.pack(">I", addressProperties.bitfield) + plaintext += addressProperties.publicSigningKey[1: ] + plaintext += addressProperties.publicEncryptionKey[1: ] + plaintext += addresses.encodeVarint(addressProperties.byteDifficulty) + plaintext += addresses.encodeVarint(addressProperties.lengthExtension) + + signature = highlevelcrypto.sign( + struct.pack(">Q", expiryTime) + headlessPayload + plaintext, + binascii.hexlify(addressProperties.secretSigningKey) + ) + + plaintext += addresses.encodeVarint(len(signature)) + plaintext += signature + + headlessPayload += highlevelcrypto.encrypt(plaintext, binascii.hexlify(publicEncryptionKey)) + else: + headlessPayload += struct.pack(">I", addressProperties.bitfield) + headlessPayload += addressProperties.publicSigningKey[1: ] + headlessPayload += addressProperties.publicEncryptionKey[1: ] + + if addressProperties.version == 3: + headlessPayload += addresses.encodeVarint(addressProperties.byteDifficulty) + headlessPayload += addresses.encodeVarint(addressProperties.lengthExtension) + + signature = highlevelcrypto.sign( + struct.pack(">Q", expiryTime) + headlessPayload, + binascii.hexlify(addressProperties.secretSigningKey) + ) + + headlessPayload += addresses.encodeVarint(len(signature)) + headlessPayload += signature + + def workDone(nonce, expiryTime): + inventoryTag = headlessPayload[inventoryTagPosition: inventoryTagPosition + 32] + + disseminateObject(nonce, expiryTime, headlessPayload, 1, addressProperties.stream, inventoryTag) + + # TODO: not atomic with the addition to the inventory, the "lastpubkeysendtime" property should be removed + # Instead check if the pubkey is present in the inventory + + try: + bmconfigparser.BMConfigParser().set(address, "lastpubkeysendtime", str(int(time.time()))) + bmconfigparser.BMConfigParser().save() + except: + pass + + queues.UISignalQueue.put(("updateStatusBar", "")) + + self.startWork( + ID, headlessPayload, TTL, expiryTime, + defaults.networkDefaultProofOfWorkNonceTrialsPerByte, + defaults.networkDefaultPayloadLengthExtraBytes, + "(For pubkey version {} message)".format(addressProperties.version), + workDone + ) + + def processBroadcast(self, address, subject, body, ackData, TTL, encoding): + ID = "broadcast", ackData + + try: + addressProperties = getMyAddressProperties(address) + except: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Error! Could not find sender address (your address) in the keys.dat file." + ) + ))) + + return + + if addressProperties.version < 2: + debug.logger.error("Address version unsupported for broadcasts") + + return + + debug.logger.info("Sending broadcast from %s", address) + + if addressProperties.version == 4: + secretEncryptionKey, tag = protocol.calculateAddressTag( + addressProperties.version, + addressProperties.stream, + addressProperties.ripe + ) + else: + secretEncryptionKey = hashlib.sha512( + addresses.encodeVarint(addressProperties.version) + + addresses.encodeVarint(addressProperties.stream) + + addressProperties.ripe + ).digest()[: 32] + + tag = "" + + publicEncryptionKey = highlevelcrypto.pointMult(secretEncryptionKey) + + TTL = min(28 * 24 * 60 * 60, TTL) + TTL = max(60 * 60, TTL) + TTL = randomizeTTL(TTL) + expiryTime = int(time.time() + TTL) + + headlessPayload = struct.pack(">I", 3) + + if addressProperties.version == 4: + headlessPayload += addresses.encodeVarint(5) + else: + headlessPayload += addresses.encodeVarint(4) + + headlessPayload += addresses.encodeVarint(addressProperties.stream) + + inventoryTagPosition = len(headlessPayload) + + headlessPayload += tag + + plaintext = addresses.encodeVarint(addressProperties.version) + plaintext += addresses.encodeVarint(addressProperties.stream) + plaintext += struct.pack(">I", addressProperties.bitfield) + plaintext += addressProperties.publicSigningKey[1: ] + plaintext += addressProperties.publicEncryptionKey[1: ] + + if addressProperties.version >= 3: + plaintext += addresses.encodeVarint(addressProperties.byteDifficulty) + plaintext += addresses.encodeVarint(addressProperties.lengthExtension) + + encodedMessage = helper_msgcoding.MsgEncode({"subject": subject, "body": body}, encoding) + + plaintext += addresses.encodeVarint(encoding) + plaintext += addresses.encodeVarint(encodedMessage.length) + plaintext += encodedMessage.data + + signature = highlevelcrypto.sign( + struct.pack(">Q", expiryTime) + headlessPayload + plaintext, + binascii.hexlify(addressProperties.secretSigningKey) + ) + + plaintext += addresses.encodeVarint(len(signature)) + plaintext += signature + + headlessPayload += highlevelcrypto.encrypt(plaintext, binascii.hexlify(publicEncryptionKey)) + + if len(headlessPayload) > 2 ** 18 - (8 + 8): # 256 kiB + debug.logger.critical( + "This broadcast object is too large to send. This should never happen. Object size: %s", + len(headlessPayload) + ) + + return + + def workDone(nonce, expiryTime): + inventoryTag = headlessPayload[inventoryTagPosition: inventoryTagPosition + 32] + + # TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction + + inventoryHash, payload = disseminateObject( + nonce, expiryTime, headlessPayload, + 3, addressProperties.stream, inventoryTag + ) + + helper_sql.sqlExecute(""" + UPDATE "sent" SET "msgid" = ?, "status" = 'broadcastsent', "lastactiontime" = ? + WHERE "ackdata" == ?; + """, inventoryHash, int(time.time()), ackData) + + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate("MainWindow", "Broadcast sent on %1").arg(l10n.formatTimestamp()) + ))) + + # Add to own inbox + + if addressProperties.version == 4: + if tag in shared.MyECSubscriptionCryptorObjects: + queues.objectProcessorQueue.put((3, payload)) + else: + if addressProperties.ripe in shared.MyECSubscriptionCryptorObjects: + queues.objectProcessorQueue.put((3, payload)) + + helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'doingbroadcastpow' WHERE "ackdata" == ?;""", ackData) + + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Doing work necessary to send broadcast." + ) + ))) + + self.startWork( + ID, headlessPayload, TTL, expiryTime, + defaults.networkDefaultProofOfWorkNonceTrialsPerByte, + defaults.networkDefaultPayloadLengthExtraBytes, + "(For broadcast message)", + workDone + ) + + def sendBroadcasts(self): + queued = helper_sql.sqlQuery(""" + SELECT "fromaddress", "subject", "message", "ackdata", "ttl", "encodingtype" FROM "sent" + WHERE "status" == 'broadcastqueued' AND "folder" == 'sent'; + """) + + for i in queued: + # Must be in a separate function because of the nested callback + + self.processBroadcast(*i) + + def generateAckMessage(self, ackData, stream, TTL, callback): + ID = "ack", ackData + + # It might be perfectly fine to just use the same TTL for + # the ackdata that we use for the message. But I would rather + # it be more difficult for attackers to associate ackData with + # the associated msg object. However, users would want the TTL + # of the acknowledgement to be about the same as they set + # for the message itself. So let's set the TTL of the + # acknowledgement to be in one of three 'buckets': 1 hour, 7 + # days, or 28 days, whichever is relatively close to what the + # user specified. + + if TTL < 24 * 60 * 60: + TTL = 24 * 60 * 60 + elif TTL < 7 * 24 * 60 * 60: + TTL = 7 * 24 * 60 * 60 + else: + TTL = 28 * 24 * 60 * 60 + + TTL = randomizeTTL(TTL) + expiryTime = int(time.time() + TTL) + + def workDone(nonce, expiryTime): + payload = nonce + struct.pack(">Q", expiryTime) + ackData + + callback(protocol.CreatePacket("object", payload)) + + self.startWork( + ID, ackData, TTL, expiryTime, + defaults.networkDefaultProofOfWorkNonceTrialsPerByte, + defaults.networkDefaultPayloadLengthExtraBytes, + "(For ack message)", + workDone + ) + + def processMessage(self, status, destination, source, subject, body, ackData, TTL, retryNumber, encoding): + ID = "message", ackData + + helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'awaitingpubkey' WHERE "ackdata" == ?;""", ackData) + + destinationProperties = getDestinationAddressProperties(destination) + + if destinationProperties is None: + queues.workerQueue.put(("requestPubkey", destination)) + + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Waiting for their encryption key. Will request it again soon." + ) + ))) + + return + + try: + defaultDifficulty = shared.isAddressInMyAddressBookSubscriptionsListOrWhitelist(destination) + + if destinationProperties.own: + defaultDifficulty = True + + sourceProperties = getMyAddressProperties(source, defaultDifficulty) + except: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Error! Could not find sender address (your address) in the keys.dat file." + ) + ))) + + return + + relativeByteDifficulty = ( + float(destinationProperties.byteDifficulty) / + defaults.networkDefaultProofOfWorkNonceTrialsPerByte + ) + + relativeLengthExtension = ( + float(destinationProperties.lengthExtension) / + defaults.networkDefaultPayloadLengthExtraBytes + ) + + if status != "forcepow": + maximumByteDifficulty = bmconfigparser.BMConfigParser().getint( + "bitmessagesettings", "maxacceptablenoncetrialsperbyte" + ) + + maximumLengthExtension = bmconfigparser.BMConfigParser().getint( + "bitmessagesettings", "maxacceptablepayloadlengthextrabytes" + ) + + if ( + maximumByteDifficulty != 0 and destinationProperties.byteDifficulty > maximumLengthExtension or + maximumLengthExtension != 0 and destinationProperties.lengthExtension > maximumLengthExtension + ): + helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'toodifficult' WHERE "ackdata" == ?;""", ackData) + + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Problem: The work demanded by the recipient (%1 and %2) is " + "more difficult than you are willing to do. %3" + ).arg(str(relativeByteDifficulty)).arg(str(relativeLengthExtension)).arg(l10n.formatTimestamp()) + ))) + + return + + debug.logger.info("Sending message from %s to %s", source, destination) + + TTL *= 2 ** retryNumber + TTL = min(28 * 24 * 60 * 60, TTL) + TTL = max(60 * 60, TTL) + TTL = randomizeTTL(TTL) + expiryTime = int(time.time() + TTL) + + def ackMessageGenerated(ackMessage): + headlessPayload = struct.pack(">I", 2) + headlessPayload += addresses.encodeVarint(1) + headlessPayload += addresses.encodeVarint(destinationProperties.stream) + + plaintext = addresses.encodeVarint(sourceProperties.version) + plaintext += addresses.encodeVarint(sourceProperties.stream) + plaintext += struct.pack(">I", sourceProperties.bitfield) + plaintext += sourceProperties.publicSigningKey[1: ] + plaintext += sourceProperties.publicEncryptionKey[1: ] + + if sourceProperties.version >= 3: + plaintext += addresses.encodeVarint(sourceProperties.byteDifficulty) + plaintext += addresses.encodeVarint(sourceProperties.lengthExtension) + + plaintext += destinationProperties.ripe # To prevent resending a signed message to a different reciever + + encodedMessage = helper_msgcoding.MsgEncode({"subject": subject, "body": body}, encoding) + + plaintext += addresses.encodeVarint(encoding) + plaintext += addresses.encodeVarint(encodedMessage.length) + plaintext += encodedMessage.data + + if ackMessage is None: + plaintext += addresses.encodeVarint(0) + else: + plaintext += addresses.encodeVarint(len(ackMessage)) + plaintext += ackMessage + + signature = highlevelcrypto.sign( + struct.pack(">Q", expiryTime) + headlessPayload + plaintext, + binascii.hexlify(sourceProperties.secretSigningKey) + ) + + plaintext += addresses.encodeVarint(len(signature)) + plaintext += signature + + try: + ciphertext = highlevelcrypto.encrypt( + plaintext, + binascii.hexlify(destinationProperties.publicEncryptionKey) + ) + except: + helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'badkey' WHERE "ackdata" == ?;""", ackData) + + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Problem: The recipient's encryption key is no good. Could not encrypt message. %1" + ).arg(l10n.formatTimestamp()) + ))) + + return + + headlessPayload += ciphertext + inventoryTag = ciphertext[: 32] + + if len(headlessPayload) > 2 ** 18 - (8 + 8): # 256 kiB + debug.logger.critical( + "This message object is too large to send. This should never happen. Object size: %s", + len(headlessPayload) + ) + + return + + def workDone(nonce, expiryTime): + if ackMessage is not None: + state.watchedAckData.add(ackData) + + #TODO: adding to the inventory, adding to inbox and setting the sent status should be within a single SQL transaction + + inventoryHash, payload = disseminateObject( + nonce, expiryTime, headlessPayload, + 2, destinationProperties.stream, inventoryTag + ) + + if ackMessage is None: + newStatus = "msgsentnoackexpected" + else: + newStatus = "msgsent" + + sleepTill = int(time.time() + TTL * 1.1) + + helper_sql.sqlExecute(""" + UPDATE "sent" SET "msgid" = ?, "status" = ?, "retrynumber" = ?, + "sleeptill" = ?, "lastactiontime" = ? + WHERE "status" == 'doingmsgpow' AND "ackdata" == ?; + """, inventoryHash, newStatus, retryNumber + 1, sleepTill, int(time.time()), ackData) + + # Add to own inbox + + if destinationProperties.own: + queues.objectProcessorQueue.put((2, payload)) + + if ackMessage is None: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Message sent. Sent at %1" + ).arg(l10n.formatTimestamp()) + ))) + else: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Message sent. Waiting for acknowledgement. Sent on %1" + ).arg(l10n.formatTimestamp()) + ))) + + self.startWork( + ID, headlessPayload, TTL, expiryTime, + destinationProperties.byteDifficulty, + destinationProperties.lengthExtension, + "(For message)", + workDone + ) + + helper_sql.sqlExecute("""UPDATE "sent" SET "status" = 'doingmsgpow' WHERE "ackdata" == ?;""", ackData) + + if relativeByteDifficulty != 1 or relativeLengthExtension != 1: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Doing work necessary to send message.\nReceiver's required difficulty: %1 and %2" + ).arg(str(relativeByteDifficulty)).arg(str(relativeLengthExtension)) + ))) + else: + queues.UISignalQueue.put(("updateSentItemStatusByAckdata", ( + ackData, + tr._translate( + "MainWindow", + "Doing work necessary to send message." + ) + ))) + + if destinationProperties.own: + debug.logger.info("Not bothering to include ack data because we are sending to ourselves or a chan") + + ackMessageGenerated(None) + elif destinationProperties.bitfield & protocol.BITFIELD_DOESACK == 0: + debug.logger.info("Not bothering to include ack data because the receiver said that they won't relay it anyway") + + ackMessageGenerated(None) + else: + self.generateAckMessage(ackData, destinationProperties.stream, TTL, ackMessageGenerated) + + def sendMessages(self): + queued = helper_sql.sqlQuery(""" + SELECT "status", "toaddress", "fromaddress", "subject", "message", + "ackdata", "ttl", "retrynumber", "encodingtype" FROM "sent" + WHERE "status" IN ('msgqueued', 'forcepow') AND "folder" == 'sent'; + """) + + for i in queued: + # Must be in a separate function because of the nested callback + + self.processMessage(*i) + + def requestPubkey(self, address): + ID = "getpubkey", address + + if ID in self.startedWorks: + return + + status, version, stream, ripe = addresses.decodeAddress(address) + + # Check if a request is already in the inventory + + if version == 4: + secretEncryptionKey, tag = protocol.calculateAddressTag(version, stream, ripe) + else: + tag = ripe + + currentExpiryTime = None + + for i in inventory.Inventory().by_type_and_tag(0, tag): + if currentExpiryTime is None: + currentExpiryTime = i.expires + else: + currentExpiryTime = max(currentExpiryTime, i.expires) + + if currentExpiryTime is not None: + helper_sql.sqlExecute(""" + UPDATE "sent" SET "status" = 'awaitingpubkey', "sleeptill" = ? + WHERE "status" IN ('doingpubkeypow', 'awaitingpubkey') AND "toaddress" == ? AND "folder" == 'sent'; + """, currentExpiryTime, address) + + queues.UISignalQueue.put(("updateSentItemStatusByToAddress", ( + address, + tr._translate( + "MainWindow", + "Waiting for their encryption key. Will request it again soon." + ) + ))) + + return + + debug.logger.info("Making request for version %s pubkey with tag: %s", version, binascii.hexlify(tag)) + + TTL = randomizeTTL(28 * 24 * 60 * 60) + + headlessPayload = struct.pack(">I", 0) + headlessPayload += addresses.encodeVarint(version) + headlessPayload += addresses.encodeVarint(stream) + + headlessPayload += tag + + def workDone(nonce, expiryTime): + # TODO: adding to the inventory and setting the sent status should be within a single SQL transaction + + disseminateObject(nonce, expiryTime, headlessPayload, 0, stream, tag) + + sleepTill = int(time.time() + TTL * 1.1) + + helper_sql.sqlExecute(""" + UPDATE "sent" SET "status" = 'awaitingpubkey', "sleeptill" = ?, "lastactiontime" = ? + WHERE "status" IN ('doingpubkeypow', 'awaitingpubkey') AND "toaddress" == ? AND "folder" == 'sent'; + """, sleepTill, int(time.time()), address) + + queues.UISignalQueue.put(("updateSentItemStatusByToAddress", ( + address, + tr._translate( + "MainWindow", + "Sending public key request. Waiting for reply. Requested at %1" + ).arg(l10n.formatTimestamp()) + ))) + + helper_sql.sqlExecute(""" + UPDATE "sent" SET "status" = 'doingpubkeypow' + WHERE "status" == 'awaitingpubkey' AND "toaddress" == ? AND "folder" == 'sent'; + """, address) + + queues.UISignalQueue.put(("updateSentItemStatusByToAddress", ( + address, + tr._translate( + "MainWindow", + "Doing work necessary to request encryption key." + ) + ))) + + self.startWork( + ID, headlessPayload, TTL, None, + defaults.networkDefaultProofOfWorkNonceTrialsPerByte, + defaults.networkDefaultPayloadLengthExtraBytes, + "(For getpubkey message)".format(version), + workDone + ) diff --git a/src/state.py b/src/state.py index d0433059..79910837 100644 --- a/src/state.py +++ b/src/state.py @@ -1,6 +1,11 @@ import collections +# Single worker assumes, that object processor checks this dict only after a pubkey is added to the inventory or the "pubkeys" table +# TODO: add locking? + neededPubkeys = {} +watchedAckData = set() + streamsInWhichIAmParticipating = [] sendDataQueues = [] # each sendData thread puts its queue in this list. diff --git a/src/workprover/__init__.py b/src/workprover/__init__.py index 0762a94c..502c8e38 100644 --- a/src/workprover/__init__.py +++ b/src/workprover/__init__.py @@ -28,7 +28,9 @@ class Task(object): self.target = target class WorkProver(threading.Thread): - def __init__(self, codePath, GPUVendor, seed, statusUpdated): + # Seed must be 32 bytes + + def __init__(self, codePath, GPUVendor, seed, statusUpdated, resultsQueue): super(self.__class__, self).__init__() self.availableSolvers = { @@ -65,7 +67,11 @@ class WorkProver(threading.Thread): self.statusUpdated = statusUpdated self.commandsQueue = Queue.Queue() - self.resultsQueue = Queue.Queue() + + if resultsQueue is None: + self.resultsQueue = Queue.Queue() + else: + self.resultsQueue = resultsQueue self.solverName = None self.solver = None @@ -86,7 +92,7 @@ class WorkProver(threading.Thread): if self.solver is not None: status = self.solver.status - self.statusUpdated((self.solverName, status, self.speed)) + self.statusUpdated((self.solverName, status, self.speed, len(self.tasks))) def setSolver(self, name, configuration): if name is None and self.solverName is None: diff --git a/src/workprover/forkingsolver.py b/src/workprover/forkingsolver.py index 8813453c..5ab84da7 100644 --- a/src/workprover/forkingsolver.py +++ b/src/workprover/forkingsolver.py @@ -38,21 +38,24 @@ def threadFunction(local, remote, codePath, threadNumber): solver = dumbsolver.DumbSolver(codePath) while True: - received = local.recv() + try: + received = local.recv() - command = received[0] - arguments = received[1: ] + command = received[0] + arguments = received[1: ] - if command == "search": - initialHash, target, seed, timeout = arguments - appendedSeed = seed + struct.pack(">Q", threadNumber) + if command == "search": + initialHash, target, seed, timeout = arguments + appendedSeed = seed + struct.pack(">Q", threadNumber) - nonce, iterationsCount = solver.search(initialHash, target, appendedSeed, timeout) + nonce, iterationsCount = solver.search(initialHash, target, appendedSeed, timeout) - local.send(("done", nonce, iterationsCount)) - elif command == "shutdown": - local.close() + local.send(("done", nonce, iterationsCount)) + elif command == "shutdown": + local.close() + return + except (EOFError, IOError): return class ForkingSolver(object): diff --git a/src/workprover/test.py b/src/workprover/test.py index e57c54b5..6d36ade2 100755 --- a/src/workprover/test.py +++ b/src/workprover/test.py @@ -121,7 +121,7 @@ class TestGPUSolver(TestSolver): class TestWorkProver(unittest.TestCase): def setUp(self): - self.thread = __init__.WorkProver(codePath, None, seed, None) + self.thread = __init__.WorkProver(codePath, None, seed, None, None) self.thread.start() def checkTaskLinks(self):