PyBitmessage/src/shared.py

671 lines
31 KiB
Python
Raw Normal View History

2015-08-22 10:48:49 +02:00
from __future__ import division
2015-01-21 18:38:25 +01:00
verbose = 1
2014-08-27 09:14:32 +02:00
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # This is obsolete with the change to protocol v3 but the singleCleaner thread still hasn't been updated so we need this a little longer.
lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time.
maximumAgeOfNodesThatIAdvertiseToOthers = 10800 # Equals three hours
2013-09-18 06:04:01 +02:00
useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages.
2013-05-02 17:53:54 +02:00
# Libraries.
import collections
import os
import pickle
import Queue
from multiprocessing import active_children, Queue as mpQueue, Lock as mpLock
import sys
import stat
import threading
import time
import shutil # used for moving the data folder and copying keys.dat
2014-09-16 19:04:56 +02:00
import datetime
from os import path, environ
2014-08-27 09:14:32 +02:00
import traceback
2016-03-23 23:26:57 +01:00
from binascii import hexlify
# Project imports.
from addresses import *
from class_objectProcessorQueue import ObjectProcessorQueue
from configparser import BMConfigParser
import highlevelcrypto
import shared
2014-08-27 09:14:32 +02:00
#import helper_startup
from helper_sql import *
from helper_threading import *
from inventory import Inventory
import protocol
import state
2013-05-02 17:53:54 +02:00
myECCryptorObjects = {}
MyECSubscriptionCryptorObjects = {}
myAddressesByHash = {} #The key in this dictionary is the RIPE hash which is encoded in an address and value is the address itself.
2013-09-15 03:06:26 +02:00
myAddressesByTag = {} # The key in this dictionary is the tag generated from the address.
2013-05-02 17:53:54 +02:00
broadcastSendersForWhichImWatching = {}
workerQueue = Queue.Queue()
UISignalQueue = Queue.Queue()
parserInputQueue = mpQueue()
parserOutputQueue = mpQueue()
parserProcess = None
parserLock = mpLock()
2013-05-02 17:53:54 +02:00
addressGeneratorQueue = Queue.Queue()
knownNodesLock = threading.Lock()
knownNodes = {}
printLock = threading.Lock()
statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
2013-05-30 22:25:42 +02:00
shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit.
thisapp = None # singleton lock instance
alreadyAttemptedConnectionsList = {
} # This is a list of nodes to which we have already attempted a connection
alreadyAttemptedConnectionsListLock = threading.Lock()
alreadyAttemptedConnectionsListResetTime = int(
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
numberOfObjectsThatWeHaveYetToGetPerPeer = {}
successfullyDecryptMessageTimings = [
] # A list of the amounts of time it took to successfully decrypt msg messages
apiAddressGeneratorReturnQueue = Queue.Queue(
) # The address generator thread uses this queue to get information back to the API thread.
ackdataForWhichImWatching = {}
clientHasReceivedIncomingConnections = False #used by API command clientStatus
numberOfMessagesProcessed = 0
numberOfBroadcastsProcessed = 0
numberOfPubkeysProcessed = 0
numberOfBytesReceived = 0 # Used for the 'network status' page
numberOfBytesSent = 0 # Used for the 'network status' page
numberOfBytesReceivedLastSecond = 0 # used for the bandwidth rate limit
numberOfBytesSentLastSecond = 0 # used for the bandwidth rate limit
lastTimeWeResetBytesReceived = 0 # used for the bandwidth rate limit
lastTimeWeResetBytesSent = 0 # used for the bandwidth rate limit
sendDataLock = threading.Lock() # used for the bandwidth rate limit
receiveDataLock = threading.Lock() # used for the bandwidth rate limit
2013-09-05 02:14:25 +02:00
daemon = False
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
maximumLengthOfTimeToBotherResendingMessages = 0
objectProcessorQueue = ObjectProcessorQueue() # receiveDataThreads dump objects they hear on the network into this queue to be processed.
timeOffsetWrongCount = 0
2013-05-02 17:53:54 +02:00
# sanity check, prevent doing ridiculous PoW
# 20 million PoWs equals approximately 2 days on dev's dual R9 290
ridiculousDifficulty = 20000000
# Remember here the RPC port read from namecoin.conf so we can restore to
# it as default whenever the user changes the "method" selection for
# namecoin integration to "namecoind".
namecoinDefaultRpcPort = "8336"
# If the trustedpeer option is specified in keys.dat then this will
# contain a Peer which will be connected to instead of using the
# addresses advertised by other peers. The client will only connect to
# this peer and the timing attack mitigation will be disabled in order
# to download data faster. The expected use case is where the user has
# a fast connection to a trusted server where they run a BitMessage
# daemon permanently. If they then run a second instance of the client
# on a local machine periodically when they want to check for messages
# it will sync with the network a lot faster without compromising
# security.
trustedPeer = None
2013-05-02 17:53:54 +02:00
def isAddressInMyAddressBook(address):
queryreturn = sqlQuery(
'''select address from addressbook where address=?''',
address)
2013-05-02 17:53:54 +02:00
return queryreturn != []
#At this point we should really just have a isAddressInMy(book, address)...
def isAddressInMySubscriptionsList(address):
queryreturn = sqlQuery(
'''select * from subscriptions where address=?''',
str(address))
return queryreturn != []
2013-06-14 04:03:03 +02:00
2013-05-02 17:53:54 +02:00
def isAddressInMyAddressBookSubscriptionsListOrWhitelist(address):
if isAddressInMyAddressBook(address):
return True
queryreturn = sqlQuery('''SELECT address FROM whitelist where address=? and enabled = '1' ''', address)
2013-05-02 17:53:54 +02:00
if queryreturn <> []:
return True
queryreturn = sqlQuery(
'''select address from subscriptions where address=? and enabled = '1' ''',
address)
2013-05-02 17:53:54 +02:00
if queryreturn <> []:
return True
return False
def decodeWalletImportFormat(WIFstring):
fullString = arithmetic.changebase(WIFstring,58,256)
privkey = fullString[:-4]
if fullString[-4:] != hashlib.sha256(hashlib.sha256(privkey).digest()).digest()[:4]:
logger.critical('Major problem! When trying to decode one of your private keys, the checksum '
'failed. Here are the first 6 characters of the PRIVATE key: %s' % str(WIFstring)[:6])
os._exit(0)
2013-05-02 17:53:54 +02:00
return ""
else:
#checksum passed
if privkey[0] == '\x80':
return privkey[1:]
else:
logger.critical('Major problem! When trying to decode one of your private keys, the '
2013-07-10 10:43:18 +02:00
'checksum passed but the key doesn\'t begin with hex 80. Here is the '
'PRIVATE key: %s' % str(WIFstring))
os._exit(0)
2013-05-02 17:53:54 +02:00
return ""
def reloadMyAddressHashes():
logger.debug('reloading keys from keys.dat file')
2013-05-02 17:53:54 +02:00
myECCryptorObjects.clear()
myAddressesByHash.clear()
2013-09-15 03:06:26 +02:00
myAddressesByTag.clear()
2013-05-02 17:53:54 +02:00
#myPrivateKeys.clear()
keyfileSecure = checkSensitiveFilePermissions(state.appdata + 'keys.dat')
configSections = BMConfigParser().sections()
hasEnabledKeys = False
2013-05-02 17:53:54 +02:00
for addressInKeysFile in configSections:
if addressInKeysFile <> 'bitmessagesettings':
isEnabled = BMConfigParser().getboolean(addressInKeysFile, 'enabled')
2013-05-02 17:53:54 +02:00
if isEnabled:
2013-06-27 12:44:49 +02:00
hasEnabledKeys = True
2013-05-02 17:53:54 +02:00
status,addressVersionNumber,streamNumber,hash = decodeAddress(addressInKeysFile)
if addressVersionNumber == 2 or addressVersionNumber == 3 or addressVersionNumber == 4:
2013-06-27 12:44:49 +02:00
# Returns a simple 32 bytes of information encoded in 64 Hex characters,
# or null if there was an error.
2016-03-23 23:26:57 +01:00
privEncryptionKey = hexlify(decodeWalletImportFormat(
BMConfigParser().get(addressInKeysFile, 'privencryptionkey')))
2013-06-27 12:44:49 +02:00
2013-05-02 17:53:54 +02:00
if len(privEncryptionKey) == 64:#It is 32 bytes encoded as 64 hex characters
myECCryptorObjects[hash] = highlevelcrypto.makeCryptor(privEncryptionKey)
myAddressesByHash[hash] = addressInKeysFile
2013-09-15 03:06:26 +02:00
tag = hashlib.sha512(hashlib.sha512(encodeVarint(
addressVersionNumber) + encodeVarint(streamNumber) + hash).digest()).digest()[32:]
myAddressesByTag[tag] = addressInKeysFile
2013-06-27 12:44:49 +02:00
2013-05-02 17:53:54 +02:00
else:
logger.error('Error in reloadMyAddressHashes: Can\'t handle address versions other than 2, 3, or 4.\n')
2013-06-27 12:44:49 +02:00
if not keyfileSecure:
fixSensitiveFilePermissions(state.appdata + 'keys.dat', hasEnabledKeys)
2013-05-02 17:53:54 +02:00
def reloadBroadcastSendersForWhichImWatching():
broadcastSendersForWhichImWatching.clear()
MyECSubscriptionCryptorObjects.clear()
queryreturn = sqlQuery('SELECT address FROM subscriptions where enabled=1')
2013-09-15 03:06:26 +02:00
logger.debug('reloading subscriptions...')
2013-05-02 17:53:54 +02:00
for row in queryreturn:
address, = row
status,addressVersionNumber,streamNumber,hash = decodeAddress(address)
if addressVersionNumber == 2:
broadcastSendersForWhichImWatching[hash] = 0
#Now, for all addresses, even version 2 addresses, we should create Cryptor objects in a dictionary which we will use to attempt to decrypt encrypted broadcast messages.
2013-09-15 03:06:26 +02:00
if addressVersionNumber <= 3:
privEncryptionKey = hashlib.sha512(encodeVarint(addressVersionNumber)+encodeVarint(streamNumber)+hash).digest()[:32]
2016-03-23 23:26:57 +01:00
MyECSubscriptionCryptorObjects[hash] = highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))
2013-09-15 03:06:26 +02:00
else:
doubleHashOfAddressData = hashlib.sha512(hashlib.sha512(encodeVarint(
addressVersionNumber) + encodeVarint(streamNumber) + hash).digest()).digest()
tag = doubleHashOfAddressData[32:]
privEncryptionKey = doubleHashOfAddressData[:32]
2016-03-23 23:26:57 +01:00
MyECSubscriptionCryptorObjects[tag] = highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))
2013-05-02 17:53:54 +02:00
def doCleanShutdown():
global shutdown, thisapp
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
try:
parserInputQueue.put(None, False)
except Queue.Full:
pass
protocol.broadcastToSendDataQueues((0, 'shutdown', 'no data'))
objectProcessorQueue.put(('checkShutdownVariable', 'no data'))
for thread in threading.enumerate():
if thread.isAlive() and isinstance(thread, StoppableThread):
thread.stopThread()
2013-05-02 17:53:54 +02:00
knownNodesLock.acquire()
UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...'))
output = open(state.appdata + 'knownnodes.dat', 'wb')
logger.info('finished opening knownnodes.dat. Now pickle.dump')
2013-05-02 17:53:54 +02:00
pickle.dump(knownNodes, output)
logger.info('Completed pickle.dump. Closing output...')
2013-05-02 17:53:54 +02:00
output.close()
knownNodesLock.release()
logger.info('Finished closing knownnodes.dat output file.')
2013-05-02 17:53:54 +02:00
UISignalQueue.put(('updateStatusBar','Done saving the knownNodes list of peers to disk.'))
logger.info('Flushing inventory in memory out to disk...')
UISignalQueue.put((
'updateStatusBar',
'Flushing inventory in memory out to disk. This should normally only take a second...'))
Inventory().flush()
2016-03-18 02:01:59 +01:00
# Verify that the objectProcessor has finished exiting. It should have incremented the
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
while shutdown == 1:
time.sleep(.1)
# This one last useless query will guarantee that the previous flush committed and that the
# objectProcessorThread committed before we close the program.
sqlQuery('SELECT address FROM subscriptions')
logger.info('Finished flushing inventory.')
sqlStoredProcedure('exit')
# Wait long enough to guarantee that any running proof of work worker threads will check the
# shutdown variable and exit. If the main thread closes before they do then they won't stop.
time.sleep(.25)
from class_outgoingSynSender import outgoingSynSender
for thread in threading.enumerate():
if thread is not threading.currentThread() and isinstance(thread, StoppableThread) and not isinstance(thread, outgoingSynSender):
logger.debug("Waiting for thread %s", thread.name)
thread.join()
2013-05-02 17:53:54 +02:00
if BMConfigParser().safeGetBoolean('bitmessagesettings','daemon'):
logger.info('Clean shutdown complete.')
thisapp.cleanup()
2013-05-02 17:53:54 +02:00
os._exit(0)
else:
logger.info('Core shutdown complete.')
2013-05-02 17:53:54 +02:00
def fixPotentiallyInvalidUTF8Data(text):
try:
unicode(text,'utf-8')
return text
except:
output = 'Part of the message is corrupt. The message cannot be displayed the normal way.\n\n' + repr(text)
2013-06-14 04:03:03 +02:00
return output
2013-06-27 12:44:49 +02:00
# Checks sensitive file permissions for inappropriate umask during keys.dat creation.
# (Or unwise subsequent chmod.)
2013-07-10 10:43:18 +02:00
#
2013-06-27 12:44:49 +02:00
# Returns true iff file appears to have appropriate permissions.
def checkSensitiveFilePermissions(filename):
if sys.platform == 'win32':
# TODO: This might deserve extra checks by someone familiar with
# Windows systems.
2013-06-27 12:44:49 +02:00
return True
2013-11-29 01:20:16 +01:00
elif sys.platform[:7] == 'freebsd':
# FreeBSD file systems are the same as major Linux file systems
present_permissions = os.stat(filename)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
return present_permissions & disallowed_permissions == 0
else:
try:
# Skip known problems for non-Win32 filesystems without POSIX permissions.
import subprocess
fstype = subprocess.check_output('stat -f -c "%%T" %s' % (filename),
shell=True,
stderr=subprocess.STDOUT)
if 'fuseblk' in fstype:
logger.info('Skipping file permissions check for %s. Filesystem fuseblk detected.',
filename)
return True
except:
# Swallow exception here, but we might run into trouble later!
logger.error('Could not determine filesystem type. %s', filename)
2013-06-27 12:44:49 +02:00
present_permissions = os.stat(filename)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
return present_permissions & disallowed_permissions == 0
# Fixes permissions on a sensitive file.
2013-06-27 12:44:49 +02:00
def fixSensitiveFilePermissions(filename, hasEnabledKeys):
if hasEnabledKeys:
logger.warning('Keyfile had insecure permissions, and there were enabled keys. '
'The truly paranoid should stop using them immediately.')
2013-06-27 12:44:49 +02:00
else:
logger.warning('Keyfile had insecure permissions, but there were no enabled keys.')
2013-06-27 12:44:49 +02:00
try:
present_permissions = os.stat(filename)[0]
disallowed_permissions = stat.S_IRWXG | stat.S_IRWXO
allowed_permissions = ((1<<32)-1) ^ disallowed_permissions
new_permissions = (
allowed_permissions & present_permissions)
os.chmod(filename, new_permissions)
logger.info('Keyfile permissions automatically fixed.')
2013-06-27 12:44:49 +02:00
except Exception, e:
logger.exception('Keyfile permissions could not be fixed.')
2013-06-27 12:44:49 +02:00
raise
def isBitSetWithinBitfield(fourByteString, n):
# Uses MSB 0 bit numbering across 4 bytes of data
n = 31 - n
x, = unpack('>L', fourByteString)
return x & 2**n != 0
2014-08-27 09:14:32 +02:00
def decryptAndCheckPubkeyPayload(data, address):
"""
2014-12-25 09:57:34 +01:00
Version 4 pubkeys are encrypted. This function is run when we already have the
address to which we want to try to send a message. The 'data' may come either
off of the wire or we might have had it already in our inventory when we tried
to send a msg to this particular address.
2014-08-27 09:14:32 +02:00
"""
2013-09-18 06:04:01 +02:00
try:
2014-08-27 09:14:32 +02:00
status, addressVersion, streamNumber, ripe = decodeAddress(address)
readPosition = 20 # bypass the nonce, time, and object type
embeddedAddressVersion, varintLength = decodeVarint(data[readPosition:readPosition + 10])
readPosition += varintLength
embeddedStreamNumber, varintLength = decodeVarint(data[readPosition:readPosition + 10])
readPosition += varintLength
2014-12-25 09:57:34 +01:00
storedData = data[20:readPosition] # We'll store the address version and stream number (and some more) in the pubkeys table.
2014-08-27 09:14:32 +02:00
if addressVersion != embeddedAddressVersion:
logger.info('Pubkey decryption was UNsuccessful due to address version mismatch.')
2013-09-18 06:04:01 +02:00
return 'failed'
2014-08-27 09:14:32 +02:00
if streamNumber != embeddedStreamNumber:
logger.info('Pubkey decryption was UNsuccessful due to stream number mismatch.')
return 'failed'
tag = data[readPosition:readPosition + 32]
readPosition += 32
2014-12-25 09:57:34 +01:00
signedData = data[8:readPosition] # the time through the tag. More data is appended onto signedData below after the decryption.
2014-08-27 09:14:32 +02:00
encryptedData = data[readPosition:]
# Let us try to decrypt the pubkey
toAddress, cryptorObject = shared.neededPubkeys[tag]
if toAddress != address:
logger.critical('decryptAndCheckPubkeyPayload failed due to toAddress mismatch. This is very peculiar. toAddress: %s, address %s' % (toAddress, address))
# the only way I can think that this could happen is if someone encodes their address data two different ways.
2014-12-25 09:57:34 +01:00
# That sort of address-malleability should have been caught by the UI or API and an error given to the user.
2014-08-27 09:14:32 +02:00
return 'failed'
try:
decryptedData = cryptorObject.decrypt(encryptedData)
except:
# Someone must have encrypted some data with a different key
# but tagged it with a tag for which we are watching.
logger.info('Pubkey decryption was unsuccessful.')
return 'failed'
readPosition = 0
bitfieldBehaviors = decryptedData[readPosition:readPosition + 4]
readPosition += 4
publicSigningKey = '\x04' + decryptedData[readPosition:readPosition + 64]
readPosition += 64
publicEncryptionKey = '\x04' + decryptedData[readPosition:readPosition + 64]
readPosition += 64
specifiedNonceTrialsPerByte, specifiedNonceTrialsPerByteLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += specifiedNonceTrialsPerByteLength
specifiedPayloadLengthExtraBytes, specifiedPayloadLengthExtraBytesLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += specifiedPayloadLengthExtraBytesLength
2014-12-25 09:57:34 +01:00
storedData += decryptedData[:readPosition]
signedData += decryptedData[:readPosition]
2014-08-27 09:14:32 +02:00
signatureLength, signatureLengthLength = decodeVarint(
decryptedData[readPosition:readPosition + 10])
readPosition += signatureLengthLength
signature = decryptedData[readPosition:readPosition + signatureLength]
2016-03-23 23:26:57 +01:00
if highlevelcrypto.verify(signedData, signature, hexlify(publicSigningKey)):
2014-12-25 09:57:34 +01:00
logger.info('ECDSA verify passed (within decryptAndCheckPubkeyPayload)')
2014-08-27 09:14:32 +02:00
else:
2014-12-25 09:57:34 +01:00
logger.info('ECDSA verify failed (within decryptAndCheckPubkeyPayload)')
return 'failed'
2014-08-27 09:14:32 +02:00
sha = hashlib.new('sha512')
sha.update(publicSigningKey + publicEncryptionKey)
ripeHasher = hashlib.new('ripemd160')
ripeHasher.update(sha.digest())
embeddedRipe = ripeHasher.digest()
if embeddedRipe != ripe:
# Although this pubkey object had the tag were were looking for and was
# encrypted with the correct encryption key, it doesn't contain the
2014-12-25 09:57:34 +01:00
# correct pubkeys. Someone is either being malicious or using buggy software.
2014-08-27 09:14:32 +02:00
logger.info('Pubkey decryption was UNsuccessful due to RIPE mismatch.')
return 'failed'
# Everything checked out. Insert it into the pubkeys table.
logger.info('within decryptAndCheckPubkeyPayload, addressVersion: %s, streamNumber: %s \n\
ripe %s\n\
publicSigningKey in hex: %s\n\
publicEncryptionKey in hex: %s' % (addressVersion,
streamNumber,
2016-03-23 23:26:57 +01:00
hexlify(ripe),
hexlify(publicSigningKey),
hexlify(publicEncryptionKey)
2014-08-27 09:14:32 +02:00
)
)
2015-03-09 07:35:32 +01:00
t = (address, addressVersion, storedData, int(time.time()), 'yes')
2014-08-27 09:14:32 +02:00
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
return 'successful'
except varintDecodeError as e:
logger.info('Pubkey decryption was UNsuccessful due to a malformed varint.')
2013-09-18 06:04:01 +02:00
return 'failed'
2014-08-27 09:14:32 +02:00
except Exception as e:
logger.critical('Pubkey decryption was UNsuccessful because of an unhandled exception! This is definitely a bug! \n%s' % traceback.format_exc())
2013-09-18 06:04:01 +02:00
return 'failed'
Peer = collections.namedtuple('Peer', ['host', 'port'])
2014-08-27 09:14:32 +02:00
def checkAndShareObjectWithPeers(data):
"""
This function is called after either receiving an object off of the wire
2014-08-27 09:14:32 +02:00
or after receiving one as ackdata.
Returns the length of time that we should reserve to process this message
if we are receiving it off of the wire.
"""
if len(data) > 2 ** 18:
logger.info('The payload length of this object is too large (%s bytes). Ignoring it.' % len(data))
return 0
# Let us check to make sure that the proof of work is sufficient.
if not protocol.isProofOfWorkSufficient(data):
2014-08-27 09:14:32 +02:00
logger.info('Proof of work is insufficient.')
return 0
endOfLifeTime, = unpack('>Q', data[8:16])
if endOfLifeTime - int(time.time()) > 28 * 24 * 60 * 60 + 10800: # The TTL may not be larger than 28 days + 3 hours of wiggle room
logger.info('This object\'s End of Life time is too far in the future. Ignoring it. Time is %s' % endOfLifeTime)
return 0
if endOfLifeTime - int(time.time()) < - 3600: # The EOL time was more than an hour ago. That's too much.
logger.info('This object\'s End of Life time was more than an hour ago. Ignoring the object. Time is %s' % endOfLifeTime)
return 0
intObjectType, = unpack('>I', data[16:20])
try:
if intObjectType == 0:
_checkAndShareGetpubkeyWithPeers(data)
return 0.1
elif intObjectType == 1:
_checkAndSharePubkeyWithPeers(data)
return 0.1
elif intObjectType == 2:
_checkAndShareMsgWithPeers(data)
return 0.6
elif intObjectType == 3:
_checkAndShareBroadcastWithPeers(data)
return 0.6
else:
_checkAndShareUndefinedObjectWithPeers(data)
return 0.6
except varintDecodeError as e:
logger.debug("There was a problem with a varint while checking to see whether it was appropriate to share an object with peers. Some details: %s" % e)
except Exception as e:
logger.critical('There was a problem while checking to see whether it was appropriate to share an object with peers. This is definitely a bug! \n%s' % traceback.format_exc())
return 0
2014-08-27 09:14:32 +02:00
def _checkAndShareUndefinedObjectWithPeers(data):
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass nonce, time, and object type
objectVersion, objectVersionLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += objectVersionLength
streamNumber, streamNumberLength = decodeVarint(
data[readPosition:readPosition + 9])
if not streamNumber in state.streamsInWhichIAmParticipating:
2014-08-27 09:14:32 +02:00
logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber)
return
2014-08-27 09:14:32 +02:00
inventoryHash = calculateInventoryHash(data)
if inventoryHash in Inventory():
2014-08-27 09:14:32 +02:00
logger.debug('We have already received this undefined object. Ignoring.')
return
objectType, = unpack('>I', data[16:20])
Inventory()[inventoryHash] = (
2014-08-27 09:14:32 +02:00
objectType, streamNumber, data, embeddedTime,'')
2016-03-23 23:26:57 +01:00
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
2014-08-27 09:14:32 +02:00
def _checkAndShareMsgWithPeers(data):
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass nonce, time, and object type
objectVersion, objectVersionLength = decodeVarint(
data[readPosition:readPosition + 9])
readPosition += objectVersionLength
2014-08-27 09:14:32 +02:00
streamNumber, streamNumberLength = decodeVarint(
data[readPosition:readPosition + 9])
if not streamNumber in state.streamsInWhichIAmParticipating:
2014-08-27 09:14:32 +02:00
logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber)
return
2014-08-27 09:14:32 +02:00
readPosition += streamNumberLength
inventoryHash = calculateInventoryHash(data)
if inventoryHash in Inventory():
logger.debug('We have already received this msg message. Ignoring.')
return
# This msg message is valid. Let's let our peers know about it.
2014-08-27 09:14:32 +02:00
objectType = 2
Inventory()[inventoryHash] = (
2014-08-27 09:14:32 +02:00
objectType, streamNumber, data, embeddedTime,'')
2016-03-23 23:26:57 +01:00
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's enqueue it to be processed ourselves.
objectProcessorQueue.put((objectType,data))
2014-08-27 09:14:32 +02:00
def _checkAndShareGetpubkeyWithPeers(data):
if len(data) < 42:
logger.info('getpubkey message doesn\'t contain enough data. Ignoring.')
return
2014-08-27 09:14:32 +02:00
if len(data) > 200:
logger.info('getpubkey is abnormally long. Sanity check failed. Ignoring object.')
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass the nonce, time, and object type
requestedAddressVersionNumber, addressVersionLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += addressVersionLength
streamNumber, streamNumberLength = decodeVarint(
data[readPosition:readPosition + 10])
if not streamNumber in state.streamsInWhichIAmParticipating:
logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber)
return
readPosition += streamNumberLength
inventoryHash = calculateInventoryHash(data)
if inventoryHash in Inventory():
logger.debug('We have already received this getpubkey request. Ignoring it.')
return
2014-08-27 09:14:32 +02:00
objectType = 0
Inventory()[inventoryHash] = (
objectType, streamNumber, data, embeddedTime,'')
# This getpubkey request is valid. Forward to peers.
2016-03-23 23:26:57 +01:00
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's queue it to be processed ourselves.
objectProcessorQueue.put((objectType,data))
2014-08-27 09:14:32 +02:00
def _checkAndSharePubkeyWithPeers(data):
if len(data) < 146 or len(data) > 440: # sanity check
return
2014-08-27 09:14:32 +02:00
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass the nonce, time, and object type
addressVersion, varintLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += varintLength
streamNumber, varintLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += varintLength
if not streamNumber in state.streamsInWhichIAmParticipating:
logger.debug('The streamNumber %s isn\'t one we are interested in.' % streamNumber)
return
if addressVersion >= 4:
tag = data[readPosition:readPosition + 32]
2016-03-23 23:26:57 +01:00
logger.debug('tag in received pubkey is: %s' % hexlify(tag))
else:
tag = ''
inventoryHash = calculateInventoryHash(data)
if inventoryHash in Inventory():
logger.debug('We have already received this pubkey. Ignoring it.')
return
2014-08-27 09:14:32 +02:00
objectType = 1
Inventory()[inventoryHash] = (
objectType, streamNumber, data, embeddedTime, tag)
# This object is valid. Forward it to peers.
2016-03-23 23:26:57 +01:00
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
protocol.broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
# Now let's queue it to be processed ourselves.
objectProcessorQueue.put((objectType,data))
2014-08-27 09:14:32 +02:00
def _checkAndShareBroadcastWithPeers(data):
if len(data) < 180:
logger.debug('The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.')
return
2014-08-27 09:14:32 +02:00
embeddedTime, = unpack('>Q', data[8:16])
readPosition = 20 # bypass the nonce, time, and object type
broadcastVersion, broadcastVersionLength = decodeVarint(
data[readPosition:readPosition + 10])
readPosition += broadcastVersionLength
if broadcastVersion >= 2:
streamNumber, streamNumberLength = decodeVarint(data[readPosition:readPosition + 10])
readPosition += streamNumberLength