Merge pull request #35 from Atheros1/master
Don't send inv messages if the remote node is already aware of the object
This commit is contained in:
commit
65ffaae4ed
|
@ -5,7 +5,7 @@
|
||||||
|
|
||||||
#Right now, PyBitmessage only support connecting to stream 1. It doesn't yet contain logic to expand into further streams.
|
#Right now, PyBitmessage only support connecting to stream 1. It doesn't yet contain logic to expand into further streams.
|
||||||
|
|
||||||
softwareVersion = '0.2.0'
|
softwareVersion = '0.2.1'
|
||||||
verbose = 2
|
verbose = 2
|
||||||
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 #Equals two days and 12 hours.
|
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 #Equals two days and 12 hours.
|
||||||
lengthOfTimeToLeaveObjectsInInventory = 237600 #Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice.
|
lengthOfTimeToLeaveObjectsInInventory = 237600 #Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice.
|
||||||
|
@ -126,14 +126,15 @@ class outgoingSynSender(QThread):
|
||||||
sock.connect((HOST, PORT))
|
sock.connect((HOST, PORT))
|
||||||
rd = receiveDataThread()
|
rd = receiveDataThread()
|
||||||
self.emit(SIGNAL("passObjectThrough(PyQt_PyObject)"),rd)
|
self.emit(SIGNAL("passObjectThrough(PyQt_PyObject)"),rd)
|
||||||
rd.setup(sock,HOST,PORT,self.streamNumber,self.selfInitiatedConnectionList)
|
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
|
||||||
|
rd.setup(sock,HOST,PORT,self.streamNumber,self.selfInitiatedConnectionList,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
rd.start()
|
rd.start()
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print self, 'connected to', HOST, 'during outgoing attempt.'
|
print self, 'connected to', HOST, 'during outgoing attempt.'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
|
|
||||||
sd = sendDataThread()
|
sd = sendDataThread()
|
||||||
sd.setup(sock,HOST,PORT,self.streamNumber)
|
sd.setup(sock,HOST,PORT,self.streamNumber,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
sd.start()
|
sd.start()
|
||||||
sd.sendVersionMessage()
|
sd.sendVersionMessage()
|
||||||
|
|
||||||
|
@ -168,7 +169,7 @@ class outgoingSynSender(QThread):
|
||||||
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||||
except Exception, err:
|
except Exception, err:
|
||||||
print 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:', err
|
print 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:', err
|
||||||
time.sleep(1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
#Only one singleListener thread will ever exist. It creates the receiveDataThread and sendDataThread for each incoming connection. Note that it cannot set the stream number because it is not known yet- the other node will have to tell us its stream number in a version message. If we don't care about their stream, we will close the connection (within the recversion function of the recieveData thread)
|
#Only one singleListener thread will ever exist. It creates the receiveDataThread and sendDataThread for each incoming connection. Note that it cannot set the stream number because it is not known yet- the other node will have to tell us its stream number in a version message. If we don't care about their stream, we will close the connection (within the recversion function of the recieveData thread)
|
||||||
class singleListener(QThread):
|
class singleListener(QThread):
|
||||||
|
@ -181,7 +182,7 @@ class singleListener(QThread):
|
||||||
while config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
while config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
||||||
time.sleep(300)
|
time.sleep(300)
|
||||||
|
|
||||||
print 'bitmessage listener running'
|
print 'Listening for incoming connections.'
|
||||||
HOST = '' # Symbolic name meaning all available interfaces
|
HOST = '' # Symbolic name meaning all available interfaces
|
||||||
PORT = config.getint('bitmessagesettings', 'port')
|
PORT = config.getint('bitmessagesettings', 'port')
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
@ -204,14 +205,15 @@ class singleListener(QThread):
|
||||||
a,(HOST,PORT) = sock.accept()"""
|
a,(HOST,PORT) = sock.accept()"""
|
||||||
rd = receiveDataThread()
|
rd = receiveDataThread()
|
||||||
self.emit(SIGNAL("passObjectThrough(PyQt_PyObject)"),rd)
|
self.emit(SIGNAL("passObjectThrough(PyQt_PyObject)"),rd)
|
||||||
rd.setup(a,HOST,PORT,-1,self.incomingConnectionList)
|
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
|
||||||
|
rd.setup(a,HOST,PORT,-1,self.incomingConnectionList,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print self, 'connected to', HOST,'during INCOMING request.'
|
print self, 'connected to', HOST,'during INCOMING request.'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
rd.start()
|
rd.start()
|
||||||
|
|
||||||
sd = sendDataThread()
|
sd = sendDataThread()
|
||||||
sd.setup(a,HOST,PORT,-1)
|
sd.setup(a,HOST,PORT,-1,objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
sd.start()
|
sd.start()
|
||||||
|
|
||||||
|
|
||||||
|
@ -223,7 +225,7 @@ class receiveDataThread(QThread):
|
||||||
self.verackSent = False
|
self.verackSent = False
|
||||||
self.verackReceived = False
|
self.verackReceived = False
|
||||||
|
|
||||||
def setup(self,sock,HOST,port,streamNumber,selfInitiatedConnectionList):
|
def setup(self,sock,HOST,port,streamNumber,selfInitiatedConnectionList,objectsOfWhichThisRemoteNodeIsAlreadyAware):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.HOST = HOST
|
self.HOST = HOST
|
||||||
self.PORT = port
|
self.PORT = port
|
||||||
|
@ -241,7 +243,7 @@ class receiveDataThread(QThread):
|
||||||
else:
|
else:
|
||||||
self.initiatedConnection = True
|
self.initiatedConnection = True
|
||||||
self.ackDataThatWeHaveYetToSend = [] #When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer.
|
self.ackDataThatWeHaveYetToSend = [] #When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer.
|
||||||
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
||||||
|
@ -438,16 +440,25 @@ class receiveDataThread(QThread):
|
||||||
sqlSubmitQueue.put(t)
|
sqlSubmitQueue.put(t)
|
||||||
queryreturn = sqlReturnQueue.get()
|
queryreturn = sqlReturnQueue.get()
|
||||||
sqlLock.release()
|
sqlLock.release()
|
||||||
print 'sendBigInv pulled', len(queryreturn), 'items from SQL inventory.'
|
|
||||||
bigInvList = {}
|
bigInvList = {}
|
||||||
for row in queryreturn:
|
for row in queryreturn:
|
||||||
hash, = row
|
hash, = row
|
||||||
|
if hash not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
bigInvList[hash] = 0
|
bigInvList[hash] = 0
|
||||||
|
else:
|
||||||
|
printLock.acquire()
|
||||||
|
print 'Not including an object hash in a big inv message because the remote node is already aware of it.'#This line is here to check that this feature is working.
|
||||||
|
printLock.release()
|
||||||
#We also have messages in our inventory in memory (which is a python dictionary). Let's fetch those too.
|
#We also have messages in our inventory in memory (which is a python dictionary). Let's fetch those too.
|
||||||
for hash, storedValue in inventory.items():
|
for hash, storedValue in inventory.items():
|
||||||
|
if hash not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
objectType, streamNumber, payload, receivedTime = storedValue
|
objectType, streamNumber, payload, receivedTime = storedValue
|
||||||
if streamNumber == self.streamNumber and receivedTime > int(time.time())-maximumAgeOfObjectsThatIAdvertiseToOthers:
|
if streamNumber == self.streamNumber and receivedTime > int(time.time())-maximumAgeOfObjectsThatIAdvertiseToOthers:
|
||||||
bigInvList[hash] = 0
|
bigInvList[hash] = 0
|
||||||
|
else:
|
||||||
|
printLock.acquire()
|
||||||
|
print 'Not including an object hash in a big inv message because the remote node is already aware of it.'#This line is here to check that this feature is working.
|
||||||
|
printLock.release()
|
||||||
numberOfObjectsInInvMessage = 0
|
numberOfObjectsInInvMessage = 0
|
||||||
payload = ''
|
payload = ''
|
||||||
#Now let us start appending all of these hashes together. They will be sent out in a big inv message to our new peer.
|
#Now let us start appending all of these hashes together. They will be sent out in a big inv message to our new peer.
|
||||||
|
@ -468,7 +479,7 @@ class receiveDataThread(QThread):
|
||||||
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
headerData = headerData + pack('>L',len(payload))
|
headerData = headerData + pack('>L',len(payload))
|
||||||
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
||||||
print 'Sending huge inv message to just this one peer'
|
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
|
||||||
self.sock.send(headerData + payload)
|
self.sock.send(headerData + payload)
|
||||||
|
|
||||||
#We have received a broadcast message
|
#We have received a broadcast message
|
||||||
|
@ -489,6 +500,7 @@ class receiveDataThread(QThread):
|
||||||
return
|
return
|
||||||
inventoryLock.acquire()
|
inventoryLock.acquire()
|
||||||
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
|
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
|
||||||
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[inventoryHash] = 0
|
||||||
if inventoryHash in inventory:
|
if inventoryHash in inventory:
|
||||||
print 'We have already received this broadcast object. Ignoring.'
|
print 'We have already received this broadcast object. Ignoring.'
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
|
@ -691,6 +703,7 @@ class receiveDataThread(QThread):
|
||||||
return
|
return
|
||||||
readPosition += streamNumberAsClaimedByMsgLength
|
readPosition += streamNumberAsClaimedByMsgLength
|
||||||
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
|
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
|
||||||
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[inventoryHash] = 0
|
||||||
inventoryLock.acquire()
|
inventoryLock.acquire()
|
||||||
if inventoryHash in inventory:
|
if inventoryHash in inventory:
|
||||||
print 'We have already received this msg message. Ignoring.'
|
print 'We have already received this msg message. Ignoring.'
|
||||||
|
@ -896,7 +909,7 @@ class receiveDataThread(QThread):
|
||||||
sqlReturnQueue.get()
|
sqlReturnQueue.get()
|
||||||
sqlLock.release()
|
sqlLock.release()
|
||||||
self.emit(SIGNAL("displayNewMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),inventoryHash,toAddress,fromAddress,subject,body)
|
self.emit(SIGNAL("displayNewMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),inventoryHash,toAddress,fromAddress,subject,body)
|
||||||
#Now let's send the acknowledgement. We'll need to make sure that our client will properly process the ackData; if the packet is malformed, we could clear out self.data and an attacker could use that behavior to determine that we were capable of decoding this message.
|
#Now let's consider sending the acknowledgement. We'll need to make sure that our client will properly process the ackData; if the packet is malformed, we could clear out self.data and an attacker could use that behavior to determine that we were capable of decoding this message.
|
||||||
ackDataValidThusFar = True
|
ackDataValidThusFar = True
|
||||||
if len(ackData) < 24:
|
if len(ackData) < 24:
|
||||||
print 'The length of ackData is unreasonably short. Not sending ackData.'
|
print 'The length of ackData is unreasonably short. Not sending ackData.'
|
||||||
|
@ -911,9 +924,7 @@ class receiveDataThread(QThread):
|
||||||
ackDataValidThusFar = False
|
ackDataValidThusFar = False
|
||||||
if ackDataValidThusFar:
|
if ackDataValidThusFar:
|
||||||
print 'ackData is valid. Will process it.'
|
print 'ackData is valid. Will process it.'
|
||||||
#self.data = self.data[:self.payloadLength+24] + ackData + self.data[self.payloadLength+24:]
|
|
||||||
self.ackDataThatWeHaveYetToSend.append(ackData) #When we have processed all data, the processData function will pop the ackData out and process it as if it is a message received from our peer.
|
self.ackDataThatWeHaveYetToSend.append(ackData) #When we have processed all data, the processData function will pop the ackData out and process it as if it is a message received from our peer.
|
||||||
#print 'self.data after:', repr(self.data)
|
|
||||||
|
|
||||||
|
|
||||||
#This section is for my RSA keys (version 1 addresses). If we don't have any version 1 addresses, then it won't matter.
|
#This section is for my RSA keys (version 1 addresses). If we don't have any version 1 addresses, then it won't matter.
|
||||||
|
@ -1124,6 +1135,7 @@ class receiveDataThread(QThread):
|
||||||
return
|
return
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
|
inventoryHash = calculateInventoryHash(self.data[24:self.payloadLength+24])
|
||||||
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[inventoryHash] = 0
|
||||||
inventoryLock.acquire()
|
inventoryLock.acquire()
|
||||||
if inventoryHash in inventory:
|
if inventoryHash in inventory:
|
||||||
print 'We have already received this pubkey. Ignoring it.'
|
print 'We have already received this pubkey. Ignoring it.'
|
||||||
|
@ -1136,7 +1148,7 @@ class receiveDataThread(QThread):
|
||||||
|
|
||||||
readPosition = 24 #for the message header
|
readPosition = 24 #for the message header
|
||||||
readPosition += 8 #for the nonce
|
readPosition += 8 #for the nonce
|
||||||
embeddedTime = self.data[readPosition:readPosition+4]#We currently are not checking the embeddedTime for any sort of validity in pubkey messages.
|
embeddedTime, = unpack('>I',self.data[readPosition:readPosition+4])#We currently are not checking the embeddedTime for any sort of validity in pubkey messages.
|
||||||
readPosition += 4 #for the time
|
readPosition += 4 #for the time
|
||||||
addressVersion, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
|
addressVersion, varintLength = decodeVarint(self.data[readPosition:readPosition+10])
|
||||||
readPosition += varintLength
|
readPosition += varintLength
|
||||||
|
@ -1174,15 +1186,13 @@ class receiveDataThread(QThread):
|
||||||
print 'publicEncryptionKey length less than 64. Sanity check failed.'
|
print 'publicEncryptionKey length less than 64. Sanity check failed.'
|
||||||
return
|
return
|
||||||
sha = hashlib.new('sha512')
|
sha = hashlib.new('sha512')
|
||||||
print 'recpubkey hashing this data to make the ripe:', repr('\x04'+publicSigningKey+'\x04'+publicEncryptionKey)
|
|
||||||
sha.update('\x04'+publicSigningKey+'\x04'+publicEncryptionKey)
|
sha.update('\x04'+publicSigningKey+'\x04'+publicEncryptionKey)
|
||||||
ripeHasher = hashlib.new('ripemd160')
|
ripeHasher = hashlib.new('ripemd160')
|
||||||
ripeHasher.update(sha.digest())
|
ripeHasher.update(sha.digest())
|
||||||
ripe = ripeHasher.digest()
|
ripe = ripeHasher.digest()
|
||||||
|
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'within recpubkey, addressVersion', addressVersion
|
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
|
||||||
print 'streamNumber', streamNumber
|
|
||||||
print 'ripe', ripe.encode('hex')
|
print 'ripe', ripe.encode('hex')
|
||||||
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
|
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
|
||||||
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
|
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
|
||||||
|
@ -1253,7 +1263,8 @@ class receiveDataThread(QThread):
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
return
|
return
|
||||||
|
|
||||||
objectType = 'pubkeyrequest'
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[inventoryHash] = 0
|
||||||
|
objectType = 'getpubkey'
|
||||||
inventory[inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime)
|
inventory[inventoryHash] = (objectType, self.streamNumber, self.data[24:self.payloadLength+24], embeddedTime)
|
||||||
inventoryLock.release()
|
inventoryLock.release()
|
||||||
|
|
||||||
|
@ -1266,7 +1277,7 @@ class receiveDataThread(QThread):
|
||||||
return
|
return
|
||||||
|
|
||||||
#This getpubkey request is valid so far. Forward to peers.
|
#This getpubkey request is valid so far. Forward to peers.
|
||||||
broadcastToSendDataQueues((self.streamNumber,'send',self.data[:self.payloadLength+24]))
|
self.broadcastinv(inventoryHash)
|
||||||
if addressVersionNumber == 0:
|
if addressVersionNumber == 0:
|
||||||
print 'The addressVersionNumber of the pubkey request is zero. That doesn\'t make any sense. Ignoring it.'
|
print 'The addressVersionNumber of the pubkey request is zero. That doesn\'t make any sense. Ignoring it.'
|
||||||
return
|
return
|
||||||
|
@ -1348,6 +1359,7 @@ class receiveDataThread(QThread):
|
||||||
numberOfItemsInInv, lengthOfVarint = decodeVarint(self.data[24:34])
|
numberOfItemsInInv, lengthOfVarint = decodeVarint(self.data[24:34])
|
||||||
if numberOfItemsInInv == 1: #we'll just request this data from the person who advertised the object.
|
if numberOfItemsInInv == 1: #we'll just request this data from the person who advertised the object.
|
||||||
for i in range(numberOfItemsInInv):
|
for i in range(numberOfItemsInInv):
|
||||||
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0
|
||||||
if self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)] in inventory:
|
if self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)] in inventory:
|
||||||
print 'Inventory (in memory) has inventory item already.'
|
print 'Inventory (in memory) has inventory item already.'
|
||||||
elif isInSqlInventory(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]):
|
elif isInSqlInventory(self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]):
|
||||||
|
@ -1358,6 +1370,7 @@ class receiveDataThread(QThread):
|
||||||
print 'inv message lists', numberOfItemsInInv, 'objects.'
|
print 'inv message lists', numberOfItemsInInv, 'objects.'
|
||||||
for i in range(numberOfItemsInInv): #upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
for i in range(numberOfItemsInInv): #upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
||||||
#print 'Adding object to self.objectsThatWeHaveYetToGet.'
|
#print 'Adding object to self.objectsThatWeHaveYetToGet.'
|
||||||
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0
|
||||||
self.objectsThatWeHaveYetToGet[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0
|
self.objectsThatWeHaveYetToGet[self.data[24+lengthOfVarint+(32*i):56+lengthOfVarint+(32*i)]] = 0
|
||||||
print 'length of objectsThatWeHaveYetToGet', len(self.objectsThatWeHaveYetToGet)
|
print 'length of objectsThatWeHaveYetToGet', len(self.objectsThatWeHaveYetToGet)
|
||||||
|
|
||||||
|
@ -1411,8 +1424,8 @@ class receiveDataThread(QThread):
|
||||||
headerData = headerData + pack('>L',len(payload)) #payload length. Note that we add an extra 8 for the nonce.
|
headerData = headerData + pack('>L',len(payload)) #payload length. Note that we add an extra 8 for the nonce.
|
||||||
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
||||||
self.sock.send(headerData + payload)
|
self.sock.send(headerData + payload)
|
||||||
elif objectType == 'pubkeyrequest':
|
elif objectType == 'getpubkey':
|
||||||
print 'sending pubkeyrequest'
|
print 'sending getpubkey'
|
||||||
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
||||||
headerData = headerData + 'getpubkey\x00\x00\x00'
|
headerData = headerData + 'getpubkey\x00\x00\x00'
|
||||||
headerData = headerData + pack('>L',len(payload)) #payload length. Note that we add an extra 8 for the nonce.
|
headerData = headerData + pack('>L',len(payload)) #payload length. Note that we add an extra 8 for the nonce.
|
||||||
|
@ -1444,15 +1457,10 @@ class receiveDataThread(QThread):
|
||||||
|
|
||||||
#Send an inv message with just one hash to all of our peers
|
#Send an inv message with just one hash to all of our peers
|
||||||
def broadcastinv(self,hash):
|
def broadcastinv(self,hash):
|
||||||
payload = '\x01' + hash
|
|
||||||
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
||||||
headerData = headerData + pack('>L',len(payload))
|
|
||||||
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'broadcasting inv with hash:', hash.encode('hex')
|
print 'broadcasting inv with hash:', hash.encode('hex')
|
||||||
printLock.release()
|
printLock.release()
|
||||||
broadcastToSendDataQueues((self.streamNumber, 'send', headerData + payload))
|
broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash))
|
||||||
|
|
||||||
|
|
||||||
#We have received an addr message.
|
#We have received an addr message.
|
||||||
|
@ -1557,7 +1565,7 @@ class receiveDataThread(QThread):
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
||||||
printLock.release()
|
printLock.release()
|
||||||
broadcastToSendDataQueues((self.streamNumber, 'send', datatosend))
|
broadcastToSendDataQueues((self.streamNumber, 'sendaddr', datatosend))
|
||||||
|
|
||||||
#Send a big addr message to our peer
|
#Send a big addr message to our peer
|
||||||
def sendaddr(self):
|
def sendaddr(self):
|
||||||
|
@ -1741,12 +1749,13 @@ class sendDataThread(QThread):
|
||||||
sendDataQueues.append(self.mailbox)
|
sendDataQueues.append(self.mailbox)
|
||||||
self.data = ''
|
self.data = ''
|
||||||
|
|
||||||
def setup(self,sock,HOST,PORT,streamNumber):
|
def setup(self,sock,HOST,PORT,streamNumber,objectsOfWhichThisRemoteNodeIsAlreadyAware):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.HOST = HOST
|
self.HOST = HOST
|
||||||
self.PORT = PORT
|
self.PORT = PORT
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
self.lastTimeISentData = int(time.time()) #If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
self.lastTimeISentData = int(time.time()) #If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
||||||
|
self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'The streamNumber of this sendDataThread (ID:', id(self),') at setup() is', self.streamNumber
|
print 'The streamNumber of this sendDataThread (ID:', id(self),') at setup() is', self.streamNumber
|
||||||
printLock.release()
|
printLock.release()
|
||||||
|
@ -1815,10 +1824,9 @@ class sendDataThread(QThread):
|
||||||
print 'setting the stream number in the sendData thread (ID:',id(self), ') to', specifiedStreamNumber
|
print 'setting the stream number in the sendData thread (ID:',id(self), ') to', specifiedStreamNumber
|
||||||
printLock.release()
|
printLock.release()
|
||||||
self.streamNumber = specifiedStreamNumber
|
self.streamNumber = specifiedStreamNumber
|
||||||
elif command == 'send':
|
elif command == 'sendaddr':
|
||||||
try:
|
try:
|
||||||
#To prevent some network analysis, 'leak' the data out to our peer after waiting a random amount of time unless we have a long list of messages in our queue to send.
|
#To prevent some network analysis, 'leak' the data out to our peer after waiting a random amount of time unless we have a long list of messages in our queue to send.
|
||||||
if self.mailbox.qsize() < 20:
|
|
||||||
random.seed()
|
random.seed()
|
||||||
time.sleep(random.randrange(0, 10))
|
time.sleep(random.randrange(0, 10))
|
||||||
self.sock.sendall(data)
|
self.sock.sendall(data)
|
||||||
|
@ -1829,6 +1837,25 @@ class sendDataThread(QThread):
|
||||||
sendDataQueues.remove(self.mailbox)
|
sendDataQueues.remove(self.mailbox)
|
||||||
print 'sendDataThread thread', self, 'ending now'
|
print 'sendDataThread thread', self, 'ending now'
|
||||||
break
|
break
|
||||||
|
elif command == 'sendinv':
|
||||||
|
if data not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
|
payload = '\x01' + data
|
||||||
|
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
||||||
|
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
headerData = headerData + pack('>L',len(payload))
|
||||||
|
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
||||||
|
#To prevent some network analysis, 'leak' the data out to our peer after waiting a random amount of time
|
||||||
|
random.seed()
|
||||||
|
time.sleep(random.randrange(0, 10))
|
||||||
|
try:
|
||||||
|
self.sock.sendall(headerData + payload)
|
||||||
|
self.lastTimeISentData = int(time.time())
|
||||||
|
except:
|
||||||
|
print 'self.sock.sendall failed'
|
||||||
|
self.sock.close()
|
||||||
|
sendDataQueues.remove(self.mailbox)
|
||||||
|
print 'sendDataThread thread', self, 'ending now'
|
||||||
|
break
|
||||||
elif command == 'pong':
|
elif command == 'pong':
|
||||||
if self.lastTimeISentData < (int(time.time()) - 298):
|
if self.lastTimeISentData < (int(time.time()) - 298):
|
||||||
#Send out a pong message to keep the connection alive.
|
#Send out a pong message to keep the connection alive.
|
||||||
|
@ -2223,15 +2250,10 @@ class singleWorker(QThread):
|
||||||
objectType = 'pubkey'
|
objectType = 'pubkey'
|
||||||
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
||||||
|
|
||||||
payload = '\x01' + inventoryHash
|
|
||||||
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
||||||
headerData = headerData + pack('>L',len(payload))
|
|
||||||
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
|
||||||
printLock.acquire()
|
printLock.acquire()
|
||||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||||
printLock.release()
|
printLock.release()
|
||||||
broadcastToSendDataQueues((streamNumber, 'send', headerData + payload))
|
broadcastToSendDataQueues((streamNumber, 'sendinv', inventoryHash))
|
||||||
|
|
||||||
def sendBroadcast(self):
|
def sendBroadcast(self):
|
||||||
sqlLock.acquire()
|
sqlLock.acquire()
|
||||||
|
@ -2288,12 +2310,7 @@ class singleWorker(QThread):
|
||||||
objectType = 'broadcast'
|
objectType = 'broadcast'
|
||||||
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
||||||
print 'sending inv (within sendBroadcast function)'
|
print 'sending inv (within sendBroadcast function)'
|
||||||
payload = '\x01' + inventoryHash
|
broadcastToSendDataQueues((streamNumber, 'sendinv', inventoryHash))
|
||||||
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
||||||
headerData = headerData + pack('>L',len(payload)) #payload length. Note that we add an extra 8 for the nonce.
|
|
||||||
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'send', headerData + payload))
|
|
||||||
|
|
||||||
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Broadcast sent at '+strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Broadcast sent at '+strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
||||||
|
|
||||||
|
@ -2355,12 +2372,7 @@ class singleWorker(QThread):
|
||||||
objectType = 'broadcast'
|
objectType = 'broadcast'
|
||||||
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
||||||
print 'sending inv (within sendBroadcast function)'
|
print 'sending inv (within sendBroadcast function)'
|
||||||
payload = '\x01' + inventoryHash
|
broadcastToSendDataQueues((streamNumber, 'sendinv', inventoryHash))
|
||||||
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
||||||
headerData = headerData + pack('>L',len(payload)) #payload length. Note that we add an extra 8 for the nonce.
|
|
||||||
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'send', headerData + payload))
|
|
||||||
|
|
||||||
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Broadcast sent at '+strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Broadcast sent at '+strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
||||||
|
|
||||||
|
@ -2547,12 +2559,7 @@ class singleWorker(QThread):
|
||||||
inventory[inventoryHash] = (objectType, toStreamNumber, payload, int(time.time()))
|
inventory[inventoryHash] = (objectType, toStreamNumber, payload, int(time.time()))
|
||||||
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Message sent. Waiting on acknowledgement. Sent on ' + strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Message sent. Waiting on acknowledgement. Sent on ' + strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
||||||
print 'sending inv (within sendmsg function)'
|
print 'sending inv (within sendmsg function)'
|
||||||
payload = '\x01' + inventoryHash
|
broadcastToSendDataQueues((streamNumber, 'sendinv', inventoryHash))
|
||||||
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
||||||
headerData += pack('>L',len(payload)) #payload length. Note that we add an extra 8 for the nonce.
|
|
||||||
headerData += hashlib.sha512(payload).digest()[:4]
|
|
||||||
broadcastToSendDataQueues((toStreamNumber, 'send', headerData + payload))
|
|
||||||
|
|
||||||
#Update the status of the message in the 'sent' table to have a 'sent' status
|
#Update the status of the message in the 'sent' table to have a 'sent' status
|
||||||
sqlLock.acquire()
|
sqlLock.acquire()
|
||||||
|
@ -2596,16 +2603,10 @@ class singleWorker(QThread):
|
||||||
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time()))
|
||||||
print 'sending inv (for the getpubkey message)'
|
print 'sending inv (for the getpubkey message)'
|
||||||
#payload = '\x01' + pack('>H',objectType) + hash
|
#payload = '\x01' + pack('>H',objectType) + hash
|
||||||
payload = '\x01' + inventoryHash
|
broadcastToSendDataQueues((streamNumber, 'sendinv', inventoryHash))
|
||||||
headerData = '\xe9\xbe\xb4\xd9' #magic bits, slighly different from Bitcoin's magic bits.
|
|
||||||
headerData = headerData + 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
|
||||||
headerData = headerData + pack('>L',len(payload))
|
|
||||||
headerData = headerData + hashlib.sha512(payload).digest()[:4]
|
|
||||||
broadcastToSendDataQueues((streamNumber, 'send', headerData + payload))
|
|
||||||
|
|
||||||
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),'Broacasting the public key request. The recipient''s software must be on. This program will auto-retry if they are offline.')
|
self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),'Broacasting the public key request. The recipient''s software must be on. This program will auto-retry if they are offline.')
|
||||||
self.emit(SIGNAL("updateSentItemStatusByHash(PyQt_PyObject,PyQt_PyObject)"),ripe,'Sending public key request. Waiting for reply. Requested at ' + strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
self.emit(SIGNAL("updateSentItemStatusByHash(PyQt_PyObject,PyQt_PyObject)"),ripe,'Sending public key request. Waiting for reply. Requested at ' + strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))))
|
||||||
broadcastToSendDataQueues((streamNumber, 'send', headerData + payload))
|
|
||||||
|
|
||||||
def generateFullAckMessage(self,ackdata,toStreamNumber):
|
def generateFullAckMessage(self,ackdata,toStreamNumber):
|
||||||
nonce = 0
|
nonce = 0
|
||||||
|
|
Loading…
Reference in New Issue
Block a user