Inventory refactoring

- minor refactoring, made it into singleton instead of a shared global
  variable. This makes it a little bit cleaner and moves the class into
a separate file
- removed duplicate inventory locking
- renamed singleton.py to singleinstance.py (this is the code that
  ensures only one instance of PyBitmessage runs at the same time)
This commit is contained in:
Peter Šurda 2017-01-10 21:15:35 +01:00
parent 4f543e14c1
commit e84b19613e
Signed by untrusted user: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
11 changed files with 214 additions and 227 deletions

View File

@ -26,6 +26,7 @@ import ConfigParser
from addresses import * from addresses import *
from pyelliptic.openssl import OpenSSL from pyelliptic.openssl import OpenSSL
import l10n import l10n
from inventory import Inventory
quit = False quit = False
menutab = 1 menutab = 1
@ -108,8 +109,8 @@ def scrollbox(d, text, height=None, width=None):
def resetlookups(): def resetlookups():
global inventorydata global inventorydata
inventorydata = shared.numberOfInventoryLookupsPerformed inventorydata = Inventory().numberOfInventoryLookupsPerformed
shared.numberOfInventoryLookupsPerformed = 0 Inventory().numberOfInventoryLookupsPerformed = 0
Timer(1, resetlookups, ()).start() Timer(1, resetlookups, ()).start()
def drawtab(stdscr): def drawtab(stdscr):
if menutab in range(1, len(menu)+1): if menutab in range(1, len(menu)+1):

View File

@ -14,7 +14,7 @@ depends.check_dependencies()
import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully.
# The next 3 are used for the API # The next 3 are used for the API
import singleton from singleinstance import singleinstance
import os import os
import socket import socket
import ctypes import ctypes
@ -162,7 +162,7 @@ class Main:
shared.curses = True shared.curses = True
# is the application already running? If yes then exit. # is the application already running? If yes then exit.
shared.thisapp = singleton.singleinstance("", daemon) shared.thisapp = singleinstance("", daemon)
if daemon: if daemon:
with shared.printLock: with shared.printLock:

View File

@ -4428,7 +4428,7 @@ class MySingleApplication(QApplication):
# Checks if there's an instance of the local server id running # Checks if there's an instance of the local server id running
if self.is_running: if self.is_running:
# This should be ignored, singleton.py will take care of exiting me. # This should be ignored, singleinstance.py will take care of exiting me.
pass pass
else: else:
# Nope, create a local server with this id and assign on_new_connection # Nope, create a local server with this id and assign on_new_connection

View File

@ -2,6 +2,7 @@ from PyQt4 import QtCore, QtGui
import time import time
import shared import shared
from tr import _translate from tr import _translate
from inventory import Inventory
import l10n import l10n
from retranslateui import RetranslateMixin from retranslateui import RetranslateMixin
from uisignaler import UISignaler from uisignaler import UISignaler
@ -127,8 +128,8 @@ class NetworkStatus(QtGui.QWidget, RetranslateMixin):
# timer driven # timer driven
def runEveryTwoSeconds(self): def runEveryTwoSeconds(self):
self.labelLookupsPerSecond.setText(_translate( self.labelLookupsPerSecond.setText(_translate(
"networkstatus", "Inventory lookups per second: %1").arg(str(shared.numberOfInventoryLookupsPerformed/2))) "networkstatus", "Inventory lookups per second: %1").arg(str(Inventory().numberOfInventoryLookupsPerformed/2)))
shared.numberOfInventoryLookupsPerformed = 0 Inventory().numberOfInventoryLookupsPerformed = 0
self.updateNumberOfBytes() self.updateNumberOfBytes()
self.updateNumberOfObjectsToBeSynced() self.updateNumberOfObjectsToBeSynced()

View File

@ -27,6 +27,7 @@ from class_objectHashHolder import objectHashHolder
from helper_generic import addDataPadding, isHostInPrivateIPRange from helper_generic import addDataPadding, isHostInPrivateIPRange
from helper_sql import sqlQuery from helper_sql import sqlQuery
from debug import logger from debug import logger
from inventory import Inventory
import tr import tr
# This thread is created either by the synSenderThread(for outgoing # This thread is created either by the synSenderThread(for outgoing
@ -230,10 +231,9 @@ class receiveDataThread(threading.Thread):
if self.data == '': # if there are no more messages if self.data == '': # if there are no more messages
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
shared.numberOfInventoryLookupsPerformed += 1
objectHash, = random.sample( objectHash, = random.sample(
self.objectsThatWeHaveYetToGetFromThisPeer, 1) self.objectsThatWeHaveYetToGetFromThisPeer, 1)
if objectHash in shared.inventory: if objectHash in Inventory():
logger.debug('Inventory already has object listed in inv message.') logger.debug('Inventory already has object listed in inv message.')
del self.objectsThatWeHaveYetToGetFromThisPeer[objectHash] del self.objectsThatWeHaveYetToGetFromThisPeer[objectHash]
else: else:
@ -336,7 +336,7 @@ class receiveDataThread(threading.Thread):
def sendBigInv(self): def sendBigInv(self):
# Select all hashes for objects in this stream. # Select all hashes for objects in this stream.
bigInvList = {} bigInvList = {}
for hash in shared.inventory.unexpired_hashes_by_stream(self.streamNumber): for hash in Inventory().unexpired_hashes_by_stream(self.streamNumber):
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash): if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
bigInvList[hash] = 0 bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0 numberOfObjectsInInvMessage = 0
@ -442,8 +442,7 @@ class receiveDataThread(threading.Thread):
return return
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[ self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
data[lengthOfVarint:32 + lengthOfVarint]] = 0 data[lengthOfVarint:32 + lengthOfVarint]] = 0
shared.numberOfInventoryLookupsPerformed += 1 if data[lengthOfVarint:32 + lengthOfVarint] in Inventory():
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
logger.debug('Inventory has inventory item already.') logger.debug('Inventory has inventory item already.')
else: else:
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
@ -455,7 +454,7 @@ class receiveDataThread(threading.Thread):
advertisedSet = set() advertisedSet = set()
for i in range(numberOfItemsInInv): for i in range(numberOfItemsInInv):
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
objectsNewToMe = advertisedSet - shared.inventory.hashes_by_stream(self.streamNumber) objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber)
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe: for item in objectsNewToMe:
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000 and shared.trustedPeer == None: # inv flooding attack mitigation
@ -488,15 +487,11 @@ class receiveDataThread(threading.Thread):
i * 32):32 + lengthOfVarint + (i * 32)] i * 32):32 + lengthOfVarint + (i * 32)]
logger.debug('received getdata request for item:' + hexlify(hash)) logger.debug('received getdata request for item:' + hexlify(hash))
shared.numberOfInventoryLookupsPerformed += 1
shared.inventoryLock.acquire()
if self.objectHashHolderInstance.hasHash(hash): if self.objectHashHolderInstance.hasHash(hash):
shared.inventoryLock.release()
self.antiIntersectionDelay() self.antiIntersectionDelay()
else: else:
shared.inventoryLock.release() if hash in Inventory():
if hash in shared.inventory: self.sendObject(Inventory()[hash].payload)
self.sendObject(shared.inventory[hash].payload)
else: else:
self.antiIntersectionDelay() self.antiIntersectionDelay()
logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,)) logger.warning('%s asked for an object with a getdata which is not in either our memory inventory or our SQL inventory. We probably cleaned it out after advertising it but before they got around to asking for it.' % (self.peer,))

View File

@ -8,6 +8,7 @@ import pickle
import tr#anslate import tr#anslate
from helper_sql import * from helper_sql import *
from helper_threading import * from helper_threading import *
from inventory import Inventory
from debug import logger from debug import logger
""" """
@ -47,7 +48,7 @@ class singleCleaner(threading.Thread, StoppableThread):
while shared.shutdown == 0: while shared.shutdown == 0:
shared.UISignalQueue.put(( shared.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) 'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)'))
shared.inventory.flush() Inventory().flush()
shared.UISignalQueue.put(('updateStatusBar', '')) shared.UISignalQueue.put(('updateStatusBar', ''))
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
@ -59,7 +60,7 @@ class singleCleaner(threading.Thread, StoppableThread):
shared.UISignalQueue.queue.clear() shared.UISignalQueue.queue.clear()
if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380: if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380:
timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
shared.inventory.clean() Inventory().clean()
# pubkeys # pubkeys
sqlExecute( sqlExecute(
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''', '''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''',

View File

@ -17,6 +17,7 @@ import helper_inbox
from helper_generic import addDataPadding from helper_generic import addDataPadding
import helper_msgcoding import helper_msgcoding
from helper_threading import * from helper_threading import *
from inventory import Inventory
import l10n import l10n
from protocol import * from protocol import *
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
@ -171,7 +172,7 @@ class singleWorker(threading.Thread, StoppableThread):
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
shared.inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime,'') objectType, streamNumber, payload, embeddedTime,'')
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
@ -261,7 +262,7 @@ class singleWorker(threading.Thread, StoppableThread):
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
shared.inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime,'') objectType, streamNumber, payload, embeddedTime,'')
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
@ -351,7 +352,7 @@ class singleWorker(threading.Thread, StoppableThread):
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
shared.inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:]) objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:])
logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash)) logger.info('broadcasting inv with hash: ' + hexlify(inventoryHash))
@ -482,7 +483,7 @@ class singleWorker(threading.Thread, StoppableThread):
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 3 objectType = 3
shared.inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, tag) objectType, streamNumber, payload, embeddedTime, tag)
logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash)) logger.info('sending inv (within sendBroadcast function) for object: ' + hexlify(inventoryHash))
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
@ -576,7 +577,7 @@ class singleWorker(threading.Thread, StoppableThread):
tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash. tag = doubleHashOfToAddressData[32:] # The second half of the sha512 hash.
shared.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey))) shared.neededPubkeys[tag] = (toaddress, highlevelcrypto.makeCryptor(hexlify(privEncryptionKey)))
for value in shared.inventory.by_type_and_tag(1, toTag): for value in Inventory().by_type_and_tag(1, toTag):
if shared.decryptAndCheckPubkeyPayload(value.payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table. if shared.decryptAndCheckPubkeyPayload(value.payload, toaddress) == 'successful': #if valid, this function also puts it in the pubkeys table.
needToRequestPubkey = False needToRequestPubkey = False
sqlExecute( sqlExecute(
@ -808,7 +809,7 @@ class singleWorker(threading.Thread, StoppableThread):
inventoryHash = calculateInventoryHash(encryptedPayload) inventoryHash = calculateInventoryHash(encryptedPayload)
objectType = 2 objectType = 2
shared.inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, embeddedTime, '') objectType, toStreamNumber, encryptedPayload, embeddedTime, '')
if shared.config.has_section(toaddress) or not checkBitfield(behaviorBitfield, shared.BITFIELD_DOESACK): if shared.config.has_section(toaddress) or not checkBitfield(behaviorBitfield, shared.BITFIELD_DOESACK):
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp())))) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr._translate("MainWindow", "Message sent. Sent at %1").arg(l10n.formatTimestamp()))))
@ -917,7 +918,7 @@ class singleWorker(threading.Thread, StoppableThread):
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
objectType = 1 objectType = 1
shared.inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '') objectType, streamNumber, payload, embeddedTime, '')
logger.info('sending inv (for the getpubkey message)') logger.info('sending inv (for the getpubkey message)')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((

84
src/inventory.py Normal file
View File

@ -0,0 +1,84 @@
import collections
from threading import RLock
import time
from helper_sql import *
from singleton import Singleton
inventoryLock = RLock() # Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag')
@Singleton
class Inventory(collections.MutableMapping):
def __init__(self):
super(self.__class__, self).__init__()
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self.numberOfInventoryLookupsPerformed = 0
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
def __contains__(self, hash):
with inventoryLock:
self.numberOfInventoryLookupsPerformed += 1
if hash in self._inventory:
return True
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash))
def __getitem__(self, hash):
with inventoryLock:
if hash in self._inventory:
return self._inventory[hash]
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
if not rows:
raise KeyError(hash)
return InventoryItem(*rows[0])
def __setitem__(self, hash, value):
with inventoryLock:
value = InventoryItem(*value)
self._inventory[hash] = value
self._streams[value.stream].add(hash)
def __delitem__(self, hash):
raise NotImplementedError
def __iter__(self):
with inventoryLock:
hashes = self._inventory.keys()[:]
hashes += (hash for hash, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
with inventoryLock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, type, tag):
with inventoryLock:
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
return values
def hashes_by_stream(self, stream):
with inventoryLock:
return self._streams[stream]
def unexpired_hashes_by_stream(self, stream):
with inventoryLock:
t = int(time.time())
hashes = [hash for hash, value in self._inventory.items() if value.stream == stream and value.expires > t]
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
return hashes
def flush(self):
with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for hash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value)
self._inventory.clear()
def clean(self):
with inventoryLock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
self._streams.clear()
for hash, value in self.items():
self._streams[value.stream].add(hash)

View File

@ -37,6 +37,7 @@ import shared
#import helper_startup #import helper_startup
from helper_sql import * from helper_sql import *
from helper_threading import * from helper_threading import *
from inventory import Inventory
config = BMConfigParser() config = BMConfigParser()
@ -55,7 +56,6 @@ addressGeneratorQueue = Queue.Queue()
knownNodesLock = threading.Lock() knownNodesLock = threading.Lock()
knownNodes = {} knownNodes = {}
sendDataQueues = [] #each sendData thread puts its queue in this list. sendDataQueues = [] #each sendData thread puts its queue in this list.
inventoryLock = threading.RLock() #Guarantees that two receiveDataThreads don't receive and process the same message concurrently (probably sent by a malicious individual)
printLock = threading.Lock() printLock = threading.Lock()
appdata = '' #holds the location of the application data storage directory appdata = '' #holds the location of the application data storage directory
statusIconColor = 'red' statusIconColor = 'red'
@ -80,7 +80,6 @@ clientHasReceivedIncomingConnections = False #used by API command clientStatus
numberOfMessagesProcessed = 0 numberOfMessagesProcessed = 0
numberOfBroadcastsProcessed = 0 numberOfBroadcastsProcessed = 0
numberOfPubkeysProcessed = 0 numberOfPubkeysProcessed = 0
numberOfInventoryLookupsPerformed = 0
numberOfBytesReceived = 0 # Used for the 'network status' page numberOfBytesReceived = 0 # Used for the 'network status' page
numberOfBytesSent = 0 # Used for the 'network status' page numberOfBytesSent = 0 # Used for the 'network status' page
numberOfBytesReceivedLastSecond = 0 # used for the bandwidth rate limit numberOfBytesReceivedLastSecond = 0 # used for the bandwidth rate limit
@ -143,88 +142,6 @@ NODE_SSL = 2
#Bitfield flags #Bitfield flags
BITFIELD_DOESACK = 1 BITFIELD_DOESACK = 1
import collections
InventoryItem = collections.namedtuple('InventoryItem', 'type stream payload expires tag')
class Inventory(collections.MutableMapping):
def __init__(self):
super(Inventory, self).__init__()
self._inventory = {} #of objects (like msg payloads and pubkey payloads) Does not include protocol headers (the first 24 bytes of each packet).
self._streams = collections.defaultdict(set) # key = streamNumer, value = a set which holds the inventory object hashes that we are aware of. This is used whenever we receive an inv message from a peer to check to see what items are new to us. We don't delete things out of it; instead, the singleCleaner thread clears and refills it every couple hours.
def __contains__(self, hash):
global numberOfInventoryLookupsPerformed
with inventoryLock:
numberOfInventoryLookupsPerformed += 1
if hash in self._inventory:
return True
return bool(sqlQuery('SELECT 1 FROM inventory WHERE hash=?', hash))
def __getitem__(self, hash):
with inventoryLock:
if hash in self._inventory:
return self._inventory[hash]
rows = sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE hash=?', hash)
if not rows:
raise KeyError(hash)
return InventoryItem(*rows[0])
def __setitem__(self, hash, value):
with inventoryLock:
value = InventoryItem(*value)
self._inventory[hash] = value
self._streams[value.stream].add(hash)
def __delitem__(self, hash):
raise NotImplementedError
def __iter__(self):
with inventoryLock:
hashes = self._inventory.keys()[:]
hashes += (hash for hash, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
with inventoryLock:
return len(self._inventory) + sqlQuery('SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, type, tag):
with inventoryLock:
values = [value for value in self._inventory.values() if value.type == type and value.tag == tag]
values += (InventoryItem(*value) for value in sqlQuery('SELECT objecttype, streamnumber, payload, expirestime, tag FROM inventory WHERE objecttype=? AND tag=?', type, tag))
return values
def hashes_by_stream(self, stream):
with inventoryLock:
return self._streams[stream]
def unexpired_hashes_by_stream(self, stream):
with inventoryLock:
t = int(time.time())
hashes = [hash for hash, value in self._inventory.items() if value.stream == stream and value.expires > t]
hashes += (payload for payload, in sqlQuery('SELECT hash FROM inventory WHERE streamnumber=? AND expirestime>?', stream, t))
return hashes
def flush(self):
with inventoryLock: # If you use both the inventoryLock and the sqlLock, always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for hash, value in self._inventory.items():
sql.execute('INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)', hash, *value)
self._inventory.clear()
def clean(self):
with inventoryLock:
sqlExecute('DELETE FROM inventory WHERE expirestime<?',int(time.time()) - (60 * 60 * 3))
self._streams.clear()
for hash, value in self.items():
self._streams[value.stream].add(hash)
inventory = Inventory()
#Create a packet #Create a packet
def CreatePacket(command, payload=''): def CreatePacket(command, payload=''):
payload_length = len(payload) payload_length = len(payload)
@ -533,7 +450,7 @@ def doCleanShutdown():
UISignalQueue.put(( UISignalQueue.put((
'updateStatusBar', 'updateStatusBar',
'Flushing inventory in memory out to disk. This should normally only take a second...')) 'Flushing inventory in memory out to disk. This should normally only take a second...'))
inventory.flush() Inventory().flush()
# Verify that the objectProcessor has finished exiting. It should have incremented the # Verify that the objectProcessor has finished exiting. It should have incremented the
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit. # shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
@ -806,16 +723,12 @@ def _checkAndShareUndefinedObjectWithPeers(data):
return return
inventoryHash = calculateInventoryHash(data) inventoryHash = calculateInventoryHash(data)
shared.numberOfInventoryLookupsPerformed += 1 if inventoryHash in Inventory():
inventoryLock.acquire()
if inventoryHash in inventory:
logger.debug('We have already received this undefined object. Ignoring.') logger.debug('We have already received this undefined object. Ignoring.')
inventoryLock.release()
return return
objectType, = unpack('>I', data[16:20]) objectType, = unpack('>I', data[16:20])
inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, data, embeddedTime,'') objectType, streamNumber, data, embeddedTime,'')
inventoryLock.release()
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
@ -833,17 +746,13 @@ def _checkAndShareMsgWithPeers(data):
return return
readPosition += streamNumberLength readPosition += streamNumberLength
inventoryHash = calculateInventoryHash(data) inventoryHash = calculateInventoryHash(data)
shared.numberOfInventoryLookupsPerformed += 1 if inventoryHash in Inventory():
inventoryLock.acquire()
if inventoryHash in inventory:
logger.debug('We have already received this msg message. Ignoring.') logger.debug('We have already received this msg message. Ignoring.')
inventoryLock.release()
return return
# This msg message is valid. Let's let our peers know about it. # This msg message is valid. Let's let our peers know about it.
objectType = 2 objectType = 2
inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, data, embeddedTime,'') objectType, streamNumber, data, embeddedTime,'')
inventoryLock.release()
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
@ -868,18 +777,14 @@ def _checkAndShareGetpubkeyWithPeers(data):
return return
readPosition += streamNumberLength readPosition += streamNumberLength
shared.numberOfInventoryLookupsPerformed += 1
inventoryHash = calculateInventoryHash(data) inventoryHash = calculateInventoryHash(data)
inventoryLock.acquire() if inventoryHash in Inventory():
if inventoryHash in inventory:
logger.debug('We have already received this getpubkey request. Ignoring it.') logger.debug('We have already received this getpubkey request. Ignoring it.')
inventoryLock.release()
return return
objectType = 0 objectType = 0
inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, data, embeddedTime,'') objectType, streamNumber, data, embeddedTime,'')
inventoryLock.release()
# This getpubkey request is valid. Forward to peers. # This getpubkey request is valid. Forward to peers.
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
@ -907,17 +812,13 @@ def _checkAndSharePubkeyWithPeers(data):
else: else:
tag = '' tag = ''
shared.numberOfInventoryLookupsPerformed += 1
inventoryHash = calculateInventoryHash(data) inventoryHash = calculateInventoryHash(data)
inventoryLock.acquire() if inventoryHash in Inventory():
if inventoryHash in inventory:
logger.debug('We have already received this pubkey. Ignoring it.') logger.debug('We have already received this pubkey. Ignoring it.')
inventoryLock.release()
return return
objectType = 1 objectType = 1
inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, data, embeddedTime, tag) objectType, streamNumber, data, embeddedTime, tag)
inventoryLock.release()
# This object is valid. Forward it to peers. # This object is valid. Forward it to peers.
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))
@ -946,18 +847,14 @@ def _checkAndShareBroadcastWithPeers(data):
tag = data[readPosition:readPosition+32] tag = data[readPosition:readPosition+32]
else: else:
tag = '' tag = ''
shared.numberOfInventoryLookupsPerformed += 1
inventoryLock.acquire()
inventoryHash = calculateInventoryHash(data) inventoryHash = calculateInventoryHash(data)
if inventoryHash in inventory: if inventoryHash in Inventory():
logger.debug('We have already received this broadcast object. Ignoring.') logger.debug('We have already received this broadcast object. Ignoring.')
inventoryLock.release()
return return
# It is valid. Let's let our peers know about it. # It is valid. Let's let our peers know about it.
objectType = 3 objectType = 3
inventory[inventoryHash] = ( Inventory()[inventoryHash] = (
objectType, streamNumber, data, embeddedTime, tag) objectType, streamNumber, data, embeddedTime, tag)
inventoryLock.release()
# This object is valid. Forward it to peers. # This object is valid. Forward it to peers.
logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash)) logger.debug('advertising inv with hash: %s' % hexlify(inventoryHash))
broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash)) broadcastToSendDataQueues((streamNumber, 'advertiseobject', inventoryHash))

84
src/singleinstance.py Normal file
View File

@ -0,0 +1,84 @@
#! /usr/bin/env python
import atexit
import errno
from multiprocessing import Process
import os
import sys
import shared
try:
import fcntl # @UnresolvedImport
except:
pass
class singleinstance:
"""
Implements a single instance application by creating a lock file at appdata.
This is based upon the singleton class from tendo https://github.com/pycontribs/tendo
which is under the Python Software Foundation License version 2
"""
def __init__(self, flavor_id="", daemon=False):
self.initialized = False
self.counter = 0
self.daemon = daemon
self.lockPid = None
self.lockfile = os.path.normpath(os.path.join(shared.appdata, 'singleton%s.lock' % flavor_id))
if not self.daemon and not shared.curses:
# Tells the already running (if any) application to get focus.
import bitmessageqt
bitmessageqt.init()
self.lock()
self.initialized = True
atexit.register(self.cleanup)
def lock(self):
if self.lockPid is None:
self.lockPid = os.getpid()
if sys.platform == 'win32':
try:
# file already exists, we try to remove (in case previous execution was interrupted)
if os.path.exists(self.lockfile):
os.unlink(self.lockfile)
self.fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except OSError:
type, e, tb = sys.exc_info()
if e.errno == 13:
print 'Another instance of this application is already running'
sys.exit(-1)
print(e.errno)
raise
else: # non Windows
self.fp = open(self.lockfile, 'w')
try:
if self.daemon and self.lockPid != os.getpid():
fcntl.lockf(self.fp, fcntl.LOCK_EX) # wait for parent to finish
else:
fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.lockPid = os.getpid()
except IOError:
print 'Another instance of this application is already running'
sys.exit(-1)
def cleanup(self):
if not self.initialized:
return
if self.daemon and self.lockPid == os.getpid():
# these are the two initial forks while daemonizing
return
print "Cleaning up lockfile"
try:
if sys.platform == 'win32':
if hasattr(self, 'fd'):
os.close(self.fd)
os.unlink(self.lockfile)
else:
fcntl.lockf(self.fp, fcntl.LOCK_UN)
if os.path.isfile(self.lockfile):
os.unlink(self.lockfile)
except Exception, e:
pass

View File

@ -1,84 +1,7 @@
#! /usr/bin/env python def Singleton(cls):
instances = {}
import atexit def getinstance():
import errno if cls not in instances:
from multiprocessing import Process instances[cls] = cls()
import os return instances[cls]
import sys return getinstance
import shared
try:
import fcntl # @UnresolvedImport
except:
pass
class singleinstance:
"""
Implements a single instance application by creating a lock file at appdata.
This is based upon the singleton class from tendo https://github.com/pycontribs/tendo
which is under the Python Software Foundation License version 2
"""
def __init__(self, flavor_id="", daemon=False):
self.initialized = False
self.counter = 0
self.daemon = daemon
self.lockPid = None
self.lockfile = os.path.normpath(os.path.join(shared.appdata, 'singleton%s.lock' % flavor_id))
if not self.daemon and not shared.curses:
# Tells the already running (if any) application to get focus.
import bitmessageqt
bitmessageqt.init()
self.lock()
self.initialized = True
atexit.register(self.cleanup)
def lock(self):
if self.lockPid is None:
self.lockPid = os.getpid()
if sys.platform == 'win32':
try:
# file already exists, we try to remove (in case previous execution was interrupted)
if os.path.exists(self.lockfile):
os.unlink(self.lockfile)
self.fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except OSError:
type, e, tb = sys.exc_info()
if e.errno == 13:
print 'Another instance of this application is already running'
sys.exit(-1)
print(e.errno)
raise
else: # non Windows
self.fp = open(self.lockfile, 'w')
try:
if self.daemon and self.lockPid != os.getpid():
fcntl.lockf(self.fp, fcntl.LOCK_EX) # wait for parent to finish
else:
fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.lockPid = os.getpid()
except IOError:
print 'Another instance of this application is already running'
sys.exit(-1)
def cleanup(self):
if not self.initialized:
return
if self.daemon and self.lockPid == os.getpid():
# these are the two initial forks while daemonizing
return
print "Cleaning up lockfile"
try:
if sys.platform == 'win32':
if hasattr(self, 'fd'):
os.close(self.fd)
os.unlink(self.lockfile)
else:
fcntl.lockf(self.fp, fcntl.LOCK_UN)
if os.path.isfile(self.lockfile):
os.unlink(self.lockfile)
except Exception, e:
pass