Fix #748 - Check hash of sig instead of message contents #783
|
@ -427,6 +427,7 @@ class objectProcessor(threading.Thread):
|
||||||
logger.debug('As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.' %
|
logger.debug('As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person: %s ..and here is the testnet address: %s. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.' %
|
||||||
(helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey))
|
(helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey))
|
||||||
)
|
)
|
||||||
|
sigHash = hashlib.sha512(hashlib.sha512(signature).digest()).digest()[32:] # Used to detect and ignore duplicate messages in our inbox
|
||||||
|
|
||||||
# calculate the fromRipe.
|
# calculate the fromRipe.
|
||||||
sha = hashlib.new('sha512')
|
sha = hashlib.new('sha512')
|
||||||
|
@ -503,13 +504,13 @@ class objectProcessor(threading.Thread):
|
||||||
body = 'Unknown encoding type.\n\n' + repr(message)
|
body = 'Unknown encoding type.\n\n' + repr(message)
|
||||||
subject = ''
|
subject = ''
|
||||||
# Let us make sure that we haven't already received this message
|
# Let us make sure that we haven't already received this message
|
||||||
if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType):
|
if helper_inbox.isMessageAlreadyInInbox(sigHash):
|
||||||
logger.info('This msg is already in our inbox. Ignoring it.')
|
logger.info('This msg is already in our inbox. Ignoring it.')
|
||||||
blockMessage = True
|
blockMessage = True
|
||||||
if not blockMessage:
|
if not blockMessage:
|
||||||
if messageEncodingType != 0:
|
if messageEncodingType != 0:
|
||||||
t = (inventoryHash, toAddress, fromAddress, subject, int(
|
t = (inventoryHash, toAddress, fromAddress, subject, int(
|
||||||
time.time()), body, 'inbox', messageEncodingType, 0)
|
time.time()), body, 'inbox', messageEncodingType, 0, sigHash)
|
||||||
helper_inbox.insert(t)
|
helper_inbox.insert(t)
|
||||||
|
|
||||||
shared.UISignalQueue.put(('displayNewInboxMessage', (
|
shared.UISignalQueue.put(('displayNewInboxMessage', (
|
||||||
|
@ -701,6 +702,7 @@ class objectProcessor(threading.Thread):
|
||||||
logger.debug('ECDSA verify failed')
|
logger.debug('ECDSA verify failed')
|
||||||
return
|
return
|
||||||
logger.debug('ECDSA verify passed')
|
logger.debug('ECDSA verify passed')
|
||||||
|
sigHash = hashlib.sha512(hashlib.sha512(signature).digest()).digest()[32:] # Used to detect and ignore duplicate messages in our inbox
|
||||||
|
|
||||||
fromAddress = encodeAddress(
|
fromAddress = encodeAddress(
|
||||||
sendersAddressVersion, sendersStream, calculatedRipe)
|
sendersAddressVersion, sendersStream, calculatedRipe)
|
||||||
|
@ -735,33 +737,33 @@ class objectProcessor(threading.Thread):
|
||||||
subject = ''
|
subject = ''
|
||||||
elif messageEncodingType == 0:
|
elif messageEncodingType == 0:
|
||||||
logger.info('messageEncodingType == 0. Doing nothing with the message.')
|
logger.info('messageEncodingType == 0. Doing nothing with the message.')
|
||||||
|
return
|
||||||
else:
|
else:
|
||||||
body = 'Unknown encoding type.\n\n' + repr(message)
|
body = 'Unknown encoding type.\n\n' + repr(message)
|
||||||
subject = ''
|
subject = ''
|
||||||
|
|
||||||
toAddress = '[Broadcast subscribers]'
|
toAddress = '[Broadcast subscribers]'
|
||||||
if messageEncodingType != 0:
|
if helper_inbox.isMessageAlreadyInInbox(sigHash):
|
||||||
if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType):
|
logger.info('This broadcast is already in our inbox. Ignoring it.')
|
||||||
logger.info('This broadcast is already in our inbox. Ignoring it.')
|
return
|
||||||
else:
|
t = (inventoryHash, toAddress, fromAddress, subject, int(
|
||||||
t = (inventoryHash, toAddress, fromAddress, subject, int(
|
time.time()), body, 'inbox', messageEncodingType, 0, sigHash)
|
||||||
time.time()), body, 'inbox', messageEncodingType, 0)
|
helper_inbox.insert(t)
|
||||||
helper_inbox.insert(t)
|
|
||||||
|
|
||||||
shared.UISignalQueue.put(('displayNewInboxMessage', (
|
shared.UISignalQueue.put(('displayNewInboxMessage', (
|
||||||
inventoryHash, toAddress, fromAddress, subject, body)))
|
inventoryHash, toAddress, fromAddress, subject, body)))
|
||||||
|
|
||||||
# If we are behaving as an API then we might need to run an
|
# If we are behaving as an API then we might need to run an
|
||||||
# outside command to let some program know that a new message
|
# outside command to let some program know that a new message
|
||||||
# has arrived.
|
# has arrived.
|
||||||
if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'):
|
if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'):
|
||||||
try:
|
try:
|
||||||
apiNotifyPath = shared.config.get(
|
apiNotifyPath = shared.config.get(
|
||||||
'bitmessagesettings', 'apinotifypath')
|
'bitmessagesettings', 'apinotifypath')
|
||||||
except:
|
except:
|
||||||
apiNotifyPath = ''
|
apiNotifyPath = ''
|
||||||
if apiNotifyPath != '':
|
if apiNotifyPath != '':
|
||||||
call([apiNotifyPath, "newBroadcast"])
|
call([apiNotifyPath, "newBroadcast"])
|
||||||
|
|
||||||
# Display timing data
|
# Display timing data
|
||||||
logger.info('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,))
|
logger.info('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,))
|
||||||
|
|
|
@ -151,9 +151,8 @@ class receiveDataThread(threading.Thread):
|
||||||
# just received valid data from it. So update the knownNodes list so
|
# just received valid data from it. So update the knownNodes list so
|
||||||
# that other peers can be made aware of its existance.
|
# that other peers can be made aware of its existance.
|
||||||
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
||||||
shared.knownNodesLock.acquire()
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
||||||
shared.knownNodesLock.release()
|
|
||||||
|
|
||||||
#Strip the nulls
|
#Strip the nulls
|
||||||
command = command.rstrip('\x00')
|
command = command.rstrip('\x00')
|
||||||
|
@ -488,35 +487,33 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload)))
|
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload)))
|
||||||
|
|
||||||
|
|
||||||
def _checkIPv4Address(self, host, hostFromAddrMessage):
|
def _checkIPv4Address(self, host, hostStandardFormat):
|
||||||
# print 'hostFromAddrMessage', hostFromAddrMessage
|
# print 'hostStandardFormat', hostStandardFormat
|
||||||
if host[0] == '\x7F':
|
if host[0] == '\x7F':
|
||||||
print 'Ignoring IP address in loopback range:', hostFromAddrMessage
|
print 'Ignoring IP address in loopback range:', hostStandardFormat
|
||||||
return False
|
return False
|
||||||
if host[0] == '\x0A':
|
if host[0] == '\x0A':
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
print 'Ignoring IP address in private range:', hostStandardFormat
|
||||||
return False
|
return False
|
||||||
if host[0:2] == '\xC0\xA8':
|
if host[0:2] == '\xC0\xA8':
|
||||||
print 'Ignoring IP address in private range:', hostFromAddrMessage
|
print 'Ignoring IP address in private range:', hostStandardFormat
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _checkIPv6Address(self, host, hostFromAddrMessage):
|
def _checkIPv6Address(self, host, hostStandardFormat):
|
||||||
if host == ('\x00' * 15) + '\x01':
|
if host == ('\x00' * 15) + '\x01':
|
||||||
print 'Ignoring loopback address:', hostFromAddrMessage
|
print 'Ignoring loopback address:', hostStandardFormat
|
||||||
return False
|
return False
|
||||||
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
|
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
|
||||||
print 'Ignoring local address:', hostFromAddrMessage
|
print 'Ignoring local address:', hostStandardFormat
|
||||||
return False
|
return False
|
||||||
if (ord(host[0]) & 0xfe) == 0xfc:
|
if (ord(host[0]) & 0xfe) == 0xfc:
|
||||||
print 'Ignoring unique local address:', hostFromAddrMessage
|
print 'Ignoring unique local address:', hostStandardFormat
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# We have received an addr message.
|
# We have received an addr message.
|
||||||
def recaddr(self, data):
|
def recaddr(self, data):
|
||||||
#listOfAddressDetailsToBroadcastToPeers = []
|
|
||||||
numberOfAddressesIncluded = 0
|
|
||||||
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
|
||||||
data[:10])
|
data[:10])
|
||||||
|
|
||||||
|
@ -531,91 +528,56 @@ class receiveDataThread(threading.Thread):
|
||||||
return
|
return
|
||||||
|
|
||||||
for i in range(0, numberOfAddressesIncluded):
|
for i in range(0, numberOfAddressesIncluded):
|
||||||
try:
|
fullHost = data[20 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)]
|
||||||
fullHost = data[20 + lengthOfNumberOfAddresses + (38 * i):36 + lengthOfNumberOfAddresses + (38 * i)]
|
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
||||||
except Exception as err:
|
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrHost). Message: %s\n' % str(err))
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
if recaddrStream == 0:
|
if recaddrStream == 0:
|
||||||
continue
|
continue
|
||||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
||||||
continue
|
continue
|
||||||
try:
|
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
||||||
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
||||||
except Exception as err:
|
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
|
|
||||||
try:
|
|
||||||
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
|
||||||
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
|
||||||
except Exception as err:
|
|
||||||
with shared.printLock:
|
|
||||||
sys.stderr.write(
|
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
|
||||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
|
||||||
# recaddrPort, ', i', i
|
|
||||||
if fullHost[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
if fullHost[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
ipv4Host = fullHost[12:]
|
ipv4Host = fullHost[12:]
|
||||||
hostFromAddrMessage = socket.inet_ntop(socket.AF_INET, ipv4Host)
|
hostStandardFormat = socket.inet_ntop(socket.AF_INET, ipv4Host)
|
||||||
if not self._checkIPv4Address(ipv4Host, hostFromAddrMessage):
|
if not self._checkIPv4Address(ipv4Host, hostStandardFormat):
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
hostFromAddrMessage = socket.inet_ntop(socket.AF_INET6, fullHost)
|
hostStandardFormat = socket.inet_ntop(socket.AF_INET6, fullHost)
|
||||||
if hostFromAddrMessage == "":
|
if hostStandardFormat == "":
|
||||||
# This can happen on Windows systems which are not 64-bit compatible
|
# This can happen on Windows systems which are not 64-bit compatible
|
||||||
# so let us drop the IPv6 address.
|
# so let us drop the IPv6 address.
|
||||||
continue
|
continue
|
||||||
if not self._checkIPv6Address(fullHost, hostFromAddrMessage):
|
if not self._checkIPv6Address(fullHost, hostStandardFormat):
|
||||||
continue
|
continue
|
||||||
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
timeSomeoneElseReceivedMessageFromThisNode, = unpack('>Q', data[lengthOfNumberOfAddresses + (
|
||||||
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
38 * i):8 + lengthOfNumberOfAddresses + (38 * i)]) # This is the 'time' value in the received addr message. 64-bit.
|
||||||
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
if recaddrStream not in shared.knownNodes: # knownNodes is a dictionary of dictionaries with one outer dictionary for each stream. If the outer stream dictionary doesn't exist yet then we must make it.
|
||||||
shared.knownNodesLock.acquire()
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[recaddrStream] = {}
|
shared.knownNodes[recaddrStream] = {}
|
||||||
shared.knownNodesLock.release()
|
peerFromAddrMessage = shared.Peer(hostStandardFormat, recaddrPort)
|
||||||
peerFromAddrMessage = shared.Peer(hostFromAddrMessage, recaddrPort)
|
|
||||||
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
if peerFromAddrMessage not in shared.knownNodes[recaddrStream]:
|
||||||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||||
shared.knownNodesLock.acquire()
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = (
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
timeSomeoneElseReceivedMessageFromThisNode)
|
|
||||||
shared.knownNodesLock.release()
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||||
|
|
||||||
shared.needToWriteKnownNodesToDisk = True
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
hostDetails = (
|
hostDetails = (
|
||||||
timeSomeoneElseReceivedMessageFromThisNode,
|
timeSomeoneElseReceivedMessageFromThisNode,
|
||||||
recaddrStream, recaddrServices, hostFromAddrMessage, recaddrPort)
|
recaddrStream, recaddrServices, hostStandardFormat, recaddrPort)
|
||||||
#listOfAddressDetailsToBroadcastToPeers.append(hostDetails)
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
self.streamNumber, 'advertisepeer', hostDetails))
|
self.streamNumber, 'advertisepeer', hostDetails))
|
||||||
else:
|
else:
|
||||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||||
peerFromAddrMessage] # PORT in this case is either the port we used to connect to the remote node, or the port that was specified by someone else in a past addr message.
|
peerFromAddrMessage]
|
||||||
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())):
|
if (timeLastReceivedMessageFromThisNode < timeSomeoneElseReceivedMessageFromThisNode) and (timeSomeoneElseReceivedMessageFromThisNode < int(time.time())+900): # 900 seconds for wiggle-room in case other nodes' clocks aren't quite right.
|
||||||
shared.knownNodesLock.acquire()
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
shared.knownNodesLock.release()
|
|
||||||
|
|
||||||
#if listOfAddressDetailsToBroadcastToPeers != []:
|
|
||||||
# self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
|
|
||||||
|
@ -633,31 +595,30 @@ class receiveDataThread(threading.Thread):
|
||||||
# We are going to share a maximum number of 1000 addrs with our peer.
|
# We are going to share a maximum number of 1000 addrs with our peer.
|
||||||
# 500 from this stream, 250 from the left child stream, and 250 from
|
# 500 from this stream, 250 from the left child stream, and 250 from
|
||||||
# the right child stream.
|
# the right child stream.
|
||||||
shared.knownNodesLock.acquire()
|
with shared.knownNodesLock:
|
||||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
if len(shared.knownNodes[self.streamNumber]) > 0:
|
||||||
for i in range(500):
|
for i in range(500):
|
||||||
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
||||||
if isHostInPrivateIPRange(peer.host):
|
if isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
addrsInMyStream[peer] = shared.knownNodes[
|
addrsInMyStream[peer] = shared.knownNodes[
|
||||||
self.streamNumber][peer]
|
self.streamNumber][peer]
|
||||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
peer, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
self.streamNumber * 2], 1)
|
self.streamNumber * 2], 1)
|
||||||
if isHostInPrivateIPRange(peer.host):
|
if isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
addrsInChildStreamLeft[peer] = shared.knownNodes[
|
addrsInChildStreamLeft[peer] = shared.knownNodes[
|
||||||
self.streamNumber * 2][peer]
|
self.streamNumber * 2][peer]
|
||||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
||||||
for i in range(250):
|
for i in range(250):
|
||||||
peer, = random.sample(shared.knownNodes[
|
peer, = random.sample(shared.knownNodes[
|
||||||
(self.streamNumber * 2) + 1], 1)
|
(self.streamNumber * 2) + 1], 1)
|
||||||
if isHostInPrivateIPRange(peer.host):
|
if isHostInPrivateIPRange(peer.host):
|
||||||
continue
|
continue
|
||||||
addrsInChildStreamRight[peer] = shared.knownNodes[
|
addrsInChildStreamRight[peer] = shared.knownNodes[
|
||||||
(self.streamNumber * 2) + 1][peer]
|
(self.streamNumber * 2) + 1][peer]
|
||||||
shared.knownNodesLock.release()
|
|
||||||
numberOfAddressesInAddrMessage = 0
|
numberOfAddressesInAddrMessage = 0
|
||||||
payload = ''
|
payload = ''
|
||||||
# print 'addrsInMyStream.items()', addrsInMyStream.items()
|
# print 'addrsInMyStream.items()', addrsInMyStream.items()
|
||||||
|
@ -771,10 +732,9 @@ class receiveDataThread(threading.Thread):
|
||||||
# in this version message. Let us inform the sendDataThread.
|
# in this version message. Let us inform the sendDataThread.
|
||||||
self.sendDataThreadQueue.put((0, 'setRemoteProtocolVersion', self.remoteProtocolVersion))
|
self.sendDataThreadQueue.put((0, 'setRemoteProtocolVersion', self.remoteProtocolVersion))
|
||||||
|
|
||||||
shared.knownNodesLock.acquire()
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||||
shared.needToWriteKnownNodesToDisk = True
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
shared.knownNodesLock.release()
|
|
||||||
|
|
||||||
self.sendverack()
|
self.sendverack()
|
||||||
if self.initiatedConnection == False:
|
if self.initiatedConnection == False:
|
||||||
|
|
|
@ -821,8 +821,9 @@ class singleWorker(threading.Thread):
|
||||||
# If we are sending to ourselves or a chan, let's put the message in
|
# If we are sending to ourselves or a chan, let's put the message in
|
||||||
# our own inbox.
|
# our own inbox.
|
||||||
if shared.config.has_section(toaddress):
|
if shared.config.has_section(toaddress):
|
||||||
|
sigHash = hashlib.sha512(hashlib.sha512(signature).digest()).digest()[32:] # Used to detect and ignore duplicate messages in our inbox
|
||||||
t = (inventoryHash, toaddress, fromaddress, subject, int(
|
t = (inventoryHash, toaddress, fromaddress, subject, int(
|
||||||
time.time()), message, 'inbox', 2, 0)
|
time.time()), message, 'inbox', 2, 0, sigHash)
|
||||||
helper_inbox.insert(t)
|
helper_inbox.insert(t)
|
||||||
|
|
||||||
shared.UISignalQueue.put(('displayNewInboxMessage', (
|
shared.UISignalQueue.put(('displayNewInboxMessage', (
|
||||||
|
|
|
@ -28,7 +28,7 @@ class sqlThread(threading.Thread):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.cur.execute(
|
self.cur.execute(
|
||||||
'''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, UNIQUE(msgid) ON CONFLICT REPLACE)''' )
|
'''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, sighash blob, UNIQUE(msgid) ON CONFLICT REPLACE)''' )
|
||||||
self.cur.execute(
|
self.cur.execute(
|
||||||
'''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text, ackdata blob, lastactiontime integer, status text, pubkeyretrynumber integer, msgretrynumber integer, folder text, encodingtype int)''' )
|
'''CREATE TABLE sent (msgid blob, toaddress text, toripe blob, fromaddress text, subject text, message text, ackdata blob, lastactiontime integer, status text, pubkeyretrynumber integer, msgretrynumber integer, folder text, encodingtype int)''' )
|
||||||
self.cur.execute(
|
self.cur.execute(
|
||||||
|
@ -61,7 +61,7 @@ class sqlThread(threading.Thread):
|
||||||
'''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
|
'''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
|
||||||
self.cur.execute(
|
self.cur.execute(
|
||||||
'''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' )
|
'''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' )
|
||||||
self.cur.execute( '''INSERT INTO settings VALUES('version','8')''')
|
self.cur.execute( '''INSERT INTO settings VALUES('version','9')''')
|
||||||
self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
|
||||||
int(time.time()),))
|
int(time.time()),))
|
||||||
self.cur.execute(
|
self.cur.execute(
|
||||||
|
@ -360,7 +360,21 @@ class sqlThread(threading.Thread):
|
||||||
parameters = (8,)
|
parameters = (8,)
|
||||||
self.cur.execute(query, parameters)
|
self.cur.execute(query, parameters)
|
||||||
logger.debug('Finished clearing currently held pubkeys.')
|
logger.debug('Finished clearing currently held pubkeys.')
|
||||||
|
|
||||||
|
# Add a new column to the inbox table to store the hash of the message signature.
|
||||||
|
# We'll use this as temporary message UUID in order to detect duplicates.
|
||||||
|
item = '''SELECT value FROM settings WHERE key='version';'''
|
||||||
|
parameters = ''
|
||||||
|
self.cur.execute(item, parameters)
|
||||||
|
currentVersion = int(self.cur.fetchall()[0][0])
|
||||||
|
if currentVersion == 8:
|
||||||
|
logger.debug('In messages.dat database, adding sighash field to the inbox table.')
|
||||||
|
item = '''ALTER TABLE inbox ADD sighash blob DEFAULT '' '''
|
||||||
|
parameters = ''
|
||||||
|
self.cur.execute(item, parameters)
|
||||||
|
item = '''update settings set value=? WHERE key='version';'''
|
||||||
|
parameters = (9,)
|
||||||
|
self.cur.execute(item, parameters)
|
||||||
|
|
||||||
# Are you hoping to add a new option to the keys.dat file of existing
|
# Are you hoping to add a new option to the keys.dat file of existing
|
||||||
# Bitmessage users or modify the SQLite database? Add it right above this line!
|
# Bitmessage users or modify the SQLite database? Add it right above this line!
|
||||||
|
|
|
@ -2,14 +2,14 @@ from helper_sql import *
|
||||||
import shared
|
import shared
|
||||||
|
|
||||||
def insert(t):
|
def insert(t):
|
||||||
sqlExecute('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?)''', *t)
|
sqlExecute('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?,?,?,?)''', *t)
|
||||||
shared.UISignalQueue.put(('changedInboxUnread', None))
|
shared.UISignalQueue.put(('changedInboxUnread', None))
|
||||||
|
|
||||||
def trash(msgid):
|
def trash(msgid):
|
||||||
sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', msgid)
|
sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', msgid)
|
||||||
shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid))
|
shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid))
|
||||||
|
|
||||||
def isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, encodingType):
|
def isMessageAlreadyInInbox(sigHash):
|
||||||
queryReturn = sqlQuery(
|
queryReturn = sqlQuery(
|
||||||
'''SELECT COUNT(*) FROM inbox WHERE toaddress=? AND fromaddress=? AND subject=? AND message=? AND encodingtype=? ''', toAddress, fromAddress, subject, body, encodingType)
|
'''SELECT COUNT(*) FROM inbox WHERE sighash=?''', sigHash)
|
||||||
return queryReturn[0][0] != 0
|
return queryReturn[0][0] != 0
|
Reference in New Issue
Block a user