Smarter advertisement of object hashes and peers #494

Merged
Atheros1 merged 5 commits from master into master 2013-09-10 02:29:20 +02:00
7 changed files with 268 additions and 272 deletions

View File

@ -46,6 +46,11 @@ if sys.platform == 'darwin':
def connectToStream(streamNumber): def connectToStream(streamNumber):
selfInitiatedConnections[streamNumber] = {} selfInitiatedConnections[streamNumber] = {}
shared.inventorySets[streamNumber] = set()
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
for row in queryData:
shared.inventorySets[streamNumber].add(row[0])
if sys.platform[0:3] == 'win': if sys.platform[0:3] == 'win':
maximumNumberOfHalfOpenConnections = 9 maximumNumberOfHalfOpenConnections = 9
else: else:
@ -705,10 +710,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType = 'msg' objectType = 'msg'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, int(time.time())) objectType, toStreamNumber, encryptedPayload, int(time.time()))
shared.inventorySets[toStreamNumber].add(inventoryHash)
with shared.printLock: with shared.printLock:
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex') print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
toStreamNumber, 'sendinv', inventoryHash)) toStreamNumber, 'advertiseobject', inventoryHash))
elif method == 'disseminatePubkey': elif method == 'disseminatePubkey':
# The device issuing this command to PyBitmessage supplies a pubkey object to be # The device issuing this command to PyBitmessage supplies a pubkey object to be
# disseminated to the rest of the Bitmessage network. PyBitmessage accepts this # disseminated to the rest of the Bitmessage network. PyBitmessage accepts this
@ -741,10 +747,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType = 'pubkey' objectType = 'pubkey'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, pubkeyStreamNumber, payload, int(time.time())) objectType, pubkeyStreamNumber, payload, int(time.time()))
shared.inventorySets[pubkeyStreamNumber].add(inventoryHash)
with shared.printLock: with shared.printLock:
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex') print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
elif method == 'getMessageDataByDestinationHash': elif method == 'getMessageDataByDestinationHash':
# Method will eventually be used by a particular Android app to # Method will eventually be used by a particular Android app to
# select relevant messages. Do not yet add this to the api # select relevant messages. Do not yet add this to the api

View File

@ -0,0 +1,45 @@
# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used
# by each sendDataThread. The sendDataThread uses it whenever it needs to
# advertise an object to peers in an inv message, or advertise a peer to other
# peers in an addr message. Instead of sending them out immediately, it must
# wait a random number of seconds for each connection so that different peers
# get different objects at different times. Thus an attacker who is
# connecting to many network nodes who receives a message first from Alice
# cannot be sure if Alice is the node who originated the message.
import random
import time
import threading
class objectHashHolder(threading.Thread):
def __init__(self, sendDataThreadMailbox):
threading.Thread.__init__(self)
self.shutdown = False
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
self.collectionOfHashLists = {}
self.collectionOfPeerLists = {}
for i in range(10):
self.collectionOfHashLists[i] = []
self.collectionOfPeerLists[i] = []
def run(self):
iterator = 0
while not self.shutdown:
if len(self.collectionOfHashLists[iterator]) > 0:
self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfHashLists[iterator]))
self.collectionOfHashLists[iterator] = []
if len(self.collectionOfPeerLists[iterator]) > 0:
self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator]))
self.collectionOfPeerLists[iterator] = []
iterator += 1
iterator %= 10
time.sleep(1)
def holdHash(self,hash):
self.collectionOfHashLists[random.randrange(0, 10)].append(hash)
def holdPeer(self,peerDetails):
self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails)
def close(self):
self.shutdown = True

View File

@ -5,7 +5,6 @@ import threading
import shared import shared
import hashlib import hashlib
import socket import socket
import pickle
import random import random
from struct import unpack, pack from struct import unpack, pack
import sys import sys
@ -91,7 +90,6 @@ class receiveDataThread(threading.Thread):
del self.selfInitiatedConnections[self.streamNumber][self] del self.selfInitiatedConnections[self.streamNumber][self]
with shared.printLock: with shared.printLock:
print 'removed self (a receiveDataThread) from selfInitiatedConnections' print 'removed self (a receiveDataThread) from selfInitiatedConnections'
except: except:
pass pass
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
@ -175,7 +173,6 @@ class receiveDataThread(threading.Thread):
if self.data == '': if self.data == '':
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
shared.numberOfInventoryLookupsPerformed += 1 shared.numberOfInventoryLookupsPerformed += 1
random.seed()
objectHash, = random.sample( objectHash, = random.sample(
self.objectsThatWeHaveYetToGetFromThisPeer, 1) self.objectsThatWeHaveYetToGetFromThisPeer, 1)
if objectHash in shared.inventory: if objectHash in shared.inventory:
@ -264,16 +261,18 @@ class receiveDataThread(threading.Thread):
self.sock.settimeout( self.sock.settimeout(
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately. 600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
remoteNodeSeenTime = shared.knownNodes[
self.streamNumber][self.peer]
with shared.printLock: with shared.printLock:
print 'Connection fully established with', self.peer print 'Connection fully established with', self.peer
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList) print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues) print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
print 'broadcasting addr from within connectionFullyEstablished function.' print 'broadcasting addr from within connectionFullyEstablished function.'
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host, #self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
self.peer.port)]) # This lets all of our peers know about this new node. # self.remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort)
shared.broadcastToSendDataQueues((
self.streamNumber, 'advertisepeer', dataToSend))
self.sendaddr() # This is one large addr message to this one peer. self.sendaddr() # This is one large addr message to this one peer.
if not self.initiatedConnection and len(shared.connectedHostsList) > 200: if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
with shared.printLock: with shared.printLock:
@ -298,6 +297,7 @@ class receiveDataThread(threading.Thread):
bigInvList[hash] = 0 bigInvList[hash] = 0
# We also have messages in our inventory in memory (which is a python # We also have messages in our inventory in memory (which is a python
# dictionary). Let's fetch those too. # dictionary). Let's fetch those too.
with shared.inventoryLock:
for hash, storedValue in shared.inventory.items(): for hash, storedValue in shared.inventory.items():
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
objectType, streamNumber, payload, receivedTime = storedValue objectType, streamNumber, payload, receivedTime = storedValue
@ -392,6 +392,7 @@ class receiveDataThread(threading.Thread):
objectType = 'broadcast' objectType = 'broadcast'
shared.inventory[self.inventoryHash] = ( shared.inventory[self.inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime) objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
shared.inventoryLock.release() shared.inventoryLock.release()
self.broadcastinv(self.inventoryHash) self.broadcastinv(self.inventoryHash)
shared.numberOfBroadcastsProcessed += 1 shared.numberOfBroadcastsProcessed += 1
@ -755,6 +756,7 @@ class receiveDataThread(threading.Thread):
objectType = 'msg' objectType = 'msg'
shared.inventory[self.inventoryHash] = ( shared.inventory[self.inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime) objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
shared.inventoryLock.release() shared.inventoryLock.release()
self.broadcastinv(self.inventoryHash) self.broadcastinv(self.inventoryHash)
shared.numberOfMessagesProcessed += 1 shared.numberOfMessagesProcessed += 1
@ -1153,6 +1155,7 @@ class receiveDataThread(threading.Thread):
objectType = 'pubkey' objectType = 'pubkey'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime) objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(inventoryHash)
shared.inventoryLock.release() shared.inventoryLock.release()
self.broadcastinv(inventoryHash) self.broadcastinv(inventoryHash)
shared.numberOfPubkeysProcessed += 1 shared.numberOfPubkeysProcessed += 1
@ -1348,6 +1351,7 @@ class receiveDataThread(threading.Thread):
objectType = 'getpubkey' objectType = 'getpubkey'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime) objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(inventoryHash)
shared.inventoryLock.release() shared.inventoryLock.release()
# This getpubkey request is valid so far. Forward to peers. # This getpubkey request is valid so far. Forward to peers.
self.broadcastinv(inventoryHash) self.broadcastinv(inventoryHash)
@ -1442,18 +1446,10 @@ class receiveDataThread(threading.Thread):
# 'set' of objects we are aware of and a set of objects in this inv # 'set' of objects we are aware of and a set of objects in this inv
# message so that we can diff one from the other cheaply. # message so that we can diff one from the other cheaply.
startTime = time.time() startTime = time.time()
currentInventoryList = set()
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''',
self.streamNumber)
for row in queryData:
currentInventoryList.add(row[0])
with shared.inventoryLock:
for objectHash, value in shared.inventory.items():
currentInventoryList.add(objectHash)
advertisedSet = set() advertisedSet = set()
for i in range(numberOfItemsInInv): for i in range(numberOfItemsInInv):
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
objectsNewToMe = advertisedSet - currentInventoryList objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber]
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe: for item in objectsNewToMe:
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
@ -1500,11 +1496,14 @@ class receiveDataThread(threading.Thread):
print 'received getdata request for item:', hash.encode('hex') print 'received getdata request for item:', hash.encode('hex')
shared.numberOfInventoryLookupsPerformed += 1 shared.numberOfInventoryLookupsPerformed += 1
shared.inventoryLock.acquire()
if hash in shared.inventory: if hash in shared.inventory:
objectType, streamNumber, payload, receivedTime = shared.inventory[ objectType, streamNumber, payload, receivedTime = shared.inventory[
hash] hash]
shared.inventoryLock.release()
self.sendData(objectType, payload) self.sendData(objectType, payload)
else: else:
shared.inventoryLock.release()
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''select objecttype, payload from inventory where hash=?''', '''select objecttype, payload from inventory where hash=?''',
hash) hash)
@ -1552,16 +1551,16 @@ class receiveDataThread(threading.Thread):
print 'sock.sendall error:', err print 'sock.sendall error:', err
# Send an inv message with just one hash to all of our peers # Advertise this object to all of our peers
def broadcastinv(self, hash): def broadcastinv(self, hash):
with shared.printLock: with shared.printLock:
print 'broadcasting inv with hash:', hash.encode('hex') print 'broadcasting inv with hash:', hash.encode('hex')
shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash)) shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash))
# We have received an addr message. # We have received an addr message.
def recaddr(self, data): def recaddr(self, data):
listOfAddressDetailsToBroadcastToPeers = [] #listOfAddressDetailsToBroadcastToPeers = []
numberOfAddressesIncluded = 0 numberOfAddressesIncluded = 0
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint( numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
data[:10]) data[:10])
@ -1570,116 +1569,12 @@ class receiveDataThread(threading.Thread):
with shared.printLock: with shared.printLock:
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
if self.remoteProtocolVersion == 1:
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
return
if len(data) != lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded):
print 'addr message does not contain the correct amount of data. Ignoring.'
return
needToWriteKnownNodesToDisk = False
for i in range(0, numberOfAddressesIncluded):
try:
if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
with shared.printLock:
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
continue
except Exception as err:
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
try:
recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + (
34 * i):8 + lengthOfNumberOfAddresses + (34 * i)])
except Exception as err:
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
if recaddrStream == 0:
continue
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
continue
try:
recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + (
34 * i):16 + lengthOfNumberOfAddresses + (34 * i)])
except Exception as err:
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
try:
recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + (
34 * i):34 + lengthOfNumberOfAddresses + (34 * i)])
except Exception as err:
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
# print 'Within recaddr(): IP', recaddrIP, ', Port',
# recaddrPort, ', i', i
hostFromAddrMessage = socket.inet_ntoa(data[
28 + lengthOfNumberOfAddresses + (34 * i):32 + lengthOfNumberOfAddresses + (34 * i)])
# print 'hostFromAddrMessage', hostFromAddrMessage
if data[28 + lengthOfNumberOfAddresses + (34 * i)] == '\x7F':
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
continue
if helper_generic.isHostInPrivateIPRange(hostFromAddrMessage):
print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I', data[lengthOfNumberOfAddresses + (
34 * i):4 + lengthOfNumberOfAddresses + (34 * i)]) # This is the 'time' value in the received addr message.
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream] = {}
shared.knownNodesLock.release()
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
shared.knownNodesLock.release()
needToWriteKnownNodesToDisk = True
hostDetails = (
timeSomeoneElseReceivedMessageFromThisNode,
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
listOfAddressDetailsToBroadcastToPeers.append(
hostDetails)
else:
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
shared.knownNodesLock.release()
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
shared.knownNodesLock.acquire()
output = open(shared.appdata + 'knownnodes.dat', 'wb')
pickle.dump(shared.knownNodes, output)
output.close()
shared.knownNodesLock.release()
self.broadcastaddr(
listOfAddressDetailsToBroadcastToPeers) # no longer broadcast
with shared.printLock:
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
return return
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded): if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
print 'addr message does not contain the correct amount of data. Ignoring.' print 'addr message does not contain the correct amount of data. Ignoring.'
return return
needToWriteKnownNodesToDisk = False
for i in range(0, numberOfAddressesIncluded): for i in range(0, numberOfAddressesIncluded):
try: try:
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
@ -1756,12 +1651,13 @@ class receiveDataThread(threading.Thread):
with shared.printLock: with shared.printLock:
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
needToWriteKnownNodesToDisk = True shared.needToWriteKnownNodesToDisk = True
hostDetails = ( hostDetails = (
timeSomeoneElseReceivedMessageFromThisNode, timeSomeoneElseReceivedMessageFromThisNode,
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
listOfAddressDetailsToBroadcastToPeers.append( #listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
hostDetails) shared.broadcastToSendDataQueues((
self.streamNumber, 'advertisepeer', hostDetails))
else: else:
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
@ -1769,20 +1665,9 @@ class receiveDataThread(threading.Thread):
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
shared.knownNodesLock.release() shared.knownNodesLock.release()
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
shared.knownNodesLock.acquire() #if listOfAddressDetailsToBroadcastToPeers != []:
output = open(shared.appdata + 'knownnodes.dat', 'wb') # self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
try:
pickle.dump(shared.knownNodes, output)
output.close()
except Exception as err:
if "Errno 28" in str(err):
logger.fatal('(while receiveDataThread needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
if shared.daemon:
os._exit(0)
shared.knownNodesLock.release()
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
with shared.printLock: with shared.printLock:
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
@ -1790,7 +1675,7 @@ class receiveDataThread(threading.Thread):
# Function runs when we want to broadcast an addr message to all of our # Function runs when we want to broadcast an addr message to all of our
# peers. Runs when we learn of nodes that we didn't previously know about # peers. Runs when we learn of nodes that we didn't previously know about
# and want to share them with our peers. # and want to share them with our peers.
def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers): """def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers):
numberOfAddressesInAddrMessage = len( numberOfAddressesInAddrMessage = len(
listOfAddressDetailsToBroadcastToPeers) listOfAddressDetailsToBroadcastToPeers)
payload = '' payload = ''
@ -1816,7 +1701,7 @@ class receiveDataThread(threading.Thread):
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
self.streamNumber, 'sendaddr', datatosend)) self.streamNumber, 'sendaddr', datatosend))"""
# Send a big addr message to our peer # Send a big addr message to our peer
def sendaddr(self): def sendaddr(self):
@ -1831,7 +1716,6 @@ class receiveDataThread(threading.Thread):
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
if len(shared.knownNodes[self.streamNumber]) > 0: if len(shared.knownNodes[self.streamNumber]) > 0:
for i in range(500): for i in range(500):
random.seed()
peer, = random.sample(shared.knownNodes[self.streamNumber], 1) peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
if helper_generic.isHostInPrivateIPRange(peer.host): if helper_generic.isHostInPrivateIPRange(peer.host):
continue continue
@ -1839,7 +1723,6 @@ class receiveDataThread(threading.Thread):
self.streamNumber][peer] self.streamNumber][peer]
if len(shared.knownNodes[self.streamNumber * 2]) > 0: if len(shared.knownNodes[self.streamNumber * 2]) > 0:
for i in range(250): for i in range(250):
random.seed()
peer, = random.sample(shared.knownNodes[ peer, = random.sample(shared.knownNodes[
self.streamNumber * 2], 1) self.streamNumber * 2], 1)
if helper_generic.isHostInPrivateIPRange(peer.host): if helper_generic.isHostInPrivateIPRange(peer.host):
@ -1848,7 +1731,6 @@ class receiveDataThread(threading.Thread):
self.streamNumber * 2][peer] self.streamNumber * 2][peer]
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0: if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
for i in range(250): for i in range(250):
random.seed()
peer, = random.sample(shared.knownNodes[ peer, = random.sample(shared.knownNodes[
(self.streamNumber * 2) + 1], 1) (self.streamNumber * 2) + 1], 1)
if helper_generic.isHostInPrivateIPRange(peer.host): if helper_generic.isHostInPrivateIPRange(peer.host):
@ -1967,10 +1849,8 @@ class receiveDataThread(threading.Thread):
self.peer, self.remoteProtocolVersion))) self.peer, self.remoteProtocolVersion)))
shared.knownNodesLock.acquire() shared.knownNodesLock.acquire()
shared.knownNodes[self.streamNumber][self.peer] = int(time.time()) shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
output = open(shared.appdata + 'knownnodes.dat', 'wb') shared.needToWriteKnownNodesToDisk = True
pickle.dump(shared.knownNodes, output)
output.close()
shared.knownNodesLock.release() shared.knownNodesLock.release()
self.sendverack() self.sendverack()

View File

@ -8,7 +8,8 @@ import random
import sys import sys
import socket import socket
#import bitmessagemain from class_objectHashHolder import *
from addresses import *
# Every connection to a peer has a sendDataThread (and also a # Every connection to a peer has a sendDataThread (and also a
# receiveDataThread). # receiveDataThread).
@ -22,6 +23,9 @@ class sendDataThread(threading.Thread):
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues) print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
self.data = '' self.data = ''
self.objectHashHolderInstance = objectHashHolder(self.mailbox)
self.objectHashHolderInstance.start()
def setup( def setup(
self, self,
@ -98,15 +102,31 @@ class sendDataThread(threading.Thread):
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
self.remoteProtocolVersion = specifiedRemoteProtocolVersion self.remoteProtocolVersion = specifiedRemoteProtocolVersion
elif command == 'advertisepeer':
self.objectHashHolderInstance.holdPeer(data)
elif command == 'sendaddr': elif command == 'sendaddr':
numberOfAddressesInAddrMessage = len(
data)
payload = ''
for hostDetails in data:
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
payload += pack(
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
payload += pack('>I', streamNumber)
payload += pack(
'>q', services) # service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(host)
payload += pack('>H', port)
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
datatosend = '\xE9\xBE\xB4\xD9addr\x00\x00\x00\x00\x00\x00\x00\x00'
datatosend = datatosend + pack('>L', len(payload)) # payload length
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
datatosend = datatosend + payload
try: try:
# To prevent some network analysis, 'leak' the data out self.sock.sendall(datatosend)
# to our peer after waiting a random amount of time
# unless we have a long list of messages in our queue
# to send.
random.seed()
time.sleep(random.randrange(0, 10))
self.sock.sendall(data)
self.lastTimeISentData = int(time.time()) self.lastTimeISentData = int(time.time())
except: except:
print 'sendaddr: self.sock.sendall failed' print 'sendaddr: self.sock.sendall failed'
@ -118,17 +138,19 @@ class sendDataThread(threading.Thread):
shared.sendDataQueues.remove(self.mailbox) shared.sendDataQueues.remove(self.mailbox)
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
break break
elif command == 'advertiseobject':
self.objectHashHolderInstance.holdHash(data)
elif command == 'sendinv': elif command == 'sendinv':
if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: payload = ''
payload = '\x01' + data for hash in data:
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
payload += hash
if payload != '':
payload = encodeVarint(len(payload)/32) + payload
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
headerData += pack('>L', len(payload)) headerData += pack('>L', len(payload))
headerData += hashlib.sha512(payload).digest()[:4] headerData += hashlib.sha512(payload).digest()[:4]
# To prevent some network analysis, 'leak' the data out
# to our peer after waiting a random amount of time
random.seed()
time.sleep(random.randrange(0, 10))
try: try:
self.sock.sendall(headerData + payload) self.sock.sendall(headerData + payload)
self.lastTimeISentData = int(time.time()) self.lastTimeISentData = int(time.time())
@ -167,4 +189,4 @@ class sendDataThread(threading.Thread):
with shared.printLock: with shared.printLock:
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
self.objectHashHolderInstance.close()

View File

@ -2,11 +2,16 @@ import threading
import shared import shared
import time import time
import sys import sys
import pickle
import tr#anslate
from helper_sql import * from helper_sql import *
from debug import logger
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
It cleans these data structures in memory: It cleans these data structures in memory:
inventory (moves data to the on-disk sql database) inventory (moves data to the on-disk sql database)
inventorySets (clears then reloads data out of sql database)
It cleans these tables on the disk: It cleans these tables on the disk:
inventory (clears data more than 2 days and 12 hours old) inventory (clears data more than 2 days and 12 hours old)
@ -31,6 +36,7 @@ class singleCleaner(threading.Thread):
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql: with SqlBulkExecute() as sql:
for hash, storedValue in shared.inventory.items(): for hash, storedValue in shared.inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue objectType, streamNumber, payload, receivedTime = storedValue
@ -109,4 +115,33 @@ class singleCleaner(threading.Thread):
shared.workerQueue.put(('sendmessage', '')) shared.workerQueue.put(('sendmessage', ''))
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) 'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...'))
# Let's also clear and reload shared.inventorySets to keep it from
# taking up an unnecessary amount of memory.
for streamNumber in shared.inventorySets:
shared.inventorySets[streamNumber] = set()
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
for row in queryData:
shared.inventorySets[streamNumber].add(row[0])
with shared.inventoryLock:
for hash, storedValue in shared.inventory.items():
objectType, streamNumber, payload, receivedTime = storedValue
if streamNumber in shared.inventorySets:
shared.inventorySets[streamNumber].add(hash)
# Let us write out the knowNodes to disk if there is anything new to write out.
if shared.needToWriteKnownNodesToDisk:
shared.knownNodesLock.acquire()
output = open(shared.appdata + 'knownnodes.dat', 'wb')
try:
pickle.dump(shared.knownNodes, output)
output.close()
except Exception as err:
if "Errno 28" in str(err):
logger.fatal('(while receiveDataThread shared.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
if shared.daemon:
os._exit(0)
shared.knownNodesLock.release()
shared.needToWriteKnownNodesToDisk = False
time.sleep(300) time.sleep(300)

View File

@ -151,12 +151,13 @@ class singleWorker(threading.Thread):
objectType = 'pubkey' objectType = 'pubkey'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime) objectType, streamNumber, payload, embeddedTime)
shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock: with shared.printLock:
print 'broadcasting inv with hash:', inventoryHash.encode('hex') print 'broadcasting inv with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
shared.UISignalQueue.put(('updateStatusBar', '')) shared.UISignalQueue.put(('updateStatusBar', ''))
shared.config.set( shared.config.set(
myAddress, 'lastpubkeysendtime', str(int(time.time()))) myAddress, 'lastpubkeysendtime', str(int(time.time())))
@ -224,12 +225,13 @@ class singleWorker(threading.Thread):
objectType = 'pubkey' objectType = 'pubkey'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime) objectType, streamNumber, payload, embeddedTime)
shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock: with shared.printLock:
print 'broadcasting inv with hash:', inventoryHash.encode('hex') print 'broadcasting inv with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
shared.UISignalQueue.put(('updateStatusBar', '')) shared.UISignalQueue.put(('updateStatusBar', ''))
# If this is a chan address then we won't send out the pubkey over the # If this is a chan address then we won't send out the pubkey over the
# network but rather will only store it in our pubkeys table so that # network but rather will only store it in our pubkeys table so that
@ -327,10 +329,11 @@ class singleWorker(threading.Thread):
objectType = 'broadcast' objectType = 'broadcast'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, int(time.time())) objectType, streamNumber, payload, int(time.time()))
shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock: with shared.printLock:
print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex') print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode( shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
@ -650,6 +653,7 @@ class singleWorker(threading.Thread):
objectType = 'msg' objectType = 'msg'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, int(time.time())) objectType, toStreamNumber, encryptedPayload, int(time.time()))
shared.inventorySets[toStreamNumber].add(inventoryHash)
if shared.safeConfigGetBoolean(toaddress, 'chan'): if shared.safeConfigGetBoolean(toaddress, 'chan'):
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode( shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
@ -659,7 +663,7 @@ class singleWorker(threading.Thread):
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex') print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
# Update the status of the message in the 'sent' table to have a # Update the status of the message in the 'sent' table to have a
# 'msgsent' status or 'msgsentnoackexpected' status. # 'msgsent' status or 'msgsentnoackexpected' status.
@ -706,9 +710,10 @@ class singleWorker(threading.Thread):
objectType = 'getpubkey' objectType = 'getpubkey'
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, int(time.time())) objectType, streamNumber, payload, int(time.time()))
shared.inventorySets[streamNumber].add(inventoryHash)
print 'sending inv (for the getpubkey message)' print 'sending inv (for the getpubkey message)'
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
sqlExecute( sqlExecute(
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''', '''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''',

View File

@ -68,6 +68,8 @@ numberOfBroadcastsProcessed = 0
numberOfPubkeysProcessed = 0 numberOfPubkeysProcessed = 0
numberOfInventoryLookupsPerformed = 0 numberOfInventoryLookupsPerformed = 0
daemon = False daemon = False
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work. networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
@ -303,7 +305,7 @@ def doCleanShutdown():
def broadcastToSendDataQueues(data): def broadcastToSendDataQueues(data):
# logger.debug('running broadcastToSendDataQueues') # logger.debug('running broadcastToSendDataQueues')
for q in sendDataQueues: for q in sendDataQueues:
q.put((data)) q.put(data)
def flushInventory(): def flushInventory():
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now. #Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.