From feb6061ccabf7ba63ccecdd7a13a43aacf454e68 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Thu, 4 Apr 2013 13:39:11 -0400 Subject: [PATCH] smarter commits Vastly improves disk performance --- bitmessagemain.py | 151 +++++++++++++------------------------------ defaultKnownNodes.py | 6 +- 2 files changed, 47 insertions(+), 110 deletions(-) diff --git a/bitmessagemain.py b/bitmessagemain.py index c082b037..48a7adae 100755 --- a/bitmessagemain.py +++ b/bitmessagemain.py @@ -623,6 +623,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() workerQueue.put(('newpubkey',(sendersAddressVersion,sendersStream,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. @@ -654,6 +655,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body) @@ -749,6 +751,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('UPDATE sent SET status=? WHERE ackdata=?') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),encryptedData[readPosition:],'Acknowledgement of the message received just now.') return @@ -850,6 +853,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() workerQueue.put(('newpubkey',(sendersAddressVersionNumber,sendersStreamNumber,ripe.digest()))) #This will check to see whether we happen to be awaiting this pubkey in order to send a message. If we are, it will do the POW and send it. blockMessage = False #Gets set to True if the user shouldn't see the message according to black or white lists. @@ -923,6 +927,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('''INSERT INTO inbox VALUES (?,?,?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.emit(SIGNAL("displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),self.inventoryHash,toAddress,fromAddress,subject,body) @@ -954,6 +959,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.emit(SIGNAL("displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"),toAddress,'[Broadcast subscribers]',fromAddress,subject,message,ackdata) @@ -1112,6 +1118,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() printLock.acquire() printLock.release() @@ -1123,6 +1130,7 @@ class receiveDataThread(QThread): sqlSubmitQueue.put('''INSERT INTO pubkeys VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() printLock.acquire() printLock.release() @@ -1770,6 +1778,7 @@ def flushInventory(): sqlSubmitQueue.put(t) sqlReturnQueue.get() del inventory[hash] + sqlSubmitQueue.put('commit') sqlLock.release() def isInSqlInventory(hash): @@ -1974,13 +1983,16 @@ class sqlThread(QThread): while True: item = sqlSubmitQueue.get() - parameters = sqlSubmitQueue.get() - #print 'item', item - #print 'parameters', parameters - self.cur.execute(item, parameters) - sqlReturnQueue.put(self.cur.fetchall()) - sqlSubmitQueue.task_done() - self.conn.commit() + if item == 'commit': + self.conn.commit() + else: + parameters = sqlSubmitQueue.get() + #print 'item', item + #print 'parameters', parameters + self.cur.execute(item, parameters) + sqlReturnQueue.put(self.cur.fetchall()) + #sqlSubmitQueue.task_done() + '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. @@ -2009,12 +2021,13 @@ class singleCleaner(QThread): self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"Doing housekeeping (Flushing inventory in memory to disk...)") for hash, storedValue in inventory.items(): objectType, streamNumber, payload, receivedTime = storedValue - if int(time.time())- 600 > receivedTime: + if int(time.time())- 3600 > receivedTime: t = (hash,objectType,streamNumber,payload,receivedTime) sqlSubmitQueue.put('''INSERT INTO inventory VALUES (?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() del inventory[hash] + sqlSubmitQueue.put('commit') self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"") sqlLock.release() broadcastToSendDataQueues((0, 'pong', 'no data')) #commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. @@ -2033,6 +2046,7 @@ class singleCleaner(QThread): sqlSubmitQueue.put('''DELETE FROM pubkeys WHERE timeI',(int(time.time()))) - payload += encodeVarint(1) #broadcast version - payload += encodeVarint(addressVersionNumber) - payload += encodeVarint(streamNumber) - payload += ripe - payload += encodeVarint(len(nString)) - payload += nString - payload += encodeVarint(len(eString)) - payload += eString - payload += messageToTransmit - signature = rsa.sign(messageToTransmit,myPrivatekey,'SHA-512') - #print 'signature', signature.encode('hex') - payload += signature - - #print 'nString', repr(nString) - #print 'eString', repr(eString) - - nonce = 0 - trialValue = 99999999999999999999 - target = 2**64 / ((len(payload)+payloadLengthExtraBytes+8) * averageProofOfWorkNonceTrialsPerByte) - print '(For broadcast message) Doing proof of work...' - initialHash = hashlib.sha512(payload).digest() - while trialValue > target: - nonce += 1 - trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) - print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce - - payload = pack('>Q',nonce) + payload - - inventoryHash = calculateInventoryHash(payload) - objectType = 'broadcast' - inventory[inventoryHash] = (objectType, streamNumber, payload, int(time.time())) - print 'sending inv (within sendBroadcast function)' - broadcastToSendDataQueues((streamNumber, 'sendinv', inventoryHash)) - - self.emit(SIGNAL("updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"),ackdata,'Broadcast sent at '+unicode(strftime(config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8')) - - #Update the status of the message in the 'sent' table to have a 'broadcastsent' status - sqlLock.acquire() - t = ('broadcastsent',int(time.time()),fromaddress, subject, body,'broadcastpending') - sqlSubmitQueue.put('UPDATE sent SET status=?, lastactiontime=? WHERE fromaddress=? AND subject=? AND message=? AND status=?') - sqlSubmitQueue.put(t) - queryreturn = sqlReturnQueue.get() - sqlLock.release()""" else: printLock.acquire() print 'In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version' @@ -2371,6 +2326,7 @@ class singleWorker(QThread): sqlSubmitQueue.put('UPDATE sent SET status=? WHERE status=? AND toripe=?') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() + sqlSubmitQueue.put('commit') t = ('doingpow',toRipe) sqlSubmitQueue.put('SELECT toaddress, fromaddress, subject, message, ackdata FROM sent WHERE status=? AND toripe=?') @@ -2488,41 +2444,6 @@ class singleWorker(QThread): readPosition += 64 encrypted = highlevelcrypto.encrypt(payload,"04"+pubEncryptionKeyBase256.encode('hex')) - """elif toAddressVersionNumber == 1: - sqlLock.acquire() - sqlSubmitQueue.put('SELECT transmitdata FROM pubkeys WHERE hash=?') - sqlSubmitQueue.put((toRipe,)) - queryreturn = sqlReturnQueue.get() - sqlLock.release() - - for row in queryreturn: - pubkeyPayload, = row - - readPosition = 8 #to bypass the nonce - behaviorBitfield = pubkeyPayload[8:12] - readPosition += 4 #to bypass the bitfield of behaviors - addressVersion, addressVersionLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10]) - readPosition += addressVersionLength - streamNumber, streamNumberLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10]) - readPosition += streamNumberLength - nLength, nLengthLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10]) - readPosition += nLengthLength - n = convertStringToInt(pubkeyPayload[readPosition:readPosition+nLength]) - readPosition += nLength - eLength, eLengthLength = decodeVarint(pubkeyPayload[readPosition:readPosition+10]) - readPosition += eLengthLength - e = convertStringToInt(pubkeyPayload[readPosition:readPosition+eLength]) - receiversPubkey = rsa.PublicKey(n,e) - - infile = cStringIO.StringIO(payload) - outfile = cStringIO.StringIO() - #print 'Encrypting using public key:', receiversPubkey - encrypt_bigfile(infile,outfile,receiversPubkey) - - encrypted = outfile.getvalue() - infile.close() - outfile.close()""" - nonce = 0 trialValue = 99999999999999999999 @@ -2561,7 +2482,7 @@ class singleWorker(QThread): sqlSubmitQueue.put('''UPDATE pubkeys SET usedpersonally='yes' WHERE hash=?''') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() - + sqlSubmitQueue.put('commit') sqlLock.release() @@ -3032,6 +2953,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): sqlSubmitQueue.put('''UPDATE inbox SET folder='trash' WHERE msgid=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() apiSignalQueue.put(('updateStatusBar','Per API: Trashed message (assuming message existed). UI not updated.')) return 'Trashed message (assuming message existed). UI not updated. To double check, run getAllInboxMessages to see that the message disappeared, or restart Bitmessage and look in the normal Bitmessage GUI.' @@ -3092,6 +3014,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() toLabel = '' @@ -3152,6 +3075,7 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() toLabel = '[Broadcast subscribers]' @@ -4030,6 +3954,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() @@ -4104,6 +4029,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''INSERT INTO sent VALUES (?,?,?,?,?,?,?,?,?,?,?,?)''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() workerQueue.put(('sendbroadcast',(fromAddress,subject,message))) @@ -4342,6 +4268,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''INSERT INTO addressbook VALUES (?,?)''') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.rerenderInboxFromLabels() else: @@ -4375,6 +4302,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''INSERT INTO subscriptions VALUES (?,?,?)''') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.rerenderInboxFromLabels() self.reloadBroadcastSendersForWhichImWatching() @@ -4544,6 +4472,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''INSERT INTO whitelist VALUES (?,?,?)''') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() else: self.statusBar().showMessage('Error: You cannot add the same address to your list twice. Perhaps rename the existing one if you want.') @@ -4702,6 +4631,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''INSERT INTO addressbook VALUES (?,?)''') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.ui.tabWidget.setCurrentIndex(5) self.ui.tableWidgetAddressBook.setCurrentCell(0,0) @@ -4720,6 +4650,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''UPDATE inbox SET folder='trash' WHERE msgid=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.ui.textEditInboxMessage.setText("") self.ui.tableWidgetInbox.removeRow(currentRow) @@ -4735,6 +4666,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''UPDATE sent SET folder='trash' WHERE ackdata=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.ui.textEditSentMessage.setPlainText("") self.ui.tableWidgetSent.removeRow(currentRow) @@ -4757,6 +4689,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''DELETE FROM addressbook WHERE label=? AND address=?''') sqlSubmitQueue.put(t) queryreturn = sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.ui.tableWidgetAddressBook.removeRow(currentRow) self.rerenderInboxFromLabels() @@ -4792,6 +4725,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''DELETE FROM subscriptions WHERE label=? AND address=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.ui.tableWidgetSubscriptions.removeRow(currentRow) self.rerenderInboxFromLabels() @@ -4822,6 +4756,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''DELETE FROM whitelist WHERE label=? AND address=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.ui.tableWidgetBlacklist.removeRow(currentRow) def on_action_BlacklistClipboard(self): @@ -4846,6 +4781,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''UPDATE whitelist SET enabled=1 WHERE address=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() def on_action_BlacklistDisable(self): currentRow = self.ui.tableWidgetBlacklist.currentRow() @@ -4862,6 +4798,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''UPDATE whitelist SET enabled=0 WHERE address=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() #Group of functions for the Your Identities dialog box @@ -4941,6 +4878,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''UPDATE addressbook set label=? WHERE address=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.rerenderInboxFromLabels() self.rerenderSentToLabels() @@ -4954,6 +4892,7 @@ class MyForm(QtGui.QMainWindow): sqlSubmitQueue.put('''UPDATE subscriptions set label=? WHERE address=?''') sqlSubmitQueue.put(t) sqlReturnQueue.get() + sqlSubmitQueue.put('commit') sqlLock.release() self.rerenderInboxFromLabels() self.rerenderSentToLabels() diff --git a/defaultKnownNodes.py b/defaultKnownNodes.py index 352b18b8..23570662 100644 --- a/defaultKnownNodes.py +++ b/defaultKnownNodes.py @@ -9,12 +9,10 @@ from time import strftime, localtime def createDefaultKnownNodes(appdata): ############## Stream 1 ################ stream1 = {} - - stream1['80.69.173.220'] = (443,int(time.time())) - stream1['109.95.105.15'] = (8443,int(time.time())) + + stream1['84.48.88.42'] = (8444,int(time.time())) stream1['66.65.120.151'] = (8080,int(time.time())) stream1['76.180.233.38'] = (8444,int(time.time())) - stream1['84.48.88.42'] = (8444,int(time.time())) stream1['74.132.73.137'] = (8444,int(time.time())) stream1['60.242.109.18'] = (8444,int(time.time()))