on close, save objectProcessorQueue to disk

This commit is contained in:
Jonathan Warren 2013-12-02 01:35:34 -05:00
parent 3c79b7bf65
commit df7116bd72
6 changed files with 79 additions and 24 deletions

View File

@ -982,16 +982,16 @@ class Main:
singleWorkerThread.daemon = True # close the main program even if there are threads left singleWorkerThread.daemon = True # close the main program even if there are threads left
singleWorkerThread.start() singleWorkerThread.start()
# Start the thread that calculates POWs
objectProcessorThread = objectProcessor()
objectProcessorThread.daemon = True # close the main program even if there are threads left
objectProcessorThread.start()
# Start the SQL thread # Start the SQL thread
sqlLookup = sqlThread() sqlLookup = sqlThread()
sqlLookup.daemon = False # DON'T close the main program even if there are threads left. The closeEvent should command this thread to exit gracefully. sqlLookup.daemon = False # DON'T close the main program even if there are threads left. The closeEvent should command this thread to exit gracefully.
sqlLookup.start() sqlLookup.start()
# Start the thread that calculates POWs
objectProcessorThread = objectProcessor()
objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object.
objectProcessorThread.start()
# Start the cleanerThread # Start the cleanerThread
singleCleanerThread = singleCleaner() singleCleanerThread = singleCleaner()
singleCleanerThread.daemon = True # close the main program even if there are threads left singleCleanerThread.daemon = True # close the main program even if there are threads left

View File

@ -2146,7 +2146,7 @@ class MyForm(QtGui.QMainWindow):
objectType = 'broadcast' objectType = 'broadcast'
with shared.objectProcessorQueueSizeLock: with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(payload) shared.objectProcessorQueueSize += len(payload)
shared.objectProcessorQueue.put((objectType,payload)) shared.objectProcessorQueue.put((objectType,payload))
def loadBlackWhiteList(self): def loadBlackWhiteList(self):
# Initialize the Blacklist or Whitelist table # Initialize the Blacklist or Whitelist table

View File

@ -27,6 +27,23 @@ class objectProcessor(threading.Thread):
""" """
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self)
"""
It may be the case that the last time Bitmessage was running, the user
closed it before it finished processing everything in the
objectProcessorQueue. Assuming that Bitmessage wasn't closed forcefully,
it should have saved the data in the queue into the objectprocessorqueue
table. Let's pull it out.
"""
queryreturn = sqlQuery(
'''SELECT objecttype, data FROM objectprocessorqueue''')
with shared.objectProcessorQueueSizeLock:
for row in queryreturn:
objectType, data = row
shared.objectProcessorQueueSize += len(data)
shared.objectProcessorQueue.put((objectType,data))
sqlExecute('''DELETE FROM objectprocessorqueue''')
logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn)))
def run(self): def run(self):
while True: while True:
@ -40,13 +57,29 @@ class objectProcessor(threading.Thread):
self.processmsg(data) self.processmsg(data)
elif objectType == 'broadcast': elif objectType == 'broadcast':
self.processbroadcast(data) self.processbroadcast(data)
elif objectType == 'checkShutdownVariable': # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable.
pass
else: else:
logger.critical('Error! Bug! The class_objectProcessor was passed an object type it doesn\'t recognize: %s' % str(objectType)) logger.critical('Error! Bug! The class_objectProcessor was passed an object type it doesn\'t recognize: %s' % str(objectType))
with shared.objectProcessorQueueSizeLock: with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
#print 'objectProcessorQueueSize:', shared.objectProcessorQueueSize
if shared.shutdown:
time.sleep(.5) # Wait just a moment for most of the connections to close
numberOfObjectsThatWereInTheObjectProcessorQueue = 0
with SqlBulkExecute() as sql:
while shared.objectProcessorQueueSize > 1:
objectType, data = shared.objectProcessorQueue.get()
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
objectType,data)
with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue.
numberOfObjectsThatWereInTheObjectProcessorQueue += 1
logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue))
shared.shutdown = 2
break
def processgetpubkey(self, data): def processgetpubkey(self, data):
readPosition = 8 # bypass the nonce readPosition = 8 # bypass the nonce
embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) embeddedTime, = unpack('>I', data[readPosition:readPosition + 4])

View File

@ -25,6 +25,7 @@ class sqlThread(threading.Thread):
self.conn = sqlite3.connect(shared.appdata + 'messages.dat') self.conn = sqlite3.connect(shared.appdata + 'messages.dat')
self.conn.text_factory = str self.conn.text_factory = str
self.cur = self.conn.cursor() self.cur = self.conn.cursor()
try: try:
self.cur.execute( self.cur.execute(
'''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, UNIQUE(msgid) ON CONFLICT REPLACE)''' ) '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, UNIQUE(msgid) ON CONFLICT REPLACE)''' )
@ -51,17 +52,15 @@ class sqlThread(threading.Thread):
'''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int, usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''' ) '''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int, usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''' )
self.cur.execute( self.cur.execute(
'''CREATE TABLE inventory (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''' ) '''CREATE TABLE inventory (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''' )
self.cur.execute(
'''CREATE TABLE knownnodes (timelastseen int, stream int, services blob, host blob, port blob, UNIQUE(host, stream, port) ON CONFLICT REPLACE)''' )
# This table isn't used in the program yet but I
# have a feeling that we'll need it.
self.cur.execute( self.cur.execute(
'''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''')
self.cur.execute( self.cur.execute(
'''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) '''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' )
self.cur.execute( '''INSERT INTO settings VALUES('version','5')''') self.cur.execute( '''INSERT INTO settings VALUES('version','6')''')
self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', (
int(time.time()),)) int(time.time()),))
self.cur.execute(
'''CREATE TABLE objectprocessorqueue (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''' )
self.conn.commit() self.conn.commit()
logger.info('Created messages database file') logger.info('Created messages database file')
except Exception as err: except Exception as err:
@ -290,6 +289,20 @@ class sqlThread(threading.Thread):
with open(shared.appdata + 'keys.dat', 'wb') as configfile: with open(shared.appdata + 'keys.dat', 'wb') as configfile:
shared.config.write(configfile) shared.config.write(configfile)
# Add a new table: objectprocessorqueue with which to hold objects
# that have yet to be processed if the user shuts down Bitmessage.
item = '''SELECT value FROM settings WHERE key='version';'''
parameters = ''
self.cur.execute(item, parameters)
currentVersion = int(self.cur.fetchall()[0][0])
if currentVersion == 5:
self.cur.execute( '''DROP TABLE knownnodes''')
self.cur.execute(
'''CREATE TABLE objectprocessorqueue (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''' )
item = '''update settings set value=? WHERE key='version';'''
parameters = (6,)
self.cur.execute(item, parameters)
# Are you hoping to add a new option to the keys.dat file of existing # Are you hoping to add a new option to the keys.dat file of existing
# Bitmessage users? Add it right above this line! # Bitmessage users? Add it right above this line!

View File

@ -57,7 +57,7 @@ def _doFastPoW(target, initialHash):
for i in range(pool_size): for i in range(pool_size):
result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size)))
while True: while True:
if shared.shutdown: if shared.shutdown >= 1:
pool.terminate() pool.terminate()
while True: while True:
time.sleep(10) # Don't let this thread return here; it will return nothing and cause an exception in bitmessagemain.py time.sleep(10) # Don't let this thread return here; it will return nothing and cause an exception in bitmessagemain.py

View File

@ -295,8 +295,12 @@ def isProofOfWorkSufficient(
def doCleanShutdown(): def doCleanShutdown():
global shutdown global shutdown
shutdown = 1 #Used to tell proof of work worker threads to exit. shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
broadcastToSendDataQueues((0, 'shutdown', 'all')) broadcastToSendDataQueues((0, 'shutdown', 'all'))
with shared.objectProcessorQueueSizeLock:
data = 'no data'
shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put(('checkShutdownVariable',data))
knownNodesLock.acquire() knownNodesLock.acquire()
UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...'))
@ -314,13 +318,18 @@ def doCleanShutdown():
'updateStatusBar', 'updateStatusBar',
'Flushing inventory in memory out to disk. This should normally only take a second...')) 'Flushing inventory in memory out to disk. This should normally only take a second...'))
flushInventory() flushInventory()
# This one last useless query will guarantee that the previous flush committed before we close # Verify that the objectProcessor has finished exiting. It should have incremented the
# the program. # shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit.
while shutdown == 1:
time.sleep(.1)
# This one last useless query will guarantee that the previous flush committed and that the
# objectProcessorThread committed before we close the program.
sqlQuery('SELECT address FROM subscriptions') sqlQuery('SELECT address FROM subscriptions')
sqlStoredProcedure('exit')
logger.info('Finished flushing inventory.') logger.info('Finished flushing inventory.')
sqlStoredProcedure('exit')
# Wait long enough to guarantee that any running proof of work worker threads will check the # Wait long enough to guarantee that any running proof of work worker threads will check the
# shutdown variable and exit. If the main thread closes before they do then they won't stop. # shutdown variable and exit. If the main thread closes before they do then they won't stop.
time.sleep(.25) time.sleep(.25)
@ -542,7 +551,7 @@ def checkAndShareMsgWithPeers(data):
time.sleep(2) time.sleep(2)
with shared.objectProcessorQueueSizeLock: with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data) shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data)) objectProcessorQueue.put((objectType,data))
def checkAndSharegetpubkeyWithPeers(data): def checkAndSharegetpubkeyWithPeers(data):
if not isProofOfWorkSufficient(data): if not isProofOfWorkSufficient(data):
@ -606,7 +615,7 @@ def checkAndSharegetpubkeyWithPeers(data):
time.sleep(2) time.sleep(2)
with shared.objectProcessorQueueSizeLock: with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data) shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data)) objectProcessorQueue.put((objectType,data))
def checkAndSharePubkeyWithPeers(data): def checkAndSharePubkeyWithPeers(data):
if len(data) < 146 or len(data) > 420: # sanity check if len(data) < 146 or len(data) > 420: # sanity check
@ -678,7 +687,7 @@ def checkAndSharePubkeyWithPeers(data):
time.sleep(2) time.sleep(2)
with shared.objectProcessorQueueSizeLock: with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data) shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data)) objectProcessorQueue.put((objectType,data))
def checkAndShareBroadcastWithPeers(data): def checkAndShareBroadcastWithPeers(data):
@ -748,7 +757,7 @@ def checkAndShareBroadcastWithPeers(data):
time.sleep(2) time.sleep(2)
with shared.objectProcessorQueueSizeLock: with shared.objectProcessorQueueSizeLock:
shared.objectProcessorQueueSize += len(data) shared.objectProcessorQueueSize += len(data)
objectProcessorQueue.put((objectType,data)) objectProcessorQueue.put((objectType,data))
helper_startup.loadConfig() helper_startup.loadConfig()