ignore duplicate messages

This commit is contained in:
Jonathan Warren 2014-07-26 13:15:28 -04:00
parent a303c8d878
commit b41fb616ae
2 changed files with 90 additions and 72 deletions

View File

@ -595,22 +595,29 @@ class objectProcessor(threading.Thread):
if queryreturn == []: if queryreturn == []:
logger.info('Message ignored because address not in whitelist.') logger.info('Message ignored because address not in whitelist.')
blockMessage = True blockMessage = True
if not blockMessage:
toLabel = shared.config.get(toAddress, 'label') toLabel = shared.config.get(toAddress, 'label')
if toLabel == '': if toLabel == '':
toLabel = toAddress toLabel = toAddress
if messageEncodingType == 2: if messageEncodingType == 2:
subject, body = self.decodeType2Message(message) subject, body = self.decodeType2Message(message)
logger.info('Message subject (first 100 characters): %s' % repr(subject)[:100]) logger.info('Message subject (first 100 characters): %s' % repr(subject)[:100])
elif messageEncodingType == 1: elif messageEncodingType == 1:
body = message body = message
subject = '' subject = ''
elif messageEncodingType == 0: elif messageEncodingType == 0:
logger.info('messageEncodingType == 0. Doing nothing with the message. They probably just sent it so that we would store their public key or send their ack data for them.') logger.info('messageEncodingType == 0. Doing nothing with the message. They probably just sent it so that we would store their public key or send their ack data for them.')
else: subject = ''
body = 'Unknown encoding type.\n\n' + repr(message) body = ''
subject = '' else:
body = 'Unknown encoding type.\n\n' + repr(message)
subject = ''
# Let us make sure that we haven't already received this message
if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType):
logger.info('This msg is already in our inbox. Ignoring it.')
blockMessage = True
if not blockMessage:
if messageEncodingType != 0: if messageEncodingType != 0:
t = (inventoryHash, toAddress, fromAddress, subject, int( t = (inventoryHash, toAddress, fromAddress, subject, int(
time.time()), body, 'inbox', messageEncodingType, 0) time.time()), body, 'inbox', messageEncodingType, 0)
@ -808,25 +815,28 @@ class objectProcessor(threading.Thread):
toAddress = '[Broadcast subscribers]' toAddress = '[Broadcast subscribers]'
if messageEncodingType != 0: if messageEncodingType != 0:
# Let us make sure that we haven't already received this message
t = (inventoryHash, toAddress, fromAddress, subject, int( if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType):
time.time()), body, 'inbox', messageEncodingType, 0) logger.info('This broadcast is already in our inbox. Ignoring it.')
helper_inbox.insert(t) else:
t = (inventoryHash, toAddress, fromAddress, subject, int(
shared.UISignalQueue.put(('displayNewInboxMessage', ( time.time()), body, 'inbox', messageEncodingType, 0)
inventoryHash, toAddress, fromAddress, subject, body))) helper_inbox.insert(t)
# If we are behaving as an API then we might need to run an shared.UISignalQueue.put(('displayNewInboxMessage', (
# outside command to let some program know that a new inventoryHash, toAddress, fromAddress, subject, body)))
# message has arrived.
if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'): # If we are behaving as an API then we might need to run an
try: # outside command to let some program know that a new
apiNotifyPath = shared.config.get( # message has arrived.
'bitmessagesettings', 'apinotifypath') if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'):
except: try:
apiNotifyPath = '' apiNotifyPath = shared.config.get(
if apiNotifyPath != '': 'bitmessagesettings', 'apinotifypath')
call([apiNotifyPath, "newBroadcast"]) except:
apiNotifyPath = ''
if apiNotifyPath != '':
call([apiNotifyPath, "newBroadcast"])
# Display timing data # Display timing data
logger.debug('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,)) logger.debug('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,))
@ -953,25 +963,27 @@ class objectProcessor(threading.Thread):
toAddress = '[Broadcast subscribers]' toAddress = '[Broadcast subscribers]'
if messageEncodingType != 0: if messageEncodingType != 0:
if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType):
t = (inventoryHash, toAddress, fromAddress, subject, int( logger.info('This broadcast is already in our inbox. Ignoring it.')
time.time()), body, 'inbox', messageEncodingType, 0) else:
helper_inbox.insert(t) t = (inventoryHash, toAddress, fromAddress, subject, int(
time.time()), body, 'inbox', messageEncodingType, 0)
shared.UISignalQueue.put(('displayNewInboxMessage', ( helper_inbox.insert(t)
inventoryHash, toAddress, fromAddress, subject, body)))
shared.UISignalQueue.put(('displayNewInboxMessage', (
# If we are behaving as an API then we might need to run an inventoryHash, toAddress, fromAddress, subject, body)))
# outside command to let some program know that a new message
# has arrived. # If we are behaving as an API then we might need to run an
if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'): # outside command to let some program know that a new message
try: # has arrived.
apiNotifyPath = shared.config.get( if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'):
'bitmessagesettings', 'apinotifypath') try:
except: apiNotifyPath = shared.config.get(
apiNotifyPath = '' 'bitmessagesettings', 'apinotifypath')
if apiNotifyPath != '': except:
call([apiNotifyPath, "newBroadcast"]) apiNotifyPath = ''
if apiNotifyPath != '':
call([apiNotifyPath, "newBroadcast"])
# Display timing data # Display timing data
logger.info('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,)) logger.info('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,))
@ -1096,25 +1108,27 @@ class objectProcessor(threading.Thread):
toAddress = '[Broadcast subscribers]' toAddress = '[Broadcast subscribers]'
if messageEncodingType != 0: if messageEncodingType != 0:
if helper_inbox.isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, messageEncodingType):
t = (inventoryHash, toAddress, fromAddress, subject, int( logger.info('This broadcast is already in our inbox. Ignoring it.')
time.time()), body, 'inbox', messageEncodingType, 0) else:
helper_inbox.insert(t) t = (inventoryHash, toAddress, fromAddress, subject, int(
time.time()), body, 'inbox', messageEncodingType, 0)
shared.UISignalQueue.put(('displayNewInboxMessage', ( helper_inbox.insert(t)
inventoryHash, toAddress, fromAddress, subject, body)))
shared.UISignalQueue.put(('displayNewInboxMessage', (
# If we are behaving as an API then we might need to run an inventoryHash, toAddress, fromAddress, subject, body)))
# outside command to let some program know that a new message
# has arrived. # If we are behaving as an API then we might need to run an
if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'): # outside command to let some program know that a new message
try: # has arrived.
apiNotifyPath = shared.config.get( if shared.safeConfigGetBoolean('bitmessagesettings', 'apienabled'):
'bitmessagesettings', 'apinotifypath') try:
except: apiNotifyPath = shared.config.get(
apiNotifyPath = '' 'bitmessagesettings', 'apinotifypath')
if apiNotifyPath != '': except:
call([apiNotifyPath, "newBroadcast"]) apiNotifyPath = ''
if apiNotifyPath != '':
call([apiNotifyPath, "newBroadcast"])
# Display timing data # Display timing data
logger.debug('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,)) logger.debug('Time spent processing this interesting broadcast: %s' % (time.time() - messageProcessingStartTime,))

View File

@ -9,3 +9,7 @@ def trash(msgid):
sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', msgid) sqlExecute('''UPDATE inbox SET folder='trash' WHERE msgid=?''', msgid)
shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid)) shared.UISignalQueue.put(('removeInboxRowByMsgid',msgid))
def isMessageAlreadyInInbox(toAddress, fromAddress, subject, body, encodingType):
queryReturn = sqlQuery(
'''SELECT COUNT(*) FROM inbox WHERE toaddress=? AND fromaddress=? AND subject=? AND message=? AND encodingtype=? ''', toAddress, fromAddress, subject, body, encodingType)
return queryReturn[0][0] != 0