preparation for switch from 32 bit time to 64 bit time in protocol

This commit is contained in:
Jonathan Warren 2013-04-17 14:24:16 -04:00
parent 3976c5a8d2
commit 11407f3f44

View File

@ -511,7 +511,16 @@ class receiveDataThread(QThread):
if not self.isProofOfWorkSufficient(data): if not self.isProofOfWorkSufficient(data):
print 'Proof of work in broadcast message insufficient.' print 'Proof of work in broadcast message insufficient.'
return return
embeddedTime, = unpack('>I',data[8:12]) readPosition = 8 #bypass the nonce
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
readPosition += 8
else:
readPosition += 4
if embeddedTime > (int(time.time())+10800): #prevent funny business if embeddedTime > (int(time.time())+10800): #prevent funny business
print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.' print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.'
return return
@ -539,7 +548,7 @@ class receiveDataThread(QThread):
self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()")) self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()"))
self.processbroadcast(data)#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it. self.processbroadcast(readPosition,data)#When this function returns, we will have either successfully processed this broadcast because we are interested in it, ignored it because we aren't interested in it, or found problem with the broadcast that warranted ignoring it.
# Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are mostly the same values used for msg messages although broadcast messages are processed faster. # Let us now set lengthOfTimeWeShouldUseToProcessThisMessage. If we haven't used the specified amount of time, we shall sleep. These values are mostly the same values used for msg messages although broadcast messages are processed faster.
if len(data) > 100000000: #Size is greater than 100 megabytes if len(data) > 100000000: #Size is greater than 100 megabytes
@ -563,8 +572,7 @@ class receiveDataThread(QThread):
printLock.release() printLock.release()
#A broadcast message has a valid time and POW and requires processing. The recbroadcast function calls this one. #A broadcast message has a valid time and POW and requires processing. The recbroadcast function calls this one.
def processbroadcast(self,data): def processbroadcast(self,readPosition,data):
readPosition = 12
broadcastVersion, broadcastVersionLength = decodeVarint(data[readPosition:readPosition+9]) broadcastVersion, broadcastVersionLength = decodeVarint(data[readPosition:readPosition+9])
if broadcastVersion <> 1: if broadcastVersion <> 1:
#Cannot decode incoming broadcast versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored. #Cannot decode incoming broadcast versions higher than 1. Assuming the sender isn\' being silly, you should upgrade Bitmessage because this message shall be ignored.
@ -693,13 +701,20 @@ class receiveDataThread(QThread):
readPosition = 8 readPosition = 8
embeddedTime, = unpack('>I',data[readPosition:readPosition+4]) embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
readPosition += 8
else:
readPosition += 4
if embeddedTime > int(time.time())+10800: if embeddedTime > int(time.time())+10800:
print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime
return return
if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept: if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept:
print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime
return return
readPosition += 4
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(data[readPosition:readPosition+9]) streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(data[readPosition:readPosition+9])
if streamNumberAsClaimedByMsg != self.streamNumber: if streamNumberAsClaimedByMsg != self.streamNumber:
print 'The stream number encoded in this msg (' + str(streamNumberAsClaimedByMsg) + ') message does not match the stream number on which it was received. Ignoring it.' print 'The stream number encoded in this msg (' + str(streamNumberAsClaimedByMsg) + ') message does not match the stream number on which it was received. Ignoring it.'
@ -1024,6 +1039,14 @@ class receiveDataThread(QThread):
readPosition = 8 #for the nonce readPosition = 8 #for the nonce
embeddedTime, = unpack('>I',data[readPosition:readPosition+4]) embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
readPosition += 8
else:
readPosition += 4
if embeddedTime < int(time.time())-lengthOfTimeToHoldOnToAllPubkeys: if embeddedTime < int(time.time())-lengthOfTimeToHoldOnToAllPubkeys:
printLock.acquire() printLock.acquire()
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
@ -1034,7 +1057,6 @@ class receiveDataThread(QThread):
print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.' print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.'
printLock.release() printLock.release()
return return
readPosition += 4 #for the time
addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10]) addressVersion, varintLength = decodeVarint(data[readPosition:readPosition+10])
readPosition += varintLength readPosition += varintLength
streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10]) streamNumber, varintLength = decodeVarint(data[readPosition:readPosition+10])
@ -1154,19 +1176,29 @@ class receiveDataThread(QThread):
if len(data) < 34: if len(data) < 34:
print 'getpubkey message doesn\'t contain enough data. Ignoring.' print 'getpubkey message doesn\'t contain enough data. Ignoring.'
return return
embeddedTime, = unpack('>I',data[8:12]) readPosition = 8 #bypass the nonce
embeddedTime, = unpack('>I',data[readPosition:readPosition+4])
#This section is used for the transition from 32 bit time to 64 bit time in the protocol.
if embeddedTime == 0:
embeddedTime, = unpack('>Q',data[readPosition:readPosition+8])
readPosition += 8
else:
readPosition += 4
if embeddedTime > int(time.time())+10800: if embeddedTime > int(time.time())+10800:
print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime
return return
if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept: if embeddedTime < int(time.time())-maximumAgeOfAnObjectThatIAmWillingToAccept:
print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime
return return
addressVersionNumber, addressVersionLength = decodeVarint(data[readPosition:readPosition+10])
addressVersionNumber, addressVersionLength = decodeVarint(data[12:22]) readPosition += addressVersionLength
streamNumber, streamNumberLength = decodeVarint(data[12+addressVersionLength:22+addressVersionLength]) streamNumber, streamNumberLength = decodeVarint(data[readPosition:readPosition+10])
if streamNumber <> self.streamNumber: if streamNumber <> self.streamNumber:
print 'The streamNumber', streamNumber, 'doesn\'t match our stream number:', self.streamNumber print 'The streamNumber', streamNumber, 'doesn\'t match our stream number:', self.streamNumber
return return
readPosition += streamNumberLength
inventoryHash = calculateInventoryHash(data) inventoryHash = calculateInventoryHash(data)
inventoryLock.acquire() inventoryLock.acquire()
@ -1195,7 +1227,7 @@ class receiveDataThread(QThread):
print 'The addressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.' print 'The addressVersionNumber of the pubkey request is too high. Can\'t understand. Ignoring it.'
return return
requestedHash = data[12+addressVersionLength+streamNumberLength:32+addressVersionLength+streamNumberLength] requestedHash = data[readPosition:readPosition+20]
if len(requestedHash) != 20: if len(requestedHash) != 20:
print 'The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.' print 'The length of the requested hash is not 20 bytes. Something is wrong. Ignoring.'
return return
@ -1349,98 +1381,192 @@ class receiveDataThread(QThread):
printLock.acquire() printLock.acquire()
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
printLock.release() printLock.release()
#print 'lengthOfNumberOfAddresses', lengthOfNumberOfAddresses
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: if self.remoteProtocolVersion == 1:
return print 'self.remoteProtocolVersion == 1'
if len(data) < lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded): if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
print 'addr message does not contain enough data. Ignoring.' return
return if len(data) != lengthOfNumberOfAddresses + (34 * numberOfAddressesIncluded):
print 'addr message does not contain the correct amount of data. Ignoring.'
return
needToWriteKnownNodesToDisk = False needToWriteKnownNodesToDisk = False
for i in range(0,numberOfAddressesIncluded): for i in range(0,numberOfAddressesIncluded):
try: try:
if data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': if data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
printLock.acquire()
print 'Skipping IPv6 address.', repr(data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)])
printLock.release()
continue
except Exception, err:
printLock.acquire() printLock.acquire()
print 'Skipping IPv6 address.', repr(data[16+lengthOfNumberOfAddresses+(34*i):28+lengthOfNumberOfAddresses+(34*i)]) sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
printLock.release() printLock.release()
break #giving up on unpacking any more. We should still be connected however.
try:
recaddrStream, = unpack('>I',data[4+lengthOfNumberOfAddresses+(34*i):8+lengthOfNumberOfAddresses+(34*i)])
except Exception, err:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
if recaddrStream == 0:
continue continue
except Exception, err: if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): #if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
printLock.acquire() continue
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) try:
printLock.release() recaddrServices, = unpack('>Q',data[8+lengthOfNumberOfAddresses+(34*i):16+lengthOfNumberOfAddresses+(34*i)])
break #giving up on unpacking any more. We should still be connected however. except Exception, err:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
try: try:
recaddrStream, = unpack('>I',data[4+lengthOfNumberOfAddresses+(34*i):8+lengthOfNumberOfAddresses+(34*i)]) recaddrPort, = unpack('>H',data[32+lengthOfNumberOfAddresses+(34*i):34+lengthOfNumberOfAddresses+(34*i)])
except Exception, err: except Exception, err:
printLock.acquire() printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
printLock.release() printLock.release()
break #giving up on unpacking any more. We should still be connected however. break #giving up on unpacking any more. We should still be connected however.
if recaddrStream == 0: #print 'Within recaddr(): IP', recaddrIP, ', Port', recaddrPort, ', i', i
continue hostFromAddrMessage = socket.inet_ntoa(data[28+lengthOfNumberOfAddresses+(34*i):32+lengthOfNumberOfAddresses+(34*i)])
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): #if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business. #print 'hostFromAddrMessage', hostFromAddrMessage
continue if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x7F':
try: print 'Ignoring IP address in loopback range:', hostFromAddrMessage
recaddrServices, = unpack('>Q',data[8+lengthOfNumberOfAddresses+(34*i):16+lengthOfNumberOfAddresses+(34*i)]) continue
except Exception, err: if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x0A':
printLock.acquire() print 'Ignoring IP address in private range:', hostFromAddrMessage
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) continue
printLock.release() if data[28+lengthOfNumberOfAddresses+(34*i):30+lengthOfNumberOfAddresses+(34*i)] == '\xC0A8':
break #giving up on unpacking any more. We should still be connected however. print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
try: timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message.
recaddrPort, = unpack('>H',data[32+lengthOfNumberOfAddresses+(34*i):34+lengthOfNumberOfAddresses+(34*i)]) if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
except Exception, err: knownNodesLock.acquire()
printLock.acquire() knownNodes[recaddrStream] = {}
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) knownNodesLock.release()
printLock.release() if hostFromAddrMessage not in knownNodes[recaddrStream]:
break #giving up on unpacking any more. We should still be connected however. if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
#print 'Within recaddr(): IP', recaddrIP, ', Port', recaddrPort, ', i', i knownNodesLock.acquire()
hostFromAddrMessage = socket.inet_ntoa(data[28+lengthOfNumberOfAddresses+(34*i):32+lengthOfNumberOfAddresses+(34*i)]) knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
#print 'hostFromAddrMessage', hostFromAddrMessage knownNodesLock.release()
if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x7F': print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream
print 'Ignoring IP address in loopback range:', hostFromAddrMessage needToWriteKnownNodesToDisk = True
continue hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
if data[28+lengthOfNumberOfAddresses+(34*i)] == '\x0A': listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
print 'Ignoring IP address in private range:', hostFromAddrMessage else:
continue PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
if data[28+lengthOfNumberOfAddresses+(34*i):30+lengthOfNumberOfAddresses+(34*i)] == '\xC0A8': if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
print 'Ignoring IP address in private range:', hostFromAddrMessage knownNodesLock.acquire()
continue knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode)
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>I',data[lengthOfNumberOfAddresses+(34*i):4+lengthOfNumberOfAddresses+(34*i)]) #This is the 'time' value in the received addr message. knownNodesLock.release()
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. if PORT != recaddrPort:
print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.'
if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers.
output = open(appdata + 'knownnodes.dat', 'wb')
knownNodesLock.acquire() knownNodesLock.acquire()
knownNodes[recaddrStream] = {} pickle.dump(knownNodes, output)
knownNodesLock.release() knownNodesLock.release()
if hostFromAddrMessage not in knownNodes[recaddrStream]: output.close()
if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) #no longer broadcast
printLock.acquire()
print 'knownNodes currently has', len(knownNodes[self.streamNumber]), 'nodes for this stream.'
printLock.release()
elif self.remoteProtocolVersion == 2:
print 'self.remoteProtocolVersion == 2'
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
return
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
print 'addr message does not contain the correct amount of data. Ignoring.'
return
needToWriteKnownNodesToDisk = False
for i in range(0,numberOfAddressesIncluded):
try:
if data[20+lengthOfNumberOfAddresses+(38*i):32+lengthOfNumberOfAddresses+(38*i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
printLock.acquire()
print 'Skipping IPv6 address.', repr(data[20+lengthOfNumberOfAddresses+(38*i):32+lengthOfNumberOfAddresses+(38*i)])
printLock.release()
continue
except Exception, err:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
try:
recaddrStream, = unpack('>I',data[8+lengthOfNumberOfAddresses+(38*i):12+lengthOfNumberOfAddresses+(38*i)])
except Exception, err:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
if recaddrStream == 0:
continue
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): #if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
continue
try:
recaddrServices, = unpack('>Q',data[12+lengthOfNumberOfAddresses+(38*i):20+lengthOfNumberOfAddresses+(38*i)])
except Exception, err:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
try:
recaddrPort, = unpack('>H',data[36+lengthOfNumberOfAddresses+(38*i):42+lengthOfNumberOfAddresses+(38*i)])
except Exception, err:
printLock.acquire()
sys.stderr.write('ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
printLock.release()
break #giving up on unpacking any more. We should still be connected however.
#print 'Within recaddr(): IP', recaddrIP, ', Port', recaddrPort, ', i', i
hostFromAddrMessage = socket.inet_ntoa(data[32+lengthOfNumberOfAddresses+(38*i):36+lengthOfNumberOfAddresses+(38*i)])
#print 'hostFromAddrMessage', hostFromAddrMessage
if data[32+lengthOfNumberOfAddresses+(38*i)] == '\x7F':
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
continue
if data[32+lengthOfNumberOfAddresses+(38*i)] == '\x0A':
print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
if data[32+lengthOfNumberOfAddresses+(38*i):34+lengthOfNumberOfAddresses+(38*i)] == '\xC0A8':
print 'Ignoring IP address in private range:', hostFromAddrMessage
continue
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q',data[lengthOfNumberOfAddresses+(38*i):8+lengthOfNumberOfAddresses+(38*i)]) #This is the 'time' value in the received addr message. 64-bit.
if recaddrStream not in knownNodes: #knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
knownNodesLock.acquire() knownNodesLock.acquire()
knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode) knownNodes[recaddrStream] = {}
knownNodesLock.release() knownNodesLock.release()
print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream if hostFromAddrMessage not in knownNodes[recaddrStream]:
needToWriteKnownNodesToDisk = True if len(knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time())-10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): #If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) knownNodesLock.acquire()
listOfAddressDetailsToBroadcastToPeers.append(hostDetails) knownNodes[recaddrStream][hostFromAddrMessage] = (recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
else: knownNodesLock.release()
PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): needToWriteKnownNodesToDisk = True
knownNodesLock.acquire() hostDetails = (timeSomeoneElseReceivedMessageFromThisNode, recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode) listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
knownNodesLock.release() else:
if PORT != recaddrPort: PORT, timeLastReceivedMessageFromThisNode = knownNodes[recaddrStream][hostFromAddrMessage]#PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.' if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers. knownNodesLock.acquire()
output = open(appdata + 'knownnodes.dat', 'wb') knownNodes[recaddrStream][hostFromAddrMessage] = (PORT, timeSomeoneElseReceivedMessageFromThisNode)
knownNodesLock.acquire() knownNodesLock.release()
pickle.dump(knownNodes, output) if PORT != recaddrPort:
knownNodesLock.release() print 'Strange occurance: The port specified in an addr message', str(recaddrPort),'does not match the port',str(PORT),'that this program (or some other peer) used to connect to it',str(hostFromAddrMessage),'. Perhaps they changed their port or are using a strange NAT configuration.'
output.close() if needToWriteKnownNodesToDisk: #Runs if any nodes were new to us. Also, share those nodes with our peers.
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) output = open(appdata + 'knownnodes.dat', 'wb')
printLock.acquire() knownNodesLock.acquire()
print 'knownNodes currently has', len(knownNodes[self.streamNumber]), 'nodes for this stream.' pickle.dump(knownNodes, output)
printLock.release() knownNodesLock.release()
output.close()
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
printLock.acquire()
print 'knownNodes currently has', len(knownNodes[self.streamNumber]), 'nodes for this stream.'
printLock.release()
#Function runs when we want to broadcast an addr message to all of our peers. Runs when we learn of nodes that we didn't previously know about and want to share them with our peers. #Function runs when we want to broadcast an addr message to all of our peers. Runs when we learn of nodes that we didn't previously know about and want to share them with our peers.
def broadcastaddr(self,listOfAddressDetailsToBroadcastToPeers): def broadcastaddr(self,listOfAddressDetailsToBroadcastToPeers):
@ -1448,7 +1574,7 @@ class receiveDataThread(QThread):
payload = '' payload = ''
for hostDetails in listOfAddressDetailsToBroadcastToPeers: for hostDetails in listOfAddressDetailsToBroadcastToPeers:
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
payload += pack('>I',timeLastReceivedMessageFromThisNode) payload += pack('>Q',timeLastReceivedMessageFromThisNode) #now uses 64-bit time
payload += pack('>I',streamNumber) payload += pack('>I',streamNumber)
payload += pack('>q',services) #service bit flags offered by this node payload += pack('>q',services) #service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(host) payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(host)
@ -1503,7 +1629,10 @@ class receiveDataThread(QThread):
PORT, timeLastReceivedMessageFromThisNode = value PORT, timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old.. if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack('>I',timeLastReceivedMessageFromThisNode) if self.remoteProtocolVersion == 1:
payload += pack('>I',timeLastReceivedMessageFromThisNode) #32-bit time
else:
payload += pack('>Q',timeLastReceivedMessageFromThisNode) #64-bit time
payload += pack('>I',self.streamNumber) payload += pack('>I',self.streamNumber)
payload += pack('>q',1) #service bit flags offered by this node payload += pack('>q',1) #service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST) payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
@ -1512,7 +1641,10 @@ class receiveDataThread(QThread):
PORT, timeLastReceivedMessageFromThisNode = value PORT, timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old.. if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack('>I',timeLastReceivedMessageFromThisNode) if self.remoteProtocolVersion == 1:
payload += pack('>I',timeLastReceivedMessageFromThisNode) #32-bit time
else:
payload += pack('>Q',timeLastReceivedMessageFromThisNode) #64-bit time
payload += pack('>I',self.streamNumber*2) payload += pack('>I',self.streamNumber*2)
payload += pack('>q',1) #service bit flags offered by this node payload += pack('>q',1) #service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST) payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
@ -1521,7 +1653,10 @@ class receiveDataThread(QThread):
PORT, timeLastReceivedMessageFromThisNode = value PORT, timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old.. if timeLastReceivedMessageFromThisNode > (int(time.time())- maximumAgeOfNodesThatIAdvertiseToOthers): #If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1 numberOfAddressesInAddrMessage += 1
payload += pack('>I',timeLastReceivedMessageFromThisNode) if self.remoteProtocolVersion == 1:
payload += pack('>I',timeLastReceivedMessageFromThisNode) #32-bit time
else:
payload += pack('>Q',timeLastReceivedMessageFromThisNode) #64-bit time
payload += pack('>I',(self.streamNumber*2)+1) payload += pack('>I',(self.streamNumber*2)+1)
payload += pack('>q',1) #service bit flags offered by this node payload += pack('>q',1) #service bit flags offered by this node
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST) payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + socket.inet_aton(HOST)
@ -1576,6 +1711,7 @@ class receiveDataThread(QThread):
print 'Closing connection to myself: ', self.HOST print 'Closing connection to myself: ', self.HOST
printLock.release() printLock.release()
return return
broadcastToSendDataQueues((0,'setRemoteProtocolVersion',(self.HOST,self.remoteProtocolVersion)))
knownNodesLock.acquire() knownNodesLock.acquire()
knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time())) knownNodes[self.streamNumber][self.HOST] = (self.remoteNodeIncomingPort, int(time.time()))
@ -1728,19 +1864,31 @@ class sendDataThread(QThread):
print 'setting the stream number in the sendData thread (ID:',id(self), ') to', specifiedStreamNumber print 'setting the stream number in the sendData thread (ID:',id(self), ') to', specifiedStreamNumber
printLock.release() printLock.release()
self.streamNumber = specifiedStreamNumber self.streamNumber = specifiedStreamNumber
elif command == 'setRemoteProtocolVersion':
hostInMessage, specifiedRemoteProtocolVersion = data
if hostInMessage == self.HOST:
printLock.acquire()
print 'setting the remote node\'s protocol version in the sendData thread (ID:',id(self), ') to', specifiedRemoteProtocolVersion
printLock.release()
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
elif command == 'sendaddr': elif command == 'sendaddr':
try: if self.remoteProtocolVersion == 1:
#To prevent some network analysis, 'leak' the data out to our peer after waiting a random amount of time unless we have a long list of messages in our queue to send. printLock.acquire()
random.seed() print 'a sendData thread is not sending an addr message to this particular peer ('+self.HOST+') because their protocol version is 1.'
time.sleep(random.randrange(0, 10)) printLock.release()
self.sock.sendall(data) else:
self.lastTimeISentData = int(time.time()) try:
except: #To prevent some network analysis, 'leak' the data out to our peer after waiting a random amount of time unless we have a long list of messages in our queue to send.
print 'self.sock.sendall failed' random.seed()
self.sock.close() time.sleep(random.randrange(0, 10))
sendDataQueues.remove(self.mailbox) self.sock.sendall(data)
print 'sendDataThread thread', self, 'ending now' self.lastTimeISentData = int(time.time())
break except:
print 'self.sock.sendall failed'
self.sock.close()
sendDataQueues.remove(self.mailbox)
print 'sendDataThread thread', self, 'ending now'
break
elif command == 'sendinv': elif command == 'sendinv':
if data not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware: if data not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware:
payload = '\x01' + data payload = '\x01' + data
@ -3993,18 +4141,7 @@ class MyForm(QtGui.QMainWindow):
sqlSubmitQueue.put('commit') sqlSubmitQueue.put('commit')
sqlLock.release() sqlLock.release()
"""try:
fromLabel = config.get(fromAddress, 'label')
except:
fromLabel = ''
if fromLabel == '':
fromLabel = fromAddress"""
toLabel = '' toLabel = ''
t = (toAddress,) t = (toAddress,)
sqlLock.acquire() sqlLock.acquire()
sqlSubmitQueue.put('''select label from addressbook where address=?''') sqlSubmitQueue.put('''select label from addressbook where address=?''')
@ -4018,30 +4155,6 @@ class MyForm(QtGui.QMainWindow):
self.displayNewSentMessage(toAddress,toLabel,fromAddress, subject, message, ackdata) self.displayNewSentMessage(toAddress,toLabel,fromAddress, subject, message, ackdata)
workerQueue.put(('sendmessage',toAddress)) workerQueue.put(('sendmessage',toAddress))
"""self.ui.tableWidgetSent.insertRow(0)
if toLabel == '':
newItem = QtGui.QTableWidgetItem(unicode(toAddress,'utf-8'))
else:
newItem = QtGui.QTableWidgetItem(unicode(toLabel,'utf-8'))
newItem.setData(Qt.UserRole,str(toAddress))
self.ui.tableWidgetSent.setItem(0,0,newItem)
if fromLabel == '':
newItem = QtGui.QTableWidgetItem(unicode(fromAddress,'utf-8'))
else:
newItem = QtGui.QTableWidgetItem(unicode(fromLabel,'utf-8'))
newItem.setData(Qt.UserRole,str(fromAddress))
self.ui.tableWidgetSent.setItem(0,1,newItem)
newItem = QtGui.QTableWidgetItem(unicode(subject,'utf-8)'))
newItem.setData(Qt.UserRole,unicode(message,'utf-8)'))
self.ui.tableWidgetSent.setItem(0,2,newItem)
newItem = myTableWidgetItem('Just pressed ''send'' ' + unicode(strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))
newItem.setData(Qt.UserRole,QByteArray(ackdata))
newItem.setData(33,int(time.time()))
self.ui.tableWidgetSent.setItem(0,3,newItem)
self.ui.textEditSentMessage.setPlainText(self.ui.tableWidgetSent.item(0,2).data(Qt.UserRole).toPyObject())"""
self.ui.comboBoxSendFrom.setCurrentIndex(0) self.ui.comboBoxSendFrom.setCurrentIndex(0)
self.ui.labelFrom.setText('') self.ui.labelFrom.setText('')
self.ui.lineEditTo.setText('') self.ui.lineEditTo.setText('')