From 3527983fa6026a9d0f621c59b41a5edf6dbd93de Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Fri, 20 Feb 2015 17:33:17 -0500 Subject: [PATCH 1/2] Minor refactoring --- src/class_receiveDataThread.py | 160 +++++++++++++-------------------- 1 file changed, 60 insertions(+), 100 deletions(-) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 53281aca..5d56e98a 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -151,9 +151,8 @@ class receiveDataThread(threading.Thread): # just received valid data from it. So update the knownNodes list so # that other peers can be made aware of its existance. if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port). - shared.knownNodesLock.acquire() - shared.knownNodes[self.streamNumber][self.peer] = int(time.time()) - shared.knownNodesLock.release() + with shared.knownNodesLock: + shared.knownNodes[self.streamNumber][self.peer] = int(time.time()) #Strip the nulls command = command.rstrip('\x00') @@ -488,35 +487,33 @@ class receiveDataThread(threading.Thread): self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload))) - def _checkIPv4Address(self, host, hostFromAddrMessage): - # print 'hostFromAddrMessage', hostFromAddrMessage + def _checkIPv4Address(self, host, hostStandardFormat): + # print 'hostStandardFormat', hostStandardFormat if host[0] == '\x7F': - print 'Ignoring IP address in loopback range:', hostFromAddrMessage + print 'Ignoring IP address in loopback range:', hostStandardFormat return False if host[0] == '\x0A': - print 'Ignoring IP address in private range:', hostFromAddrMessage + print 'Ignoring IP address in private range:', hostStandardFormat return False if host[0:2] == '\xC0\xA8': - print 'Ignoring IP address in private range:', hostFromAddrMessage + print 'Ignoring IP address in private range:', hostStandardFormat return False return True - def _checkIPv6Address(self, host, hostFromAddrMessage): + def _checkIPv6Address(self, host, hostStandardFormat): if host == ('\x00' * 15) + '\x01': - print 'Ignoring loopback address:', hostFromAddrMessage + print 'Ignoring loopback address:', hostStandardFormat return False if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80: - print 'Ignoring local address:', hostFromAddrMessage + print 'Ignoring local address:', hostStandardFormat return False if (ord(host[0]) & 0xfe) == 0xfc: - print 'Ignoring unique local address:', hostFromAddrMessage + print 'Ignoring unique local address:', hostStandardFormat return False return True # We have received an addr message. def recaddr(self, data): - #listOfAddressDetailsToBroadcastToPeers = [] - numberOfAddressesIncluded = 0 numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint( data[:10]) @@ -531,91 +528,56 @@ class receiveDataThread(threading.Thread): return for i in range(0, numberOfAddressesIncluded): - try: - fullHost = data[20 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)] - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrHost). Message: %s\n' % str(err)) - break # giving up on unpacking any more. We should still be connected however. - - try: - recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + ( - 38 * i):12 + lengthOfNumberOfAddresses + (38 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) - break # giving up on unpacking any more. We should still be connected however. + fullHost = data[20 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)] + recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + ( + 38 * i):12 + lengthOfNumberOfAddresses + (38 * i)]) if recaddrStream == 0: continue if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business. continue - try: - recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + ( - 38 * i):20 + lengthOfNumberOfAddresses + (38 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) - break # giving up on unpacking any more. We should still be connected however. - - try: - recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + ( - 38 * i):38 + lengthOfNumberOfAddresses + (38 * i)]) - except Exception as err: - with shared.printLock: - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) - break # giving up on unpacking any more. We should still be connected however. - # print 'Within recaddr(): IP', recaddrIP, ', Port', - # recaddrPort, ', i', i + recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + ( + 38 * i):20 + lengthOfNumberOfAddresses + (38 * i)]) + recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + ( + 38 * i):38 + lengthOfNumberOfAddresses + (38 * i)]) if fullHost[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': ipv4Host = fullHost[12:] - hostFromAddrMessage = socket.inet_ntop(socket.AF_INET, ipv4Host) - if not self._checkIPv4Address(ipv4Host, hostFromAddrMessage): + hostStandardFormat = socket.inet_ntop(socket.AF_INET, ipv4Host) + if not self._checkIPv4Address(ipv4Host, hostStandardFormat): continue else: - hostFromAddrMessage = socket.inet_ntop(socket.AF_INET6, fullHost) - if hostFromAddrMessage == "": + hostStandardFormat = socket.inet_ntop(socket.AF_INET6, fullHost) + if hostStandardFormat == "": # This can happen on Windows systems which are not 64-bit compatible # so let us drop the IPv6 address. continue - if not self._checkIPv6Address(fullHost, hostFromAddrMessage): + if not self._checkIPv6Address(fullHost, hostStandardFormat): continue timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + ( 38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit. if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it. - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream] = {} - shared.knownNodesLock.release() - peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort) + with shared.knownNodesLock: + shared.knownNodes[recaddrStream] = {} + peerFromAddrMessage = shared.Peer(hostStandardFormat, recaddrPort) if peerFromAddrMessage not in shared.knownNodes[recaddrStream]: if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][peerFromAddrMessage] = ( - timeSomeoneElseReceivedMessageFromThisNode) - shared.knownNodesLock.release() + with shared.knownNodesLock: + shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode with shared.printLock: print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream shared.needToWriteKnownNodesToDisk = True hostDetails = ( timeSomeoneElseReceivedMessageFromThisNode, - recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort) - #listOfAddressDetailsToBroadcastToPeers.append(hostDetails) + recaddrStream, recaddrServices, hostStandardFormat, recaddrPort) shared.broadcastToSendDataQueues(( self.streamNumber, 'advertisepeer', hostDetails)) else: timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][ - peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message. - if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())): - shared.knownNodesLock.acquire() - shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode - shared.knownNodesLock.release() + peerFromAddrMessage] + if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())+900): # 900 seconds for wiggle-room in case other nodes' clocks aren't quite right. + with shared.knownNodesLock: + shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode - #if listOfAddressDetailsToBroadcastToPeers != []: - # self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) with shared.printLock: print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' @@ -633,31 +595,30 @@ class receiveDataThread(threading.Thread): # We are going to share a maximum number of 1000 addrs with our peer. # 500 from this stream, 250 from the left child stream, and 250 from # the right child stream. - shared.knownNodesLock.acquire() - if len(shared.knownNodes[self.streamNumber]) > 0: - for i in range(500): - peer, = random.sample(shared.knownNodes[self.streamNumber], 1) - if isHostInPrivateIPRange(peer.host): - continue - addrsInMyStream[peer] = shared.knownNodes[ - self.streamNumber][peer] - if len(shared.knownNodes[self.streamNumber * 2]) > 0: - for i in range(250): - peer, = random.sample(shared.knownNodes[ - self.streamNumber * 2], 1) - if isHostInPrivateIPRange(peer.host): - continue - addrsInChildStreamLeft[peer] = shared.knownNodes[ - self.streamNumber * 2][peer] - if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0: - for i in range(250): - peer, = random.sample(shared.knownNodes[ - (self.streamNumber * 2) + 1], 1) - if isHostInPrivateIPRange(peer.host): - continue - addrsInChildStreamRight[peer] = shared.knownNodes[ - (self.streamNumber * 2) + 1][peer] - shared.knownNodesLock.release() + with shared.knownNodesLock: + if len(shared.knownNodes[self.streamNumber]) > 0: + for i in range(500): + peer, = random.sample(shared.knownNodes[self.streamNumber], 1) + if isHostInPrivateIPRange(peer.host): + continue + addrsInMyStream[peer] = shared.knownNodes[ + self.streamNumber][peer] + if len(shared.knownNodes[self.streamNumber * 2]) > 0: + for i in range(250): + peer, = random.sample(shared.knownNodes[ + self.streamNumber * 2], 1) + if isHostInPrivateIPRange(peer.host): + continue + addrsInChildStreamLeft[peer] = shared.knownNodes[ + self.streamNumber * 2][peer] + if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0: + for i in range(250): + peer, = random.sample(shared.knownNodes[ + (self.streamNumber * 2) + 1], 1) + if isHostInPrivateIPRange(peer.host): + continue + addrsInChildStreamRight[peer] = shared.knownNodes[ + (self.streamNumber * 2) + 1][peer] numberOfAddressesInAddrMessage = 0 payload = '' # print 'addrsInMyStream.items()', addrsInMyStream.items() @@ -771,10 +732,9 @@ class receiveDataThread(threading.Thread): # in this version message. Let us inform the sendDataThread. self.sendDataThreadQueue.put((0, 'setRemoteProtocolVersion', self.remoteProtocolVersion)) - shared.knownNodesLock.acquire() - shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time()) - shared.needToWriteKnownNodesToDisk = True - shared.knownNodesLock.release() + with shared.knownNodesLock: + shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time()) + shared.needToWriteKnownNodesToDisk = True self.sendverack() if self.initiatedConnection == False: From 95c939a2a06a6098199f6fad208d355bc93ef402 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Fri, 20 Feb 2015 21:03:20 -0500 Subject: [PATCH 2/2] Fix #748 - Check hash of sig instead of message contents --- src/class_objectProcessor.py | 46 +++++++++++++++++++----------------- src/class_singleWorker.py | 3 ++- src/class_sqlThread.py | 20 +++++++++++++--- src/helper_inbox.py | 6 ++--- 4 files changed, 46 insertions(+), 29 deletions(-) diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 7a126f4a..6cb55c25 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -427,6 +427,7 @@ class objectProcessor(threading.Thread): logger.debug('As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.' % (helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey)) ) + sigHash = hashlib.sha512(hashlib.sha512(signature).digest()).digest()[32:] # Used to detect and ignore duplicate messages in our inbox # calculate the fromRipe. sha = hashlib.new('sha512') @@ -503,13 +504,13 @@ class objectProcessor(threading.Thread): body = 'Unknown encoding type.\n\n' + repr(message) subject = '' # Let us make sure that we haven't already received this message - if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType): + if helper_inbox.isMessageAlreadyInInbox(sigHash): logger.info('This msg is already in our inbox. Ignoring it.') blockMessage = True if not blockMessage: if messageEncodingType != 0: t = (inventoryHash, toAddress, fromAddress, subject, int( - time.time()), body, 'inbox', messageEncodingType, 0) + time.time()), body, 'inbox', messageEncodingType, 0, sigHash) helper_inbox.insert(t) shared.UISignalQueue.put(('displayNewInboxMessage', ( @@ -701,6 +702,7 @@ class objectProcessor(threading.Thread): logger.debug('ECDSA verify failed') return logger.debug('ECDSA verify passed') + sigHash = hashlib.sha512(hashlib.sha512(signature).digest()).digest()[32:] # Used to detect and ignore duplicate messages in our inbox fromAddress = encodeAddress( sendersAddressVersion, sendersStream, calculatedRipe) @@ -735,33 +737,33 @@ class objectProcessor(threading.Thread): subject = '' elif messageEncodingType == 0: logger.info('messageEncodingType == 0. Doing nothing with the message.') + return else: body = 'Unknown encoding type.\n\n' + repr(message) subject = '' toAddress = '[Broadcast subscribers]' - if messageEncodingType != 0: - if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType): - logger.info('This broadcast is already in our inbox. Ignoring it.') - else: - t = (inventoryHash, toAddress, fromAddress, subject, int( - time.time()), body, 'inbox', messageEncodingType, 0) - helper_inbox.insert(t) + if helper_inbox.isMessageAlreadyInInbox(sigHash): + logger.info('This broadcast is already in our inbox. Ignoring it.') + return + t = (inventoryHash, toAddress, fromAddress, subject, int( + time.time()), body, 'inbox', messageEncodingType, 0, sigHash) + helper_inbox.insert(t) - shared.UISignalQueue.put(('displayNewInboxMessage', ( - inventoryHash, toAddress, fromAddress, subject, body))) + shared.UISignalQueue.put(('displayNewInboxMessage', ( + inventoryHash, toAddress, fromAddress, subject, body))) - # If we are behaving as an API then we might need to run an - # outside command to let some program know that a new message - # has arrived. - if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'): - try: - apiNotifyPath = shared.config.get( - 'bitmessagesettings', 'apinotifypath') - except: - apiNotifyPath = '' - if apiNotifyPath != '': - call([apiNotifyPath, "newBroadcast"]) + # If we are behaving as an API then we might need to run an + # outside command to let some program know that a new message + # has arrived. + if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'): + try: + apiNotifyPath = shared.config.get( + 'bitmessagesettings', 'apinotifypath') + except: + apiNotifyPath = '' + if apiNotifyPath != '': + call([apiNotifyPath, "newBroadcast"]) # Display timing data logger.info('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,)) diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 84f0403c..feda7638 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -821,8 +821,9 @@ class singleWorker(threading.Thread): # If we are sending to ourselves or a chan, let's put the message in # our own inbox. if shared.config.has_section(toaddress): + sigHash = hashlib.sha512(hashlib.sha512(signature).digest()).digest()[32:] # Used to detect and ignore duplicate messages in our inbox t = (inventoryHash, toaddress, fromaddress, subject, int( - time.time()), message, 'inbox', 2, 0) + time.time()), message, 'inbox', 2, 0, sigHash) helper_inbox.insert(t) shared.UISignalQueue.put(('displayNewInboxMessage', ( diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index 4eadc6f0..d4c69b59 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -28,7 +28,7 @@ class sqlThread(threading.Thread): try: self.cur.execute( - '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, UNIQUE(msgid) ON CONFLICT REPLACE)''' ) + '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, sighash blob, UNIQUE(msgid) ON CONFLICT REPLACE)''' ) self.cur.execute( '''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text, ackdata blob, lastactiontime integer, status text, pubkeyretrynumber integer, msgretrynumber integer, folder text, encodingtype int)''' ) self.cur.execute( @@ -61,7 +61,7 @@ class sqlThread(threading.Thread): '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') self.cur.execute( '''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) - self.cur.execute( '''INSERT INTO settings VALUES('version','8')''') + self.cur.execute( '''INSERT INTO settings VALUES('version','9')''') self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( int(time.time()),)) self.cur.execute( @@ -360,7 +360,21 @@ class sqlThread(threading.Thread): parameters = (8,) self.cur.execute(query, parameters) logger.debug('Finished clearing currently held pubkeys.') - + + # Add a new column to the inbox table to store the hash of the message signature. + # We'll use this as temporary message UUID in order to detect duplicates. + item = '''SELECT value FROM settings WHERE key='version';''' + parameters = '' + self.cur.execute(item, parameters) + currentVersion = int(self.cur.fetchall()[0][0]) + if currentVersion == 8: + logger.debug('In messages.dat database, adding sighash field to the inbox table.') + item = '''ALTER TABLE inbox ADD sighash blob DEFAULT '' ''' + parameters = '' + self.cur.execute(item, parameters) + item = '''update settings set value=? WHERE key='version';''' + parameters = (9,) + self.cur.execute(item, parameters) # Are you hoping to add a new option to the keys.dat file of existing # Bitmessage users or modify the SQLite database? Add it right above this line! diff --git a/src/helper_inbox.py b/src/helper_inbox.py index 04ca1d0c..09c7edbc 100644 --- a/src/helper_inbox.py +++ b/src/helper_inbox.py @@ -2,14 +2,14 @@ from helper_sql import * import shared def insert(t): - sqlExecute('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?)''', *t) + sqlExecute('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?,?)''', *t) shared.UISignalQueue.put(('changedInboxUnread', None)) def trash(msgid): sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', msgid) shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid)) -def isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, encodingType): +def isMessageAlreadyInInbox(sigHash): queryReturn = sqlQuery( - '''SELECT COUNT(*) FROM inbox WHERE toaddress=? AND fromaddress=? AND subject=? AND message=? AND encodingtype=? ''', toAddress, fromAddress, subject, body, encodingType) + '''SELECT COUNT(*) FROM inbox WHERE sighash=?''', sigHash) return queryReturn[0][0] != 0 \ No newline at end of file