From df7116bd72eafcb3d102245468a6f9439c12e8a5 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Mon, 2 Dec 2013 01:35:34 -0500 Subject: [PATCH] on close, save objectProcessorQueue to disk --- src/bitmessagemain.py | 10 +++++----- src/bitmessageqt/__init__.py | 2 +- src/class_objectProcessor.py | 35 ++++++++++++++++++++++++++++++++++- src/class_sqlThread.py | 23 ++++++++++++++++++----- src/proofofwork.py | 2 +- src/shared.py | 31 ++++++++++++++++++++----------- 6 files changed, 79 insertions(+), 24 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 19dfcf12..6caf0c98 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -982,16 +982,16 @@ class Main: singleWorkerThread.daemon = True # close the main program even if there are threads left singleWorkerThread.start() - # Start the thread that calculates POWs - objectProcessorThread = objectProcessor() - objectProcessorThread.daemon = True # close the main program even if there are threads left - objectProcessorThread.start() - # Start the SQL thread sqlLookup = sqlThread() sqlLookup.daemon = False # DON'T close the main program even if there are threads left. The closeEvent should command this thread to exit gracefully. sqlLookup.start() + # Start the thread that calculates POWs + objectProcessorThread = objectProcessor() + objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object. + objectProcessorThread.start() + # Start the cleanerThread singleCleanerThread = singleCleaner() singleCleanerThread.daemon = True # close the main program even if there are threads left diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index df1ea4cf..684784b6 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2146,7 +2146,7 @@ class MyForm(QtGui.QMainWindow): objectType = 'broadcast' with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize += len(payload) - shared.objectProcessorQueue.put((objectType,payload)) + shared.objectProcessorQueue.put((objectType,payload)) def loadBlackWhiteList(self): # Initialize the Blacklist or Whitelist table diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 3a389833..e32d5a6d 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -27,6 +27,23 @@ class objectProcessor(threading.Thread): """ def __init__(self): threading.Thread.__init__(self) + """ + It may be the case that the last time Bitmessage was running, the user + closed it before it finished processing everything in the + objectProcessorQueue. Assuming that Bitmessage wasn't closed forcefully, + it should have saved the data in the queue into the objectprocessorqueue + table. Let's pull it out. + """ + queryreturn = sqlQuery( + '''SELECT objecttype, data FROM objectprocessorqueue''') + with shared.objectProcessorQueueSizeLock: + for row in queryreturn: + objectType, data = row + shared.objectProcessorQueueSize += len(data) + shared.objectProcessorQueue.put((objectType,data)) + sqlExecute('''DELETE FROM objectprocessorqueue''') + logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn))) + def run(self): while True: @@ -40,13 +57,29 @@ class objectProcessor(threading.Thread): self.processmsg(data) elif objectType == 'broadcast': self.processbroadcast(data) + elif objectType == 'checkShutdownVariable': # is more of a command, not an object type. Is used to get this thread past the queue.get() so that it will check the shutdown variable. + pass else: logger.critical('Error! Bug! The class_objectProcessor was passed an object type it doesn\'t recognize: %s' % str(objectType)) with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. - #print 'objectProcessorQueueSize:', shared.objectProcessorQueueSize + if shared.shutdown: + time.sleep(.5) # Wait just a moment for most of the connections to close + numberOfObjectsThatWereInTheObjectProcessorQueue = 0 + with SqlBulkExecute() as sql: + while shared.objectProcessorQueueSize > 1: + objectType, data = shared.objectProcessorQueue.get() + sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', + objectType,data) + with shared.objectProcessorQueueSizeLock: + shared.objectProcessorQueueSize -= len(data) # We maintain objectProcessorQueueSize so that we will slow down requesting objects if too much data accumulates in the queue. + numberOfObjectsThatWereInTheObjectProcessorQueue += 1 + logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsThatWereInTheObjectProcessorQueue)) + shared.shutdown = 2 + break + def processgetpubkey(self, data): readPosition = 8 # bypass the nonce embeddedTime, = unpack('>I', data[readPosition:readPosition + 4]) diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index ad8ee4f6..2df6c05f 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -25,6 +25,7 @@ class sqlThread(threading.Thread): self.conn = sqlite3.connect(shared.appdata + 'messages.dat') self.conn.text_factory = str self.cur = self.conn.cursor() + try: self.cur.execute( '''CREATE TABLE inbox (msgid blob, toaddress text, fromaddress text, subject text, received text, message text, folder text, encodingtype int, read bool, UNIQUE(msgid) ON CONFLICT REPLACE)''' ) @@ -51,17 +52,15 @@ class sqlThread(threading.Thread): '''CREATE TABLE pubkeys (hash blob, addressversion int, transmitdata blob, time int, usedpersonally text, UNIQUE(hash, addressversion) ON CONFLICT REPLACE)''' ) self.cur.execute( '''CREATE TABLE inventory (hash blob, objecttype text, streamnumber int, payload blob, receivedtime integer, tag blob, UNIQUE(hash) ON CONFLICT REPLACE)''' ) - self.cur.execute( - '''CREATE TABLE knownnodes (timelastseen int, stream int, services blob, host blob, port blob, UNIQUE(host, stream, port) ON CONFLICT REPLACE)''' ) - # This table isn't used in the program yet but I - # have a feeling that we'll need it. self.cur.execute( '''INSERT INTO subscriptions VALUES('Bitmessage new releases/announcements','BM-GtovgYdgs7qXPkoYaRgrLFuFKz1SFpsw',1)''') self.cur.execute( '''CREATE TABLE settings (key blob, value blob, UNIQUE(key) ON CONFLICT REPLACE)''' ) - self.cur.execute( '''INSERT INTO settings VALUES('version','5')''') + self.cur.execute( '''INSERT INTO settings VALUES('version','6')''') self.cur.execute( '''INSERT INTO settings VALUES('lastvacuumtime',?)''', ( int(time.time()),)) + self.cur.execute( + '''CREATE TABLE objectprocessorqueue (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''' ) self.conn.commit() logger.info('Created messages database file') except Exception as err: @@ -290,6 +289,20 @@ class sqlThread(threading.Thread): with open(shared.appdata + 'keys.dat', 'wb') as configfile: shared.config.write(configfile) + # Add a new table: objectprocessorqueue with which to hold objects + # that have yet to be processed if the user shuts down Bitmessage. + item = '''SELECT value FROM settings WHERE key='version';''' + parameters = '' + self.cur.execute(item, parameters) + currentVersion = int(self.cur.fetchall()[0][0]) + if currentVersion == 5: + self.cur.execute( '''DROP TABLE knownnodes''') + self.cur.execute( + '''CREATE TABLE objectprocessorqueue (objecttype text, data blob, UNIQUE(objecttype, data) ON CONFLICT REPLACE)''' ) + item = '''update settings set value=? WHERE key='version';''' + parameters = (6,) + self.cur.execute(item, parameters) + # Are you hoping to add a new option to the keys.dat file of existing # Bitmessage users? Add it right above this line! diff --git a/src/proofofwork.py b/src/proofofwork.py index 17a13eb8..c26f2e1b 100644 --- a/src/proofofwork.py +++ b/src/proofofwork.py @@ -57,7 +57,7 @@ def _doFastPoW(target, initialHash): for i in range(pool_size): result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) while True: - if shared.shutdown: + if shared.shutdown >= 1: pool.terminate() while True: time.sleep(10) # Don't let this thread return here; it will return nothing and cause an exception in bitmessagemain.py diff --git a/src/shared.py b/src/shared.py index 9350fdaf..0f1f3690 100644 --- a/src/shared.py +++ b/src/shared.py @@ -295,8 +295,12 @@ def isProofOfWorkSufficient( def doCleanShutdown(): global shutdown - shutdown = 1 #Used to tell proof of work worker threads to exit. - broadcastToSendDataQueues((0, 'shutdown', 'all')) + shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. + broadcastToSendDataQueues((0, 'shutdown', 'all')) + with shared.objectProcessorQueueSizeLock: + data = 'no data' + shared.objectProcessorQueueSize += len(data) + objectProcessorQueue.put(('checkShutdownVariable',data)) knownNodesLock.acquire() UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) @@ -314,13 +318,18 @@ def doCleanShutdown(): 'updateStatusBar', 'Flushing inventory in memory out to disk. This should normally only take a second...')) flushInventory() - - # This one last useless query will guarantee that the previous flush committed before we close - # the program. + + # Verify that the objectProcessor has finished exiting. It should have incremented the + # shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit. + while shutdown == 1: + time.sleep(.1) + + # This one last useless query will guarantee that the previous flush committed and that the + # objectProcessorThread committed before we close the program. sqlQuery('SELECT address FROM subscriptions') - sqlStoredProcedure('exit') logger.info('Finished flushing inventory.') - + sqlStoredProcedure('exit') + # Wait long enough to guarantee that any running proof of work worker threads will check the # shutdown variable and exit. If the main thread closes before they do then they won't stop. time.sleep(.25) @@ -542,7 +551,7 @@ def checkAndShareMsgWithPeers(data): time.sleep(2) with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def checkAndSharegetpubkeyWithPeers(data): if not isProofOfWorkSufficient(data): @@ -606,7 +615,7 @@ def checkAndSharegetpubkeyWithPeers(data): time.sleep(2) with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def checkAndSharePubkeyWithPeers(data): if len(data) < 146 or len(data) > 420: # sanity check @@ -678,7 +687,7 @@ def checkAndSharePubkeyWithPeers(data): time.sleep(2) with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) def checkAndShareBroadcastWithPeers(data): @@ -748,7 +757,7 @@ def checkAndShareBroadcastWithPeers(data): time.sleep(2) with shared.objectProcessorQueueSizeLock: shared.objectProcessorQueueSize += len(data) - objectProcessorQueue.put((objectType,data)) + objectProcessorQueue.put((objectType,data)) helper_startup.loadConfig()