initial testing inv refactorization

master
Jonathan Warren 9 years ago
parent 477568f501
commit a9b15f83ba
  1. 11
      src/bitmessagemain.py
  2. 18
      src/class_receiveDataThread.py
  3. 25
      src/class_sendDataThread.py
  4. 15
      src/class_singleWorker.py
  5. 1
      src/shared.py

@ -46,6 +46,11 @@ if sys.platform == 'darwin':
def connectToStream(streamNumber):
selfInitiatedConnections[streamNumber] = {}
shared.inventorySets[streamNumber] = set()
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''', streamNumber)
for row in queryData:
shared.inventorySets[streamNumber].add(row[0])
if sys.platform[0:3] == 'win':
maximumNumberOfHalfOpenConnections = 9
else:
@ -705,10 +710,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType = 'msg'
shared.inventory[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, int(time.time()))
shared.inventorySets[toStreamNumber].add(inventoryHash)
with shared.printLock:
print 'Broadcasting inv for msg(API disseminatePreEncryptedMsg command):', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues((
toStreamNumber, 'sendinv', inventoryHash))
toStreamNumber, 'advertiseobject', inventoryHash))
elif method == 'disseminatePubkey':
# The device issuing this command to PyBitmessage supplies a pubkey object to be
# disseminated to the rest of the Bitmessage network. PyBitmessage accepts this
@ -741,10 +747,11 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
objectType = 'pubkey'
shared.inventory[inventoryHash] = (
objectType, pubkeyStreamNumber, payload, int(time.time()))
shared.inventorySets[pubkeyStreamNumber].add(inventoryHash)
with shared.printLock:
print 'broadcasting inv within API command disseminatePubkey with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
streamNumber, 'advertiseobject', inventoryHash))
elif method == 'getMessageDataByDestinationHash':
# Method will eventually be used by a particular Android app to
# select relevant messages. Do not yet add this to the api

@ -392,6 +392,7 @@ class receiveDataThread(threading.Thread):
objectType = 'broadcast'
shared.inventory[self.inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
shared.inventoryLock.release()
self.broadcastinv(self.inventoryHash)
shared.numberOfBroadcastsProcessed += 1
@ -755,6 +756,7 @@ class receiveDataThread(threading.Thread):
objectType = 'msg'
shared.inventory[self.inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(self.inventoryHash)
shared.inventoryLock.release()
self.broadcastinv(self.inventoryHash)
shared.numberOfMessagesProcessed += 1
@ -1153,6 +1155,7 @@ class receiveDataThread(threading.Thread):
objectType = 'pubkey'
shared.inventory[inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(inventoryHash)
shared.inventoryLock.release()
self.broadcastinv(inventoryHash)
shared.numberOfPubkeysProcessed += 1
@ -1348,6 +1351,7 @@ class receiveDataThread(threading.Thread):
objectType = 'getpubkey'
shared.inventory[inventoryHash] = (
objectType, self.streamNumber, data, embeddedTime)
shared.inventorySets[self.streamNumber].add(inventoryHash)
shared.inventoryLock.release()
# This getpubkey request is valid so far. Forward to peers.
self.broadcastinv(inventoryHash)
@ -1442,18 +1446,10 @@ class receiveDataThread(threading.Thread):
# 'set' of objects we are aware of and a set of objects in this inv
# message so that we can diff one from the other cheaply.
startTime = time.time()
currentInventoryList = set()
queryData = sqlQuery('''SELECT hash FROM inventory WHERE streamnumber=?''',
self.streamNumber)
for row in queryData:
currentInventoryList.add(row[0])
with shared.inventoryLock:
for objectHash, value in shared.inventory.items():
currentInventoryList.add(objectHash)
advertisedSet = set()
for i in range(numberOfItemsInInv):
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
objectsNewToMe = advertisedSet - currentInventoryList
objectsNewToMe = advertisedSet - shared.inventorySets[self.streamNumber]
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe:
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
@ -1552,12 +1548,12 @@ class receiveDataThread(threading.Thread):
print 'sock.sendall error:', err
# Send an inv message with just one hash to all of our peers
# Advertise this object to all of our peers
def broadcastinv(self, hash):
with shared.printLock:
print 'broadcasting inv with hash:', hash.encode('hex')
shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash))
shared.broadcastToSendDataQueues((self.streamNumber, 'advertiseobject', hash))
# We have received an addr message.
def recaddr(self, data):

@ -8,7 +8,8 @@ import random
import sys
import socket
#import bitmessagemain
from class_objectHashHolder import *
from addresses import *
# Every connection to a peer has a sendDataThread (and also a
# receiveDataThread).
@ -22,6 +23,9 @@ class sendDataThread(threading.Thread):
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
self.data = ''
self.objectHashHolderInstance = objectHashHolder(self.mailbox)
self.objectHashHolderInstance.start()
def setup(
self,
@ -118,17 +122,20 @@ class sendDataThread(threading.Thread):
shared.sendDataQueues.remove(self.mailbox)
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
break
elif command == 'advertiseobject':
self.objectHashHolderInstance.holdHash(data)
elif command == 'sendinv':
if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
payload = '\x01' + data
payload = ''
for hash in data:
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
payload += hash
if payload != '':
print 'within sendinv, payload contains', len(payload)/32, 'hashes.'
payload = encodeVarint(len(payload)/32) + payload
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
headerData += pack('>L', len(payload))
headerData += hashlib.sha512(payload).digest()[:4]
# To prevent some network analysis, 'leak' the data out
# to our peer after waiting a random amount of time
random.seed()
time.sleep(random.randrange(0, 10))
try:
self.sock.sendall(headerData + payload)
self.lastTimeISentData = int(time.time())
@ -142,6 +149,8 @@ class sendDataThread(threading.Thread):
shared.sendDataQueues.remove(self.mailbox)
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
break
else:
print '(within sendinv) payload was empty. Not sending anything' #testing.
elif command == 'pong':
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
if self.lastTimeISentData < (int(time.time()) - 298):
@ -167,4 +176,4 @@ class sendDataThread(threading.Thread):
with shared.printLock:
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
self.objectHashHolderInstance.close()

@ -151,12 +151,13 @@ class singleWorker(threading.Thread):
objectType = 'pubkey'
shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime)
shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock:
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
streamNumber, 'advertiseobject', inventoryHash))
shared.UISignalQueue.put(('updateStatusBar', ''))
shared.config.set(
myAddress, 'lastpubkeysendtime', str(int(time.time())))
@ -224,12 +225,13 @@ class singleWorker(threading.Thread):
objectType = 'pubkey'
shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime)
shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock:
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
streamNumber, 'advertiseobject', inventoryHash))
shared.UISignalQueue.put(('updateStatusBar', ''))
# If this is a chan address then we won't send out the pubkey over the
# network but rather will only store it in our pubkeys table so that
@ -327,10 +329,11 @@ class singleWorker(threading.Thread):
objectType = 'broadcast'
shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, int(time.time()))
shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock:
print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
streamNumber, 'advertiseobject', inventoryHash))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
@ -650,6 +653,7 @@ class singleWorker(threading.Thread):
objectType = 'msg'
shared.inventory[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, int(time.time()))
shared.inventorySets[toStreamNumber].add(inventoryHash)
if shared.safeConfigGetBoolean(toaddress, 'chan'):
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Sent on %1").arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
@ -659,7 +663,7 @@ class singleWorker(threading.Thread):
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
streamNumber, 'advertiseobject', inventoryHash))
# Update the status of the message in the 'sent' table to have a
# 'msgsent' status or 'msgsentnoackexpected' status.
@ -706,9 +710,10 @@ class singleWorker(threading.Thread):
objectType = 'getpubkey'
shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, int(time.time()))
shared.inventorySets[streamNumber].add(inventoryHash)
print 'sending inv (for the getpubkey message)'
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
streamNumber, 'advertiseobject', inventoryHash))
sqlExecute(
'''UPDATE sent SET status='awaitingpubkey' WHERE toaddress=? AND status='doingpubkeypow' ''',

@ -68,6 +68,7 @@ numberOfBroadcastsProcessed = 0
numberOfPubkeysProcessed = 0
numberOfInventoryLookupsPerformed = 0
daemon = False
inventorySets = {} # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.

Loading…
Cancel
Save