@ -1,40 +1,41 @@
"""
src / class_singleWorker . py
== == == == == == == == == == == == =
"""
# pylint: disable=protected-access,too-many-branches,too-many-statements,no-self-use,too-many-lines,too-many-locals
from __future__ import division
import time
import threading
import hashlib
import threading
import time
from binascii import hexlify , unhexlify
from struct import pack
# used when the API must execute an outside program
from subprocess import call # nosec
from binascii import hexlify , unhexlify
import tr
import defaults
import helper_inbox
import helper_msgcoding
import helper_random
import highlevelcrypto
import l10n
import proofofwork
import protocol
import queues
import state
import shared
import defaults
import highlevelcrypto
import proofofwork
import helper_inbox
import helper_random
import helper_msgcoding
import state
import tr
from addresses import calculateInventoryHash , decodeAddress , decodeVarint , encodeVarint
from bmconfigparser import BMConfigParser
from debug import logger
from inventory import Inventory
from addresses import (
decodeAddress , encodeVarint , decodeVarint , calculateInventoryHash
)
# from helper_generic import addDataPadding
from helper_sql import sqlExecute , sqlQuery
from helper_threading import StoppableThread
from helper_sql import sqlQuery , sqlExecute
from inventory import Inventory
# This thread, of which there is only one, does the heavy lifting:
# calculating POWs.
def sizeof_fmt ( num , suffix = ' h/s ' ) :
""" Format hashes per seconds nicely (SI prefix) """
for unit in [ ' ' , ' k ' , ' M ' , ' G ' , ' T ' , ' P ' , ' E ' , ' Z ' ] :
if abs ( num ) < 1000.0 :
return " %3.1f %s %s " % ( num , unit , suffix )
@ -43,14 +44,16 @@ def sizeof_fmt(num, suffix='h/s'):
class singleWorker ( threading . Thread , StoppableThread ) :
""" Thread for performing PoW """
def __init__ ( self ) :
# QThread.__init__(self, parent)
threading . Thread . __init__ ( self , name = " singleWorker " )
self . initStop ( )
proofofwork . init ( )
def stopThread ( self ) :
""" Signal through the queue that the thread should be stopped """
try :
queues . workerQueue . put ( ( " stopThread " , " data " ) )
except :
@ -58,6 +61,7 @@ class singleWorker(threading.Thread, StoppableThread):
super ( singleWorker , self ) . stopThread ( )
def run ( self ) :
# pylint: disable=attribute-defined-outside-init
while not state . sqlReady and state . shutdown == 0 :
self . stop . wait ( 2 )
@ -96,12 +100,12 @@ class singleWorker(threading.Thread, StoppableThread):
''' SELECT ackdata FROM sent WHERE status = ' msgsent ' ''' )
for row in queryreturn :
ackdata , = row
logger . info ( ' Watching for ackdata ' + hexlify ( ackdata ) )
logger . info ( ' Watching for ackdata %s ' , hexlify ( ackdata ) )
shared . ackdataForWhichImWatching [ ackdata ] = 0
# Fix legacy (headerless) watched ackdata to include header
for oldack in shared . ackdataForWhichImWatching . keys ( ) :
if ( len ( oldack ) == 32 ) :
for oldack in shared . ackdataForWhichImWatching :
if len ( oldack ) == 32 :
# attach legacy header, always constant (msg/1/1)
newack = ' \x00 \x00 \x00 \x02 \x01 \x01 ' + oldack
shared . ackdataForWhichImWatching [ newack ] = 0
@ -226,19 +230,9 @@ class singleWorker(threading.Thread, StoppableThread):
# inventoryHash = calculateInventoryHash(payload)
return payload
# This function also broadcasts out the pubkey message
# once it is done with the POW
def doPOWForMyV2Pubkey ( self , adressHash ) :
""" This function also broadcasts out the pubkey message once it is done with the POW """
# Look up my stream number based on my address hash
""" configSections = shared.config.addresses()
for addressInKeysFile in configSections :
if addressInKeysFile != ' bitmessagesettings ' :
status , addressVersionNumber , streamNumber , \
hashFromThisParticularAddress = \
decodeAddress ( addressInKeysFile )
if hash == hashFromThisParticularAddress :
myAddress = addressInKeysFile
break """
myAddress = shared . myAddressesByHash [ adressHash ]
# status
_ , addressVersionNumber , streamNumber , adressHash = decodeAddress ( myAddress )
@ -289,11 +283,12 @@ class singleWorker(threading.Thread, StoppableThread):
# before this finished.
pass
# If this isn't a chan address, this function assembles the pubkey data,
# does the necessary POW and sends it out. If it *is* a chan then it
# assembles the pubkey and stores is in the pubkey table so that we can
# send messages to "ourselves".
def sendOutOrStoreMyV3Pubkey ( self , adressHash ) :
"""
If this isn ' t a chan address, this function assembles the pubkey data, does the necessary POW and sends it out.
If it * is * a chan then it assembles the pubkey and stores is in the pubkey table so that we can send messages
to " ourselves " .
"""
try :
myAddress = shared . myAddressesByHash [ adressHash ]
except :
@ -357,7 +352,7 @@ class singleWorker(threading.Thread, StoppableThread):
Inventory ( ) [ inventoryHash ] = (
objectType , streamNumber , payload , embeddedTime , ' ' )
logger . info ( ' broadcasting inv with hash: ' + hexlify ( inventoryHash ) )
logger . info ( ' broadcasting inv with hash: %s ' , hexlify ( inventoryHash ) )
queues . invQueue . put ( ( streamNumber , inventoryHash ) )
queues . UISignalQueue . put ( ( ' updateStatusBar ' , ' ' ) )
@ -370,9 +365,12 @@ class singleWorker(threading.Thread, StoppableThread):
# before this finished.
pass
# If this isn't a chan address, this function assembles
# the pubkey data, does the necessary POW and sends it out.
def sendOutOrStoreMyV4Pubkey ( self , myAddress ) :
"""
It doesn ' t send directly anymore. It put is to a queue for another thread to send at an appropriate time,
whereas in the past it directly appended it to the outgoing buffer , I think . Same with all the other methods in
this class .
"""
if not BMConfigParser ( ) . has_section ( myAddress ) :
# The address has been deleted.
return
@ -444,7 +442,7 @@ class singleWorker(threading.Thread, StoppableThread):
doubleHashOfAddressData [ 32 : ]
)
logger . info ( ' broadcasting inv with hash: ' + hexlify ( inventoryHash ) )
logger . info ( ' broadcasting inv with hash: %s ' , hexlify ( inventoryHash ) )
queues . invQueue . put ( ( streamNumber , inventoryHash ) )
queues . UISignalQueue . put ( ( ' updateStatusBar ' , ' ' ) )
@ -459,6 +457,7 @@ class singleWorker(threading.Thread, StoppableThread):
)
def sendBroadcast ( self ) :
""" Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue) """
# Reset just in case
sqlExecute (
''' UPDATE sent SET status= ' broadcastqueued ' '''
@ -627,6 +626,8 @@ class singleWorker(threading.Thread, StoppableThread):
)
def sendMsg ( self ) :
""" Send a message-type object (assemble the object, perform PoW and put it to the inv announcement queue) """
# pylint: disable=too-many-nested-blocks
# Reset just in case
sqlExecute (
''' UPDATE sent SET status= ' msgqueued ' '''
@ -740,10 +741,8 @@ class singleWorker(threading.Thread, StoppableThread):
# object associated with the tag for this toAddress.
if toAddressVersionNumber > = 4 :
doubleHashOfToAddressData = hashlib . sha512 (
hashlib . sha512 ( encodeVarint (
toAddressVersionNumber ) +
encodeVarint ( toStreamNumber ) +
toRipe
hashlib . sha512 (
encodeVarint ( toAddressVersionNumber ) + encodeVarint ( toStreamNumber ) + toRipe
) . digest ( )
) . digest ( )
# The first half of the sha512 hash.
@ -834,7 +833,7 @@ class singleWorker(threading.Thread, StoppableThread):
queryreturn = sqlQuery (
' SELECT transmitdata FROM pubkeys WHERE address=? ' ,
toaddress )
for row in queryreturn :
for row in queryreturn : # pylint: disable=redefined-outer-name
pubkeyPayload , = row
# The pubkey message is stored with the following items
@ -939,40 +938,43 @@ class singleWorker(threading.Thread, StoppableThread):
requiredAverageProofOfWorkNonceTrialsPerByte ,
requiredPayloadLengthExtraBytes
)
queues . UISignalQueue . put ( (
' updateSentItemStatusByAckdata ' , (
ackdata ,
tr . _translate (
" MainWindow " ,
" Doing work necessary to send message. \n "
" Receiver \' s required difficulty: % 1 "
" and % 2 "
) . arg ( str ( float (
requiredAverageProofOfWorkNonceTrialsPerByte ) /
defaults . networkDefaultProofOfWorkNonceTrialsPerByte
) ) . arg ( str ( float (
requiredPayloadLengthExtraBytes ) /
defaults . networkDefaultPayloadLengthExtraBytes
) ) ) ) )
queues . UISignalQueue . put (
(
' updateSentItemStatusByAckdata ' ,
(
ackdata ,
tr . _translate (
" MainWindow " ,
" Doing work necessary to send message. \n "
" Receiver \' s required difficulty: % 1 "
" and % 2 "
) . arg (
str (
float ( requiredAverageProofOfWorkNonceTrialsPerByte ) /
defaults . networkDefaultProofOfWorkNonceTrialsPerByte
)
) . arg (
str (
float ( requiredPayloadLengthExtraBytes ) /
defaults . networkDefaultPayloadLengthExtraBytes
)
)
)
)
)
if status != ' forcepow ' :
if ( requiredAverageProofOfWorkNonceTrialsPerByte
> BMConfigParser ( ) . getint (
' bitmessagesettings ' ,
' maxacceptablenoncetrialsperbyte '
) and
BMConfigParser ( ) . getint (
' bitmessagesettings ' ,
' maxacceptablenoncetrialsperbyte '
) != 0 ) or (
requiredPayloadLengthExtraBytes
> BMConfigParser ( ) . getint (
' bitmessagesettings ' ,
' maxacceptablepayloadlengthextrabytes '
) and
BMConfigParser ( ) . getint (
' bitmessagesettings ' ,
' maxacceptablepayloadlengthextrabytes '
) != 0 ) :
maxacceptablenoncetrialsperbyte = BMConfigParser ( ) . getint (
' bitmessagesettings ' , ' maxacceptablenoncetrialsperbyte ' )
maxacceptablepayloadlengthextrabytes = BMConfigParser ( ) . getint (
' bitmessagesettings ' , ' maxacceptablepayloadlengthextrabytes ' )
cond1 = maxacceptablenoncetrialsperbyte and \
requiredAverageProofOfWorkNonceTrialsPerByte > maxacceptablenoncetrialsperbyte
cond2 = maxacceptablepayloadlengthextrabytes and \
requiredPayloadLengthExtraBytes > maxacceptablepayloadlengthextrabytes
if cond1 or cond2 :
# The demanded difficulty is more than
# we are willing to do.
sqlExecute (
@ -988,19 +990,15 @@ class singleWorker(threading.Thread, StoppableThread):
" the recipient ( % 1 and % 2) is "
" more difficult than you are "
" willing to do. % 3 "
) . arg ( str ( float (
requiredAverageProofOfWorkNonceTrialsPerByte )
/ defaults . networkDefaultProofOfWorkNonceTrialsPerByte
) ) . arg ( str ( float (
requiredPayloadLengthExtraBytes )
/ defaults . networkDefaultPayloadLengthExtraBytes
) ) . arg ( l10n . formatTimestamp ( ) ) )
) )
) . arg ( str ( float ( requiredAverageProofOfWorkNonceTrialsPerByte ) /
defaults . networkDefaultProofOfWorkNonceTrialsPerByte ) ) . arg (
str ( float ( requiredPayloadLengthExtraBytes ) /
defaults . networkDefaultPayloadLengthExtraBytes ) ) . arg (
l10n . formatTimestamp ( ) ) ) ) )
continue
else : # if we are sending a message to ourselves or a chan..
logger . info ( ' Sending a message. ' )
logger . debug (
' First 150 characters of message: %r ' , message [ : 150 ] )
logger . debug ( ' First 150 characters of message: %r ' , message [ : 150 ] )
behaviorBitfield = protocol . getBitfield ( fromaddress )
try :
@ -1199,16 +1197,14 @@ class singleWorker(threading.Thread, StoppableThread):
Inventory ( ) [ inventoryHash ] = (
objectType , toStreamNumber , encryptedPayload , embeddedTime , ' ' )
if BMConfigParser ( ) . has_section ( toaddress ) or \
not protocol . checkBitfield (
behaviorBitfield , protocol . BITFIELD_DOESACK ) :
not protocol . checkBitfield ( behaviorBitfield , protocol . BITFIELD_DOESACK ) :
queues . UISignalQueue . put ( (
' updateSentItemStatusByAckdata ' , (
ackdata ,
tr . _translate (
" MainWindow " ,
" Message sent. Sent at % 1 "
) . arg ( l10n . formatTimestamp ( ) ) )
) )
) . arg ( l10n . formatTimestamp ( ) ) ) ) )
else :
# not sending to a chan or one of my addresses
queues . UISignalQueue . put ( (
@ -1229,8 +1225,7 @@ class singleWorker(threading.Thread, StoppableThread):
# Update the sent message in the sent table with the
# necessary information.
if BMConfigParser ( ) . has_section ( toaddress ) or \
not protocol . checkBitfield (
behaviorBitfield , protocol . BITFIELD_DOESACK ) :
not protocol . checkBitfield ( behaviorBitfield , protocol . BITFIELD_DOESACK ) :
newStatus = ' msgsentnoackexpected '
else :
newStatus = ' msgsent '
@ -1270,6 +1265,7 @@ class singleWorker(threading.Thread, StoppableThread):
call ( [ apiNotifyPath , " newMessage " ] )
def requestPubKey ( self , toAddress ) :
""" Send a getpubkey object """
toStatus , addressVersionNumber , streamNumber , ripe = decodeAddress (
toAddress )
if toStatus != ' success ' :
@ -1286,7 +1282,7 @@ class singleWorker(threading.Thread, StoppableThread):
''' LIMIT 1 ''' ,
toAddress
)
if len ( queryReturn ) == 0 :
if not queryReturn :
logger . critical (
' BUG: Why are we requesting the pubkey for %s '
' if there are no messages in the sent folder '
@ -1389,16 +1385,14 @@ class singleWorker(threading.Thread, StoppableThread):
) . arg ( l10n . formatTimestamp ( ) ) )
) )
def generateFullAckMessage ( self , ackdata , toStreamNumber , TTL ) :
# It might be perfectly fine to just use the same TTL for
# the ackdata that we use for the message. But I would rather
# it be more difficult for attackers to associate ackData with
# the associated msg object. However, users would want the TTL
# of the acknowledgement to be about the same as they set
# for the message itself. So let's set the TTL of the
# acknowledgement to be in one of three 'buckets': 1 hour, 7
# days, or 28 days, whichever is relatively close to what the
# user specified.
def generateFullAckMessage ( self , ackdata , _ , TTL ) :
"""
It might be perfectly fine to just use the same TTL for the ackdata that we use for the message . But I would
rather it be more difficult for attackers to associate ackData with the associated msg object . However , users
would want the TTL of the acknowledgement to be about the same as they set for the message itself . So let ' s set
the TTL of the acknowledgement to be in one of three ' buckets ' : 1 hour , 7 days , or 28 days , whichever is
relatively close to what the user specified .
"""
if TTL < 24 * 60 * 60 : # 1 day
TTL = 24 * 60 * 60 # 1 day
elif TTL < 7 * 24 * 60 * 60 : # 1 week