Merge pull request #494 from Atheros1/master
Smarter advertisement of object hashes and peers
This commit is contained in:
commit
33ea666df7
|
@ -46,6 +46,11 @@ if sys.platform == 'darwin':
|
|||
|
||||
def connectToStream(streamNumber):
|
||||
selfInitiatedConnections[streamNumber] = {}
|
||||
shared.inventorySets[streamNumber] = set()
|
||||
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
||||
for row in queryData:
|
||||
shared.inventorySets[streamNumber].add(row[0])
|
||||
|
||||
if sys.platform[0:3] == 'win':
|
||||
maximumNumberOfHalfOpenConnections = 9
|
||||
else:
|
||||
|
@ -708,10 +713,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
|||
objectType = 'msg'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, toStreamNumber, encryptedPayload, int(time.time()))
|
||||
shared.inventorySets[toStreamNumber].add(inventoryHash)
|
||||
with shared.printLock:
|
||||
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
|
||||
shared.broadcastToSendDataQueues((
|
||||
toStreamNumber, 'sendinv', inventoryHash))
|
||||
toStreamNumber, 'advertiseobject', inventoryHash))
|
||||
elif method == 'disseminatePubkey':
|
||||
# The device issuing this command to PyBitmessage supplies a pubkey object to be
|
||||
# disseminated to the rest of the Bitmessage network. PyBitmessage accepts this
|
||||
|
@ -744,10 +750,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
|||
objectType = 'pubkey'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, pubkeyStreamNumber, payload, int(time.time()))
|
||||
shared.inventorySets[pubkeyStreamNumber].add(inventoryHash)
|
||||
with shared.printLock:
|
||||
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'sendinv', inventoryHash))
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
elif method == 'getMessageDataByDestinationHash':
|
||||
# Method will eventually be used by a particular Android app to
|
||||
# select relevant messages. Do not yet add this to the api
|
||||
|
|
45
src/class_objectHashHolder.py
Normal file
45
src/class_objectHashHolder.py
Normal file
|
@ -0,0 +1,45 @@
|
|||
# objectHashHolder is a timer-driven thread. One objectHashHolder thread is used
|
||||
# by each sendDataThread. The sendDataThread uses it whenever it needs to
|
||||
# advertise an object to peers in an inv message, or advertise a peer to other
|
||||
# peers in an addr message. Instead of sending them out immediately, it must
|
||||
# wait a random number of seconds for each connection so that different peers
|
||||
# get different objects at different times. Thus an attacker who is
|
||||
# connecting to many network nodes who receives a message first from Alice
|
||||
# cannot be sure if Alice is the node who originated the message.
|
||||
|
||||
import random
|
||||
import time
|
||||
import threading
|
||||
|
||||
class objectHashHolder(threading.Thread):
|
||||
def __init__(self, sendDataThreadMailbox):
|
||||
threading.Thread.__init__(self)
|
||||
self.shutdown = False
|
||||
self.sendDataThreadMailbox = sendDataThreadMailbox # This queue is used to submit data back to our associated sendDataThread.
|
||||
self.collectionOfHashLists = {}
|
||||
self.collectionOfPeerLists = {}
|
||||
for i in range(10):
|
||||
self.collectionOfHashLists[i] = []
|
||||
self.collectionOfPeerLists[i] = []
|
||||
|
||||
def run(self):
|
||||
iterator = 0
|
||||
while not self.shutdown:
|
||||
if len(self.collectionOfHashLists[iterator]) > 0:
|
||||
self.sendDataThreadMailbox.put((0, 'sendinv', self.collectionOfHashLists[iterator]))
|
||||
self.collectionOfHashLists[iterator] = []
|
||||
if len(self.collectionOfPeerLists[iterator]) > 0:
|
||||
self.sendDataThreadMailbox.put((0, 'sendaddr', self.collectionOfPeerLists[iterator]))
|
||||
self.collectionOfPeerLists[iterator] = []
|
||||
iterator += 1
|
||||
iterator %= 10
|
||||
time.sleep(1)
|
||||
|
||||
def holdHash(self,hash):
|
||||
self.collectionOfHashLists[random.randrange(0, 10)].append(hash)
|
||||
|
||||
def holdPeer(self,peerDetails):
|
||||
self.collectionOfPeerLists[random.randrange(0, 10)].append(peerDetails)
|
||||
|
||||
def close(self):
|
||||
self.shutdown = True
|
|
@ -5,7 +5,6 @@ import threading
|
|||
import shared
|
||||
import hashlib
|
||||
import socket
|
||||
import pickle
|
||||
import random
|
||||
from struct import unpack, pack
|
||||
import sys
|
||||
|
@ -91,7 +90,6 @@ class receiveDataThread(threading.Thread):
|
|||
del self.selfInitiatedConnections[self.streamNumber][self]
|
||||
with shared.printLock:
|
||||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||
|
||||
except:
|
||||
pass
|
||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||
|
@ -175,7 +173,6 @@ class receiveDataThread(threading.Thread):
|
|||
if self.data == '':
|
||||
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||
shared.numberOfInventoryLookupsPerformed += 1
|
||||
random.seed()
|
||||
objectHash, = random.sample(
|
||||
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
||||
if objectHash in shared.inventory:
|
||||
|
@ -264,16 +261,18 @@ class receiveDataThread(threading.Thread):
|
|||
self.sock.settimeout(
|
||||
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||
remoteNodeSeenTime = shared.knownNodes[
|
||||
self.streamNumber][self.peer]
|
||||
with shared.printLock:
|
||||
print 'Connection fully established with', self.peer
|
||||
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
||||
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
||||
print 'broadcasting addr from within connectionFullyEstablished function.'
|
||||
|
||||
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
||||
self.peer.port)]) # This lets all of our peers know about this new node.
|
||||
#self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.peer.host,
|
||||
# self.remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
|
||||
dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort)
|
||||
shared.broadcastToSendDataQueues((
|
||||
self.streamNumber, 'advertisepeer', dataToSend))
|
||||
|
||||
self.sendaddr() # This is one large addr message to this one peer.
|
||||
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
||||
with shared.printLock:
|
||||
|
@ -298,6 +297,7 @@ class receiveDataThread(threading.Thread):
|
|||
bigInvList[hash] = 0
|
||||
# We also have messages in our inventory in memory (which is a python
|
||||
# dictionary). Let's fetch those too.
|
||||
with shared.inventoryLock:
|
||||
for hash, storedValue in shared.inventory.items():
|
||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||
objectType, streamNumber, payload, receivedTime = storedValue
|
||||
|
@ -392,6 +392,7 @@ class receiveDataThread(threading.Thread):
|
|||
objectType = 'broadcast'
|
||||
shared.inventory[self.inventoryHash] = (
|
||||
objectType, self.streamNumber, data, embeddedTime)
|
||||
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
|
||||
shared.inventoryLock.release()
|
||||
self.broadcastinv(self.inventoryHash)
|
||||
shared.numberOfBroadcastsProcessed += 1
|
||||
|
@ -755,6 +756,7 @@ class receiveDataThread(threading.Thread):
|
|||
objectType = 'msg'
|
||||
shared.inventory[self.inventoryHash] = (
|
||||
objectType, self.streamNumber, data, embeddedTime)
|
||||
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
|
||||
shared.inventoryLock.release()
|
||||
self.broadcastinv(self.inventoryHash)
|
||||
shared.numberOfMessagesProcessed += 1
|
||||
|
@ -1153,6 +1155,7 @@ class receiveDataThread(threading.Thread):
|
|||
objectType = 'pubkey'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, self.streamNumber, data, embeddedTime)
|
||||
shared.inventorySets[self.streamNumber].add(inventoryHash)
|
||||
shared.inventoryLock.release()
|
||||
self.broadcastinv(inventoryHash)
|
||||
shared.numberOfPubkeysProcessed += 1
|
||||
|
@ -1348,6 +1351,7 @@ class receiveDataThread(threading.Thread):
|
|||
objectType = 'getpubkey'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, self.streamNumber, data, embeddedTime)
|
||||
shared.inventorySets[self.streamNumber].add(inventoryHash)
|
||||
shared.inventoryLock.release()
|
||||
# This getpubkey request is valid so far. Forward to peers.
|
||||
self.broadcastinv(inventoryHash)
|
||||
|
@ -1442,18 +1446,10 @@ class receiveDataThread(threading.Thread):
|
|||
# 'set' of objects we are aware of and a set of objects in this inv
|
||||
# message so that we can diff one from the other cheaply.
|
||||
startTime = time.time()
|
||||
currentInventoryList = set()
|
||||
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''',
|
||||
self.streamNumber)
|
||||
for row in queryData:
|
||||
currentInventoryList.add(row[0])
|
||||
with shared.inventoryLock:
|
||||
for objectHash, value in shared.inventory.items():
|
||||
currentInventoryList.add(objectHash)
|
||||
advertisedSet = set()
|
||||
for i in range(numberOfItemsInInv):
|
||||
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
||||
objectsNewToMe = advertisedSet - currentInventoryList
|
||||
objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber]
|
||||
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
||||
for item in objectsNewToMe:
|
||||
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
||||
|
@ -1500,11 +1496,14 @@ class receiveDataThread(threading.Thread):
|
|||
print 'received getdata request for item:', hash.encode('hex')
|
||||
|
||||
shared.numberOfInventoryLookupsPerformed += 1
|
||||
shared.inventoryLock.acquire()
|
||||
if hash in shared.inventory:
|
||||
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
||||
hash]
|
||||
shared.inventoryLock.release()
|
||||
self.sendData(objectType, payload)
|
||||
else:
|
||||
shared.inventoryLock.release()
|
||||
queryreturn = sqlQuery(
|
||||
'''select objecttype, payload from inventory where hash=?''',
|
||||
hash)
|
||||
|
@ -1552,16 +1551,16 @@ class receiveDataThread(threading.Thread):
|
|||
print 'sock.sendall error:', err
|
||||
|
||||
|
||||
# Send an inv message with just one hash to all of our peers
|
||||
# Advertise this object to all of our peers
|
||||
def broadcastinv(self, hash):
|
||||
with shared.printLock:
|
||||
print 'broadcasting inv with hash:', hash.encode('hex')
|
||||
|
||||
shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash))
|
||||
shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash))
|
||||
|
||||
# We have received an addr message.
|
||||
def recaddr(self, data):
|
||||
listOfAddressDetailsToBroadcastToPeers = []
|
||||
#listOfAddressDetailsToBroadcastToPeers = []
|
||||
numberOfAddressesIncluded = 0
|
||||
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
||||
data[:10])
|
||||
|
@ -1570,116 +1569,12 @@ class receiveDataThread(threading.Thread):
|
|||
with shared.printLock:
|
||||
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
||||
|
||||
|
||||
if self.remoteProtocolVersion == 1:
|
||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||
return
|
||||
if len(data) != lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded):
|
||||
print 'addr message does not contain the correct amount of data. Ignoring.'
|
||||
return
|
||||
|
||||
needToWriteKnownNodesToDisk = False
|
||||
for i in range(0, numberOfAddressesIncluded):
|
||||
try:
|
||||
if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||
with shared.printLock:
|
||||
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
|
||||
|
||||
continue
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
||||
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
|
||||
try:
|
||||
recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + (
|
||||
34 * i):8 + lengthOfNumberOfAddresses + (34 * i)])
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
||||
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
if recaddrStream == 0:
|
||||
continue
|
||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
||||
continue
|
||||
try:
|
||||
recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + (
|
||||
34 * i):16 + lengthOfNumberOfAddresses + (34 * i)])
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
||||
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
|
||||
try:
|
||||
recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + (
|
||||
34 * i):34 + lengthOfNumberOfAddresses + (34 * i)])
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
||||
|
||||
break # giving up on unpacking any more. We should still be connected however.
|
||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
||||
# recaddrPort, ', i', i
|
||||
hostFromAddrMessage = socket.inet_ntoa(data[
|
||||
28 + lengthOfNumberOfAddresses + (34 * i):32 + lengthOfNumberOfAddresses + (34 * i)])
|
||||
# print 'hostFromAddrMessage', hostFromAddrMessage
|
||||
if data[28 + lengthOfNumberOfAddresses + (34 * i)] == '\x7F':
|
||||
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
||||
continue
|
||||
if helper_generic.isHostInPrivateIPRange(hostFromAddrMessage):
|
||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
||||
continue
|
||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I', data[lengthOfNumberOfAddresses + (
|
||||
34 * i):4 + lengthOfNumberOfAddresses + (34 * i)]) # This is the 'time' value in the received addr message.
|
||||
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[recaddrStream] = {}
|
||||
shared.knownNodesLock.release()
|
||||
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
||||
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
shared.knownNodesLock.release()
|
||||
needToWriteKnownNodesToDisk = True
|
||||
hostDetails = (
|
||||
timeSomeoneElseReceivedMessageFromThisNode,
|
||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
||||
listOfAddressDetailsToBroadcastToPeers.append(
|
||||
hostDetails)
|
||||
else:
|
||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
shared.knownNodesLock.release()
|
||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
||||
shared.knownNodesLock.acquire()
|
||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||
pickle.dump(shared.knownNodes, output)
|
||||
output.close()
|
||||
shared.knownNodesLock.release()
|
||||
self.broadcastaddr(
|
||||
listOfAddressDetailsToBroadcastToPeers) # no longer broadcast
|
||||
with shared.printLock:
|
||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||
|
||||
elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
|
||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||
return
|
||||
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
||||
print 'addr message does not contain the correct amount of data. Ignoring.'
|
||||
return
|
||||
|
||||
needToWriteKnownNodesToDisk = False
|
||||
for i in range(0, numberOfAddressesIncluded):
|
||||
try:
|
||||
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||
|
@ -1756,12 +1651,13 @@ class receiveDataThread(threading.Thread):
|
|||
with shared.printLock:
|
||||
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||
|
||||
needToWriteKnownNodesToDisk = True
|
||||
shared.needToWriteKnownNodesToDisk = True
|
||||
hostDetails = (
|
||||
timeSomeoneElseReceivedMessageFromThisNode,
|
||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
||||
listOfAddressDetailsToBroadcastToPeers.append(
|
||||
hostDetails)
|
||||
#listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
|
||||
shared.broadcastToSendDataQueues((
|
||||
self.streamNumber, 'advertisepeer', hostDetails))
|
||||
else:
|
||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
||||
|
@ -1769,20 +1665,9 @@ class receiveDataThread(threading.Thread):
|
|||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
shared.knownNodesLock.release()
|
||||
if needToWriteKnownNodesToDisk: # Runs if any nodes were new to us. Also, share those nodes with our peers.
|
||||
shared.knownNodesLock.acquire()
|
||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||
try:
|
||||
pickle.dump(shared.knownNodes, output)
|
||||
output.close()
|
||||
except Exception as err:
|
||||
if "Errno 28" in str(err):
|
||||
logger.fatal('(while receiveDataThread needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
|
||||
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
|
||||
if shared.daemon:
|
||||
os._exit(0)
|
||||
shared.knownNodesLock.release()
|
||||
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
||||
|
||||
#if listOfAddressDetailsToBroadcastToPeers != []:
|
||||
# self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
||||
with shared.printLock:
|
||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||
|
||||
|
@ -1790,7 +1675,7 @@ class receiveDataThread(threading.Thread):
|
|||
# Function runs when we want to broadcast an addr message to all of our
|
||||
# peers. Runs when we learn of nodes that we didn't previously know about
|
||||
# and want to share them with our peers.
|
||||
def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers):
|
||||
"""def broadcastaddr(self, listOfAddressDetailsToBroadcastToPeers):
|
||||
numberOfAddressesInAddrMessage = len(
|
||||
listOfAddressDetailsToBroadcastToPeers)
|
||||
payload = ''
|
||||
|
@ -1816,7 +1701,7 @@ class receiveDataThread(threading.Thread):
|
|||
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
||||
|
||||
shared.broadcastToSendDataQueues((
|
||||
self.streamNumber, 'sendaddr', datatosend))
|
||||
self.streamNumber, 'sendaddr', datatosend))"""
|
||||
|
||||
# Send a big addr message to our peer
|
||||
def sendaddr(self):
|
||||
|
@ -1831,7 +1716,6 @@ class receiveDataThread(threading.Thread):
|
|||
shared.knownNodesLock.acquire()
|
||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
||||
for i in range(500):
|
||||
random.seed()
|
||||
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||
continue
|
||||
|
@ -1839,7 +1723,6 @@ class receiveDataThread(threading.Thread):
|
|||
self.streamNumber][peer]
|
||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
||||
for i in range(250):
|
||||
random.seed()
|
||||
peer, = random.sample(shared.knownNodes[
|
||||
self.streamNumber * 2], 1)
|
||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||
|
@ -1848,7 +1731,6 @@ class receiveDataThread(threading.Thread):
|
|||
self.streamNumber * 2][peer]
|
||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
||||
for i in range(250):
|
||||
random.seed()
|
||||
peer, = random.sample(shared.knownNodes[
|
||||
(self.streamNumber * 2) + 1], 1)
|
||||
if helper_generic.isHostInPrivateIPRange(peer.host):
|
||||
|
@ -1967,10 +1849,8 @@ class receiveDataThread(threading.Thread):
|
|||
self.peer, self.remoteProtocolVersion)))
|
||||
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||
pickle.dump(shared.knownNodes, output)
|
||||
output.close()
|
||||
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||
shared.needToWriteKnownNodesToDisk = True
|
||||
shared.knownNodesLock.release()
|
||||
|
||||
self.sendverack()
|
||||
|
|
|
@ -8,7 +8,8 @@ import random
|
|||
import sys
|
||||
import socket
|
||||
|
||||
#import bitmessagemain
|
||||
from class_objectHashHolder import *
|
||||
from addresses import *
|
||||
|
||||
# Every connection to a peer has a sendDataThread (and also a
|
||||
# receiveDataThread).
|
||||
|
@ -22,6 +23,9 @@ class sendDataThread(threading.Thread):
|
|||
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
||||
|
||||
self.data = ''
|
||||
self.objectHashHolderInstance = objectHashHolder(self.mailbox)
|
||||
self.objectHashHolderInstance.start()
|
||||
|
||||
|
||||
def setup(
|
||||
self,
|
||||
|
@ -98,15 +102,31 @@ class sendDataThread(threading.Thread):
|
|||
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
||||
|
||||
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
||||
elif command == 'advertisepeer':
|
||||
self.objectHashHolderInstance.holdPeer(data)
|
||||
elif command == 'sendaddr':
|
||||
numberOfAddressesInAddrMessage = len(
|
||||
data)
|
||||
payload = ''
|
||||
for hostDetails in data:
|
||||
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
||||
payload += pack(
|
||||
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
|
||||
payload += pack('>I', streamNumber)
|
||||
payload += pack(
|
||||
'>q', services) # service bit flags offered by this node
|
||||
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
|
||||
socket.inet_aton(host)
|
||||
payload += pack('>H', port)
|
||||
|
||||
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
||||
datatosend = '\xE9\xBE\xB4\xD9addr\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
datatosend = datatosend + pack('>L', len(payload)) # payload length
|
||||
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
||||
datatosend = datatosend + payload
|
||||
|
||||
try:
|
||||
# To prevent some network analysis, 'leak' the data out
|
||||
# to our peer after waiting a random amount of time
|
||||
# unless we have a long list of messages in our queue
|
||||
# to send.
|
||||
random.seed()
|
||||
time.sleep(random.randrange(0, 10))
|
||||
self.sock.sendall(data)
|
||||
self.sock.sendall(datatosend)
|
||||
self.lastTimeISentData = int(time.time())
|
||||
except:
|
||||
print 'sendaddr: self.sock.sendall failed'
|
||||
|
@ -118,17 +138,19 @@ class sendDataThread(threading.Thread):
|
|||
shared.sendDataQueues.remove(self.mailbox)
|
||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
||||
break
|
||||
elif command == 'advertiseobject':
|
||||
self.objectHashHolderInstance.holdHash(data)
|
||||
elif command == 'sendinv':
|
||||
if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||
payload = '\x01' + data
|
||||
payload = ''
|
||||
for hash in data:
|
||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||
payload += hash
|
||||
if payload != '':
|
||||
payload = encodeVarint(len(payload)/32) + payload
|
||||
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
||||
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||
headerData += pack('>L', len(payload))
|
||||
headerData += hashlib.sha512(payload).digest()[:4]
|
||||
# To prevent some network analysis, 'leak' the data out
|
||||
# to our peer after waiting a random amount of time
|
||||
random.seed()
|
||||
time.sleep(random.randrange(0, 10))
|
||||
try:
|
||||
self.sock.sendall(headerData + payload)
|
||||
self.lastTimeISentData = int(time.time())
|
||||
|
@ -167,4 +189,4 @@ class sendDataThread(threading.Thread):
|
|||
with shared.printLock:
|
||||
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
||||
|
||||
|
||||
self.objectHashHolderInstance.close()
|
||||
|
|
|
@ -2,11 +2,16 @@ import threading
|
|||
import shared
|
||||
import time
|
||||
import sys
|
||||
import pickle
|
||||
|
||||
import tr#anslate
|
||||
from helper_sql import *
|
||||
from debug import logger
|
||||
|
||||
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
|
||||
It cleans these data structures in memory:
|
||||
inventory (moves data to the on-disk sql database)
|
||||
inventorySets (clears then reloads data out of sql database)
|
||||
|
||||
It cleans these tables on the disk:
|
||||
inventory (clears data more than 2 days and 12 hours old)
|
||||
|
@ -31,6 +36,7 @@ class singleCleaner(threading.Thread):
|
|||
shared.UISignalQueue.put((
|
||||
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
|
||||
|
||||
with shared.inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
|
||||
with SqlBulkExecute() as sql:
|
||||
for hash, storedValue in shared.inventory.items():
|
||||
objectType, streamNumber, payload, receivedTime = storedValue
|
||||
|
@ -109,4 +115,33 @@ class singleCleaner(threading.Thread):
|
|||
shared.workerQueue.put(('sendmessage', ''))
|
||||
shared.UISignalQueue.put((
|
||||
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...'))
|
||||
|
||||
# Let's also clear and reload shared.inventorySets to keep it from
|
||||
# taking up an unnecessary amount of memory.
|
||||
for streamNumber in shared.inventorySets:
|
||||
shared.inventorySets[streamNumber] = set()
|
||||
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
|
||||
for row in queryData:
|
||||
shared.inventorySets[streamNumber].add(row[0])
|
||||
with shared.inventoryLock:
|
||||
for hash, storedValue in shared.inventory.items():
|
||||
objectType, streamNumber, payload, receivedTime = storedValue
|
||||
if streamNumber in shared.inventorySets:
|
||||
shared.inventorySets[streamNumber].add(hash)
|
||||
|
||||
# Let us write out the knowNodes to disk if there is anything new to write out.
|
||||
if shared.needToWriteKnownNodesToDisk:
|
||||
shared.knownNodesLock.acquire()
|
||||
output = open(shared.appdata + 'knownnodes.dat', 'wb')
|
||||
try:
|
||||
pickle.dump(shared.knownNodes, output)
|
||||
output.close()
|
||||
except Exception as err:
|
||||
if "Errno 28" in str(err):
|
||||
logger.fatal('(while receiveDataThread shared.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
|
||||
shared.UISignalQueue.put(('alert', (tr.translateText("MainWindow", "Disk full"), tr.translateText("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True)))
|
||||
if shared.daemon:
|
||||
os._exit(0)
|
||||
shared.knownNodesLock.release()
|
||||
shared.needToWriteKnownNodesToDisk = False
|
||||
time.sleep(300)
|
||||
|
|
|
@ -151,12 +151,13 @@ class singleWorker(threading.Thread):
|
|||
objectType = 'pubkey'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, embeddedTime)
|
||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||
|
||||
with shared.printLock:
|
||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'sendinv', inventoryHash))
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||
shared.config.set(
|
||||
myAddress, 'lastpubkeysendtime', str(int(time.time())))
|
||||
|
@ -224,12 +225,13 @@ class singleWorker(threading.Thread):
|
|||
objectType = 'pubkey'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, embeddedTime)
|
||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||
|
||||
with shared.printLock:
|
||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'sendinv', inventoryHash))
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||
# If this is a chan address then we won't send out the pubkey over the
|
||||
# network but rather will only store it in our pubkeys table so that
|
||||
|
@ -327,10 +329,11 @@ class singleWorker(threading.Thread):
|
|||
objectType = 'broadcast'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, int(time.time()))
|
||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||
with shared.printLock:
|
||||
print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex')
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'sendinv', inventoryHash))
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
|
||||
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
|
||||
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
||||
|
@ -650,6 +653,7 @@ class singleWorker(threading.Thread):
|
|||
objectType = 'msg'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, toStreamNumber, encryptedPayload, int(time.time()))
|
||||
shared.inventorySets[toStreamNumber].add(inventoryHash)
|
||||
if shared.safeConfigGetBoolean(toaddress, 'chan'):
|
||||
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode(
|
||||
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
||||
|
@ -659,7 +663,7 @@ class singleWorker(threading.Thread):
|
|||
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
|
||||
print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex')
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'sendinv', inventoryHash))
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
|
||||
# Update the status of the message in the 'sent' table to have a
|
||||
# 'msgsent' status or 'msgsentnoackexpected' status.
|
||||
|
@ -706,9 +710,10 @@ class singleWorker(threading.Thread):
|
|||
objectType = 'getpubkey'
|
||||
shared.inventory[inventoryHash] = (
|
||||
objectType, streamNumber, payload, int(time.time()))
|
||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||
print 'sending inv (for the getpubkey message)'
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'sendinv', inventoryHash))
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
|
||||
sqlExecute(
|
||||
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''',
|
||||
|
|
|
@ -68,6 +68,8 @@ numberOfBroadcastsProcessed = 0
|
|||
numberOfPubkeysProcessed = 0
|
||||
numberOfInventoryLookupsPerformed = 0
|
||||
daemon = False
|
||||
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
|
||||
needToWriteKnownNodesToDisk = False # If True, the singleCleaner will write it to disk eventually.
|
||||
|
||||
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
|
||||
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
|
||||
|
@ -303,7 +305,7 @@ def doCleanShutdown():
|
|||
def broadcastToSendDataQueues(data):
|
||||
# logger.debug('running broadcastToSendDataQueues')
|
||||
for q in sendDataQueues:
|
||||
q.put((data))
|
||||
q.put(data)
|
||||
|
||||
def flushInventory():
|
||||
#Note that the singleCleanerThread clears out the inventory dictionary from time to time, although it only clears things that have been in the dictionary for a long time. This clears the inventory dictionary Now.
|
||||
|
|
Loading…
Reference in New Issue
Block a user