diff --git a/src/bitmessagecurses/__init__.py b/src/bitmessagecurses/__init__.py index 3b740247..1d667b2b 100644 --- a/src/bitmessagecurses/__init__.py +++ b/src/bitmessagecurses/__init__.py @@ -26,6 +26,7 @@ import ConfigParser from addresses import * from pyelliptic.openssl import OpenSSL import l10n +from inventory import Inventory quit = False menutab = 1 @@ -108,8 +109,8 @@ def scrollbox(d, text, height=None, width=None): def resetlookups(): global inventorydata - inventorydata = shared.numberOfInventoryLookupsPerformed - shared.numberOfInventoryLookupsPerformed = 0 + inventorydata = Inventory().numberOfInventoryLookupsPerformed + Inventory().numberOfInventoryLookupsPerformed = 0 Timer(1, resetlookups, ()).start() def drawtab(stdscr): if menutab in range(1, len(menu)+1): diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 830dca31..3b2d1aae 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -14,7 +14,7 @@ depends.check_dependencies() import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. # The next 3 are used for the API -import singleton +from singleinstance import singleinstance import os import socket import ctypes @@ -162,7 +162,7 @@ class Main: shared.curses = True # is the application already running? If yes then exit. - shared.thisapp = singleton.singleinstance("", daemon) + shared.thisapp = singleinstance("", daemon) if daemon: with shared.printLock: diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 37c52917..4e3366c7 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -4428,7 +4428,7 @@ class MySingleApplication(QApplication): # Checks if there's an instance of the local server id running if self.is_running: - # This should be ignored, singleton.py will take care of exiting me. + # This should be ignored, singleinstance.py will take care of exiting me. pass else: # Nope, create a local server with this id and assign on_new_connection diff --git a/src/bitmessageqt/networkstatus.py b/src/bitmessageqt/networkstatus.py index 09d05c63..c792f81c 100644 --- a/src/bitmessageqt/networkstatus.py +++ b/src/bitmessageqt/networkstatus.py @@ -2,6 +2,7 @@ from PyQt4 import QtCore, QtGui import time import shared from tr import _translate +from inventory import Inventory import l10n from retranslateui import RetranslateMixin from uisignaler import UISignaler @@ -127,8 +128,8 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin): # timer driven def runEveryTwoSeconds(self): self.labelLookupsPerSecond.setText(_translate( - "networkstatus", "Inventory lookups per second: %1").arg(str(shared.numberOfInventoryLookupsPerformed/2))) - shared.numberOfInventoryLookupsPerformed = 0 + "networkstatus", "Inventory lookups per second: %1").arg(str(Inventory().numberOfInventoryLookupsPerformed/2))) + Inventory().numberOfInventoryLookupsPerformed = 0 self.updateNumberOfBytes() self.updateNumberOfObjectsToBeSynced() diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index c69697d6..f29b1f4c 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -27,6 +27,7 @@ from class_objectHashHolder import objectHashHolder from helper_generic import addDataPadding, isHostInPrivateIPRange from helper_sql import sqlQuery from debug import logger +from inventory import Inventory import tr # This thread is created either by the synSenderThread(for outgoing @@ -230,10 +231,9 @@ class receiveDataThread(threading.Thread): if self.data == '': # if there are no more messages while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: - shared.numberOfInventoryLookupsPerformed += 1 objectHash, = random.sample( self.objectsThatWeHaveYetToGetFromThisPeer, 1) - if objectHash in shared.inventory: + if objectHash in Inventory(): logger.debug('Inventory already has object listed in inv message.') del self.objectsThatWeHaveYetToGetFromThisPeer[objectHash] else: @@ -336,7 +336,7 @@ class receiveDataThread(threading.Thread): def sendBigInv(self): # Select all hashes for objects in this stream. bigInvList = {} - for hash in shared.inventory.unexpired_hashes_by_stream(self.streamNumber): + for hash in Inventory().unexpired_hashes_by_stream(self.streamNumber): if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash): bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 @@ -442,8 +442,7 @@ class receiveDataThread(threading.Thread): return self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[ data[lengthOfVarint:32 + lengthOfVarint]] = 0 - shared.numberOfInventoryLookupsPerformed += 1 - if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory: + if data[lengthOfVarint:32 + lengthOfVarint] in Inventory(): logger.debug('Inventory has inventory item already.') else: self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) @@ -455,7 +454,7 @@ class receiveDataThread(threading.Thread): advertisedSet = set() for i in range(numberOfItemsInInv): advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) - objectsNewToMe = advertisedSet - shared.inventory.hashes_by_stream(self.streamNumber) + objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) for item in objectsNewToMe: if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation @@ -488,15 +487,11 @@ class receiveDataThread(threading.Thread): i * 32):32 + lengthOfVarint + (i * 32)] logger.debug('received getdata request for item:' + hexlify(hash)) - shared.numberOfInventoryLookupsPerformed += 1 - shared.inventoryLock.acquire() if self.objectHashHolderInstance.hasHash(hash): - shared.inventoryLock.release() self.antiIntersectionDelay() else: - shared.inventoryLock.release() - if hash in shared.inventory: - self.sendObject(shared.inventory[hash].payload) + if hash in Inventory(): + self.sendObject(Inventory()[hash].payload) else: self.antiIntersectionDelay() logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,)) diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 04ee3f9b..533184d3 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -8,6 +8,7 @@ import pickle import tr#anslate from helper_sql import * from helper_threading import * +from inventory import Inventory from debug import logger """ @@ -47,7 +48,7 @@ class singleCleaner(threading.Thread, StoppableThread): while shared.shutdown == 0: shared.UISignalQueue.put(( 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) - shared.inventory.flush() + Inventory().flush() shared.UISignalQueue.put(('updateStatusBar', '')) shared.broadcastToSendDataQueues(( @@ -59,7 +60,7 @@ class singleCleaner(threading.Thread, StoppableThread): shared.UISignalQueue.queue.clear() if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380: timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) - shared.inventory.clean() + Inventory().clean() # pubkeys sqlExecute( '''DELETE FROM pubkeys WHERE timeQ', nonce) + payload inventoryHash = calculateInventoryHash(payload) objectType = 1 - shared.inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime,'') logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -351,7 +352,7 @@ class singleWorker(threading.Thread, StoppableThread): payload = pack('>Q', nonce) + payload inventoryHash = calculateInventoryHash(payload) objectType = 1 - shared.inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:]) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) @@ -482,7 +483,7 @@ class singleWorker(threading.Thread, StoppableThread): inventoryHash = calculateInventoryHash(payload) objectType = 3 - shared.inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, tag) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) shared.broadcastToSendDataQueues(( @@ -576,7 +577,7 @@ class singleWorker(threading.Thread, StoppableThread): tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash. shared.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) - for value in shared.inventory.by_type_and_tag(1, toTag): + for value in Inventory().by_type_and_tag(1, toTag): if shared.decryptAndCheckPubkeyPayload(value.payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table. needToRequestPubkey = False sqlExecute( @@ -808,7 +809,7 @@ class singleWorker(threading.Thread, StoppableThread): inventoryHash = calculateInventoryHash(encryptedPayload) objectType = 2 - shared.inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, embeddedTime, '') if shared.config.has_section(toaddress) or not checkBitfield(behaviorBitfield, shared.BITFIELD_DOESACK): shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp())))) @@ -917,7 +918,7 @@ class singleWorker(threading.Thread, StoppableThread): payload = pack('>Q', nonce) + payload inventoryHash = calculateInventoryHash(payload) objectType = 1 - shared.inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') logger.info('sending inv (for the getpubkey message)') shared.broadcastToSendDataQueues(( diff --git a/src/inventory.py b/src/inventory.py new file mode 100644 index 00000000..83a9ce82 --- /dev/null +++ b/src/inventory.py @@ -0,0 +1,84 @@ +import collections +from threading import RLock +import time + +from helper_sql import * +from singleton import Singleton + +inventoryLock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual) +InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag') + + +@Singleton +class Inventory(collections.MutableMapping): + def __init__(self): + super(self.__class__, self).__init__() + self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet). + self.numberOfInventoryLookupsPerformed = 0 + self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours. + + def __contains__(self, hash): + with inventoryLock: + self.numberOfInventoryLookupsPerformed += 1 + if hash in self._inventory: + return True + return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash)) + + def __getitem__(self, hash): + with inventoryLock: + if hash in self._inventory: + return self._inventory[hash] + rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash) + if not rows: + raise KeyError(hash) + return InventoryItem(*rows[0]) + + def __setitem__(self, hash, value): + with inventoryLock: + value = InventoryItem(*value) + self._inventory[hash] = value + self._streams[value.stream].add(hash) + + def __delitem__(self, hash): + raise NotImplementedError + + def __iter__(self): + with inventoryLock: + hashes = self._inventory.keys()[:] + hashes += (hash for hash, in sqlQuery('SELECT hash FROM inventory')) + return hashes.__iter__() + + def __len__(self): + with inventoryLock: + return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0] + + def by_type_and_tag(self, type, tag): + with inventoryLock: + values = [value for value in self._inventory.values() if value.type == type and value.tag == tag] + values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag)) + return values + + def hashes_by_stream(self, stream): + with inventoryLock: + return self._streams[stream] + + def unexpired_hashes_by_stream(self, stream): + with inventoryLock: + t = int(time.time()) + hashes = [hash for hash, value in self._inventory.items() if value.stream == stream and value.expires > t] + hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) + return hashes + + def flush(self): + with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. + with SqlBulkExecute() as sql: + for hash, value in self._inventory.items(): + sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value) + self._inventory.clear() + + def clean(self): + with inventoryLock: + sqlExecute('DELETE FROM inventory WHERE expirestime t] - hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t)) - return hashes - - def flush(self): - with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock. - with SqlBulkExecute() as sql: - for hash, value in self._inventory.items(): - sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value) - self._inventory.clear() - - def clean(self): - with inventoryLock: - sqlExecute('DELETE FROM inventory WHERE expirestimeI', data[16:20]) - inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, data, embeddedTime,'') - inventoryLock.release() logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) @@ -833,17 +746,13 @@ def _checkAndShareMsgWithPeers(data): return readPosition += streamNumberLength inventoryHash = calculateInventoryHash(data) - shared.numberOfInventoryLookupsPerformed += 1 - inventoryLock.acquire() - if inventoryHash in inventory: + if inventoryHash in Inventory(): logger.debug('We have already received this msg message. Ignoring.') - inventoryLock.release() return # This msg message is valid. Let's let our peers know about it. objectType = 2 - inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, data, embeddedTime,'') - inventoryLock.release() logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) @@ -868,18 +777,14 @@ def _checkAndShareGetpubkeyWithPeers(data): return readPosition += streamNumberLength - shared.numberOfInventoryLookupsPerformed += 1 inventoryHash = calculateInventoryHash(data) - inventoryLock.acquire() - if inventoryHash in inventory: + if inventoryHash in Inventory(): logger.debug('We have already received this getpubkey request. Ignoring it.') - inventoryLock.release() return objectType = 0 - inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, data, embeddedTime,'') - inventoryLock.release() # This getpubkey request is valid. Forward to peers. logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) @@ -907,17 +812,13 @@ def _checkAndSharePubkeyWithPeers(data): else: tag = '' - shared.numberOfInventoryLookupsPerformed += 1 inventoryHash = calculateInventoryHash(data) - inventoryLock.acquire() - if inventoryHash in inventory: + if inventoryHash in Inventory(): logger.debug('We have already received this pubkey. Ignoring it.') - inventoryLock.release() return objectType = 1 - inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, data, embeddedTime, tag) - inventoryLock.release() # This object is valid. Forward it to peers. logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) @@ -946,18 +847,14 @@ def _checkAndShareBroadcastWithPeers(data): tag = data[readPosition:readPosition+32] else: tag = '' - shared.numberOfInventoryLookupsPerformed += 1 - inventoryLock.acquire() inventoryHash = calculateInventoryHash(data) - if inventoryHash in inventory: + if inventoryHash in Inventory(): logger.debug('We have already received this broadcast object. Ignoring.') - inventoryLock.release() return # It is valid. Let's let our peers know about it. objectType = 3 - inventory[inventoryHash] = ( + Inventory()[inventoryHash] = ( objectType, streamNumber, data, embeddedTime, tag) - inventoryLock.release() # This object is valid. Forward it to peers. logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) diff --git a/src/singleinstance.py b/src/singleinstance.py new file mode 100644 index 00000000..8bf7ed01 --- /dev/null +++ b/src/singleinstance.py @@ -0,0 +1,84 @@ +#! /usr/bin/env python + +import atexit +import errno +from multiprocessing import Process +import os +import sys +import shared + +try: + import fcntl # @UnresolvedImport +except: + pass + +class singleinstance: + """ + Implements a single instance application by creating a lock file at appdata. + + This is based upon the singleton class from tendo https://github.com/pycontribs/tendo + which is under the Python Software Foundation License version 2 + """ + def __init__(self, flavor_id="", daemon=False): + self.initialized = False + self.counter = 0 + self.daemon = daemon + self.lockPid = None + self.lockfile = os.path.normpath(os.path.join(shared.appdata, 'singleton%s.lock' % flavor_id)) + + if not self.daemon and not shared.curses: + # Tells the already running (if any) application to get focus. + import bitmessageqt + bitmessageqt.init() + + self.lock() + + self.initialized = True + atexit.register(self.cleanup) + + def lock(self): + if self.lockPid is None: + self.lockPid = os.getpid() + if sys.platform == 'win32': + try: + # file already exists, we try to remove (in case previous execution was interrupted) + if os.path.exists(self.lockfile): + os.unlink(self.lockfile) + self.fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) + except OSError: + type, e, tb = sys.exc_info() + if e.errno == 13: + print 'Another instance of this application is already running' + sys.exit(-1) + print(e.errno) + raise + else: # non Windows + self.fp = open(self.lockfile, 'w') + try: + if self.daemon and self.lockPid != os.getpid(): + fcntl.lockf(self.fp, fcntl.LOCK_EX) # wait for parent to finish + else: + fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) + self.lockPid = os.getpid() + except IOError: + print 'Another instance of this application is already running' + sys.exit(-1) + + def cleanup(self): + if not self.initialized: + return + if self.daemon and self.lockPid == os.getpid(): + # these are the two initial forks while daemonizing + return + print "Cleaning up lockfile" + try: + if sys.platform == 'win32': + if hasattr(self, 'fd'): + os.close(self.fd) + os.unlink(self.lockfile) + else: + fcntl.lockf(self.fp, fcntl.LOCK_UN) + if os.path.isfile(self.lockfile): + os.unlink(self.lockfile) + except Exception, e: + pass diff --git a/src/singleton.py b/src/singleton.py index 8bf7ed01..1eef08e1 100644 --- a/src/singleton.py +++ b/src/singleton.py @@ -1,84 +1,7 @@ -#! /usr/bin/env python - -import atexit -import errno -from multiprocessing import Process -import os -import sys -import shared - -try: - import fcntl # @UnresolvedImport -except: - pass - -class singleinstance: - """ - Implements a single instance application by creating a lock file at appdata. - - This is based upon the singleton class from tendo https://github.com/pycontribs/tendo - which is under the Python Software Foundation License version 2 - """ - def __init__(self, flavor_id="", daemon=False): - self.initialized = False - self.counter = 0 - self.daemon = daemon - self.lockPid = None - self.lockfile = os.path.normpath(os.path.join(shared.appdata, 'singleton%s.lock' % flavor_id)) - - if not self.daemon and not shared.curses: - # Tells the already running (if any) application to get focus. - import bitmessageqt - bitmessageqt.init() - - self.lock() - - self.initialized = True - atexit.register(self.cleanup) - - def lock(self): - if self.lockPid is None: - self.lockPid = os.getpid() - if sys.platform == 'win32': - try: - # file already exists, we try to remove (in case previous execution was interrupted) - if os.path.exists(self.lockfile): - os.unlink(self.lockfile) - self.fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR) - except OSError: - type, e, tb = sys.exc_info() - if e.errno == 13: - print 'Another instance of this application is already running' - sys.exit(-1) - print(e.errno) - raise - else: # non Windows - self.fp = open(self.lockfile, 'w') - try: - if self.daemon and self.lockPid != os.getpid(): - fcntl.lockf(self.fp, fcntl.LOCK_EX) # wait for parent to finish - else: - fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) - self.lockPid = os.getpid() - except IOError: - print 'Another instance of this application is already running' - sys.exit(-1) - - def cleanup(self): - if not self.initialized: - return - if self.daemon and self.lockPid == os.getpid(): - # these are the two initial forks while daemonizing - return - print "Cleaning up lockfile" - try: - if sys.platform == 'win32': - if hasattr(self, 'fd'): - os.close(self.fd) - os.unlink(self.lockfile) - else: - fcntl.lockf(self.fp, fcntl.LOCK_UN) - if os.path.isfile(self.lockfile): - os.unlink(self.lockfile) - except Exception, e: - pass +def Singleton(cls): + instances = {} + def getinstance(): + if cls not in instances: + instances[cls] = cls() + return instances[cls] + return getinstance