You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
PyBitmessage/src/class_singleWorker.py

1419 lines
63 KiB
Python

9 years ago
from __future__ import division
import time
import threading
import hashlib
from struct import pack
# used when the API must execute an outside program
from subprocess import call # nosec
from binascii import hexlify, unhexlify
import tr
import l10n
import protocol
import queues
import state
import shared
import defaults
import highlevelcrypto
import proofofwork
import helper_inbox
import helper_random
import helper_msgcoding
from bmconfigparser import BMConfigParser
from debug import logger
from inventory import Inventory
from addresses import (
decodeAddress, encodeVarint, decodeVarint, calculateInventoryHash
)
# from helper_generic import addDataPadding
from helper_threading import StoppableThread
from helper_sql import sqlQuery, sqlExecute
# This thread, of which there is only one, does the heavy lifting:
# calculating POWs.
def sizeof_fmt(num, suffix='h/s'):
for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']:
if abs(num) < 1000.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)
class singleWorker(threading.Thread, StoppableThread):
def __init__(self):
# QThread.__init__(self, parent)
threading.Thread.__init__(self, name="singleWorker")
self.initStop()
proofofwork.init()
def stopThread(self):
try:
queues.workerQueue.put(("stopThread", "data"))
except:
pass
super(singleWorker, self).stopThread()
def run(self):
while not state.sqlReady and state.shutdown == 0:
self.stop.wait(2)
if state.shutdown > 0:
return
# Initialize the neededPubkeys dictionary.
queryreturn = sqlQuery(
'''SELECT DISTINCT toaddress FROM sent'''
''' WHERE (status='awaitingpubkey' AND folder='sent')''')
for row in queryreturn:
toAddress, = row
# toStatus
_, toAddressVersionNumber, toStreamNumber, toRipe = \
decodeAddress(toAddress)
if toAddressVersionNumber <= 3:
state.neededPubkeys[toAddress] = 0
elif toAddressVersionNumber >= 4:
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(
encodeVarint(toAddressVersionNumber) +
encodeVarint(toStreamNumber) + toRipe
).digest()).digest()
# Note that this is the first half of the sha512 hash.
privEncryptionKey = doubleHashOfAddressData[:32]
tag = doubleHashOfAddressData[32:]
# We'll need this for when we receive a pubkey reply:
# it will be encrypted and we'll need to decrypt it.
state.neededPubkeys[tag] = (
toAddress,
highlevelcrypto.makeCryptor(
hexlify(privEncryptionKey))
)
# Initialize the shared.ackdataForWhichImWatching data structure
queryreturn = sqlQuery(
'''SELECT ackdata FROM sent WHERE status = 'msgsent' ''')
for row in queryreturn:
ackdata, = row
logger.info('Watching for ackdata ' + hexlify(ackdata))
shared.ackdataForWhichImWatching[ackdata] = 0
# Fix legacy (headerless) watched ackdata to include header
for oldack in shared.ackdataForWhichImWatching.keys():
if (len(oldack) == 32):
# attach legacy header, always constant (msg/1/1)
newack = '\x00\x00\x00\x02\x01\x01' + oldack
shared.ackdataForWhichImWatching[newack] = 0
sqlExecute(
'UPDATE sent SET ackdata=? WHERE ackdata=?',
newack, oldack
)
del shared.ackdataForWhichImWatching[oldack]
# give some time for the GUI to start
# before we start on existing POW tasks.
self.stop.wait(10)
if state.shutdown == 0:
# just in case there are any pending tasks for msg
# messages that have yet to be sent.
queues.workerQueue.put(('sendmessage', ''))
# just in case there are any tasks for Broadcasts
# that have yet to be sent.
queues.workerQueue.put(('sendbroadcast', ''))
while state.shutdown == 0:
self.busy = 0
command, data = queues.workerQueue.get()
self.busy = 1
if command == 'sendmessage':
try:
self.sendMsg()
except:
pass
elif command == 'sendbroadcast':
try:
self.sendBroadcast()
except:
pass
elif command == 'doPOWForMyV2Pubkey':
try:
self.doPOWForMyV2Pubkey(data)
except:
pass
elif command == 'sendOutOrStoreMyV3Pubkey':
try:
self.sendOutOrStoreMyV3Pubkey(data)
except:
pass
elif command == 'sendOutOrStoreMyV4Pubkey':
try:
self.sendOutOrStoreMyV4Pubkey(data)
except:
pass
elif command == 'resetPoW':
try:
proofofwork.resetPoW()
except:
pass
elif command == 'stopThread':
self.busy = 0
return
else:
logger.error(
'Probable programming error: The command sent'
' to the workerThread is weird. It is: %s\n',
command
)
queues.workerQueue.task_done()
logger.info("Quitting...")
def _getKeysForAddress(self, address):
privSigningKeyBase58 = BMConfigParser().get(
address, 'privsigningkey')
privEncryptionKeyBase58 = BMConfigParser().get(
address, 'privencryptionkey')
privSigningKeyHex = hexlify(shared.decodeWalletImportFormat(
privSigningKeyBase58))
privEncryptionKeyHex = hexlify(shared.decodeWalletImportFormat(
privEncryptionKeyBase58))
# The \x04 on the beginning of the public keys are not sent.
# This way there is only one acceptable way to encode
# and send a public key.
pubSigningKey = unhexlify(highlevelcrypto.privToPub(
privSigningKeyHex))[1:]
pubEncryptionKey = unhexlify(highlevelcrypto.privToPub(
privEncryptionKeyHex))[1:]
return privSigningKeyHex, privEncryptionKeyHex, \
pubSigningKey, pubEncryptionKey
def _doPOWDefaults(self, payload, TTL,
log_prefix='',
log_time=False):
target = 2 ** 64 / (
defaults.networkDefaultProofOfWorkNonceTrialsPerByte * (
len(payload) + 8 +
defaults.networkDefaultPayloadLengthExtraBytes + ((
TTL * (
len(payload) + 8 +
defaults.networkDefaultPayloadLengthExtraBytes
)) / (2 ** 16))
))
initialHash = hashlib.sha512(payload).digest()
logger.info(
'%s Doing proof of work... TTL set to %s', log_prefix, TTL)
if log_time:
start_time = time.time()
trialValue, nonce = proofofwork.run(target, initialHash)
logger.info(
'%s Found proof of work %s Nonce: %s',
log_prefix, trialValue, nonce
)
try:
delta = time.time() - start_time
logger.info(
'PoW took %.1f seconds, speed %s.',
delta, sizeof_fmt(nonce / delta)
)
except: # NameError
pass
payload = pack('>Q', nonce) + payload
# inventoryHash = calculateInventoryHash(payload)
return payload
# This function also broadcasts out the pubkey message
# once it is done with the POW
def doPOWForMyV2Pubkey(self, adressHash):
# Look up my stream number based on my address hash
"""configSections = shared.config.addresses()
for addressInKeysFile in configSections:
if addressInKeysFile != 'bitmessagesettings':
status, addressVersionNumber, streamNumber, \
hashFromThisParticularAddress = \
decodeAddress(addressInKeysFile)
if hash == hashFromThisParticularAddress:
myAddress = addressInKeysFile
break"""
myAddress = shared.myAddressesByHash[adressHash]
# status
_, addressVersionNumber, streamNumber, adressHash = decodeAddress(myAddress)
# 28 days from now plus or minus five minutes
TTL = int(28 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300))
embeddedTime = int(time.time() + TTL)
payload = pack('>Q', (embeddedTime))
payload += '\x00\x00\x00\x01' # object type: pubkey
payload += encodeVarint(addressVersionNumber) # Address version number
payload += encodeVarint(streamNumber)
# bitfield of features supported by me (see the wiki).
payload += protocol.getBitfield(myAddress)
try:
# privSigningKeyHex, privEncryptionKeyHex
_, _, pubSigningKey, pubEncryptionKey = \
self._getKeysForAddress(myAddress)
except Exception as err:
logger.error(
'Error within doPOWForMyV2Pubkey. Could not read'
' the keys from the keys.dat file for a requested'
' address. %s\n', err
)
return
payload += pubSigningKey + pubEncryptionKey
# Do the POW for this pubkey message
payload = self._doPOWDefaults(
payload, TTL, log_prefix='(For pubkey message)')
inventoryHash = calculateInventoryHash(payload)
objectType = 1
Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '')
logger.info('broadcasting inv with hash: %s', hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
10 years ago
try:
BMConfigParser().set(
10 years ago
myAddress, 'lastpubkeysendtime', str(int(time.time())))
BMConfigParser().save()
10 years ago
except:
# The user deleted the address out of the keys.dat file
# before this finished.
10 years ago
pass
# If this isn't a chan address, this function assembles the pubkey data,
# does the necessary POW and sends it out. If it *is* a chan then it
# assembles the pubkey and stores is in the pubkey table so that we can
# send messages to "ourselves".
def sendOutOrStoreMyV3Pubkey(self, adressHash):
10 years ago
try:
myAddress = shared.myAddressesByHash[adressHash]
10 years ago
except:
# The address has been deleted.
10 years ago
return
if BMConfigParser().safeGetBoolean(myAddress, 'chan'):
logger.info('This is a chan address. Not sending pubkey.')
return
_, addressVersionNumber, streamNumber, adressHash = decodeAddress(
myAddress)
# 28 days from now plus or minus five minutes
TTL = int(28 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300))
embeddedTime = int(time.time() + TTL)
# signedTimeForProtocolV2 = embeddedTime - TTL
# According to the protocol specification, the expiresTime
# along with the pubkey information is signed. But to be
# backwards compatible during the upgrade period, we shall sign
# not the expiresTime but rather the current time. There must be
# precisely a 28 day difference between the two. After the upgrade
# period we'll switch to signing the whole payload with the
# expiresTime time.
payload = pack('>Q', (embeddedTime))
payload += '\x00\x00\x00\x01' # object type: pubkey
payload += encodeVarint(addressVersionNumber) # Address version number
payload += encodeVarint(streamNumber)
# bitfield of features supported by me (see the wiki).
payload += protocol.getBitfield(myAddress)
try:
# , privEncryptionKeyHex
privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \
self._getKeysForAddress(myAddress)
except Exception as err:
logger.error(
'Error within sendOutOrStoreMyV3Pubkey. Could not read'
' the keys from the keys.dat file for a requested'
' address. %s\n', err
)
return
payload += pubSigningKey + pubEncryptionKey
payload += encodeVarint(BMConfigParser().getint(
myAddress, 'noncetrialsperbyte'))
payload += encodeVarint(BMConfigParser().getint(
myAddress, 'payloadlengthextrabytes'))
signature = highlevelcrypto.sign(payload, privSigningKeyHex)
payload += encodeVarint(len(signature))
payload += signature
# Do the POW for this pubkey message
payload = self._doPOWDefaults(
payload, TTL, log_prefix='(For pubkey message)')
inventoryHash = calculateInventoryHash(payload)
objectType = 1
Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '')
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
10 years ago
try:
BMConfigParser().set(
10 years ago
myAddress, 'lastpubkeysendtime', str(int(time.time())))
BMConfigParser().save()
10 years ago
except:
# The user deleted the address out of the keys.dat file
# before this finished.
10 years ago
pass
# If this isn't a chan address, this function assembles
# the pubkey data, does the necessary POW and sends it out.
def sendOutOrStoreMyV4Pubkey(self, myAddress):
if not BMConfigParser().has_section(myAddress):
# The address has been deleted.
10 years ago
return
if shared.BMConfigParser().safeGetBoolean(myAddress, 'chan'):
logger.info('This is a chan address. Not sending pubkey.')
return
_, addressVersionNumber, streamNumber, addressHash = decodeAddress(
myAddress)
# 28 days from now plus or minus five minutes
TTL = int(28 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300))
embeddedTime = int(time.time() + TTL)
payload = pack('>Q', (embeddedTime))
payload += '\x00\x00\x00\x01' # object type: pubkey
payload += encodeVarint(addressVersionNumber) # Address version number
payload += encodeVarint(streamNumber)
dataToEncrypt = protocol.getBitfield(myAddress)
try:
# , privEncryptionKeyHex
privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \
self._getKeysForAddress(myAddress)
except Exception as err:
logger.error(
'Error within sendOutOrStoreMyV4Pubkey. Could not read'
' the keys from the keys.dat file for a requested'
' address. %s\n', err
)
return
dataToEncrypt += pubSigningKey + pubEncryptionKey
dataToEncrypt += encodeVarint(BMConfigParser().getint(
myAddress, 'noncetrialsperbyte'))
dataToEncrypt += encodeVarint(BMConfigParser().getint(
myAddress, 'payloadlengthextrabytes'))
# When we encrypt, we'll use a hash of the data
# contained in an address as a decryption key. This way
# in order to read the public keys in a pubkey message,
# a node must know the address first. We'll also tag,
# unencrypted, the pubkey with part of the hash so that nodes
# know which pubkey object to try to decrypt
# when they want to send a message.
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(
encodeVarint(addressVersionNumber) +
encodeVarint(streamNumber) + addressHash
).digest()).digest()
payload += doubleHashOfAddressData[32:] # the tag
signature = highlevelcrypto.sign(
payload + dataToEncrypt, privSigningKeyHex
)
dataToEncrypt += encodeVarint(len(signature))
dataToEncrypt += signature
privEncryptionKey = doubleHashOfAddressData[:32]
pubEncryptionKey = highlevelcrypto.pointMult(privEncryptionKey)
payload += highlevelcrypto.encrypt(
dataToEncrypt, hexlify(pubEncryptionKey))
# Do the POW for this pubkey message
payload = self._doPOWDefaults(
payload, TTL, log_prefix='(For pubkey message)')
inventoryHash = calculateInventoryHash(payload)
objectType = 1
Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime,
doubleHashOfAddressData[32:]
)
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put(('updateStatusBar', ''))
try:
BMConfigParser().set(
myAddress, 'lastpubkeysendtime', str(int(time.time())))
BMConfigParser().save()
except Exception as err:
logger.error(
'Error: Couldn\'t add the lastpubkeysendtime'
' to the keys.dat file. Error message: %s', err
)
def sendBroadcast(self):
# Reset just in case
sqlExecute(
'''UPDATE sent SET status='broadcastqueued' '''
'''WHERE status = 'doingbroadcastpow' ''')
queryreturn = sqlQuery(
'''SELECT fromaddress, subject, message, '''
''' ackdata, ttl, encodingtype FROM sent '''
''' WHERE status=? and folder='sent' ''', 'broadcastqueued')
for row in queryreturn:
fromaddress, subject, body, ackdata, TTL, encoding = row
# status
_, addressVersionNumber, streamNumber, ripe = \
decodeAddress(fromaddress)
if addressVersionNumber <= 1:
logger.error(
'Error: In the singleWorker thread, the '
' sendBroadcast function doesn\'t understand'
' the address version.\n')
return
# We need to convert our private keys to public keys in order
# to include them.
try:
# , privEncryptionKeyHex
privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \
self._getKeysForAddress(fromaddress)
except:
queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', (
ackdata,
tr._translate(
"MainWindow",
"Error! Could not find sender address"
" (your address) in the keys.dat file."))
))
continue
sqlExecute(
'''UPDATE sent SET status='doingbroadcastpow' '''
''' WHERE ackdata=? AND status='broadcastqueued' ''',
ackdata)
# At this time these pubkeys are 65 bytes long
# because they include the encoding byte which we won't
# be sending in the broadcast message.
# pubSigningKey = \
# highlevelcrypto.privToPub(privSigningKeyHex).decode('hex')
if TTL > 28 * 24 * 60 * 60:
TTL = 28 * 24 * 60 * 60
if TTL < 60 * 60:
TTL = 60 * 60
# add some randomness to the TTL
TTL = int(TTL + helper_random.randomrandrange(-300, 300))
embeddedTime = int(time.time() + TTL)
payload = pack('>Q', embeddedTime)
payload += '\x00\x00\x00\x03' # object type: broadcast
if addressVersionNumber <= 3:
payload += encodeVarint(4) # broadcast version
else:
payload += encodeVarint(5) # broadcast version
payload += encodeVarint(streamNumber)
if addressVersionNumber >= 4:
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(
encodeVarint(addressVersionNumber) +
encodeVarint(streamNumber) + ripe
).digest()).digest()
tag = doubleHashOfAddressData[32:]
payload += tag
else:
tag = ''
dataToEncrypt = encodeVarint(addressVersionNumber)
dataToEncrypt += encodeVarint(streamNumber)
# behavior bitfield
dataToEncrypt += protocol.getBitfield(fromaddress)
dataToEncrypt += pubSigningKey + pubEncryptionKey
if addressVersionNumber >= 3:
dataToEncrypt += encodeVarint(BMConfigParser().getint(
fromaddress, 'noncetrialsperbyte'))
dataToEncrypt += encodeVarint(BMConfigParser().getint(
fromaddress, 'payloadlengthextrabytes'))
# message encoding type
dataToEncrypt += encodeVarint(encoding)
encodedMessage = helper_msgcoding.MsgEncode(
{"subject": subject, "body": body}, encoding)
dataToEncrypt += encodeVarint(encodedMessage.length)
dataToEncrypt += encodedMessage.data
dataToSign = payload + dataToEncrypt
signature = highlevelcrypto.sign(
dataToSign, privSigningKeyHex)
dataToEncrypt += encodeVarint(len(signature))
dataToEncrypt += signature
# Encrypt the broadcast with the information
# contained in the broadcaster's address.
# Anyone who knows the address can generate
# the private encryption key to decrypt the broadcast.
# This provides virtually no privacy; its purpose is to keep
# questionable and illegal content from flowing through the
# Internet connections and being stored on the disk of 3rd parties.
if addressVersionNumber <= 3:
privEncryptionKey = hashlib.sha512(
encodeVarint(addressVersionNumber) +
encodeVarint(streamNumber) + ripe
).digest()[:32]
else:
privEncryptionKey = doubleHashOfAddressData[:32]
pubEncryptionKey = highlevelcrypto.pointMult(privEncryptionKey)
payload += highlevelcrypto.encrypt(
dataToEncrypt, hexlify(pubEncryptionKey))
queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', (
ackdata,
tr._translate(
"MainWindow",
"Doing work necessary to send broadcast..."))
))
payload = self._doPOWDefaults(
payload, TTL, log_prefix='(For broadcast message)')
# Sanity check. The payload size should never be larger
# than 256 KiB. There should be checks elsewhere in the code
# to not let the user try to send a message this large
# until we implement message continuation.
if len(payload) > 2 ** 18: # 256 KiB
logger.critical(
'This broadcast object is too large to send.'
' This should never happen. Object size: %s',
len(payload)
)
continue
inventoryHash = calculateInventoryHash(payload)
objectType = 3
Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, tag)
logger.info(
'sending inv (within sendBroadcast function)'
' for object: %s',
hexlify(inventoryHash)
)
queues.invQueue.put((streamNumber, inventoryHash))
queues.UISignalQueue.put((
'updateSentItemStatusByAckdata', (
ackdata,
tr._translate(
"MainWindow",
"Broadcast sent on %1"
).arg(l10n.formatTimestamp()))
))
# Update the status of the message in the 'sent' table to have
# a 'broadcastsent' status
sqlExecute(
'UPDATE sent SET msgid=?, status=?, lastactiontime=?'
' WHERE ackdata=?',
inventoryHash, 'broadcastsent', int(time.time()), ackdata
)
def sendMsg(self):
# Reset just in case
sqlExecute(
'''UPDATE sent SET status='msgqueued' '''
''' WHERE status IN ('doingpubkeypow', 'doingmsgpow')''')
queryreturn = sqlQuery(
'''SELECT toaddress, fromaddress, subject, message, '''
''' ackdata, status, ttl, retrynumber, encodingtype FROM '''
''' sent WHERE (status='msgqueued' or status='forcepow') '''
''' and folder='sent' ''')
# while we have a msg that needs some work
for row in queryreturn:
toaddress, fromaddress, subject, message, \
ackdata, status, TTL, retryNumber, encoding = row