PyBitmessage/src/shared.py

873 lines
40 KiB
Python
Raw Normal View History

2015-01-21 18:38:25 +01:00
from __future__ import division
2014-08-27 09:14:32 +02:00
softwareVersion = '0.4.4'
verbose = 1
2014-08-27 09:14:32 +02:00
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # This is obsolete with the change to protocol v3 but the singleCleaner thread still hasn't been updated so we need this a little longer.
lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time.
maximumAgeOfNodesThatIAdvertiseToOthers = 10800 # Equals three hours
2013-09-18 06:04:01 +02:00
useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages.
2013-05-02 17:53:54 +02:00
# Libraries.
import collections
import ConfigParser
import os
import pickle
import Queue
import random
import socket
import sys
import stat
import threading
import time
import shutil # used for moving the data folder and copying keys.dat
2014-09-16 19:04:56 +02:00
import datetime
from os import path, environ
from struct import Struct
2014-08-27 09:14:32 +02:00
import traceback
# Project imports.
from addresses import *
import highlevelcrypto
import shared
2014-08-27 09:14:32 +02:00
#import helper_startup
from helper_sql import *
2013-05-02 17:53:54 +02:00
config = ConfigParser.SafeConfigParser()
2013-05-02 17:53:54 +02:00
myECCryptorObjects = {}
MyECSubscriptionCryptorObjects = {}
myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself.
2013-09-15 03:06:26 +02:00
myAddressesByTag = {} # The key in this dictionary is the tag generated from the address.
2013-05-02 17:53:54 +02:00
broadcastSendersForWhichImWatching = {}
workerQueue = Queue.Queue()
UISignalQueue = Queue.Queue()
addressGeneratorQueue = Queue.Queue()
knownNodesLock = threading.Lock()
knownNodes = {}
sendDataQueues = [] #each sendData thread puts its queue in this list.
inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
inventoryLock = threading.Lock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
printLock = threading.Lock()
objectProcessorQueueSizeLock = threading.Lock()
objectProcessorQueueSize = 0 # in Bytes. We maintain this to prevent nodes from flooing us with objects which take up too much memory. If this gets too big we'll sleep before asking for further objects.
2013-05-02 17:53:54 +02:00
appdata = '' #holds the location of the application data storage directory
statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
2013-05-30 22:25:42 +02:00
shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit.
alreadyAttemptedConnectionsList = {
} # This is a list of nodes to which we have already attempted a connection
alreadyAttemptedConnectionsListLock = threading.Lock()
alreadyAttemptedConnectionsListResetTime = int(
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
numberOfObjectsThatWeHaveYetToGetPerPeer = {}
neededPubkeys = {}
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
'>Q', random.randrange(1, 18446744073709551615))
successfullyDecryptMessageTimings = [
] # A list of the amounts of time it took to successfully decrypt msg messages
apiAddressGeneratorReturnQueue = Queue.Queue(
) # The address generator thread uses this queue to get information back to the API thread.
ackdataForWhichImWatching = {}
clientHasReceivedIncomingConnections = False #used by API command clientStatus
numberOfMessagesProcessed = 0
numberOfBroadcastsProcessed = 0
numberOfPubkeysProcessed = 0
numberOfInventoryLookupsPerformed = 0
numberOfBytesReceived = 0 # Used for the 'network status' page
numberOfBytesSent = 0 # Used for the 'network status' page
numberOfBytesReceivedLastSecond = 0 # used for the bandwidth rate limit
numberOfBytesSentLastSecond = 0 # used for the bandwidth rate limit
lastTimeWeResetBytesReceived = 0 # used for the bandwidth rate limit
lastTimeWeResetBytesSent = 0 # used for the bandwidth rate limit
sendDataLock = threading.Lock() # used for the bandwidth rate limit
receiveDataLock = threading.Lock() # used for the bandwidth rate limit
2013-09-05 02:14:25 +02:00
daemon = False
2013-09-07 00:55:12 +02:00
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
maximumLengthOfTimeToBotherResendingMessages = 0
objectProcessorQueue = Queue.Queue(
) # receiveDataThreads dump objects they hear on the network into this queue to be processed.
streamsInWhichIAmParticipating = {}
2013-05-02 17:53:54 +02:00
2013-05-02 21:59:10 +02:00
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
2014-08-27 09:14:32 +02:00
networkDefaultProofOfWorkNonceTrialsPerByte = 1000 #The amount of work that should be performed (and demanded) per byte of the payload.
networkDefaultPayloadLengthExtraBytes = 1000 #To make sending short messages a little more difficult, this value is added to the payload length for use in calculating the proof of work target.
2013-05-02 21:59:10 +02:00
# Remember here the RPC port read from namecoin.conf so we can restore to
# it as default whenever the user changes the "method" selection for
# namecoin integration to "namecoind".
namecoinDefaultRpcPort = "8336"
# When using py2exe or py2app, the variable frozen is added to the sys
# namespace. This can be used to setup a different code path for
# binary distributions vs source distributions.
frozen = getattr(sys,'frozen', None)
# If the trustedpeer option is specified in keys.dat then this will
# contain a Peer which will be connected to instead of using the
# addresses advertised by other peers. The client will only connect to
# this peer and the timing attack mitigation will be disabled in order
# to download data faster. The expected use case is where the user has
# a fast connection to a trusted server where they run a BitMessage
# daemon permanently. If they then run a second instance of the client
# on a local machine periodically when they want to check for messages
# it will sync with the network a lot faster without compromising
# security.
trustedPeer = None
#Compiled struct for packing/unpacking headers
#New code should use CreatePacket instead of Header.pack
Header = Struct('!L12sL4s')
#Create a packet
def CreatePacket(command, payload=''):
payload_length = len(payload)
2014-08-27 09:14:32 +02:00
checksum = hashlib.sha512(payload).digest()[0:4]
b = bytearray(Header.size + payload_length)
Header.pack_into(b, 0, 0xE9BEB4D9, command, payload_length, checksum)
b[Header.size:] = payload
return bytes(b)
def isInSqlInventory(hash):
queryreturn = sqlQuery('''select hash from inventory where hash=?''', hash)
return queryreturn != []
def encodeHost(host):
if host.find(':') == -1:
return '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(host)
else:
return socket.inet_pton(socket.AF_INET6, host)
def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
payload = ''
2014-08-27 09:14:32 +02:00
payload += pack('>L', 3) # protocol version.
payload += pack('>q', 1) # bitflags of the services I offer.
payload += pack('>q', int(time.time()))
payload += pack(
'>q', 1) # boolservices of remote connection; ignored by the remote host.
payload += encodeHost(remoteHost)
payload += pack('>H', remotePort) # remote IPv6 and port
payload += pack('>q', 1) # bitflags of the services I offer.
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack(
'>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
payload += pack('>H', shared.config.getint(
'bitmessagesettings', 'port'))
random.seed()
payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf
2013-08-07 21:15:49 +02:00
userAgent = '/PyBitmessage:' + shared.softwareVersion + '/'
payload += encodeVarint(len(userAgent))
payload += userAgent
payload += encodeVarint(
1) # The number of streams about which I care. PyBitmessage currently only supports 1 per connection.
payload += encodeVarint(myStreamNumber)
return CreatePacket('version', payload)
def assembleErrorMessage(fatal=0, banTime=0, inventoryVector='', errorText=''):
payload = encodeVarint(fatal)
payload += encodeVarint(banTime)
payload += encodeVarint(len(inventoryVector))
payload += inventoryVector
payload += encodeVarint(len(errorText))
payload += errorText
return CreatePacket('error', payload)
2013-05-02 17:53:54 +02:00
def lookupAppdataFolder():
APPNAME = "PyBitmessage"
if "BITMESSAGE_HOME" in environ:
dataFolder = environ["BITMESSAGE_HOME"]
if dataFolder[-1] not in [os.path.sep, os.path.altsep]:
dataFolder += os.path.sep
elif sys.platform == 'darwin':
2013-05-02 17:53:54 +02:00
if "HOME" in environ:
dataFolder = path.join(os.environ["HOME"], "Library/Application Support/", APPNAME) + '/'
2013-05-02 17:53:54 +02:00
else:
stringToLog = 'Could not find home folder, please report this message and your OS X version to the BitMessage Github.'
if 'logger' in globals():
logger.critical(stringToLog)
else:
print stringToLog
2013-05-02 17:53:54 +02:00
sys.exit()
elif 'win32' in sys.platform or 'win64' in sys.platform:
2013-11-29 21:42:15 +01:00
dataFolder = path.join(environ['APPDATA'].decode(sys.getfilesystemencoding(), 'ignore'), APPNAME) + path.sep
2013-05-02 17:53:54 +02:00
else:
2013-06-26 11:11:32 +02:00
from shutil import move
try:
dataFolder = path.join(environ["XDG_CONFIG_HOME"], APPNAME)
except KeyError:
dataFolder = path.join(environ["HOME"], ".config", APPNAME)
2013-06-26 11:11:32 +02:00
# Migrate existing data to the proper location if this is an existing install
try:
move(path.join(environ["HOME"], ".%s" % APPNAME), dataFolder)
stringToLog = "Moving data folder to %s" % (dataFolder)
if 'logger' in globals():
logger.info(stringToLog)
else:
print stringToLog
except IOError:
2013-07-12 00:58:10 +02:00
# Old directory may not exist.
pass
dataFolder = dataFolder + '/'
return dataFolder
2013-05-02 17:53:54 +02:00
def isAddressInMyAddressBook(address):
queryreturn = sqlQuery(
'''select address from addressbook where address=?''',
address)
2013-05-02 17:53:54 +02:00
return queryreturn != []
#At this point we should really just have a isAddressInMy(book, address)...
def isAddressInMySubscriptionsList(address):
queryreturn = sqlQuery(
'''select * from subscriptions where address=?''',
str(address))
return queryreturn != []
2013-06-14 04:03:03 +02:00
2013-05-02 17:53:54 +02:00
def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address):
if isAddressInMyAddressBook(address):
return True
queryreturn = sqlQuery('''SELECT address FROM whitelist where address=? and enabled = '1' ''', address)
2013-05-02 17:53:54 +02:00
if queryreturn <> []:
return True
queryreturn = sqlQuery(
'''select address from subscriptions where address=? and enabled = '1' ''',
address)
2013-05-02 17:53:54 +02:00
if queryreturn <> []:
return True
return False
def safeConfigGetBoolean(section,field):
try:
return config.getboolean(section,field)
2013-07-22 07:10:22 +02:00
except Exception, err:
return False
2013-05-02 17:53:54 +02:00
def decodeWalletImportFormat(WIFstring):
fullString = arithmetic.changebase(WIFstring,58,256)
privkey = fullString[:-4]
if fullString[-4:] != hashlib.sha256(hashlib.sha256(privkey).digest()).digest()[:4]:
logger.critical('Major problem! When trying to decode one of your private keys, the checksum '
'failed. Here are the first 6 characters of the PRIVATE key: %s' % str(WIFstring)[:6])
os._exit(0)
2013-05-02 17:53:54 +02:00
return ""
else:
#checksum passed
if privkey[0] == '\x80':
return privkey[1:]
else:
logger.critical('Major problem! When trying to decode one of your private keys, the '
2013-07-10 10:43:18 +02:00
'checksum passed but the key doesn\'t begin with hex 80. Here is the '
'PRIVATE key: %s' % str(WIFstring))
os._exit(0)
2013-05-02 17:53:54 +02:00
return ""
def reloadMyAddressHashes():
logger.debug('reloading keys from keys.dat file')
2013-05-02 17:53:54 +02:00
myECCryptorObjects.clear()
myAddressesByHash.clear()
2013-09-15 03:06:26 +02:00
myAddressesByTag.clear()
2013-05-02 17:53:54 +02:00
#myPrivateKeys.clear()
keyfileSecure = checkSensitiveFilePermissions(appdata + 'keys.dat')
2013-05-02 17:53:54 +02:00
configSections = config.sections()
hasEnabledKeys = False
2013-05-02 17:53:54 +02:00
for addressInKeysFile in configSections:
if addressInKeysFile <> 'bitmessagesettings':
isEnabled = config.getboolean(addressInKeysFile, 'enabled')
if isEnabled:
2013-06-27 12:44:49 +02:00
hasEnabledKeys = True
2013-05-02 17:53:54 +02:00
status,addressVersionNumber,streamNumber,hash = decodeAddress(addressInKeysFile)
if addressVersionNumber == 2 or addressVersionNumber == 3 or addressVersionNumber == 4:
2013-06-27 12:44:49 +02:00
# Returns a simple 32 bytes of information encoded in 64 Hex characters,
# or null if there was an error.
privEncryptionKey = decodeWalletImportFormat(
config.get(addressInKeysFile, 'privencryptionkey')).encode('hex')
2013-05-02 17:53:54 +02:00
if len(privEncryptionKey) == 64:#It is 32 bytes encoded as 64 hex characters
myECCryptorObjects[hash] = highlevelcrypto.makeCryptor(privEncryptionKey)
myAddressesByHash[hash] = addressInKeysFile
2013-09-15 03:06:26 +02:00
tag = hashlib.sha512(hashlib.sha512(encodeVarint(
addressVersionNumber) + encodeVarint(streamNumber) + hash).digest()).digest()[32:]
myAddressesByTag[tag] = addressInKeysFile
2013-06-27 12:44:49 +02:00
2013-05-02 17:53:54 +02:00
else:
logger.error('Error in reloadMyAddressHashes: Can\'t handle address versions other than 2, 3, or 4.\n')
2013-06-27 12:44:49 +02:00
if not keyfileSecure:
fixSensitiveFilePermissions(appdata + 'keys.dat', hasEnabledKeys)
2013-05-02 17:53:54 +02:00
def reloadBroadcastSendersForWhichImWatching():
broadcastSendersForWhichImWatching.clear()
MyECSubscriptionCryptorObjects.clear()
queryreturn = sqlQuery('SELECT address FROM subscriptions where enabled=1')
2013-09-15 03:06:26 +02:00
logger.debug('reloading subscriptions...')
2013-05-02 17:53:54 +02:00
for row in queryreturn:
address, = row
status,addressVersionNumber,streamNumber,hash = decodeAddress(address)
if addressVersionNumber == 2:
broadcastSendersForWhichImWatching[hash] = 0
#Now, for all addresses, even version 2 addresses, we should create Cryptor objects in a dictionary which we will use to attempt to decrypt encrypted broadcast messages.
2013-09-15 03:06:26 +02:00
if addressVersionNumber <= 3:
privEncryptionKey = hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+hash).digest()[:32]
MyECSubscriptionCryptorObjects[hash] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))
else:
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint(
addressVersionNumber) + encodeVarint(streamNumber) + hash).digest()).digest()
tag = doubleHashOfAddressData[32:]
privEncryptionKey = doubleHashOfAddressData[:32]
MyECSubscriptionCryptorObjects[tag] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))
2013-05-02 17:53:54 +02:00
2014-08-27 09:14:32 +02:00
def isProofOfWorkSufficient(data,
nonceTrialsPerByte=0,
payloadLengthExtraBytes=0):
if nonceTrialsPerByte < networkDefaultProofOfWorkNonceTrialsPerByte:
nonceTrialsPerByte = networkDefaultProofOfWorkNonceTrialsPerByte
if payloadLengthExtraBytes < networkDefaultPayloadLengthExtraBytes:
payloadLengthExtraBytes = networkDefaultPayloadLengthExtraBytes
2014-08-27 09:14:32 +02:00
endOfLifeTime, = unpack('>Q', data[8:16])
TTL = endOfLifeTime - int(time.time())
if TTL < 300:
TTL = 300
POW, = unpack('>Q', hashlib.sha512(hashlib.sha512(data[
:8] + hashlib.sha512(data[8:]).digest()).digest()).digest()[0:8])
2014-08-27 09:14:32 +02:00
return POW <= 2 ** 64 / (nonceTrialsPerByte*(len(data) + payloadLengthExtraBytes + ((TTL*(len(data)+payloadLengthExtraBytes))/(2 ** 16))))
2013-05-02 17:53:54 +02:00
def doCleanShutdown():
2013-05-30 22:25:42 +02:00
global shutdown
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
broadcastToSendDataQueues((0, 'shutdown', 'no data'))
with shared.objectProcessorQueueSizeLock:
data = 'no data'
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put(('checkShutdownVariable',data))
2013-05-02 17:53:54 +02:00
knownNodesLock.acquire()
UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...'))
output = open(appdata + 'knownnodes.dat', 'wb')
logger.info('finished opening knownnodes.dat. Now pickle.dump')
2013-05-02 17:53:54 +02:00
pickle.dump(knownNodes, output)
logger.info('Completed pickle.dump. Closing output...')
2013-05-02 17:53:54 +02:00
output.close()
knownNodesLock.release()
logger.info('Finished closing knownnodes.dat output file.')
2013-05-02 17:53:54 +02:00
UISignalQueue.put(('updateStatusBar','Done saving the knownNodes list of peers to disk.'))
logger.info('Flushing inventory in memory out to disk...')
UISignalQueue.put((
'updateStatusBar',
'Flushing inventory in memory out to disk. This should normally only take a second...'))
2013-05-02 17:53:54 +02:00
flushInventory()
# Verify that the objectProcessor has finished exiting. It should have incremented the
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
while shutdown == 1:
time.sleep(.1)
# This one last useless query will guarantee that the previous flush committed and that the
# objectProcessorThread committed before we close the program.
sqlQuery('SELECT address FROM subscriptions')
logger.info('Finished flushing inventory.')
sqlStoredProcedure('exit')
# Wait long enough to guarantee that any running proof of work worker threads will check the
# shutdown variable and exit. If the main thread closes before they do then they won't stop.
time.sleep(.25)
2013-05-02 17:53:54 +02:00
if safeConfigGetBoolean('bitmessagesettings','daemon'):
logger.info('Clean shutdown complete.')
2013-05-02 17:53:54 +02:00
os._exit(0)
# If you want to command all of the sendDataThreads to do something, like shutdown or send some data, this
# function puts your data into the queues for each of the sendDataThreads. The sendDataThreads are
# responsible for putting their queue into (and out of) the sendDataQueues list.
2013-05-02 17:53:54 +02:00
def broadcastToSendDataQueues(data):
# logger.debug('running broadcastToSendDataQueues')
2013-05-02 17:53:54 +02:00
for q in sendDataQueues:
2013-09-07 03:47:54 +02:00
q.put(data)
2013-05-02 17:53:54 +02:00
def flushInventory():
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
with SqlBulkExecute() as sql:
for hash, storedValue in inventory.items():
2014-08-27 09:14:32 +02:00
objectType, streamNumber, payload, expiresTime, tag = storedValue
2013-09-15 03:06:26 +02:00
sql.execute('''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
2014-08-27 09:14:32 +02:00
hash,objectType,streamNumber,payload,expiresTime,tag)
del inventory[hash]
def fixPotentiallyInvalidUTF8Data(text):
try:
unicode(text,'utf-8')
return text
except:
output = 'Part of the message is corrupt. The message cannot be displayed the normal way.\n\n' + repr(text)
2013-06-14 04:03:03 +02:00
return output
2013-06-27 12:44:49 +02:00
# Checks sensitive file permissions for inappropriate umask during keys.dat creation.
# (Or unwise subsequent chmod.)
2013-07-10 10:43:18 +02:00
#
2013-06-27 12:44:49 +02:00
# Returns true iff file appears to have appropriate permissions.
def checkSensitiveFilePermissions(filename):
if sys.platform == 'win32':
# TODO: This might deserve extra checks by someone familiar with
# Windows systems.
2013-06-27 12:44:49 +02:00
return True
2013-11-29 01:20:16 +01:00
elif sys.platform[:7] == 'freebsd':
# FreeBSD file systems are the same as major Linux file systems
present_permissions = os.stat(filename)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
return present_permissions & disallowed_permissions == 0
else:
try:
# Skip known problems for non-Win32 filesystems without POSIX permissions.
import subprocess
fstype = subprocess.check_output('stat -f -c "%%T" %s' % (filename),
shell=True,
stderr=subprocess.STDOUT)
if 'fuseblk' in fstype:
logger.info('Skipping file permissions check for %s. Filesystem fuseblk detected.',
filename)
return True
except:
# Swallow exception here, but we might run into trouble later!
logger.error('Could not determine filesystem type. %s', filename)
2013-06-27 12:44:49 +02:00
present_permissions = os.stat(filename)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
return present_permissions & disallowed_permissions == 0
# Fixes permissions on a sensitive file.
2013-06-27 12:44:49 +02:00
def fixSensitiveFilePermissions(filename, hasEnabledKeys):
if hasEnabledKeys:
logger.warning('Keyfile had insecure permissions, and there were enabled keys. '
'The truly paranoid should stop using them immediately.')
2013-06-27 12:44:49 +02:00
else:
logger.warning('Keyfile had insecure permissions, but there were no enabled keys.')
2013-06-27 12:44:49 +02:00
try:
present_permissions = os.stat(filename)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
allowed_permissions = ((1<<32)-1) ^ disallowed_permissions
new_permissions = (
allowed_permissions & present_permissions)
os.chmod(filename, new_permissions)
logger.info('Keyfile permissions automatically fixed.')
2013-06-27 12:44:49 +02:00
except Exception, e:
logger.exception('Keyfile permissions could not be fixed.')
2013-06-27 12:44:49 +02:00
raise
def isBitSetWithinBitfield(fourByteString, n):
# Uses MSB 0 bit numbering across 4 bytes of data
n = 31 - n
x, = unpack('>L', fourByteString)
return x & 2**n != 0
2014-08-27 09:14:32 +02:00
def decryptAndCheckPubkeyPayload(data, address):
"""
2014-12-25 09:57:34 +01:00
Version 4 pubkeys are encrypted. This function is run when we already have the
address to which we want to try to send a message. The 'data' may come either
off of the wire or we might have had it already in our inventory when we tried
to send a msg to this particular address.
2014-08-27 09:14:32 +02:00
"""
2013-09-18 06:04:01 +02:00
try:
2014-08-27 09:14:32 +02:00
status, addressVersion, streamNumber, ripe = decodeAddress(address)
readPosition = 20 # bypass the nonce, time, and object type
embeddedAddressVersion, varintLength = decodeVarint(data[readPosition:readPosition + 10])
readPosition += varintLength
embeddedStreamNumber, varintLength = decodeVarint(data[readPosition:readPosition + 10])
readPosition += varintLength
2014-12-25 09:57:34 +01:00
storedData = data[20:readPosition] # We'll store the address version and stream number (and some more) in the pubkeys table.
2014-08-27 09:14:32 +02:00
if addressVersion != embeddedAddressVersion:
logger.info('Pubkey decryption was UNsuccessful due to address version mismatch.')
2013-09-18 06:04:01 +02:00
return 'failed'
2014-08-27 09:14:32 +02:00
if streamNumber != embeddedStreamNumber:
logger.info('Pubkey decryption was UNsuccessful due to stream number mismatch.')
return 'failed'
tag = data[readPosition:readPosition + 32]
readPosition += 32
2014-12-25 09:57:34 +01:00
signedData = data[8:readPosition] # the time through the tag. More data is appended onto signedData below after the decryption.
2014-08-27 09:14:32 +02:00
encryptedData = data[readPosition:]
# Let us try to decrypt the pubkey
toAddress, cryptorObject = shared.neededPubkeys[tag]
if toAddress != address:
logger.critical('decryptAndCheckPubkeyPayload failed due to toAddress mismatch. This is very peculiar. toAddress: %s, address %s' % (toAddress, address))
# the only way I can think that this could happen is if someone encodes their address data two different ways.
2014-12-25 09:57:34 +01:00
# That sort of address-malleability should have been caught by the UI or API and an error given to the user.
2014-08-27 09:14:32 +02:00
return 'failed'
try:
decryptedData = cryptorObject.decrypt(encryptedData)
except:
# Someone must have encrypted some data with a different key
# but tagged it with a tag for which we are watching.
logger.info('Pubkey decryption was unsuccessful.')
return 'failed'
readPosition = 0
bitfieldBehaviors = decryptedData[readPosition:readPosition + 4]
readPosition += 4
publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64]
readPosition += 64
publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64]
readPosition += 64
specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += specifiedNonceTrialsPerByteLength
specifiedPayloadLengthExtraBytes, specifiedPayloadLengthExtraBytesLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += specifiedPayloadLengthExtraBytesLength
2014-12-25 09:57:34 +01:00
storedData += decryptedData[:readPosition]
signedData += decryptedData[:readPosition]
2014-08-27 09:14:32 +02:00
signatureLength, signatureLengthLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += signatureLengthLength
signature = decryptedData[readPosition:readPosition + signatureLength]
2014-12-25 09:57:34 +01:00
if highlevelcrypto.verify(signedData, signature, publicSigningKey.encode('hex')):
logger.info('ECDSA verify passed (within decryptAndCheckPubkeyPayload)')
2014-08-27 09:14:32 +02:00
else:
2014-12-25 09:57:34 +01:00
logger.info('ECDSA verify failed (within decryptAndCheckPubkeyPayload)')
return 'failed'
2014-08-27 09:14:32 +02:00
sha = hashlib.new('sha512')
sha.update(publicSigningKey + publicEncryptionKey)
ripeHasher = hashlib.new('ripemd160')
ripeHasher.update(sha.digest())
embeddedRipe = ripeHasher.digest()
if embeddedRipe != ripe:
# Although this pubkey object had the tag were were looking for and was
# encrypted with the correct encryption key, it doesn't contain the
2014-12-25 09:57:34 +01:00
# correct pubkeys. Someone is either being malicious or using buggy software.
2014-08-27 09:14:32 +02:00
logger.info('Pubkey decryption was UNsuccessful due to RIPE mismatch.')
return 'failed'
# Everything checked out. Insert it into the pubkeys table.
logger.info('within decryptAndCheckPubkeyPayload, addressVersion: %s, streamNumber: %s \n\
ripe %s\n\
publicSigningKey in hex: %s\n\
publicEncryptionKey in hex: %s' % (addressVersion,
streamNumber,
ripe.encode('hex'),
publicSigningKey.encode('hex'),
publicEncryptionKey.encode('hex')
)
)
2015-03-09 07:35:32 +01:00
t = (address, addressVersion, storedData, int(time.time()), 'yes')
2014-08-27 09:14:32 +02:00
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
return 'successful'
except varintDecodeError as e:
logger.info('Pubkey decryption was UNsuccessful due to a malformed varint.')
2013-09-18 06:04:01 +02:00
return 'failed'
2014-08-27 09:14:32 +02:00
except Exception as e:
logger.critical('Pubkey decryption was UNsuccessful because of an unhandled exception! This is definitely a bug! \n%s' % traceback.format_exc())
2013-09-18 06:04:01 +02:00
return 'failed'
Peer = collections.namedtuple('Peer', ['host', 'port'])
2014-08-27 09:14:32 +02:00
def checkAndShareObjectWithPeers(data):
"""
This function is called after either receiving an object off of the wire
2014-08-27 09:14:32 +02:00
or after receiving one as ackdata.
Returns the length of time that we should reserve to process this message
if we are receiving it off of the wire.
"""
if len(data) > 2 ** 18:
logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(data))
return 0
# Let us check to make sure that the proof of work is sufficient.
if not isProofOfWorkSufficient(data):
2014-08-27 09:14:32 +02:00
logger.info('Proof of work is insufficient.')
return 0
endOfLifeTime, = unpack('>Q', data[8:16])
if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: # The TTL may not be larger than 28 days + 3 hours of wiggle room
logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % endOfLifeTime)
return 0
if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much.
logger.info('This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s' % endOfLifeTime)
return 0
intObjectType, = unpack('>I', data[16:20])
try:
if intObjectType == 0:
_checkAndShareGetpubkeyWithPeers(data)
return 0.1
elif intObjectType == 1:
_checkAndSharePubkeyWithPeers(data)
return 0.1
elif intObjectType == 2:
_checkAndShareMsgWithPeers(data)
return 0.6
elif intObjectType == 3:
_checkAndShareBroadcastWithPeers(data)
return 0.6
else:
_checkAndShareUndefinedObjectWithPeers(data)
return 0.6
except varintDecodeError as e:
logger.debug("There was a problem with a varint while checking to see whether it was appropriate to share an object with peers. Some details: %s" % e)
except Exception as e:
logger.critical('There was a problem while checking to see whether it was appropriate to share an object with peers. This is definitely a bug! \n%s' % traceback.format_exc())
return 0
2014-08-27 09:14:32 +02:00
def _checkAndShareUndefinedObjectWithPeers(data):
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass nonce, time, and object type
objectVersion, objectVersionLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += objectVersionLength
streamNumber, streamNumberLength = decodeVarint(
data[readPosition:readPosition + 9])
if not streamNumber in streamsInWhichIAmParticipating:
logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber)
return
2014-08-27 09:14:32 +02:00
inventoryHash = calculateInventoryHash(data)
shared.numberOfInventoryLookupsPerformed += 1
inventoryLock.acquire()
if inventoryHash in inventory:
logger.debug('We have already received this undefined object. Ignoring.')
inventoryLock.release()
return
elif isInSqlInventory(inventoryHash):
logger.debug('We have already received this undefined object (it is stored on disk in the SQL inventory). Ignoring it.')
inventoryLock.release()
return
2014-08-27 09:14:32 +02:00
objectType, = unpack('>I', data[16:20])
inventory[inventoryHash] = (
objectType, streamNumber, data, embeddedTime,'')
inventorySets[streamNumber].add(inventoryHash)
inventoryLock.release()
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
def _checkAndShareMsgWithPeers(data):
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass nonce, time, and object type
objectVersion, objectVersionLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += objectVersionLength
2014-08-27 09:14:32 +02:00
streamNumber, streamNumberLength = decodeVarint(
data[readPosition:readPosition + 9])
2014-08-27 09:14:32 +02:00
if not streamNumber in streamsInWhichIAmParticipating:
logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber)
return
2014-08-27 09:14:32 +02:00
readPosition += streamNumberLength
inventoryHash = calculateInventoryHash(data)
shared.numberOfInventoryLookupsPerformed += 1
inventoryLock.acquire()
if inventoryHash in inventory:
logger.debug('We have already received this msg message. Ignoring.')
inventoryLock.release()
return
elif isInSqlInventory(inventoryHash):
logger.debug('We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.')
inventoryLock.release()
return
# This msg message is valid. Let's let our peers know about it.
2014-08-27 09:14:32 +02:00
objectType = 2
inventory[inventoryHash] = (
2014-08-27 09:14:32 +02:00
objectType, streamNumber, data, embeddedTime,'')
inventorySets[streamNumber].add(inventoryHash)
inventoryLock.release()
logger.debug('advertising inv with hash: %s' % inventoryHash.encode('hex'))
2014-08-27 09:14:32 +02:00
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's enqueue it to be processed ourselves.
# If we already have too much data in the queue to be processed, just sleep for now.
while shared.objectProcessorQueueSize > 120000000:
time.sleep(2)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data))
2014-08-27 09:14:32 +02:00
def _checkAndShareGetpubkeyWithPeers(data):
if len(data) < 42:
logger.info('getpubkey message doesn\'t contain enough data. Ignoring.')