|
|
|
@ -62,35 +62,35 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
logger.info('ID of the receiveDataThread is %s. The size of the shared.connectedHostsList is now %s'%(id(self),len(shared.connectedHostsList)))
|
|
|
|
|
logger.debug('ID of the receiveDataThread is %s. The size of the shared.connectedHostsList is now %s'%(id(self),len(shared.connectedHostsList)))
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
dataLen = len(self.data)
|
|
|
|
|
try:
|
|
|
|
|
self.data += self.sock.recv(4096)
|
|
|
|
|
except socket.timeout:
|
|
|
|
|
logger.info('Timeout occurred waiting for data from %s. Closing receiveData thread. (ID: %s)'%(self.peer,id(self)))
|
|
|
|
|
logger.debug('Timeout occurred waiting for data from %s. Closing receiveData thread. (ID: %s)'%(self.peer,id(self)))
|
|
|
|
|
break
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('sock.recv error. Closing receiveData thread (HOST:%s ID:%s ).%s'%(self.peer,id(self),err))
|
|
|
|
|
logger.debug('sock.recv error. Closing receiveData thread (HOST:%s ID:%s ).%s'%(self.peer,id(self),err))
|
|
|
|
|
break
|
|
|
|
|
# print 'Received', repr(self.data)
|
|
|
|
|
if len(self.data) == dataLen: # If self.sock.recv returned no data:
|
|
|
|
|
logger.info('Connection to %s closed. Closing receiveData thread. (ID:%s)'%(self.peer,id(self)))
|
|
|
|
|
logger.debug('Connection to %s closed. Closing receiveData thread. (ID:%s)'%(self.peer,id(self)))
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
self.processData()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
del self.selfInitiatedConnections[self.streamNumber][self]
|
|
|
|
|
logger.info('removed self (a receiveDataThread) from selfInitiatedConnections')
|
|
|
|
|
logger.debug('removed self (a receiveDataThread) from selfInitiatedConnections')
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
|
|
|
|
try:
|
|
|
|
|
del shared.connectedHostsList[self.peer.host]
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('Could not delete %s from shared.connectedHostsList. %s'%(self.peer.host, err))
|
|
|
|
|
logger.debug('Could not delete %s from shared.connectedHostsList. %s'%(self.peer.host, err))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
|
|
|
@ -98,7 +98,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
|
|
|
|
logger.info('The size of the connectedHostsList is now: %s'%len(shared.connectedHostsList))
|
|
|
|
|
logger.debug('The size of the connectedHostsList is now: %s'%len(shared.connectedHostsList))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def processData(self):
|
|
|
|
@ -110,7 +110,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
return
|
|
|
|
|
if self.data[0:4] != '\xe9\xbe\xb4\xd9':
|
|
|
|
|
if shared.verbose >= 1:
|
|
|
|
|
logger.info('The magic bytes were not correct. First 40 bytes of data: %s'%repr(self.data[0:40]))
|
|
|
|
|
logger.debug('The magic bytes were not correct. First 40 bytes of data: %s'%repr(self.data[0:40]))
|
|
|
|
|
|
|
|
|
|
self.data = ""
|
|
|
|
|
return
|
|
|
|
@ -118,7 +118,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
if len(self.data) < self.payloadLength + 24: # check if the whole message has arrived yet.
|
|
|
|
|
return
|
|
|
|
|
if self.data[20:24] != hashlib.sha512(self.data[24:self.payloadLength + 24]).digest()[0:4]: # test the checksum in the message. If it is correct...
|
|
|
|
|
logger.info('Checksum incorrect. Clearing this message.')
|
|
|
|
|
logger.debug('Checksum incorrect. Clearing this message.')
|
|
|
|
|
self.data = self.data[self.payloadLength + 24:]
|
|
|
|
|
self.processData()
|
|
|
|
|
return
|
|
|
|
@ -131,7 +131,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
shared.knownNodesLock.release()
|
|
|
|
|
if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
|
|
|
|
|
remoteCommand = self.data[4:16]
|
|
|
|
|
logger.info('remoteCommand %s from %s'%(repr(remoteCommand.replace('\x00', '')),self.peer))
|
|
|
|
|
logger.debug('remoteCommand %s from %s'%(repr(remoteCommand.replace('\x00', '')),self.peer))
|
|
|
|
|
|
|
|
|
|
if remoteCommand == 'version\x00\x00\x00\x00\x00':
|
|
|
|
|
self.recversion(self.data[24:self.payloadLength + 24])
|
|
|
|
@ -166,13 +166,13 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
objectHash, = random.sample(
|
|
|
|
|
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
|
|
|
|
if objectHash in shared.inventory:
|
|
|
|
|
logger.info('Inventory (in memory) already has object listed in inv message.')
|
|
|
|
|
logger.debug('Inventory (in memory) already has object listed in inv message.')
|
|
|
|
|
|
|
|
|
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
|
|
|
|
objectHash]
|
|
|
|
|
elif shared.isInSqlInventory(objectHash):
|
|
|
|
|
if shared.verbose >= 3:
|
|
|
|
|
logger.info('Inventory (SQL on disk) already has object listed in inv message.')
|
|
|
|
|
logger.debug('Inventory (SQL on disk) already has object listed in inv message.')
|
|
|
|
|
|
|
|
|
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
|
|
|
|
objectHash]
|
|
|
|
@ -181,7 +181,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
|
|
|
|
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
|
|
|
|
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
|
|
|
|
logger.info('(concerning %s) number of objectsThatWeHaveYetToGetFromThisPeer is now %s'%(self.peer,len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
logger.debug('(concerning %s) number of objectsThatWeHaveYetToGetFromThisPeer is now %s'%(self.peer,len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
|
|
|
@ -190,7 +190,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
pass
|
|
|
|
|
break
|
|
|
|
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
|
|
|
|
logger.info('(concerning %s) number of objectsThatWeHaveYetToGetFromThisPeer is now %s'%(self.peer, len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
logger.debug('(concerning %s) number of objectsThatWeHaveYetToGetFromThisPeer is now %s'%(self.peer, len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
|
|
|
@ -198,7 +198,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
except:
|
|
|
|
|
pass
|
|
|
|
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
|
|
|
|
logger.info('(concerning %s) number of objectsThatWeHaveYetToGetFromThisPeer is now %s'%(self.peer,len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
logger.debug('(concerning %s) number of objectsThatWeHaveYetToGetFromThisPeer is now %s'%(self.peer,len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
|
|
|
|
|
shared.numberOfObjectsThatWeHaveYetToGetPerPeer[self.peer] = len(
|
|
|
|
|
self.objectsThatWeHaveYetToGetFromThisPeer) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
|
|
|
@ -221,7 +221,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
return POW <= 2 ** 64 / ((len(data) + payloadLengthExtraBytes) * (nonceTrialsPerByte))
|
|
|
|
|
|
|
|
|
|
def sendpong(self):
|
|
|
|
|
logger.info('Sending pong')
|
|
|
|
|
logger.debug('Sending pong')
|
|
|
|
|
try:
|
|
|
|
|
self.sock.sendall(
|
|
|
|
|
'\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
|
|
|
@ -231,7 +231,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def recverack(self):
|
|
|
|
|
logger.info('verack received')
|
|
|
|
|
logger.debug('verack received')
|
|
|
|
|
self.verackReceived = True
|
|
|
|
|
if self.verackSent:
|
|
|
|
|
# We have thus both sent and received a verack.
|
|
|
|
@ -247,9 +247,9 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
|
|
|
|
|
|
|
|
|
logger.info('Connection fully established with %s'%str(self.peer))
|
|
|
|
|
logger.info('The size of the connectedHostsList is now %s'%len(shared.connectedHostsList))
|
|
|
|
|
logger.info('The length of sendDataQueues is now: %s'%len(shared.sendDataQueues))
|
|
|
|
|
logger.info('broadcasting addr from within connectionFullyEstablished function.')
|
|
|
|
|
logger.debug('The size of the connectedHostsList is now %s'%len(shared.connectedHostsList))
|
|
|
|
|
logger.debug('The length of sendDataQueues is now: %s'%len(shared.sendDataQueues))
|
|
|
|
|
logger.debug('broadcasting addr from within connectionFullyEstablished function.')
|
|
|
|
|
|
|
|
|
|
#self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
|
|
|
|
# self.remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
|
|
|
|
@ -259,7 +259,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
self.sendaddr() # This is one large addr message to this one peer.
|
|
|
|
|
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
|
|
|
|
logger.info('We are connected to too many people. Closing connection.')
|
|
|
|
|
logger.debug('We are connected to too many people. Closing connection.')
|
|
|
|
|
|
|
|
|
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
|
|
|
|
return
|
|
|
|
@ -310,7 +310,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
|
|
|
headerData += pack('>L', len(payload))
|
|
|
|
|
headerData += hashlib.sha512(payload).digest()[:4]
|
|
|
|
|
logger.info('Sending huge inv message with %s objects to just this one peer'%numberOfObjects)
|
|
|
|
|
logger.debug('Sending huge inv message with %s objects to just this one peer'%numberOfObjects)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
self.sock.sendall(headerData + payload)
|
|
|
|
@ -338,13 +338,13 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition += 4
|
|
|
|
|
|
|
|
|
|
if embeddedTime > (int(time.time()) + 10800): # prevent funny business
|
|
|
|
|
logger.info('The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.')
|
|
|
|
|
logger.debug('The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
if embeddedTime < (int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept):
|
|
|
|
|
logger.info('The embedded time in this broadcast message is too old. Ignoring message.')
|
|
|
|
|
logger.debug('The embedded time in this broadcast message is too old. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
if len(data) < 180:
|
|
|
|
|
logger.info('The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.')
|
|
|
|
|
logger.debug('The payload length of this broadcast packet is unreasonably low. Someone is probably trying funny business. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
# Let us check to make sure the stream number is correct (thus
|
|
|
|
|
# preventing an individual from sending broadcasts out on the wrong
|
|
|
|
@ -355,18 +355,18 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
streamNumber, streamNumberLength = decodeVarint(data[
|
|
|
|
|
readPosition + broadcastVersionLength:readPosition + broadcastVersionLength + 10])
|
|
|
|
|
if streamNumber != self.streamNumber:
|
|
|
|
|
logger.info('The stream number encoded in this broadcast message (%s) does not match the stream number on which it was received. Ignoring it.'%streamNumber)
|
|
|
|
|
logger.debug('The stream number encoded in this broadcast message (%s) does not match the stream number on which it was received. Ignoring it.'%streamNumber)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
shared.numberOfInventoryLookupsPerformed += 1
|
|
|
|
|
shared.inventoryLock.acquire()
|
|
|
|
|
self.inventoryHash = calculateInventoryHash(data)
|
|
|
|
|
if self.inventoryHash in shared.inventory:
|
|
|
|
|
logger.info('We have already received this broadcast object. Ignoring.')
|
|
|
|
|
logger.debug('We have already received this broadcast object. Ignoring.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
elif shared.isInSqlInventory(self.inventoryHash):
|
|
|
|
|
logger.info('We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
logger.debug('We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
# It is valid so far. Let's let our peers know about it.
|
|
|
|
@ -399,10 +399,10 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
|
|
|
|
(time.time() - self.messageProcessingStartTime)
|
|
|
|
|
if sleepTime > 0 and doTimingAttackMitigation:
|
|
|
|
|
logger.info('Timing attack mitigation: Sleeping for %s seconds.'%sleepTime)
|
|
|
|
|
logger.debug('Timing attack mitigation: Sleeping for %s seconds.'%sleepTime)
|
|
|
|
|
|
|
|
|
|
time.sleep(sleepTime)
|
|
|
|
|
logger.info('Total message processing time: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Total message processing time: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# A broadcast message has a valid time and POW and requires processing.
|
|
|
|
@ -412,7 +412,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
data[readPosition:readPosition + 9])
|
|
|
|
|
readPosition += broadcastVersionLength
|
|
|
|
|
if broadcastVersion < 1 or broadcastVersion > 3:
|
|
|
|
|
logger.info('Cannot decode incoming broadcast versions higher than 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
|
|
|
|
|
logger.debug('Cannot decode incoming broadcast versions higher than 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
|
|
|
|
|
return
|
|
|
|
|
if broadcastVersion == 1:
|
|
|
|
|
beginningOfPubkeyPosition = readPosition # used when we add the pubkey to our pubkey table
|
|
|
|
@ -440,7 +440,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
sendersHash = data[readPosition:readPosition + 20]
|
|
|
|
|
if sendersHash not in shared.broadcastSendersForWhichImWatching:
|
|
|
|
|
# Display timing data
|
|
|
|
|
logger.info('Time spent deciding that we are not interested in this v1 broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Time spent deciding that we are not interested in this v1 broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
return
|
|
|
|
|
# At this point, this message claims to be from sendersHash and
|
|
|
|
|
# we are interested in it. We still have to hash the public key
|
|
|
|
@ -472,17 +472,17 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
signature = data[readPosition:readPosition + signatureLength]
|
|
|
|
|
try:
|
|
|
|
|
if not highlevelcrypto.verify(data[12:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')):
|
|
|
|
|
logger.info('ECDSA verify failed')
|
|
|
|
|
logger.debug('ECDSA verify failed')
|
|
|
|
|
return
|
|
|
|
|
logger.info('ECDSA verify passed')
|
|
|
|
|
logger.debug('ECDSA verify passed')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('ECDSA verify failed %s'%err)
|
|
|
|
|
logger.debug('ECDSA verify failed %s'%err)
|
|
|
|
|
return
|
|
|
|
|
# verify passed
|
|
|
|
|
fromAddress = encodeAddress(
|
|
|
|
|
sendersAddressVersion, sendersStream, ripe.digest())
|
|
|
|
|
|
|
|
|
|
logger.info('fromAddress: %s'%fromAddress)
|
|
|
|
|
logger.debug('fromAddress: %s'%fromAddress)
|
|
|
|
|
|
|
|
|
|
# Let's store the public key in case we want to reply to this person.
|
|
|
|
|
# We don't have the correct nonce or time (which would let us
|
|
|
|
@ -509,7 +509,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
body = message
|
|
|
|
|
subject = ''
|
|
|
|
|
elif messageEncodingType == 0:
|
|
|
|
|
logger.info('messageEncodingType == 0. Doing nothing with the message.')
|
|
|
|
|
logger.debug('messageEncodingType == 0. Doing nothing with the message.')
|
|
|
|
|
else:
|
|
|
|
|
body = 'Unknown encoding type.\n\n' + repr(message)
|
|
|
|
|
subject = ''
|
|
|
|
@ -537,7 +537,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
call([apiNotifyPath, "newBroadcast"])
|
|
|
|
|
|
|
|
|
|
# Display timing data
|
|
|
|
|
logger.info('Time spent processing this interesting broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Time spent processing this interesting broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
|
|
|
|
|
if broadcastVersion == 2:
|
|
|
|
|
cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
|
|
|
|
@ -549,14 +549,14 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
decryptedData = cryptorObject.decrypt(data[readPosition:])
|
|
|
|
|
toRipe = key # This is the RIPE hash of the sender's pubkey. We need this below to compare to the RIPE hash of the sender's address to verify that it was encrypted by with their key rather than some other key.
|
|
|
|
|
initialDecryptionSuccessful = True
|
|
|
|
|
logger.info('EC decryption successful using key associated with ripe hash: %s'% key.encode('hex'))
|
|
|
|
|
logger.debug('EC decryption successful using key associated with ripe hash: %s'% key.encode('hex'))
|
|
|
|
|
break
|
|
|
|
|
except Exception as err:
|
|
|
|
|
pass
|
|
|
|
|
# print 'cryptorObject.decrypt Exception:', err
|
|
|
|
|
if not initialDecryptionSuccessful:
|
|
|
|
|
# This is not a broadcast I am interested in.
|
|
|
|
|
logger.info('Length of time program spent failing to decrypt this v2 broadcast: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Length of time program spent failing to decrypt this v2 broadcast: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
return
|
|
|
|
|
# At this point this is a broadcast I have decrypted and thus am
|
|
|
|
|
# interested in.
|
|
|
|
@ -566,7 +566,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
sendersAddressVersion, sendersAddressVersionLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 9])
|
|
|
|
|
if sendersAddressVersion < 2 or sendersAddressVersion > 3:
|
|
|
|
|
logger.info('Cannot decode senderAddressVersion other than 2 or 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
|
|
|
|
|
logger.debug('Cannot decode senderAddressVersion other than 2 or 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
|
|
|
|
|
return
|
|
|
|
|
readPosition += sendersAddressVersionLength
|
|
|
|
|
sendersStream, sendersStreamLength = decodeVarint(
|
|
|
|
@ -587,11 +587,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
logger.info('sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s'%requiredAverageProofOfWorkNonceTrialsPerByte)
|
|
|
|
|
logger.debug('sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s'%requiredAverageProofOfWorkNonceTrialsPerByte)
|
|
|
|
|
requiredPayloadLengthExtraBytes, varintLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
logger.info('sender\'s requiredPayloadLengthExtraBytes is %s'%requiredPayloadLengthExtraBytes)
|
|
|
|
|
logger.debug('sender\'s requiredPayloadLengthExtraBytes is %s'%requiredPayloadLengthExtraBytes)
|
|
|
|
|
endOfPubkeyPosition = readPosition
|
|
|
|
|
|
|
|
|
|
sha = hashlib.new('sha512')
|
|
|
|
@ -600,7 +600,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
ripe.update(sha.digest())
|
|
|
|
|
|
|
|
|
|
if toRipe != ripe.digest():
|
|
|
|
|
logger.info('The encryption key used to encrypt this message doesn\'t match the keys inbedded in the message itself. Ignoring message.')
|
|
|
|
|
logger.debug('The encryption key used to encrypt this message doesn\'t match the keys inbedded in the message itself. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
messageEncodingType, messageEncodingTypeLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 9])
|
|
|
|
@ -620,11 +620,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition:readPosition + signatureLength]
|
|
|
|
|
try:
|
|
|
|
|
if not highlevelcrypto.verify(decryptedData[:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')):
|
|
|
|
|
logger.info('ECDSA verify failed')
|
|
|
|
|
logger.debug('ECDSA verify failed')
|
|
|
|
|
return
|
|
|
|
|
logger.info('ECDSA verify passed')
|
|
|
|
|
logger.debug('ECDSA verify passed')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('ECDSA verify failed %s'%err)
|
|
|
|
|
logger.debug('ECDSA verify failed %s'%err)
|
|
|
|
|
return
|
|
|
|
|
# verify passed
|
|
|
|
|
|
|
|
|
@ -644,7 +644,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
fromAddress = encodeAddress(
|
|
|
|
|
sendersAddressVersion, sendersStream, ripe.digest())
|
|
|
|
|
logger.info('fromAddress: %s'%fromAddress)
|
|
|
|
|
logger.debug('fromAddress: %s'%fromAddress)
|
|
|
|
|
|
|
|
|
|
if messageEncodingType == 2:
|
|
|
|
|
subject, body = self.decodeType2Message(message)
|
|
|
|
@ -652,7 +652,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
body = message
|
|
|
|
|
subject = ''
|
|
|
|
|
elif messageEncodingType == 0:
|
|
|
|
|
logger.info('messageEncodingType == 0. Doing nothing with the message.')
|
|
|
|
|
logger.debug('messageEncodingType == 0. Doing nothing with the message.')
|
|
|
|
|
else:
|
|
|
|
|
body = 'Unknown encoding type.\n\n' + repr(message)
|
|
|
|
|
subject = ''
|
|
|
|
@ -680,7 +680,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
call([apiNotifyPath, "newBroadcast"])
|
|
|
|
|
|
|
|
|
|
# Display timing data
|
|
|
|
|
logger.info('Time spent processing this interesting broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Time spent processing this interesting broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
|
|
|
|
|
if broadcastVersion == 3:
|
|
|
|
|
cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
|
|
|
|
@ -689,15 +689,15 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
embeddedTag = data[readPosition:readPosition+32]
|
|
|
|
|
readPosition += 32
|
|
|
|
|
if embeddedTag not in shared.MyECSubscriptionCryptorObjects:
|
|
|
|
|
logger.info('We\'re not interested in this broadcast.')
|
|
|
|
|
logger.debug('We\'re not interested in this broadcast.')
|
|
|
|
|
return
|
|
|
|
|
# We are interested in this broadcast because of its tag.
|
|
|
|
|
cryptorObject = shared.MyECSubscriptionCryptorObjects[embeddedTag]
|
|
|
|
|
try:
|
|
|
|
|
decryptedData = cryptorObject.decrypt(data[readPosition:])
|
|
|
|
|
logger.info('EC decryption successful')
|
|
|
|
|
logger.debug('EC decryption successful')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('Broadcast version 3 decryption Unsuccessful.')
|
|
|
|
|
logger.debug('Broadcast version 3 decryption Unsuccessful.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
signedBroadcastVersion, readPosition = decodeVarint(
|
|
|
|
@ -706,13 +706,13 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
sendersAddressVersion, sendersAddressVersionLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 9])
|
|
|
|
|
if sendersAddressVersion < 4:
|
|
|
|
|
logger.info('Cannot decode senderAddressVersion less than 4 for broadcast version number 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
|
|
|
|
|
logger.debug('Cannot decode senderAddressVersion less than 4 for broadcast version number 3. Assuming the sender isn\'t being silly, you should upgrade Bitmessage because this message shall be ignored.')
|
|
|
|
|
return
|
|
|
|
|
readPosition += sendersAddressVersionLength
|
|
|
|
|
sendersStream, sendersStreamLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 9])
|
|
|
|
|
if sendersStream != cleartextStreamNumber:
|
|
|
|
|
logger.info('The stream number outside of the encryption on which the POW was completed doesn\'t match the stream number inside the encryption. Ignoring broadcast.')
|
|
|
|
|
logger.debug('The stream number outside of the encryption on which the POW was completed doesn\'t match the stream number inside the encryption. Ignoring broadcast.')
|
|
|
|
|
return
|
|
|
|
|
readPosition += sendersStreamLength
|
|
|
|
|
behaviorBitfield = decryptedData[readPosition:readPosition + 4]
|
|
|
|
@ -727,11 +727,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
logger.info('sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s'% requiredAverageProofOfWorkNonceTrialsPerByte)
|
|
|
|
|
logger.debug('sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s'% requiredAverageProofOfWorkNonceTrialsPerByte)
|
|
|
|
|
requiredPayloadLengthExtraBytes, varintLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
logger.info('sender\'s requiredPayloadLengthExtraBytes is %s'%requiredPayloadLengthExtraBytes)
|
|
|
|
|
logger.debug('sender\'s requiredPayloadLengthExtraBytes is %s'%requiredPayloadLengthExtraBytes)
|
|
|
|
|
endOfPubkeyPosition = readPosition
|
|
|
|
|
|
|
|
|
|
sha = hashlib.new('sha512')
|
|
|
|
@ -743,7 +743,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
calculatedTag = hashlib.sha512(hashlib.sha512(encodeVarint(
|
|
|
|
|
sendersAddressVersion) + encodeVarint(sendersStream) + calculatedRipe).digest()).digest()[32:]
|
|
|
|
|
if calculatedTag != embeddedTag:
|
|
|
|
|
logger.info('The tag and encryption key used to encrypt this message doesn\'t match the keys inbedded in the message itself. Ignoring message.')
|
|
|
|
|
logger.debug('The tag and encryption key used to encrypt this message doesn\'t match the keys inbedded in the message itself. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
messageEncodingType, messageEncodingTypeLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 9])
|
|
|
|
@ -763,17 +763,17 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition:readPosition + signatureLength]
|
|
|
|
|
try:
|
|
|
|
|
if not highlevelcrypto.verify(decryptedData[:readPositionAtBottomOfMessage], signature, sendersPubSigningKey.encode('hex')):
|
|
|
|
|
logger.info('ECDSA verify failed')
|
|
|
|
|
logger.debug('ECDSA verify failed')
|
|
|
|
|
return
|
|
|
|
|
logger.info('ECDSA verify passed')
|
|
|
|
|
logger.debug('ECDSA verify passed')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('ECDSA verify failed %s'%err)
|
|
|
|
|
logger.debug('ECDSA verify failed %s'%err)
|
|
|
|
|
return
|
|
|
|
|
# verify passed
|
|
|
|
|
|
|
|
|
|
fromAddress = encodeAddress(
|
|
|
|
|
sendersAddressVersion, sendersStream, calculatedRipe)
|
|
|
|
|
logger.info('fromAddress: %s'%fromAddress)
|
|
|
|
|
logger.debug('fromAddress: %s'%fromAddress)
|
|
|
|
|
|
|
|
|
|
# Let's store the public key in case we want to reply to this person.
|
|
|
|
|
sqlExecute(
|
|
|
|
@ -794,7 +794,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
body = message
|
|
|
|
|
subject = ''
|
|
|
|
|
elif messageEncodingType == 0:
|
|
|
|
|
logger.info('messageEncodingType == 0. Doing nothing with the message.')
|
|
|
|
|
logger.debug('messageEncodingType == 0. Doing nothing with the message.')
|
|
|
|
|
else:
|
|
|
|
|
body = 'Unknown encoding type.\n\n' + repr(message)
|
|
|
|
|
subject = ''
|
|
|
|
@ -822,14 +822,14 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
call([apiNotifyPath, "newBroadcast"])
|
|
|
|
|
|
|
|
|
|
# Display timing data
|
|
|
|
|
logger.info('Time spent processing this interesting broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Time spent processing this interesting broadcast: %s'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
|
|
|
|
|
# We have received a msg message.
|
|
|
|
|
def recmsg(self, data):
|
|
|
|
|
self.messageProcessingStartTime = time.time()
|
|
|
|
|
# First we must check to make sure the proof of work is sufficient.
|
|
|
|
|
if not self.isProofOfWorkSufficient(data):
|
|
|
|
|
logger.info('Proof of work in msg message insufficient.')
|
|
|
|
|
logger.debug('Proof of work in msg message insufficient.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
readPosition = 8
|
|
|
|
@ -844,26 +844,26 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition += 4
|
|
|
|
|
|
|
|
|
|
if embeddedTime > int(time.time()) + 10800:
|
|
|
|
|
logger.info('The time in the msg message is too new. Ignoring it. Time: %s'%embeddedTime)
|
|
|
|
|
logger.debug('The time in the msg message is too new. Ignoring it. Time: %s'%embeddedTime)
|
|
|
|
|
return
|
|
|
|
|
if embeddedTime < int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept:
|
|
|
|
|
logger.info('The time in the msg message is too old. Ignoring it. Time:'%embeddedTime)
|
|
|
|
|
logger.debug('The time in the msg message is too old. Ignoring it. Time: %s'%embeddedTime)
|
|
|
|
|
return
|
|
|
|
|
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(
|
|
|
|
|
data[readPosition:readPosition + 9])
|
|
|
|
|
if streamNumberAsClaimedByMsg != self.streamNumber:
|
|
|
|
|
logger.info('The stream number encoded in this msg ( %s ) message does not match the stream number on which it was received. Ignoring it.'%streamNumberAsClaimedByMsg)
|
|
|
|
|
logger.debug('The stream number encoded in this msg ( %s ) message does not match the stream number on which it was received. Ignoring it.'%streamNumberAsClaimedByMsg)
|
|
|
|
|
return
|
|
|
|
|
readPosition += streamNumberAsClaimedByMsgLength
|
|
|
|
|
self.inventoryHash = calculateInventoryHash(data)
|
|
|
|
|
shared.numberOfInventoryLookupsPerformed += 1
|
|
|
|
|
shared.inventoryLock.acquire()
|
|
|
|
|
if self.inventoryHash in shared.inventory:
|
|
|
|
|
logger.info('We have already received this msg message. Ignoring.')
|
|
|
|
|
logger.debug('We have already received this msg message. Ignoring.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
elif shared.isInSqlInventory(self.inventoryHash):
|
|
|
|
|
logger.info('We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
logger.debug('We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
# This msg message is valid. Let's let our peers know about it.
|
|
|
|
@ -895,10 +895,10 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
|
|
|
|
(time.time() - self.messageProcessingStartTime)
|
|
|
|
|
if sleepTime > 0 and doTimingAttackMitigation:
|
|
|
|
|
logger.info('Timing attack mitigation: Sleeping for %s seconds.'%sleepTime)
|
|
|
|
|
logger.debug('Timing attack mitigation: Sleeping for %s seconds.'%sleepTime)
|
|
|
|
|
|
|
|
|
|
time.sleep(sleepTime)
|
|
|
|
|
logger.info('Total message processing time: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Total message processing time: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# A msg message has a valid time and POW and requires processing. The
|
|
|
|
@ -916,7 +916,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8')))))
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
logger.info('This was NOT an acknowledgement bound for me.')
|
|
|
|
|
logger.debug('This was NOT an acknowledgement bound for me.')
|
|
|
|
|
# print 'shared.ackdataForWhichImWatching', shared.ackdataForWhichImWatching
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -928,14 +928,14 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
encryptedData[readPosition:])
|
|
|
|
|
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
|
|
|
|
|
initialDecryptionSuccessful = True
|
|
|
|
|
logger.info('EC decryption successful using key associated with ripe hash: %s'%key.encode('hex'))
|
|
|
|
|
logger.debug('EC decryption successful using key associated with ripe hash: %s'%key.encode('hex'))
|
|
|
|
|
break
|
|
|
|
|
except Exception as err:
|
|
|
|
|
pass
|
|
|
|
|
# print 'cryptorObject.decrypt Exception:', err
|
|
|
|
|
if not initialDecryptionSuccessful:
|
|
|
|
|
# This is not a message bound for me.
|
|
|
|
|
logger.info('Length of time program spent failing to decrypt this message: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
logger.debug('Length of time program spent failing to decrypt this message: %s seconds.'%(time.time() - self.messageProcessingStartTime))
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
# This is a message bound for me.
|
|
|
|
@ -946,24 +946,24 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += messageVersionLength
|
|
|
|
|
if messageVersion != 1:
|
|
|
|
|
logger.info('Cannot understand message versions other than one. Ignoring message.')
|
|
|
|
|
logger.debug('Cannot understand message versions other than one. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
sendersAddressVersionNumber, sendersAddressVersionNumberLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += sendersAddressVersionNumberLength
|
|
|
|
|
if sendersAddressVersionNumber == 0:
|
|
|
|
|
logger.info('Cannot understand sendersAddressVersionNumber = 0. Ignoring message.')
|
|
|
|
|
logger.debug('Cannot understand sendersAddressVersionNumber = 0. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
if sendersAddressVersionNumber > 4:
|
|
|
|
|
logger.info('Sender\'s address version number %s not yet supported. Ignoring message.'%sendersAddressVersionNumber)
|
|
|
|
|
logger.debug('Sender\'s address version number %s not yet supported. Ignoring message.'%sendersAddressVersionNumber)
|
|
|
|
|
return
|
|
|
|
|
if len(decryptedData) < 170:
|
|
|
|
|
logger.info('Length of the unencrypted data is unreasonably short. Sanity check failed. Ignoring message.')
|
|
|
|
|
logger.debug('Length of the unencrypted data is unreasonably short. Sanity check failed. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
sendersStreamNumber, sendersStreamNumberLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
if sendersStreamNumber == 0:
|
|
|
|
|
logger.info('sender\'s stream number is 0. Ignoring message.')
|
|
|
|
|
logger.debug('sender\'s stream number is 0. Ignoring message.')
|
|
|
|
|
return
|
|
|
|
|
readPosition += sendersStreamNumberLength
|
|
|
|
|
behaviorBitfield = decryptedData[readPosition:readPosition + 4]
|
|
|
|
@ -978,17 +978,17 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
logger.info('sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s'%requiredAverageProofOfWorkNonceTrialsPerByte)
|
|
|
|
|
logger.debug('sender\'s requiredAverageProofOfWorkNonceTrialsPerByte is %s'%requiredAverageProofOfWorkNonceTrialsPerByte)
|
|
|
|
|
requiredPayloadLengthExtraBytes, varintLength = decodeVarint(
|
|
|
|
|
decryptedData[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
logger.info('sender\'s requiredPayloadLengthExtraBytes is %s'%requiredPayloadLengthExtraBytes)
|
|
|
|
|
logger.debug('sender\'s requiredPayloadLengthExtraBytes is %s'%requiredPayloadLengthExtraBytes)
|
|
|
|
|
endOfThePublicKeyPosition = readPosition # needed for when we store the pubkey in our database of pubkeys for later use.
|
|
|
|
|
if toRipe != decryptedData[readPosition:readPosition + 20]:
|
|
|
|
|
logger.info('The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.')
|
|
|
|
|
logger.info('See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html')
|
|
|
|
|
logger.info('your toRipe: %s'%toRipe.encode('hex'))
|
|
|
|
|
logger.info('embedded destination toRipe: %s'%decryptedData[readPosition:readPosition + 20].encode('hex'))
|
|
|
|
|
logger.debug('The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.')
|
|
|
|
|
logger.debug('See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html')
|
|
|
|
|
logger.debug('your toRipe: %s'%toRipe.encode('hex'))
|
|
|
|
|
logger.debug('embedded destination toRipe: %s'%decryptedData[readPosition:readPosition + 20].encode('hex'))
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
readPosition += 20
|
|
|
|
@ -1014,13 +1014,13 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition:readPosition + signatureLength]
|
|
|
|
|
try:
|
|
|
|
|
if not highlevelcrypto.verify(decryptedData[:positionOfBottomOfAckData], signature, pubSigningKey.encode('hex')):
|
|
|
|
|
logger.info('ECDSA verify failed')
|
|
|
|
|
logger.debug('ECDSA verify failed')
|
|
|
|
|
return
|
|
|
|
|
logger.info('ECDSA verify passed')
|
|
|
|
|
logger.debug('ECDSA verify passed')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('ECDSA verify failed %s'%err)
|
|
|
|
|
logger.debug('ECDSA verify failed %s'%err)
|
|
|
|
|
return
|
|
|
|
|
logger.info('As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.'%(helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey),helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey)))
|
|
|
|
|
logger.debug('As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.'%(helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey),helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey)))
|
|
|
|
|
|
|
|
|
|
# calculate the fromRipe.
|
|
|
|
|
sha = hashlib.new('sha512')
|
|
|
|
@ -1065,7 +1065,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
requiredPayloadLengthExtraBytes = shared.config.getint(
|
|
|
|
|
toAddress, 'payloadlengthextrabytes')
|
|
|
|
|
if not self.isProofOfWorkSufficient(encryptedData, requiredNonceTrialsPerByte, requiredPayloadLengthExtraBytes):
|
|
|
|
|
logger.info('Proof of work in msg message insufficient only because it does not meet our higher requirement.')
|
|
|
|
|
logger.debug('Proof of work in msg message insufficient only because it does not meet our higher requirement.')
|
|
|
|
|
return
|
|
|
|
|
blockMessage = False # Gets set to True if the user shouldn't see the message according to black or white lists.
|
|
|
|
|
if shared.config.get('bitmessagesettings', 'blackwhitelist') == 'black': # If we are using a blacklist
|
|
|
|
@ -1073,7 +1073,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
'''SELECT label FROM blacklist where address=? and enabled='1' ''',
|
|
|
|
|
fromAddress)
|
|
|
|
|
if queryreturn != []:
|
|
|
|
|
logger.info('Message ignored because address is in blacklist.')
|
|
|
|
|
logger.debug('Message ignored because address is in blacklist.')
|
|
|
|
|
|
|
|
|
|
blockMessage = True
|
|
|
|
|
else: # We're using a whitelist
|
|
|
|
@ -1081,11 +1081,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
'''SELECT label FROM whitelist where address=? and enabled='1' ''',
|
|
|
|
|
fromAddress)
|
|
|
|
|
if queryreturn == []:
|
|
|
|
|
logger.info('Message ignored because address not in whitelist.')
|
|
|
|
|
logger.debug('Message ignored because address not in whitelist.')
|
|
|
|
|
blockMessage = True
|
|
|
|
|
if not blockMessage:
|
|
|
|
|
logger.info('fromAddress: %s'%fromAddress)
|
|
|
|
|
logger.info('First 150 characters of message: %s'%repr(message[:150]))
|
|
|
|
|
logger.debug('First 150 characters of message: %s'%repr(message[:150]))
|
|
|
|
|
|
|
|
|
|
toLabel = shared.config.get(toAddress, 'label')
|
|
|
|
|
if toLabel == '':
|
|
|
|
@ -1097,7 +1097,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
body = message
|
|
|
|
|
subject = ''
|
|
|
|
|
elif messageEncodingType == 0:
|
|
|
|
|
logger.info('messageEncodingType == 0. Doing nothing with the message. They probably just sent it so that we would store their public key or send their ack data for them.')
|
|
|
|
|
logger.debug('messageEncodingType == 0. Doing nothing with the message. They probably just sent it so that we would store their public key or send their ack data for them.')
|
|
|
|
|
else:
|
|
|
|
|
body = 'Unknown encoding type.\n\n' + repr(message)
|
|
|
|
|
subject = ''
|
|
|
|
@ -1150,7 +1150,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
shared.workerQueue.put(('sendbroadcast', ''))
|
|
|
|
|
|
|
|
|
|
if self.isAckDataValid(ackData):
|
|
|
|
|
logger.info('ackData is valid. Will process it.')
|
|
|
|
|
logger.debug('ackData is valid. Will process it.')
|
|
|
|
|
self.ackDataThatWeHaveYetToSend.append(
|
|
|
|
|
ackData) # When we have processed all data, the processData function will pop the ackData out and process it as if it is a message received from our peer.
|
|
|
|
|
# Display timing data
|
|
|
|
@ -1161,8 +1161,8 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
sum = 0
|
|
|
|
|
for item in shared.successfullyDecryptMessageTimings:
|
|
|
|
|
sum += item
|
|
|
|
|
logger.info('Time to decrypt this message successfully: %s'%timeRequiredToAttemptToDecryptMessage)
|
|
|
|
|
logger.info('Average time for all message decryption successes since startup: %s'%(sum / len(shared.successfullyDecryptMessageTimings)))
|
|
|
|
|
logger.debug('Time to decrypt this message successfully: %s'%timeRequiredToAttemptToDecryptMessage)
|
|
|
|
|
logger.debug('Average time for all message decryption successes since startup: %s'%(sum / len(shared.successfullyDecryptMessageTimings)))
|
|
|
|
|
|
|
|
|
|
def decodeType2Message(self, message):
|
|
|
|
|
bodyPositionIndex = string.find(message, '\nBody:')
|
|
|
|
@ -1182,14 +1182,14 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
def isAckDataValid(self, ackData):
|
|
|
|
|
if len(ackData) < 24:
|
|
|
|
|
logger.info('The length of ackData is unreasonably short. Not sending ackData.')
|
|
|
|
|
logger.debug('The length of ackData is unreasonably short. Not sending ackData.')
|
|
|
|
|
return False
|
|
|
|
|
if ackData[0:4] != '\xe9\xbe\xb4\xd9':
|
|
|
|
|
logger.info('Ackdata magic bytes were wrong. Not sending ackData.')
|
|
|
|
|
logger.debug('Ackdata magic bytes were wrong. Not sending ackData.')
|
|
|
|
|
return False
|
|
|
|
|
ackDataPayloadLength, = unpack('>L', ackData[16:20])
|
|
|
|
|
if len(ackData) - 24 != ackDataPayloadLength:
|
|
|
|
|
logger.info('ackData payload length doesn\'t match the payload length specified in the header. Not sending ackdata.')
|
|
|
|
|
logger.debug('ackData payload length doesn\'t match the payload length specified in the header. Not sending ackdata.')
|
|
|
|
|
return False
|
|
|
|
|
if ackData[4:16] != 'getpubkey\x00\x00\x00' and ackData[4:16] != 'pubkey\x00\x00\x00\x00\x00\x00' and ackData[4:16] != 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00' and ackData[4:16] != 'broadcast\x00\x00\x00':
|
|
|
|
|
return False
|
|
|
|
@ -1211,14 +1211,14 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
# For address versions <= 3, we wait on a key with the correct ripe hash
|
|
|
|
|
if ripe != None:
|
|
|
|
|
if ripe in shared.neededPubkeys:
|
|
|
|
|
logger.info('We have been awaiting the arrival of this pubkey.')
|
|
|
|
|
logger.debug('We have been awaiting the arrival of this pubkey.')
|
|
|
|
|
del shared.neededPubkeys[ripe]
|
|
|
|
|
sqlExecute(
|
|
|
|
|
'''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''',
|
|
|
|
|
ripe)
|
|
|
|
|
shared.workerQueue.put(('sendmessage', ''))
|
|
|
|
|
else:
|
|
|
|
|
logger.info('We don\'t need this pub key. We didn\'t ask for it. Pubkey hash: %s'%ripe.encode('hex'))
|
|
|
|
|
logger.debug('We don\'t need this pub key. We didn\'t ask for it. Pubkey hash: %s'%ripe.encode('hex'))
|
|
|
|
|
# For address versions >= 4, we wait on a pubkey with the correct tag.
|
|
|
|
|
# Let us create the tag from the address and see if we were waiting
|
|
|
|
|
# for it.
|
|
|
|
@ -1227,7 +1227,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
tag = hashlib.sha512(hashlib.sha512(encodeVarint(
|
|
|
|
|
addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()[32:]
|
|
|
|
|
if tag in shared.neededPubkeys:
|
|
|
|
|
logger.info('We have been awaiting the arrival of this pubkey.')
|
|
|
|
|
logger.debug('We have been awaiting the arrival of this pubkey.')
|
|
|
|
|
del shared.neededPubkeys[tag]
|
|
|
|
|
sqlExecute(
|
|
|
|
|
'''UPDATE sent SET status='doingmsgpow' WHERE toripe=? AND (status='awaitingpubkey' or status='doingpubkeypow') and folder='sent' ''',
|
|
|
|
@ -1241,7 +1241,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
return
|
|
|
|
|
# We must check to make sure the proof of work is sufficient.
|
|
|
|
|
if not self.isProofOfWorkSufficient(data):
|
|
|
|
|
logger.info('Proof of work in pubkey message insufficient.')
|
|
|
|
|
logger.debug('Proof of work in pubkey message insufficient.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
readPosition = 8 # for the nonce
|
|
|
|
@ -1256,11 +1256,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition += 4
|
|
|
|
|
|
|
|
|
|
if embeddedTime < int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys:
|
|
|
|
|
logger.info('The embedded time in this pubkey message is too old. Ignoring. Embedded time is: %s'%embeddedTime)
|
|
|
|
|
logger.debug('The embedded time in this pubkey message is too old. Ignoring. Embedded time is: %s'%str(embeddedTime))
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
if embeddedTime > int(time.time()) + 10800:
|
|
|
|
|
logger.info('The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.')
|
|
|
|
|
logger.debug('The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.')
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
addressVersion, varintLength = decodeVarint(
|
|
|
|
@ -1270,11 +1270,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
data[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
if self.streamNumber != streamNumber:
|
|
|
|
|
logger.info('stream number embedded in this pubkey doesn\'t match our stream number. Ignoring.')
|
|
|
|
|
logger.debug('stream number embedded in this pubkey doesn\'t match our stream number. Ignoring.')
|
|
|
|
|
return
|
|
|
|
|
if addressVersion >= 4:
|
|
|
|
|
tag = data[readPosition:readPosition + 32]
|
|
|
|
|
logger.info('tag in received pubkey is: %s'%tag.encode('hex'))
|
|
|
|
|
logger.debug('tag in received pubkey is: %s'%tag.encode('hex'))
|
|
|
|
|
else:
|
|
|
|
|
tag = ''
|
|
|
|
|
|
|
|
|
@ -1282,11 +1282,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
inventoryHash = calculateInventoryHash(data)
|
|
|
|
|
shared.inventoryLock.acquire()
|
|
|
|
|
if inventoryHash in shared.inventory:
|
|
|
|
|
logger.info('We have already received this pubkey. Ignoring it.')
|
|
|
|
|
logger.debug('We have already received this pubkey. Ignoring it.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
elif shared.isInSqlInventory(inventoryHash):
|
|
|
|
|
logger.info('We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
logger.debug('We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
objectType = 'pubkey'
|
|
|
|
@ -1305,10 +1305,10 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
|
|
|
|
(time.time() - self.pubkeyProcessingStartTime)
|
|
|
|
|
if sleepTime > 0 and doTimingAttackMitigation:
|
|
|
|
|
logger.info('Timing attack mitigation: Sleeping for %s seconds.'%sleepTime)
|
|
|
|
|
logger.debug('Timing attack mitigation: Sleeping for %s seconds.'%sleepTime)
|
|
|
|
|
|
|
|
|
|
time.sleep(sleepTime)
|
|
|
|
|
logger.info('Total pubkey processing time: %s seconds.'%(time.time() - self.pubkeyProcessingStartTime))
|
|
|
|
|
logger.debug('Total pubkey processing time: %s seconds.'%(time.time() - self.pubkeyProcessingStartTime))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def processpubkey(self, data):
|
|
|
|
@ -1330,14 +1330,14 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
data[readPosition:readPosition + 10])
|
|
|
|
|
readPosition += varintLength
|
|
|
|
|
if addressVersion == 0:
|
|
|
|
|
logger.info('(Within processpubkey) addressVersion of 0 doesn\'t make sense.')
|
|
|
|
|
logger.debug('(Within processpubkey) addressVersion of 0 doesn\'t make sense.')
|
|
|
|
|
return
|
|
|
|
|
if addressVersion > 4 or addressVersion == 1:
|
|
|
|
|
logger.info('This version of Bitmessage cannot handle version %s addresses.'%addressVersion)
|
|
|
|
|
logger.debug('This version of Bitmessage cannot handle version %s addresses.'%addressVersion)
|
|
|
|
|
return
|
|
|
|
|
if addressVersion == 2:
|
|
|
|
|
if len(data) < 146: # sanity check. This is the minimum possible length.
|
|
|
|
|
logger.info('(within processpubkey) payloadLength less than 146. Sanity check failed.')
|
|
|
|
|
logger.debug('(within processpubkey) payloadLength less than 146. Sanity check failed.')
|
|
|
|
|
return
|
|
|
|
|
bitfieldBehaviors = data[readPosition:readPosition + 4]
|
|
|
|
|
readPosition += 4
|
|
|
|
@ -1348,7 +1348,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition += 64
|
|
|
|
|
publicEncryptionKey = data[readPosition:readPosition + 64]
|
|
|
|
|
if len(publicEncryptionKey) < 64:
|
|
|
|
|
logger.info('publicEncryptionKey length less than 64. Sanity check failed.')
|
|
|
|
|
logger.debug('publicEncryptionKey length less than 64. Sanity check failed.')
|
|
|
|
|
return
|
|
|
|
|
sha = hashlib.new('sha512')
|
|
|
|
|
sha.update(
|
|
|
|
@ -1358,19 +1358,19 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
ripe = ripeHasher.digest()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info('within recpubkey, addressVersion: %s streamNumber: %s'%(addressVersion, streamNumber))
|
|
|
|
|
logger.info('ripe %s'%ripe.encode('hex'))
|
|
|
|
|
logger.info('publicSigningKey in hex: %s'%publicSigningKey.encode('hex'))
|
|
|
|
|
logger.info('publicEncryptionKey in hex: %s'%publicEncryptionKey.encode('hex'))
|
|
|
|
|
logger.debug('within recpubkey, addressVersion: %s streamNumber: %s'%(addressVersion, streamNumber))
|
|
|
|
|
logger.debug('ripe %s'%ripe.encode('hex'))
|
|
|
|
|
logger.debug('publicSigningKey in hex: %s'%publicSigningKey.encode('hex'))
|
|
|
|
|
logger.debug('publicEncryptionKey in hex: %s'%publicEncryptionKey.encode('hex'))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
queryreturn = sqlQuery(
|
|
|
|
|
'''SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally='yes' ''', ripe, addressVersion)
|
|
|
|
|
if queryreturn != []: # if this pubkey is already in our database and if we have used it personally:
|
|
|
|
|
logger.info('We HAVE used this pubkey personally. Updating time.')
|
|
|
|
|
logger.debug('We HAVE used this pubkey personally. Updating time.')
|
|
|
|
|
t = (ripe, addressVersion, data, embeddedTime, 'yes')
|
|
|
|
|
else:
|
|
|
|
|
logger.info('We have NOT used this pubkey personally. Inserting in database.')
|
|
|
|
|
logger.debug('We have NOT used this pubkey personally. Inserting in database.')
|
|
|
|
|
t = (ripe, addressVersion, data, embeddedTime, 'no')
|
|
|
|
|
# This will also update the embeddedTime.
|
|
|
|
|
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
|
|
|
|
@ -1378,7 +1378,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
self.possibleNewPubkey(ripe = ripe)
|
|
|
|
|
if addressVersion == 3:
|
|
|
|
|
if len(data) < 170: # sanity check.
|
|
|
|
|
logger.info('(within processpubkey) payloadLength less than 170. Sanity check failed.')
|
|
|
|
|
logger.debug('(within processpubkey) payloadLength less than 170. Sanity check failed.')
|
|
|
|
|
return
|
|
|
|
|
bitfieldBehaviors = data[readPosition:readPosition + 4]
|
|
|
|
|
readPosition += 4
|
|
|
|
@ -1402,11 +1402,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
signature = data[readPosition:readPosition + signatureLength]
|
|
|
|
|
try:
|
|
|
|
|
if not highlevelcrypto.verify(data[8:endOfSignedDataPosition], signature, publicSigningKey.encode('hex')):
|
|
|
|
|
logger.info('ECDSA verify failed (within processpubkey)')
|
|
|
|
|
logger.debug('ECDSA verify failed (within processpubkey)')
|
|
|
|
|
return
|
|
|
|
|
logger.info('ECDSA verify passed (within processpubkey)')
|
|
|
|
|
logger.debug('ECDSA verify passed (within processpubkey)')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('ECDSA verify failed (within processpubkey) %s'%err)
|
|
|
|
|
logger.debug('ECDSA verify failed (within processpubkey) %s'%err)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
sha = hashlib.new('sha512')
|
|
|
|
@ -1416,17 +1416,17 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
ripe = ripeHasher.digest()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info('within recpubkey, addressVersion: %s streamNumber: %s'%(addressVersion, streamNumber))
|
|
|
|
|
logger.info('ripe %s'%ripe.encode('hex'))
|
|
|
|
|
logger.info('publicSigningKey in hex: %s'%publicSigningKey.encode('hex'))
|
|
|
|
|
logger.info('publicEncryptionKey in hex: %s'%publicEncryptionKey.encode('hex'))
|
|
|
|
|
logger.debug('within recpubkey, addressVersion: %s streamNumber: %s'%(addressVersion, streamNumber))
|
|
|
|
|
logger.debug('ripe %s'%ripe.encode('hex'))
|
|
|
|
|
logger.debug('publicSigningKey in hex: %s'%publicSigningKey.encode('hex'))
|
|
|
|
|
logger.debug('publicEncryptionKey in hex: %s'%publicEncryptionKey.encode('hex'))
|
|
|
|
|
|
|
|
|
|
queryreturn = sqlQuery('''SELECT usedpersonally FROM pubkeys WHERE hash=? AND addressversion=? AND usedpersonally='yes' ''', ripe, addressVersion)
|
|
|
|
|
if queryreturn != []: # if this pubkey is already in our database and if we have used it personally:
|
|
|
|
|
logger.info('We HAVE used this pubkey personally. Updating time.')
|
|
|
|
|
logger.debug('We HAVE used this pubkey personally. Updating time.')
|
|
|
|
|
t = (ripe, addressVersion, data, embeddedTime, 'yes')
|
|
|
|
|
else:
|
|
|
|
|
logger.info('We have NOT used this pubkey personally. Inserting in database.')
|
|
|
|
|
logger.debug('We have NOT used this pubkey personally. Inserting in database.')
|
|
|
|
|
t = (ripe, addressVersion, data, embeddedTime, 'no')
|
|
|
|
|
# This will also update the embeddedTime.
|
|
|
|
|
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
|
|
|
|
@ -1434,14 +1434,14 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
if addressVersion == 4:
|
|
|
|
|
if len(data) < 350: # sanity check.
|
|
|
|
|
logger.info('(within processpubkey) payloadLength less than 350. Sanity check failed.')
|
|
|
|
|
logger.debug('(within processpubkey) payloadLength less than 350. Sanity check failed.')
|
|
|
|
|
return
|
|
|
|
|
signedData = data[8:readPosition] # Used only for v4 or higher pubkeys
|
|
|
|
|
tag = data[readPosition:readPosition + 32]
|
|
|
|
|
readPosition += 32
|
|
|
|
|
encryptedData = data[readPosition:]
|
|
|
|
|
if tag not in shared.neededPubkeys:
|
|
|
|
|
logger.info('We don\'t need this v4 pubkey. We didn\'t ask for it.')
|
|
|
|
|
logger.debug('We don\'t need this v4 pubkey. We didn\'t ask for it.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Let us try to decrypt the pubkey
|
|
|
|
@ -1451,7 +1451,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
except:
|
|
|
|
|
# Someone must have encrypted some data with a different key
|
|
|
|
|
# but tagged it with a tag for which we are watching.
|
|
|
|
|
logger.info('Pubkey decryption was unsuccessful.')
|
|
|
|
|
logger.debug('Pubkey decryption was unsuccessful.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -1478,11 +1478,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
signature = decryptedData[readPosition:readPosition + signatureLength]
|
|
|
|
|
try:
|
|
|
|
|
if not highlevelcrypto.verify(signedData, signature, publicSigningKey.encode('hex')):
|
|
|
|
|
logger.info('ECDSA verify failed (within processpubkey)')
|
|
|
|
|
logger.debug('ECDSA verify failed (within processpubkey)')
|
|
|
|
|
return
|
|
|
|
|
logger.info('ECDSA verify passed (within processpubkey)')
|
|
|
|
|
logger.debug('ECDSA verify passed (within processpubkey)')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
logger.info('ECDSA verify failed (within processpubkey) %s'%err)
|
|
|
|
|
logger.debug('ECDSA verify failed (within processpubkey) %s'%err)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
sha = hashlib.new('sha512')
|
|
|
|
@ -1494,16 +1494,16 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
# We need to make sure that the tag on the outside of the encryption
|
|
|
|
|
# is the one generated from hashing these particular keys.
|
|
|
|
|
if tag != hashlib.sha512(hashlib.sha512(encodeVarint(addressVersion) + encodeVarint(streamNumber) + ripe).digest()).digest()[32:]:
|
|
|
|
|
logger.info('Someone was trying to act malicious: tag doesn\'t match the keys in this pubkey message. Ignoring it.')
|
|
|
|
|
logger.debug('Someone was trying to act malicious: tag doesn\'t match the keys in this pubkey message. Ignoring it.')
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
logger.info('Tag successfully matches keys in pubkey message') # testing. Will remove soon.
|
|
|
|
|
logger.debug('Tag successfully matches keys in pubkey message') # testing. Will remove soon.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info('within recpubkey, addressVersion: %s streamNumber: %s'%(addressVersion, streamNumber))
|
|
|
|
|
logger.info('ripe %s'%ripe.encode('hex'))
|
|
|
|
|
logger.info('publicSigningKey in hex: %s'%publicSigningKey.encode('hex'))
|
|
|
|
|
logger.info('publicEncryptionKey in hex: %s'%publicEncryptionKey.encode('hex'))
|
|
|
|
|
logger.debug('within recpubkey, addressVersion: %s streamNumber: %s'%(addressVersion, streamNumber))
|
|
|
|
|
logger.debug('ripe %s'%ripe.encode('hex'))
|
|
|
|
|
logger.debug('publicSigningKey in hex: %s'%publicSigningKey.encode('hex'))
|
|
|
|
|
logger.debug('publicEncryptionKey in hex: %s'%publicEncryptionKey.encode('hex'))
|
|
|
|
|
|
|
|
|
|
t = (ripe, addressVersion, signedData, embeddedTime, 'yes')
|
|
|
|
|
sqlExecute('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''', *t)
|
|
|
|
@ -1518,10 +1518,10 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
# We have received a getpubkey message
|
|
|
|
|
def recgetpubkey(self, data):
|
|
|
|
|
if not self.isProofOfWorkSufficient(data):
|
|
|
|
|
logger.info('Proof of work in getpubkey message insufficient.')
|
|
|
|
|
logger.debug('Proof of work in getpubkey message insufficient.')
|
|
|
|
|
return
|
|
|
|
|
if len(data) < 34:
|
|
|
|
|
logger.info('getpubkey message doesn\'t contain enough data. Ignoring.')
|
|
|
|
|
logger.debug('getpubkey message doesn\'t contain enough data. Ignoring.')
|
|
|
|
|
return
|
|
|
|
|
readPosition = 8 # bypass the nonce
|
|
|
|
|
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])
|
|
|
|
@ -1535,10 +1535,10 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition += 4
|
|
|
|
|
|
|
|
|
|
if embeddedTime > int(time.time()) + 10800:
|
|
|
|
|
logger.info('The time in this getpubkey message is too new. Ignoring it. Time: %s'%embeddedTime)
|
|
|
|
|
logger.debug('The time in this getpubkey message is too new. Ignoring it. Time: %s'%embeddedTime)
|
|
|
|
|
return
|
|
|
|
|
if embeddedTime < int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept:
|
|
|
|
|
logger.info('The time in this getpubkey message is too old. Ignoring it. Time: %s'%embeddedTime)
|
|
|
|
|
logger.debug('The time in this getpubkey message is too old. Ignoring it. Time: %s'%embeddedTime)
|
|
|
|
|
return
|
|
|
|
|
requestedAddressVersionNumber, addressVersionLength = decodeVarint(
|
|
|
|
|
data[readPosition:readPosition + 10])
|
|
|
|
@ -1546,7 +1546,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
streamNumber, streamNumberLength = decodeVarint(
|
|
|
|
|
data[readPosition:readPosition + 10])
|
|
|
|
|
if streamNumber != self.streamNumber:
|
|
|
|
|
logger.info('The streamNumber %s doesn\'t match our stream number: %s'%(streamNumber, self.streamNumber))
|
|
|
|
|
logger.debug('The streamNumber %s doesn\'t match our stream number: %s'%(streamNumber, self.streamNumber))
|
|
|
|
|
return
|
|
|
|
|
readPosition += streamNumberLength
|
|
|
|
|
|
|
|
|
@ -1554,11 +1554,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
inventoryHash = calculateInventoryHash(data)
|
|
|
|
|
shared.inventoryLock.acquire()
|
|
|
|
|
if inventoryHash in shared.inventory:
|
|
|
|
|
logger.info('We have already received this getpubkey request. Ignoring it.')
|
|
|
|
|
logger.debug('We have already received this getpubkey request. Ignoring it.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
elif shared.isInSqlInventory(inventoryHash):
|
|
|
|
|
logger.info('We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
logger.debug('We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.')
|
|
|
|
|
shared.inventoryLock.release()
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
@ -1571,43 +1571,43 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
self.broadcastinv(inventoryHash)
|
|
|
|
|
|
|
|
|
|
if requestedAddressVersionNumber == 0:
|
|
|
|
|
logger.info('The requestedAddressVersionNumber of the pubkey request is zero. That doesn\'t make any sense. Ignoring it.')
|
|
|
|
|
logger.debug('The requestedAddressVersionNumber of the pubkey request is zero. That doesn\'t make any sense. Ignoring it.')
|
|
|
|
|
return
|
|
|
|
|
elif requestedAddressVersionNumber == 1:
|
|
|
|
|
logger.info('The requestedAddressVersionNumber of the pubkey request is 1 which isn\'t supported anymore. Ignoring it.')
|
|
|
|
|
logger.debug('The requestedAddressVersionNumber of the pubkey request is 1 which isn\'t supported anymore. Ignoring it.')
|
|
|
|
|
return
|
|
|
|
|
elif requestedAddressVersionNumber > 4:
|
|
|
|
|
logger.info('The requestedAddressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.')
|
|
|
|
|
logger.debug('The requestedAddressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
myAddress = ''
|
|
|
|
|
if requestedAddressVersionNumber <= 3 :
|
|
|
|
|
requestedHash = data[readPosition:readPosition + 20]
|
|
|
|
|
if len(requestedHash) != 20:
|
|
|
|
|
logger.info('The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.')
|
|
|
|
|
logger.debug('The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.')
|
|
|
|
|
return
|
|
|
|
|
logger.info('the hash requested in this getpubkey request is: %s'%requestedHash.encode('hex'))
|
|
|
|
|
logger.debug('the hash requested in this getpubkey request is: %s'%requestedHash.encode('hex'))
|
|
|
|
|
if requestedHash in shared.myAddressesByHash: # if this address hash is one of mine
|
|
|
|
|
myAddress = shared.myAddressesByHash[requestedHash]
|
|
|
|
|
elif requestedAddressVersionNumber >= 4:
|
|
|
|
|
requestedTag = data[readPosition:readPosition + 32]
|
|
|
|
|
if len(requestedTag) != 32:
|
|
|
|
|
logger.info('The length of the requested tag is not 32 bytes. Something is wrong. Ignoring.')
|
|
|
|
|
logger.debug('The length of the requested tag is not 32 bytes. Something is wrong. Ignoring.')
|
|
|
|
|
return
|
|
|
|
|
logger.info('the tag requested in this getpubkey request is: %s'%requestedTag.encode('hex'))
|
|
|
|
|
logger.debug('the tag requested in this getpubkey request is: %s'%requestedTag.encode('hex'))
|
|
|
|
|
if requestedTag in shared.myAddressesByTag:
|
|
|
|
|
|
|
|
|
|
myAddress = shared.myAddressesByTag[requestedTag]
|
|
|
|
|
|
|
|
|
|
if myAddress == '':
|
|
|
|
|
logger.info('This getpubkey request is not for any of my keys.')
|
|
|
|
|
logger.debug('This getpubkey request is not for any of my keys.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if decodeAddress(myAddress)[1] != requestedAddressVersionNumber:
|
|
|
|
|
logger.info('(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. They shouldn\'t have done that. Ignoring.\n')
|
|
|
|
|
logger.debug('(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. They shouldn\'t have done that. Ignoring.\n')
|
|
|
|
|
return
|
|
|
|
|
if shared.safeConfigGetBoolean(myAddress, 'chan'):
|
|
|
|
|
logger.info('Ignoring getpubkey request because it is for one of my chan addresses. The other party should already have the pubkey.')
|
|
|
|
|
logger.debug('Ignoring getpubkey request because it is for one of my chan addresses. The other party should already have the pubkey.')
|
|
|
|
|
return
|
|
|
|
|
try:
|
|
|
|
|
lastPubkeySendTime = int(shared.config.get(
|
|
|
|
@ -1615,10 +1615,10 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
except:
|
|
|
|
|
lastPubkeySendTime = 0
|
|
|
|
|
if lastPubkeySendTime > time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was more recent than 28 days ago...
|
|
|
|
|
logger.info('Found getpubkey-requested-item in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is: %s'%lastPubkeySendTime)
|
|
|
|
|
logger.debug('Found getpubkey-requested-item in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is: %s'%lastPubkeySendTime)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
logger.info('Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.')
|
|
|
|
|
logger.debug('Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.')
|
|
|
|
|
if requestedAddressVersionNumber == 2:
|
|
|
|
|
shared.workerQueue.put((
|
|
|
|
|
'doPOWForMyV2Pubkey', requestedHash))
|
|
|
|
@ -1636,28 +1636,28 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0:
|
|
|
|
|
for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items():
|
|
|
|
|
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value
|
|
|
|
|
logger.info('number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer: %s'%len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer))
|
|
|
|
|
logger.info('totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = %s'%totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers)
|
|
|
|
|
logger.debug('number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer: %s'%len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer))
|
|
|
|
|
logger.debug('totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = %s'%totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers)
|
|
|
|
|
|
|
|
|
|
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
|
|
|
|
|
if numberOfItemsInInv > 50000:
|
|
|
|
|
logger.error('Too many items in inv message!')
|
|
|
|
|
return
|
|
|
|
|
if len(data) < lengthOfVarint + (numberOfItemsInInv * 32):
|
|
|
|
|
logger.info('inv message doesn\'t contain enough data. Ignoring.')
|
|
|
|
|
logger.debug('inv message doesn\'t contain enough data. Ignoring.')
|
|
|
|
|
return
|
|
|
|
|
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
|
|
|
|
|
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
|
|
|
|
logger.info('We already have %s items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'%totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers)
|
|
|
|
|
logger.debug('We already have %s items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'%totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers)
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
|
|
|
|
|
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
|
|
|
|
shared.numberOfInventoryLookupsPerformed += 1
|
|
|
|
|
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
|
|
|
|
logger.info('Inventory (in memory) has inventory item already.')
|
|
|
|
|
logger.debug('Inventory (in memory) has inventory item already.')
|
|
|
|
|
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
|
|
|
|
|
logger.info('Inventory (SQL on disk) has inventory item already.')
|
|
|
|
|
logger.debug('Inventory (SQL on disk) has inventory item already.')
|
|
|
|
|
else:
|
|
|
|
|
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
|
|
|
|
|
else:
|
|
|
|
@ -1669,10 +1669,10 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
for i in range(numberOfItemsInInv):
|
|
|
|
|
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
|
|
|
|
objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber]
|
|
|
|
|
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
|
|
|
|
logger.debug('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
|
|
|
|
for item in objectsNewToMe:
|
|
|
|
|
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
|
|
|
|
logger.info('We already have %s items yet to retrieve from peers and over %s from this node in particular. Ignoring the rest of this inv message.'%(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers,len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
logger.debug('We already have %s items yet to retrieve from peers and over %s from this node in particular. Ignoring the rest of this inv message.'%(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers,len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
|
|
|
|
break
|
|
|
|
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
|
|
|
|
|
self.objectsThatWeHaveYetToGetFromThisPeer[item] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
|
|
|
@ -1683,7 +1683,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
# Send a getdata message to our peer to request the object with the given
|
|
|
|
|
# hash
|
|
|
|
|
def sendgetdata(self, hash):
|
|
|
|
|
logger.info('sending getdata to retrieve object with hash: %s'%hash.encode('hex'))
|
|
|
|
|
logger.debug('sending getdata to retrieve object with hash: %s'%hash.encode('hex'))
|
|
|
|
|
|
|
|
|
|
payload = '\x01' + hash
|
|
|
|
|
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
|
|
|
@ -1695,7 +1695,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
self.sock.sendall(headerData + payload)
|
|
|
|
|
except Exception as err:
|
|
|
|
|
# if not 'Bad file descriptor' in err:
|
|
|
|
|
logger.info('sock.sendall error: %s'%err)
|
|
|
|
|
logger.debug('sock.sendall error: %s'%err)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# We have received a getdata request from our peer
|
|
|
|
@ -1703,12 +1703,12 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
numberOfRequestedInventoryItems, lengthOfVarint = decodeVarint(
|
|
|
|
|
data[:10])
|
|
|
|
|
if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems):
|
|
|
|
|
logger.info('getdata message does not contain enough data. Ignoring.')
|
|
|
|
|
logger.debug('getdata message does not contain enough data. Ignoring.')
|
|
|
|
|
return
|
|
|
|
|
for i in xrange(numberOfRequestedInventoryItems):
|
|
|
|
|
hash = data[lengthOfVarint + (
|
|
|
|
|
i * 32):32 + lengthOfVarint + (i * 32)]
|
|
|
|
|
logger.info('received getdata request for item: %s'%hash.encode('hex'))
|
|
|
|
|
logger.debug('received getdata request for item: %s'%hash.encode('hex'))
|
|
|
|
|
|
|
|
|
|
shared.numberOfInventoryLookupsPerformed += 1
|
|
|
|
|
shared.inventoryLock.acquire()
|
|
|
|
@ -1727,29 +1727,29 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
objectType, payload = row
|
|
|
|
|
self.sendData(objectType, payload)
|
|
|
|
|
else:
|
|
|
|
|
logger.info('Someone asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. That shouldn\'t have happened.')
|
|
|
|
|
logger.debug('Someone asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. That shouldn\'t have happened.')
|
|
|
|
|
|
|
|
|
|
# Our peer has requested (in a getdata message) that we send an object.
|
|
|
|
|
def sendData(self, objectType, payload):
|
|
|
|
|
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
|
|
|
|
if objectType == 'pubkey':
|
|
|
|
|
logger.info('sending pubkey')
|
|
|
|
|
logger.debug('sending pubkey')
|
|
|
|
|
|
|
|
|
|
headerData += 'pubkey\x00\x00\x00\x00\x00\x00'
|
|
|
|
|
elif objectType == 'getpubkey' or objectType == 'pubkeyrequest':
|
|
|
|
|
logger.info('sending getpubkey')
|
|
|
|
|
logger.debug('sending getpubkey')
|
|
|
|
|
|
|
|
|
|
headerData += 'getpubkey\x00\x00\x00'
|
|
|
|
|
elif objectType == 'msg':
|
|
|
|
|
logger.info('sending msg')
|
|
|
|
|
logger.debug('sending msg')
|
|
|
|
|
|
|
|
|
|
headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
|
|
|
elif objectType == 'broadcast':
|
|
|
|
|
logger.info('sending broadcast')
|
|
|
|
|
logger.debug('sending broadcast')
|
|
|
|
|
|
|
|
|
|
headerData += 'broadcast\x00\x00\x00'
|
|
|
|
|
else:
|
|
|
|
|
logger.info('Error: sendData has been asked to send a strange objectType: %s\n' % str(objectType))
|
|
|
|
|
logger.debug('Error: sendData has been asked to send a strange objectType: %s\n' % str(objectType))
|
|
|
|
|
return
|
|
|
|
|
headerData += pack('>L', len(payload)) # payload length.
|
|
|
|
|
headerData += hashlib.sha512(payload).digest()[:4]
|
|
|
|
@ -1757,12 +1757,12 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
self.sock.sendall(headerData + payload)
|
|
|
|
|
except Exception as err:
|
|
|
|
|
# if not 'Bad file descriptor' in err:
|
|
|
|
|
logger.info('sock.sendall error: %s'%err)
|
|
|
|
|
logger.debug('sock.sendall error: %s'%err)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Advertise this object to all of our peers
|
|
|
|
|
def broadcastinv(self, hash):
|
|
|
|
|
logger.info('broadcasting inv with hash: %s'%hash.encode('hex'))
|
|
|
|
|
logger.debug('broadcasting inv with hash: %s'%hash.encode('hex'))
|
|
|
|
|
|
|
|
|
|
shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash))
|
|
|
|
|
|
|
|
|
@ -1774,18 +1774,18 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
data[:10])
|
|
|
|
|
|
|
|
|
|
if shared.verbose >= 1:
|
|
|
|
|
logger.info('addr message contains %s IP addresses.'%numberOfAddressesIncluded)
|
|
|
|
|
logger.debug('addr message contains %s IP addresses.'%numberOfAddressesIncluded)
|
|
|
|
|
|
|
|
|
|
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
|
|
|
|
return
|
|
|
|
|
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
|
|
|
|
logger.info('addr message does not contain the correct amount of data. Ignoring.')
|
|
|
|
|
logger.debug('addr message does not contain the correct amount of data. Ignoring.')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
for i in range(0, numberOfAddressesIncluded):
|
|
|
|
|
try:
|
|
|
|
|
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
|
|
|
|
logger.info('Skipping IPv6 address. %s'%(repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])))
|
|
|
|
|
logger.debug('Skipping IPv6 address. %s'%(repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])))
|
|
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
except Exception as err:
|
|
|
|
@ -1824,13 +1824,13 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)])
|
|
|
|
|
# print 'hostFromAddrMessage', hostFromAddrMessage
|
|
|
|
|
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F':
|
|
|
|
|
logger.info('Ignoring IP address in loopback range: %s'%hostFromAddrMessage)
|
|
|
|
|
logger.debug('Ignoring IP address in loopback range: %s'%hostFromAddrMessage)
|
|
|
|
|
continue
|
|
|
|
|
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A':
|
|
|
|
|
logger.info('Ignoring IP address in private range: %s'%hostFromAddrMessage)
|
|
|
|
|
logger.debug('Ignoring IP address in private range: %s'%hostFromAddrMessage)
|
|
|
|
|
continue
|
|
|
|
|
if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8':
|
|
|
|
|
logger.info('Ignoring IP address in private range: %s'%hostFromAddrMessage)
|
|
|
|
|
logger.debug('Ignoring IP address in private range: %s'%hostFromAddrMessage)
|
|
|
|
|
continue
|
|
|
|
|
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
|
|
|
|
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
|
|
|
@ -1845,7 +1845,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
|
|
|
|
timeSomeoneElseReceivedMessageFromThisNode)
|
|
|
|
|
shared.knownNodesLock.release()
|
|
|
|
|
logger.info('added new node %s to knownNodes in stream %s'%(peerFromAddrMessage, recaddrStream))
|
|
|
|
|
logger.debug('added new node %s to knownNodes in stream %s'%(peerFromAddrMessage, recaddrStream))
|
|
|
|
|
|
|
|
|
|
shared.needToWriteKnownNodesToDisk = True
|
|
|
|
|
hostDetails = (
|
|
|
|
@ -1864,7 +1864,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
#if listOfAddressDetailsToBroadcastToPeers != []:
|
|
|
|
|
# self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
|
|
|
|
logger.info('knownNodes currently has %s nodes for this stream.'%len(shared.knownNodes[self.streamNumber]))
|
|
|
|
|
logger.debug('knownNodes currently has %s nodes for this stream.'%len(shared.knownNodes[self.streamNumber]))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Function runs when we want to broadcast an addr message to all of our
|
|
|
|
@ -1981,11 +1981,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
try:
|
|
|
|
|
self.sock.sendall(datatosend)
|
|
|
|
|
if shared.verbose >= 1:
|
|
|
|
|
logger.info('Sending addr with %s entries.'%numberOfAddressesInAddrMessage)
|
|
|
|
|
logger.debug('Sending addr with %s entries.'%numberOfAddressesInAddrMessage)
|
|
|
|
|
|
|
|
|
|
except Exception as err:
|
|
|
|
|
# if not 'Bad file descriptor' in err:
|
|
|
|
|
logger.info('sock.sendall error: %s'%err)
|
|
|
|
|
logger.debug('sock.sendall error: %s'%err)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# We have received a version message
|
|
|
|
@ -1997,7 +1997,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
|
|
|
|
if self.remoteProtocolVersion <= 1:
|
|
|
|
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
|
|
|
|
logger.info('Closing connection to old protocol version 1 node: %s'%self.peer)
|
|
|
|
|
logger.debug('Closing connection to old protocol version 1 node: %s'%self.peer)
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
# print 'remoteProtocolVersion', self.remoteProtocolVersion
|
|
|
|
@ -2015,11 +2015,11 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
|
|
|
|
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
|
|
|
|
|
data[readPosition:])
|
|
|
|
|
logger.info('Remote node useragent: %s stream number: %s'%(useragent,self.streamNumber))
|
|
|
|
|
logger.debug('Remote node useragent: %s stream number: %s'%(useragent,self.streamNumber))
|
|
|
|
|
|
|
|
|
|
if self.streamNumber != 1:
|
|
|
|
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
|
|
|
|
logger.info('Closed connection to %s because they are interested in stream %s.'%(self.peer, self.streamNumber))
|
|
|
|
|
logger.debug('Closed connection to %s because they are interested in stream %s.'%(self.peer, self.streamNumber))
|
|
|
|
|
return
|
|
|
|
|
shared.connectedHostsList[
|
|
|
|
|
self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
|
|
|
@ -2030,7 +2030,7 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
0, 'setStreamNumber', (self.peer, self.streamNumber)))
|
|
|
|
|
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
|
|
|
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
|
|
|
|
logger.info('Closing connection to myself: %s'%self.peer)
|
|
|
|
|
logger.debug('Closing connection to myself: %s'%self.peer)
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
shared.broadcastToSendDataQueues((0, 'setRemoteProtocolVersion', (
|
|
|
|
@ -2047,29 +2047,26 @@ class receiveDataThread(threading.Thread):
|
|
|
|
|
|
|
|
|
|
# Sends a version message
|
|
|
|
|
def sendversion(self):
|
|
|
|
|
logger.info('Sending version message')
|
|
|
|
|
logger.debug('Sending version message')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
self.sock.sendall(shared.assembleVersionMessage(
|
|
|
|
|
self.peer.host, self.peer.port, self.streamNumber))
|
|
|
|
|
except Exception as err:
|
|
|
|
|
# if not 'Bad file descriptor' in err:
|
|
|
|
|
logger.info('sock.sendall error: %s'%err)
|
|
|
|
|
logger.debug('sock.sendall error: %s'%err)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Sends a verack message
|
|
|
|
|
def sendverack(self):
|
|
|
|
|
logger.info('Sending verack')
|
|
|
|
|
logger.debug('Sending verack')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
self.sock.sendall(
|
|
|
|
|
'\xE9\xBE\xB4\xD9\x76\x65\x72\x61\x63\x6B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
|
|
|
|
except Exception as err:
|
|
|
|
|
# if not 'Bad file descriptor' in err:
|
|
|
|
|
logger.info('sock.sendall error: %s'%err)
|
|
|
|
|
|
|
|
|
|
# cf
|
|
|
|
|
# 83
|
|
|
|
|
logger.debug('sock.sendall error: %s'%err)
|
|
|
|
|
# e1
|
|
|
|
|
# 35
|
|
|
|
|
self.verackSent = True
|
|
|
|
|