Smarter advertisement of object hashes and peers #494
|
@ -46,6 +46,11 @@ if sys.platform == 'darwin':
|
||||||
|
|
||||||
def connectToStream(streamNumber):
|
def connectToStream(streamNumber):
|
||||||
selfInitiatedConnections[streamNumber] = {}
|
selfInitiatedConnections[streamNumber] = {}
|
||||||
|
shared.inventorySets[streamNumber] = set()
|
||||||
|
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
||||||
|
for row in queryData:
|
||||||
|
shared.inventorySets[streamNumber].add(row[0])
|
||||||
|
|
||||||
if sys.platform[0:3] == 'win':
|
if sys.platform[0:3] == 'win':
|
||||||
maximumNumberOfHalfOpenConnections = 9
|
maximumNumberOfHalfOpenConnections = 9
|
||||||
else:
|
else:
|
||||||
|
@ -705,10 +710,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
objectType = 'msg'
|
objectType = 'msg'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, toStreamNumber, encryptedPayload, int(time.time()))
|
objectType, toStreamNumber, encryptedPayload, int(time.time()))
|
||||||
|
shared.inventorySets[toStreamNumber].add(inventoryHash)
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
|
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
toStreamNumber, 'sendinv', inventoryHash))
|
toStreamNumber, 'advertiseobject', inventoryHash))
|
||||||
elif method == 'disseminatePubkey':
|
elif method == 'disseminatePubkey':
|
||||||
# The device issuing this command to PyBitmessage supplies a pubkey object to be
|
# The device issuing this command to PyBitmessage supplies a pubkey object to be
|
||||||
# disseminated to the rest of the Bitmessage network. PyBitmessage accepts this
|
# disseminated to the rest of the Bitmessage network. PyBitmessage accepts this
|
||||||
|
@ -741,10 +747,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
objectType = 'pubkey'
|
objectType = 'pubkey'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, pubkeyStreamNumber, payload, int(time.time()))
|
objectType, pubkeyStreamNumber, payload, int(time.time()))
|
||||||
|
shared.inventorySets[pubkeyStreamNumber].add(inventoryHash)
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
|
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
elif method == 'getMessageDataByDestinationHash':
|
elif method == 'getMessageDataByDestinationHash':
|
||||||
# Method will eventually be used by a particular Android app to
|
# Method will eventually be used by a particular Android app to
|
||||||
# select relevant messages. Do not yet add this to the api
|
# select relevant messages. Do not yet add this to the api
|
||||||
|
|
45
src/class_objectHashHolder.py
Normal file
45
src/class_objectHashHolder.py
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used
|
||||||
|
# by each sendDataThread. The sendDataThread uses it whenever it needs to
|
||||||
|
# advertise an object to peers in an inv message, or advertise a peer to other
|
||||||
|
# peers in an addr message. Instead of sending them out immediately, it must
|
||||||
|
# wait a random number of seconds for each connection so that different peers
|
||||||
|
# get different objects at different times. Thus an attacker who is
|
||||||
|
# connecting to many network nodes who receives a message first from Alice
|
||||||
|
# cannot be sure if Alice is the node who originated the message.
|
||||||
|
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
class objectHashHolder(threading.Thread):
|
||||||
|
def __init__(self, sendDataThreadMailbox):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.shutdown = False
|
||||||
|
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
|
||||||
|
self.collectionOfHashLists = {}
|
||||||
|
self.collectionOfPeerLists = {}
|
||||||
|
for i in range(10):
|
||||||
|
self.collectionOfHashLists[i] = []
|
||||||
|
self.collectionOfPeerLists[i] = []
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
iterator = 0
|
||||||
|
while not self.shutdown:
|
||||||
|
if len(self.collectionOfHashLists[iterator]) > 0:
|
||||||
|
self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfHashLists[iterator]))
|
||||||
|
self.collectionOfHashLists[iterator] = []
|
||||||
|
if len(self.collectionOfPeerLists[iterator]) > 0:
|
||||||
|
self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator]))
|
||||||
|
self.collectionOfPeerLists[iterator] = []
|
||||||
|
iterator += 1
|
||||||
|
iterator %= 10
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
def holdHash(self,hash):
|
||||||
|
self.collectionOfHashLists[random.randrange(0, 10)].append(hash)
|
||||||
|
|
||||||
|
def holdPeer(self,peerDetails):
|
||||||
|
self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.shutdown = True
|
|
@ -5,7 +5,6 @@ import threading
|
||||||
import shared
|
import shared
|
||||||
import hashlib
|
import hashlib
|
||||||
import socket
|
import socket
|
||||||
import pickle
|
|
||||||
import random
|
import random
|
||||||
from struct import unpack, pack
|
from struct import unpack, pack
|
||||||
import sys
|
import sys
|
||||||
|
@ -91,7 +90,6 @@ class receiveDataThread(threading.Thread):
|
||||||
del self.selfInitiatedConnections[self.streamNumber][self]
|
del self.selfInitiatedConnections[self.streamNumber][self]
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||||
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
|
@ -175,7 +173,6 @@ class receiveDataThread(threading.Thread):
|
||||||
if self.data == '':
|
if self.data == '':
|
||||||
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||||
shared.numberOfInventoryLookupsPerformed += 1
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
random.seed()
|
|
||||||
objectHash, = random.sample(
|
objectHash, = random.sample(
|
||||||
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
||||||
if objectHash in shared.inventory:
|
if objectHash in shared.inventory:
|
||||||
|
@ -264,16 +261,18 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sock.settimeout(
|
self.sock.settimeout(
|
||||||
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
||||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
remoteNodeSeenTime = shared.knownNodes[
|
|
||||||
self.streamNumber][self.peer]
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Connection fully established with', self.peer
|
print 'Connection fully established with', self.peer
|
||||||
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
||||||
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
||||||
print 'broadcasting addr from within connectionFullyEstablished function.'
|
print 'broadcasting addr from within connectionFullyEstablished function.'
|
||||||
|
|
||||||
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
#self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
||||||
self.peer.port)]) # This lets all of our peers know about this new node.
|
# self.remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
|
||||||
|
dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort)
|
||||||
|
shared.broadcastToSendDataQueues((
|
||||||
|
self.streamNumber, 'advertisepeer', dataToSend))
|
||||||
|
|
||||||
self.sendaddr() # This is one large addr message to this one peer.
|
self.sendaddr() # This is one large addr message to this one peer.
|
||||||
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
|
@ -298,11 +297,12 @@ class receiveDataThread(threading.Thread):
|
||||||
bigInvList[hash] = 0
|
bigInvList[hash] = 0
|
||||||
# We also have messages in our inventory in memory (which is a python
|
# We also have messages in our inventory in memory (which is a python
|
||||||
# dictionary). Let's fetch those too.
|
# dictionary). Let's fetch those too.
|
||||||
for hash, storedValue in shared.inventory.items():
|
with shared.inventoryLock:
|
||||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
for hash, storedValue in shared.inventory.items():
|
||||||
objectType, streamNumber, payload, receivedTime = storedValue
|
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers:
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
bigInvList[hash] = 0
|
if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers:
|
||||||
|
bigInvList[hash] = 0
|
||||||
numberOfObjectsInInvMessage = 0
|
numberOfObjectsInInvMessage = 0
|
||||||
payload = ''
|
payload = ''
|
||||||
# Now let us start appending all of these hashes together. They will be
|
# Now let us start appending all of these hashes together. They will be
|
||||||
|
@ -392,6 +392,7 @@ class receiveDataThread(threading.Thread):
|
||||||
objectType = 'broadcast'
|
objectType = 'broadcast'
|
||||||
shared.inventory[self.inventoryHash] = (
|
shared.inventory[self.inventoryHash] = (
|
||||||
objectType, self.streamNumber, data, embeddedTime)
|
objectType, self.streamNumber, data, embeddedTime)
|
||||||
|
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
|
||||||
shared.inventoryLock.release()
|
shared.inventoryLock.release()
|
||||||
self.broadcastinv(self.inventoryHash)
|
self.broadcastinv(self.inventoryHash)
|
||||||
shared.numberOfBroadcastsProcessed += 1
|
shared.numberOfBroadcastsProcessed += 1
|
||||||
|
@ -755,6 +756,7 @@ class receiveDataThread(threading.Thread):
|
||||||
objectType = 'msg'
|
objectType = 'msg'
|
||||||
shared.inventory[self.inventoryHash] = (
|
shared.inventory[self.inventoryHash] = (
|
||||||
objectType, self.streamNumber, data, embeddedTime)
|
objectType, self.streamNumber, data, embeddedTime)
|
||||||
|
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
|
||||||
shared.inventoryLock.release()
|
shared.inventoryLock.release()
|
||||||
self.broadcastinv(self.inventoryHash)
|
self.broadcastinv(self.inventoryHash)
|
||||||
shared.numberOfMessagesProcessed += 1
|
shared.numberOfMessagesProcessed += 1
|
||||||
|
@ -1153,6 +1155,7 @@ class receiveDataThread(threading.Thread):
|
||||||
objectType = 'pubkey'
|
objectType = 'pubkey'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, self.streamNumber, data, embeddedTime)
|
objectType, self.streamNumber, data, embeddedTime)
|
||||||
|
shared.inventorySets[self.streamNumber].add(inventoryHash)
|
||||||
shared.inventoryLock.release()
|
shared.inventoryLock.release()
|
||||||
self.broadcastinv(inventoryHash)
|
self.broadcastinv(inventoryHash)
|
||||||
shared.numberOfPubkeysProcessed += 1
|
shared.numberOfPubkeysProcessed += 1
|
||||||
|
@ -1348,6 +1351,7 @@ class receiveDataThread(threading.Thread):
|
||||||
objectType = 'getpubkey'
|
objectType = 'getpubkey'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, self.streamNumber, data, embeddedTime)
|
objectType, self.streamNumber, data, embeddedTime)
|
||||||
|
shared.inventorySets[self.streamNumber].add(inventoryHash)
|
||||||
shared.inventoryLock.release()
|
shared.inventoryLock.release()
|
||||||
# This getpubkey request is valid so far. Forward to peers.
|
# This getpubkey request is valid so far. Forward to peers.
|
||||||
self.broadcastinv(inventoryHash)
|
self.broadcastinv(inventoryHash)
|
||||||
|
@ -1442,18 +1446,10 @@ class receiveDataThread(threading.Thread):
|
||||||
# 'set' of objects we are aware of and a set of objects in this inv
|
# 'set' of objects we are aware of and a set of objects in this inv
|
||||||
# message so that we can diff one from the other cheaply.
|
# message so that we can diff one from the other cheaply.
|
||||||
startTime = time.time()
|
startTime = time.time()
|
||||||
currentInventoryList = set()
|
|
||||||
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''',
|
|
||||||
self.streamNumber)
|
|
||||||
for row in queryData:
|
|
||||||
currentInventoryList.add(row[0])
|
|
||||||
with shared.inventoryLock:
|
|
||||||
for objectHash, value in shared.inventory.items():
|
|
||||||
currentInventoryList.add(objectHash)
|
|
||||||
advertisedSet = set()
|
advertisedSet = set()
|
||||||
for i in range(numberOfItemsInInv):
|
for i in range(numberOfItemsInInv):
|
||||||
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
||||||
objectsNewToMe = advertisedSet - currentInventoryList
|
objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber]
|
||||||
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
||||||
for item in objectsNewToMe:
|
for item in objectsNewToMe:
|
||||||
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
||||||
|
@ -1500,11 +1496,14 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'received getdata request for item:', hash.encode('hex')
|
print 'received getdata request for item:', hash.encode('hex')
|
||||||
|
|
||||||
shared.numberOfInventoryLookupsPerformed += 1
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
|
shared.inventoryLock.acquire()
|
||||||
if hash in shared.inventory:
|
if hash in shared.inventory:
|
||||||
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
||||||
hash]
|
hash]
|
||||||
|
shared.inventoryLock.release()
|
||||||
self.sendData(objectType, payload)
|
self.sendData(objectType, payload)
|
||||||
else:
|
else:
|
||||||
|
shared.inventoryLock.release()
|
||||||
queryreturn = sqlQuery(
|
queryreturn = sqlQuery(
|
||||||
'''select objecttype, payload from inventory where hash=?''',
|
'''select objecttype, payload from inventory where hash=?''',
|
||||||
hash)
|
hash)
|
||||||
|
@ -1552,16 +1551,16 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
|
|
||||||
|
|
||||||
# Send an inv message with just one hash to all of our peers
|
# Advertise this object to all of our peers
|
||||||
def broadcastinv(self, hash):
|
def broadcastinv(self, hash):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'broadcasting inv with hash:', hash.encode('hex')
|
print 'broadcasting inv with hash:', hash.encode('hex')
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash))
|
shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash))
|
||||||
|
|
||||||
# We have received an addr message.
|
# We have received an addr message.
|
||||||
def recaddr(self, data):
|
def recaddr(self, data):
|
||||||
listOfAddressDetailsToBroadcastToPeers = []
|
#listOfAddressDetailsToBroadcastToPeers = []
|
||||||
numberOfAddressesIncluded = 0
|
numberOfAddressesIncluded = 0
|
||||||
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
||||||
data[:10])
|
data[:10])
|
||||||
|
@ -1570,227 +1569,113 @@ class receiveDataThread(threading.Thread):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
||||||
|
|
||||||
|
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||||
|
return
|
||||||
|
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
||||||
|
print 'addr message does not contain the correct amount of data. Ignoring.'
|
||||||
|
return
|
||||||
|
|
||||||
if self.remoteProtocolVersion == 1:
|
for i in range(0, numberOfAddressesIncluded):
|
||||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
try:
|
||||||
return
|
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
if len(data) != lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded):
|
|
||||||
print 'addr message does not contain the correct amount of data. Ignoring.'
|
|
||||||
return
|
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = False
|
|
||||||
for i in range(0, numberOfAddressesIncluded):
|
|
||||||
try:
|
|
||||||
if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
|
||||||
with shared.printLock:
|
|
||||||
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
|
|
||||||
|
|
||||||
continue
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + (
|
|
||||||
34 * i):8 + lengthOfNumberOfAddresses + (34 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
if recaddrStream == 0:
|
|
||||||
continue
|
continue
|
||||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
except Exception as err:
|
||||||
continue
|
with shared.printLock:
|
||||||
try:
|
sys.stderr.write(
|
||||||
recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + (
|
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
||||||
34 * i):16 + lengthOfNumberOfAddresses + (34 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + (
|
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
||||||
34 * i):34 + lengthOfNumberOfAddresses + (34 * i)])
|
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
if recaddrStream == 0:
|
||||||
# recaddrPort, ', i', i
|
continue
|
||||||
hostFromAddrMessage = socket.inet_ntoa(data[
|
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
||||||
28 + lengthOfNumberOfAddresses + (34 * i):32 + lengthOfNumberOfAddresses + (34 * i)])
|
continue
|
||||||
# print 'hostFromAddrMessage', hostFromAddrMessage
|
try:
|
||||||
if data[28 + lengthOfNumberOfAddresses + (34 * i)] == '\x7F':
|
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
||||||
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
continue
|
except Exception as err:
|
||||||
if helper_generic.isHostInPrivateIPRange(hostFromAddrMessage):
|
with shared.printLock:
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
sys.stderr.write(
|
||||||
continue
|
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
||||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I', data[lengthOfNumberOfAddresses + (
|
|
||||||
34 * i):4 + lengthOfNumberOfAddresses + (34 * i)]) # This is the 'time' value in the received addr message.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
|
||||||
shared.knownNodesLock.acquire()
|
try:
|
||||||
shared.knownNodes[recaddrStream] = {}
|
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
||||||
shared.knownNodesLock.release()
|
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
except Exception as err:
|
||||||
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
with shared.printLock:
|
||||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
sys.stderr.write(
|
||||||
shared.knownNodesLock.acquire()
|
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
|
||||||
shared.knownNodesLock.release()
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
needToWriteKnownNodesToDisk = True
|
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
||||||
hostDetails = (
|
# recaddrPort, ', i', i
|
||||||
timeSomeoneElseReceivedMessageFromThisNode,
|
hostFromAddrMessage = socket.inet_ntoa(data[
|
||||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
listOfAddressDetailsToBroadcastToPeers.append(
|
# print 'hostFromAddrMessage', hostFromAddrMessage
|
||||||
hostDetails)
|
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F':
|
||||||
else:
|
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
||||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
continue
|
||||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A':
|
||||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||||
shared.knownNodesLock.acquire()
|
continue
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8':
|
||||||
shared.knownNodesLock.release()
|
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
continue
|
||||||
|
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
||||||
|
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
||||||
|
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
shared.knownNodes[recaddrStream] = {}
|
||||||
pickle.dump(shared.knownNodes, output)
|
|
||||||
output.close()
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
self.broadcastaddr(
|
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
||||||
listOfAddressDetailsToBroadcastToPeers) # no longer broadcast
|
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
||||||
with shared.printLock:
|
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
|
||||||
|
|
||||||
elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
|
|
||||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
|
||||||
return
|
|
||||||
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
|
||||||
print 'addr message does not contain the correct amount of data. Ignoring.'
|
|
||||||
return
|
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = False
|
|
||||||
for i in range(0, numberOfAddressesIncluded):
|
|
||||||
try:
|
|
||||||
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
|
||||||
with shared.printLock:
|
|
||||||
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
|
|
||||||
continue
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
if recaddrStream == 0:
|
|
||||||
continue
|
|
||||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
|
||||||
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
|
||||||
# recaddrPort, ', i', i
|
|
||||||
hostFromAddrMessage = socket.inet_ntoa(data[
|
|
||||||
32 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
# print 'hostFromAddrMessage', hostFromAddrMessage
|
|
||||||
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x7F':
|
|
||||||
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
|
||||||
continue
|
|
||||||
if data[32 + lengthOfNumberOfAddresses + (38 * i)] == '\x0A':
|
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
|
||||||
continue
|
|
||||||
if data[32 + lengthOfNumberOfAddresses + (38 * i):34 + lengthOfNumberOfAddresses + (38 * i)] == '\xC0A8':
|
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
|
||||||
continue
|
|
||||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
|
||||||
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[recaddrStream] = {}
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
||||||
|
timeSomeoneElseReceivedMessageFromThisNode)
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
with shared.printLock:
|
||||||
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
|
||||||
shared.knownNodesLock.acquire()
|
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
|
||||||
timeSomeoneElseReceivedMessageFromThisNode)
|
|
||||||
shared.knownNodesLock.release()
|
|
||||||
with shared.printLock:
|
|
||||||
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
|
||||||
|
|
||||||
needToWriteKnownNodesToDisk = True
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
hostDetails = (
|
hostDetails = (
|
||||||
timeSomeoneElseReceivedMessageFromThisNode,
|
timeSomeoneElseReceivedMessageFromThisNode,
|
||||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
||||||
listOfAddressDetailsToBroadcastToPeers.append(
|
#listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
|
||||||
hostDetails)
|
shared.broadcastToSendDataQueues((
|
||||||
else:
|
self.streamNumber, 'advertisepeer', hostDetails))
|
||||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
else:
|
||||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||||
shared.knownNodesLock.acquire()
|
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
shared.knownNodesLock.release()
|
||||||
shared.knownNodesLock.acquire()
|
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
#if listOfAddressDetailsToBroadcastToPeers != []:
|
||||||
try:
|
# self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
||||||
pickle.dump(shared.knownNodes, output)
|
with shared.printLock:
|
||||||
output.close()
|
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
except Exception as err:
|
|
||||||
if "Errno 28" in str(err):
|
|
||||||
logger.fatal('(while receiveDataThread needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
|
|
||||||
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
|
|
||||||
if shared.daemon:
|
|
||||||
os._exit(0)
|
|
||||||
shared.knownNodesLock.release()
|
|
||||||
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
|
||||||
with shared.printLock:
|
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
|
||||||
|
|
||||||
|
|
||||||
# Function runs when we want to broadcast an addr message to all of our
|
# Function runs when we want to broadcast an addr message to all of our
|
||||||
# peers. Runs when we learn of nodes that we didn't previously know about
|
# peers. Runs when we learn of nodes that we didn't previously know about
|
||||||
# and want to share them with our peers.
|
# and want to share them with our peers.
|
||||||
def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers):
|
"""def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers):
|
||||||
numberOfAddressesInAddrMessage = len(
|
numberOfAddressesInAddrMessage = len(
|
||||||
listOfAddressDetailsToBroadcastToPeers)
|
listOfAddressDetailsToBroadcastToPeers)
|
||||||
payload = ''
|
payload = ''
|
||||||
|
@ -1816,7 +1701,7 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
self.streamNumber, 'sendaddr', datatosend))
|
self.streamNumber, 'sendaddr', datatosend))"""
|
||||||
|
|
||||||
# Send a big addr message to our peer
|
# Send a big addr message to our peer
|
||||||
def sendaddr(self):
|
def sendaddr(self):
|
||||||
|
@ -1831,7 +1716,6 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
if len(shared.knownNodes[self.streamNumber]) > 0:
|
||||||
for i in range(500):
|
for i in range(500):
|
||||||
random.seed()
|
|
||||||
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
|
@ -1839,7 +1723,6 @@ class receiveDataThread(threading.Thread):
|
||||||
self.streamNumber][peer]
|
self.streamNumber][peer]
|
||||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
random.seed()
|
|
||||||
peer, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
self.streamNumber * 2], 1)
|
self.streamNumber * 2], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
|
@ -1848,7 +1731,6 @@ class receiveDataThread(threading.Thread):
|
||||||
self.streamNumber * 2][peer]
|
self.streamNumber * 2][peer]
|
||||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
random.seed()
|
|
||||||
peer, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
(self.streamNumber * 2) + 1], 1)
|
(self.streamNumber * 2) + 1], 1)
|
||||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||||
|
@ -1967,10 +1849,8 @@ class receiveDataThread(threading.Thread):
|
||||||
self.peer, self.remoteProtocolVersion)))
|
self.peer, self.remoteProtocolVersion)))
|
||||||
|
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
pickle.dump(shared.knownNodes, output)
|
|
||||||
output.close()
|
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
|
|
||||||
self.sendverack()
|
self.sendverack()
|
||||||
|
|
|
@ -8,7 +8,8 @@ import random
|
||||||
import sys
|
import sys
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
#import bitmessagemain
|
from class_objectHashHolder import *
|
||||||
|
from addresses import *
|
||||||
|
|
||||||
# Every connection to a peer has a sendDataThread (and also a
|
# Every connection to a peer has a sendDataThread (and also a
|
||||||
# receiveDataThread).
|
# receiveDataThread).
|
||||||
|
@ -22,6 +23,9 @@ class sendDataThread(threading.Thread):
|
||||||
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
||||||
|
|
||||||
self.data = ''
|
self.data = ''
|
||||||
|
self.objectHashHolderInstance = objectHashHolder(self.mailbox)
|
||||||
|
self.objectHashHolderInstance.start()
|
||||||
|
|
||||||
|
|
||||||
def setup(
|
def setup(
|
||||||
self,
|
self,
|
||||||
|
@ -98,15 +102,31 @@ class sendDataThread(threading.Thread):
|
||||||
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
||||||
|
|
||||||
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
||||||
|
elif command == 'advertisepeer':
|
||||||
|
self.objectHashHolderInstance.holdPeer(data)
|
||||||
elif command == 'sendaddr':
|
elif command == 'sendaddr':
|
||||||
|
numberOfAddressesInAddrMessage = len(
|
||||||
|
data)
|
||||||
|
payload = ''
|
||||||
|
for hostDetails in data:
|
||||||
|
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
||||||
|
payload += pack(
|
||||||
|
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
|
||||||
|
payload += pack('>I', streamNumber)
|
||||||
|
payload += pack(
|
||||||
|
'>q', services) # service bit flags offered by this node
|
||||||
|
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
||||||
|
socket.inet_aton(host)
|
||||||
|
payload += pack('>H', port)
|
||||||
|
|
||||||
|
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
||||||
|
datatosend = '\xE9\xBE\xB4\xD9addr\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
datatosend = datatosend + pack('>L', len(payload)) # payload length
|
||||||
|
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
||||||
|
datatosend = datatosend + payload
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# To prevent some network analysis, 'leak' the data out
|
self.sock.sendall(datatosend)
|
||||||
# to our peer after waiting a random amount of time
|
|
||||||
# unless we have a long list of messages in our queue
|
|
||||||
# to send.
|
|
||||||
random.seed()
|
|
||||||
time.sleep(random.randrange(0, 10))
|
|
||||||
self.sock.sendall(data)
|
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'sendaddr: self.sock.sendall failed'
|
print 'sendaddr: self.sock.sendall failed'
|
||||||
|
@ -118,17 +138,19 @@ class sendDataThread(threading.Thread):
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.mailbox)
|
||||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
||||||
break
|
break
|
||||||
|
elif command == 'advertiseobject':
|
||||||
|
self.objectHashHolderInstance.holdHash(data)
|
||||||
elif command == 'sendinv':
|
elif command == 'sendinv':
|
||||||
if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
payload = ''
|
||||||
payload = '\x01' + data
|
for hash in data:
|
||||||
|
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
|
payload += hash
|
||||||
|
if payload != '':
|
||||||
|
payload = encodeVarint(len(payload)/32) + payload
|
||||||
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
||||||
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
headerData += pack('>L', len(payload))
|
headerData += pack('>L', len(payload))
|
||||||
headerData += hashlib.sha512(payload).digest()[:4]
|
headerData += hashlib.sha512(payload).digest()[:4]
|
||||||
# To prevent some network analysis, 'leak' the data out
|
|
||||||
# to our peer after waiting a random amount of time
|
|
||||||
random.seed()
|
|
||||||
time.sleep(random.randrange(0, 10))
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(headerData + payload)
|
self.sock.sendall(headerData + payload)
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
|
@ -167,4 +189,4 @@ class sendDataThread(threading.Thread):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
||||||
|
|
||||||
|
self.objectHashHolderInstance.close()
|
||||||
|
|
|
@ -2,11 +2,16 @@ import threading
|
||||||
import shared
|
import shared
|
||||||
import time
|
import time
|
||||||
import sys
|
import sys
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
import tr#anslate
|
||||||
from helper_sql import *
|
from helper_sql import *
|
||||||
|
from debug import logger
|
||||||
|
|
||||||
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
|
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
|
||||||
It cleans these data structures in memory:
|
It cleans these data structures in memory:
|
||||||
inventory (moves data to the on-disk sql database)
|
inventory (moves data to the on-disk sql database)
|
||||||
|
inventorySets (clears then reloads data out of sql database)
|
||||||
|
|
||||||
It cleans these tables on the disk:
|
It cleans these tables on the disk:
|
||||||
inventory (clears data more than 2 days and 12 hours old)
|
inventory (clears data more than 2 days and 12 hours old)
|
||||||
|
@ -31,19 +36,20 @@ class singleCleaner(threading.Thread):
|
||||||
shared.UISignalQueue.put((
|
shared.UISignalQueue.put((
|
||||||
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
|
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
|
||||||
|
|
||||||
with SqlBulkExecute() as sql:
|
with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
|
||||||
for hash, storedValue in shared.inventory.items():
|
with SqlBulkExecute() as sql:
|
||||||
objectType, streamNumber, payload, receivedTime = storedValue
|
for hash, storedValue in shared.inventory.items():
|
||||||
if int(time.time()) - 3600 > receivedTime:
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
sql.execute(
|
if int(time.time()) - 3600 > receivedTime:
|
||||||
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
|
sql.execute(
|
||||||
hash,
|
'''INSERT INTO inventory VALUES (?,?,?,?,?,?)''',
|
||||||
objectType,
|
hash,
|
||||||
streamNumber,
|
objectType,
|
||||||
payload,
|
streamNumber,
|
||||||
receivedTime,
|
payload,
|
||||||
'')
|
receivedTime,
|
||||||
del shared.inventory[hash]
|
'')
|
||||||
|
del shared.inventory[hash]
|
||||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
|
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
|
||||||
|
@ -109,4 +115,33 @@ class singleCleaner(threading.Thread):
|
||||||
shared.workerQueue.put(('sendmessage', ''))
|
shared.workerQueue.put(('sendmessage', ''))
|
||||||
shared.UISignalQueue.put((
|
shared.UISignalQueue.put((
|
||||||
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...'))
|
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...'))
|
||||||
|
|
||||||
|
# Let's also clear and reload shared.inventorySets to keep it from
|
||||||
|
# taking up an unnecessary amount of memory.
|
||||||
|
for streamNumber in shared.inventorySets:
|
||||||
|
shared.inventorySets[streamNumber] = set()
|
||||||
|
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
||||||
|
for row in queryData:
|
||||||
|
shared.inventorySets[streamNumber].add(row[0])
|
||||||
|
with shared.inventoryLock:
|
||||||
|
for hash, storedValue in shared.inventory.items():
|
||||||
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
|
if streamNumber in shared.inventorySets:
|
||||||
|
shared.inventorySets[streamNumber].add(hash)
|
||||||
|
|
||||||
|
# Let us write out the knowNodes to disk if there is anything new to write out.
|
||||||
|
if shared.needToWriteKnownNodesToDisk:
|
||||||
|
shared.knownNodesLock.acquire()
|
||||||
|
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||||
|
try:
|
||||||
|
pickle.dump(shared.knownNodes, output)
|
||||||
|
output.close()
|
||||||
|
except Exception as err:
|
||||||
|
if "Errno 28" in str(err):
|
||||||
|
logger.fatal('(while receiveDataThread shared.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
|
||||||
|
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
|
||||||
|
if shared.daemon:
|
||||||
|
os._exit(0)
|
||||||
|
shared.knownNodesLock.release()
|
||||||
|
shared.needToWriteKnownNodesToDisk = False
|
||||||
time.sleep(300)
|
time.sleep(300)
|
||||||
|
|
|
@ -151,12 +151,13 @@ class singleWorker(threading.Thread):
|
||||||
objectType = 'pubkey'
|
objectType = 'pubkey'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime)
|
objectType, streamNumber, payload, embeddedTime)
|
||||||
|
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||||
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
shared.config.set(
|
shared.config.set(
|
||||||
myAddress, 'lastpubkeysendtime', str(int(time.time())))
|
myAddress, 'lastpubkeysendtime', str(int(time.time())))
|
||||||
|
@ -224,12 +225,13 @@ class singleWorker(threading.Thread):
|
||||||
objectType = 'pubkey'
|
objectType = 'pubkey'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime)
|
objectType, streamNumber, payload, embeddedTime)
|
||||||
|
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||||
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
# If this is a chan address then we won't send out the pubkey over the
|
# If this is a chan address then we won't send out the pubkey over the
|
||||||
# network but rather will only store it in our pubkeys table so that
|
# network but rather will only store it in our pubkeys table so that
|
||||||
|
@ -327,10 +329,11 @@ class singleWorker(threading.Thread):
|
||||||
objectType = 'broadcast'
|
objectType = 'broadcast'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, int(time.time()))
|
objectType, streamNumber, payload, int(time.time()))
|
||||||
|
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex')
|
print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex')
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
|
|
||||||
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
|
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
|
||||||
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
||||||
|
@ -650,6 +653,7 @@ class singleWorker(threading.Thread):
|
||||||
objectType = 'msg'
|
objectType = 'msg'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, toStreamNumber, encryptedPayload, int(time.time()))
|
objectType, toStreamNumber, encryptedPayload, int(time.time()))
|
||||||
|
shared.inventorySets[toStreamNumber].add(inventoryHash)
|
||||||
if shared.safeConfigGetBoolean(toaddress, 'chan'):
|
if shared.safeConfigGetBoolean(toaddress, 'chan'):
|
||||||
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode(
|
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode(
|
||||||
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
||||||
|
@ -659,7 +663,7 @@ class singleWorker(threading.Thread):
|
||||||
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
||||||
print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex')
|
print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex')
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
|
|
||||||
# Update the status of the message in the 'sent' table to have a
|
# Update the status of the message in the 'sent' table to have a
|
||||||
# 'msgsent' status or 'msgsentnoackexpected' status.
|
# 'msgsent' status or 'msgsentnoackexpected' status.
|
||||||
|
@ -706,9 +710,10 @@ class singleWorker(threading.Thread):
|
||||||
objectType = 'getpubkey'
|
objectType = 'getpubkey'
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, int(time.time()))
|
objectType, streamNumber, payload, int(time.time()))
|
||||||
|
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||||
print 'sending inv (for the getpubkey message)'
|
print 'sending inv (for the getpubkey message)'
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'advertiseobject', inventoryHash))
|
||||||
|
|
||||||
sqlExecute(
|
sqlExecute(
|
||||||
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''',
|
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''',
|
||||||
|
|
|
@ -68,6 +68,8 @@ numberOfBroadcastsProcessed = 0
|
||||||
numberOfPubkeysProcessed = 0
|
numberOfPubkeysProcessed = 0
|
||||||
numberOfInventoryLookupsPerformed = 0
|
numberOfInventoryLookupsPerformed = 0
|
||||||
daemon = False
|
daemon = False
|
||||||
|
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
||||||
|
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
|
||||||
|
|
||||||
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
||||||
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
||||||
|
@ -303,7 +305,7 @@ def doCleanShutdown():
|
||||||
def broadcastToSendDataQueues(data):
|
def broadcastToSendDataQueues(data):
|
||||||
# logger.debug('running broadcastToSendDataQueues')
|
# logger.debug('running broadcastToSendDataQueues')
|
||||||
for q in sendDataQueues:
|
for q in sendDataQueues:
|
||||||
q.put((data))
|
q.put(data)
|
||||||
|
|
||||||
def flushInventory():
|
def flushInventory():
|
||||||
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
|
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
|
||||||
|
|
Reference in New Issue
Block a user