From ad2457361f8c1bc09a04591c39f893615b130f88 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Thu, 30 May 2013 16:25:42 -0400 Subject: [PATCH] Multi-core POW --- src/bitmessagemain.py | 16 +++++++---- src/bitmessageqt/__init__.py | 23 +++++++++++++++ src/proofofwork.py | 44 +++++++++++++++++++---------- src/settings.py | 34 +++++++++++++++++++++-- src/settings.ui | 54 ++++++++++++++++++++++++++++++++++++ src/shared.py | 7 ++++- 6 files changed, 155 insertions(+), 23 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index ba31f7a9..0eb78101 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -1506,10 +1506,10 @@ class receiveDataThread(threading.Thread): #We have received an inv message def recinv(self,data): - totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = 0 - for key, value in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items(): - totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value + totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = 0 # ..from all peers, counting duplicates seperately (because they take up memory) if len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0: + for key, value in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items(): + totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value shared.printLock.acquire() print 'number of keys(hosts) in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave @@ -2434,6 +2434,13 @@ class sqlThread(threading.Thread): print 'Vacuuming message.dat. You might notice that the file size gets much smaller.' self.cur.execute( ''' VACUUM ''') + #After code refactoring, the possible status values for sent messages as changed. + self.cur.execute( '''update sent set status='doingmsgpow' where status='doingpow' ''') + self.cur.execute( '''update sent set status='msgsent' where status='sentmessage' ''') + self.cur.execute( '''update sent set status='doingpubkeypow' where status='findingpubkey' ''') + self.cur.execute( '''update sent set status='broadcastqueued' where status='broadcastpending' ''') + self.conn.commit() + try: testpayload = '\x00\x00' t = ('1234',testpayload,'12345678','no') @@ -2545,7 +2552,6 @@ class singleCleaner(threading.Thread): shared.sqlReturnQueue.get() del shared.inventory[hash] shared.sqlSubmitQueue.put('commit') - #self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"") shared.UISignalQueue.put(('updateStatusBar','')) shared.sqlLock.release() shared.broadcastToSendDataQueues((0, 'pong', 'no data')) #commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. @@ -2556,7 +2562,7 @@ class singleCleaner(threading.Thread): timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) #inventory (moves data from the inventory data structure to the on-disk sql database) shared.sqlLock.acquire() - #inventory (clears data more than 2 days and 12 hours old) + #inventory (clears pubkeys after 28 days and everything else after 2 days and 12 hours) t = (int(time.time())-lengthOfTimeToLeaveObjectsInInventory,int(time.time())-lengthOfTimeToHoldOnToAllPubkeys) shared.sqlSubmitQueue.put('''DELETE FROM inventory WHERE (receivedtime'pubkey') OR (receivedtime= 1: shared.config.set('bitmessagesettings', 'defaultpayloadlengthextrabytes',str(int(float(self.settingsDialogInstance.ui.lineEditSmallMessageDifficulty.text())*shared.networkDefaultPayloadLengthExtraBytes))) + if str(self.settingsDialogInstance.ui.comboBoxMaxCores.currentText()) == 'All': + shared.config.set('bitmessagesettings', 'maxcores', '99999') + else: + shared.config.set('bitmessagesettings', 'maxcores', str(self.settingsDialogInstance.ui.comboBoxMaxCores.currentText())) with open(shared.appdata + 'keys.dat', 'wb') as configfile: shared.config.write(configfile) @@ -2167,6 +2171,25 @@ class settingsDialog(QtGui.QDialog): self.ui.lineEditTotalDifficulty.setText(str((float(shared.config.getint('bitmessagesettings', 'defaultnoncetrialsperbyte'))/shared.networkDefaultProofOfWorkNonceTrialsPerByte))) self.ui.lineEditSmallMessageDifficulty.setText(str((float(shared.config.getint('bitmessagesettings', 'defaultpayloadlengthextrabytes'))/shared.networkDefaultPayloadLengthExtraBytes))) + + #On the System tab + try: + maxCores = shared.config.getint('bitmessagesettings', 'maxcores') + except: + maxCores = 99999 + if maxCores <= 1: + self.ui.comboBoxMaxCores.setCurrentIndex(0) + elif maxCores == 2: + self.ui.comboBoxMaxCores.setCurrentIndex(1) + elif maxCores <= 4: + self.ui.comboBoxMaxCores.setCurrentIndex(2) + elif maxCores <= 8: + self.ui.comboBoxMaxCores.setCurrentIndex(3) + elif maxCores <= 16: + self.ui.comboBoxMaxCores.setCurrentIndex(4) + else: + self.ui.comboBoxMaxCores.setCurrentIndex(5) + QtGui.QWidget.resize(self,QtGui.QWidget.sizeHint(self)) def comboBoxProxyTypeChanged(self,comboBoxIndex): diff --git a/src/proofofwork.py b/src/proofofwork.py index 03d7c22d..144c7d79 100644 --- a/src/proofofwork.py +++ b/src/proofofwork.py @@ -1,28 +1,42 @@ +import shared +import time +from multiprocessing import Pool, cpu_count +import hashlib +from struct import unpack, pack + def _pool_worker(nonce, initialHash, target, pool_size): - import hashlib - from struct import unpack, pack trialValue = 99999999999999999999 while trialValue > target: - nonce += pool_size - trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) + nonce += pool_size + trialValue, = unpack('>Q',hashlib.sha512(hashlib.sha512(pack('>Q',nonce) + initialHash).digest()).digest()[0:8]) return [trialValue, nonce] def run(target, initialHash): - from multiprocessing import Pool, cpu_count - import time try: - pool_size = cpu_count() + pool_size = cpu_count() except: - pool_size = 4 + pool_size = 4 + + try: + maxCores = config.getint('bitmessagesettings', 'maxcores') + except: + maxCores = 99999 + if pool_size > maxCores: + pool_size = maxCores + pool = Pool(processes=pool_size) result = [] for i in range(pool_size): - result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) + result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) while True: - for i in range(pool_size): - if result[i].ready(): - result = result[i].get() - pool.terminate() - return result[0], result[1] - time.sleep(1) + for i in range(pool_size): + if shared.shutdown: + pool.terminate() + time.sleep(5) #Don't return anything (doing so will cause exceptions because we'll return an unusable response). Sit here and wait for this thread to close. + return + if result[i].ready(): + result = result[i].get() + pool.terminate() + return result[0], result[1] + time.sleep(0.2) diff --git a/src/settings.py b/src/settings.py index 0cd7b07e..dac0f788 100644 --- a/src/settings.py +++ b/src/settings.py @@ -2,8 +2,8 @@ # Form implementation generated from reading ui file 'settings.ui' # -# Created: Sat May 25 13:20:18 2013 -# by: PyQt4 UI code generator 4.9.5 +# Created: Thu May 30 15:50:32 2013 +# by: PyQt4 UI code generator 4.9.4 # # WARNING! All changes made in this file will be lost! @@ -172,6 +172,28 @@ class Ui_settingsDialog(object): self.label_10.setObjectName(_fromUtf8("label_10")) self.gridLayout_6.addWidget(self.label_10, 2, 0, 1, 3) self.tabWidgetSettings.addTab(self.tab, _fromUtf8("")) + self.tab_2 = QtGui.QWidget() + self.tab_2.setObjectName(_fromUtf8("tab_2")) + self.formLayout = QtGui.QFormLayout(self.tab_2) + self.formLayout.setObjectName(_fromUtf8("formLayout")) + self.label_13 = QtGui.QLabel(self.tab_2) + self.label_13.setObjectName(_fromUtf8("label_13")) + self.formLayout.setWidget(0, QtGui.QFormLayout.LabelRole, self.label_13) + self.comboBoxMaxCores = QtGui.QComboBox(self.tab_2) + sizePolicy = QtGui.QSizePolicy(QtGui.QSizePolicy.Fixed, QtGui.QSizePolicy.Fixed) + sizePolicy.setHorizontalStretch(0) + sizePolicy.setVerticalStretch(0) + sizePolicy.setHeightForWidth(self.comboBoxMaxCores.sizePolicy().hasHeightForWidth()) + self.comboBoxMaxCores.setSizePolicy(sizePolicy) + self.comboBoxMaxCores.setObjectName(_fromUtf8("comboBoxMaxCores")) + self.comboBoxMaxCores.addItem(_fromUtf8("")) + self.comboBoxMaxCores.addItem(_fromUtf8("")) + self.comboBoxMaxCores.addItem(_fromUtf8("")) + self.comboBoxMaxCores.addItem(_fromUtf8("")) + self.comboBoxMaxCores.addItem(_fromUtf8("")) + self.comboBoxMaxCores.addItem(_fromUtf8("")) + self.formLayout.setWidget(0, QtGui.QFormLayout.FieldRole, self.comboBoxMaxCores) + self.tabWidgetSettings.addTab(self.tab_2, _fromUtf8("")) self.gridLayout.addWidget(self.tabWidgetSettings, 0, 0, 1, 1) self.retranslateUi(settingsDialog) @@ -222,4 +244,12 @@ class Ui_settingsDialog(object): self.label_12.setText(QtGui.QApplication.translate("settingsDialog", "The \'Small message difficulty\' mostly only affects the difficulty of sending small messages. Doubling this value makes it almost twice as difficult to send a small message but doesn\'t really affect large messages.", None, QtGui.QApplication.UnicodeUTF8)) self.label_10.setText(QtGui.QApplication.translate("settingsDialog", "The \'Total difficulty\' affects the absolute amount of work the sender must complete. Doubling this value doubles the amount of work.", None, QtGui.QApplication.UnicodeUTF8)) self.tabWidgetSettings.setTabText(self.tabWidgetSettings.indexOf(self.tab), QtGui.QApplication.translate("settingsDialog", "Demanded difficulty", None, QtGui.QApplication.UnicodeUTF8)) + self.label_13.setText(QtGui.QApplication.translate("settingsDialog", "Maximum number of CPU cores to use when doing work:", None, QtGui.QApplication.UnicodeUTF8)) + self.comboBoxMaxCores.setItemText(0, QtGui.QApplication.translate("settingsDialog", "1", None, QtGui.QApplication.UnicodeUTF8)) + self.comboBoxMaxCores.setItemText(1, QtGui.QApplication.translate("settingsDialog", "2", None, QtGui.QApplication.UnicodeUTF8)) + self.comboBoxMaxCores.setItemText(2, QtGui.QApplication.translate("settingsDialog", "4", None, QtGui.QApplication.UnicodeUTF8)) + self.comboBoxMaxCores.setItemText(3, QtGui.QApplication.translate("settingsDialog", "8", None, QtGui.QApplication.UnicodeUTF8)) + self.comboBoxMaxCores.setItemText(4, QtGui.QApplication.translate("settingsDialog", "16", None, QtGui.QApplication.UnicodeUTF8)) + self.comboBoxMaxCores.setItemText(5, QtGui.QApplication.translate("settingsDialog", "All", None, QtGui.QApplication.UnicodeUTF8)) + self.tabWidgetSettings.setTabText(self.tabWidgetSettings.indexOf(self.tab_2), QtGui.QApplication.translate("settingsDialog", "System", None, QtGui.QApplication.UnicodeUTF8)) diff --git a/src/settings.ui b/src/settings.ui index 971480bd..61419bb6 100644 --- a/src/settings.ui +++ b/src/settings.ui @@ -373,6 +373,60 @@ + + + System + + + + + + Maximum number of CPU cores to use when doing work: + + + + + + + + 0 + 0 + + + + + 1 + + + + + 2 + + + + + 4 + + + + + 8 + + + + + 16 + + + + + All + + + + + + diff --git a/src/shared.py b/src/shared.py index 850c84f3..df524c5d 100644 --- a/src/shared.py +++ b/src/shared.py @@ -7,6 +7,7 @@ import highlevelcrypto import Queue import pickle import os +import time myECCryptorObjects = {} MyECSubscriptionCryptorObjects = {} @@ -27,6 +28,7 @@ printLock = threading.Lock() appdata = '' #holds the location of the application data storage directory statusIconColor = 'red' connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice. +shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit. #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work. @@ -141,6 +143,8 @@ def reloadBroadcastSendersForWhichImWatching(): MyECSubscriptionCryptorObjects[hash] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')) def doCleanShutdown(): + global shutdown + shutdown = 1 #Used to tell proof of work worker threads to exit. knownNodesLock.acquire() UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) output = open(appdata + 'knownnodes.dat', 'wb') @@ -172,7 +176,8 @@ def doCleanShutdown(): printLock.acquire() print 'Finished flushing inventory.' printLock.release() - + + time.sleep(.25) #Wait long enough to guarantee that any running proof of work worker threads will check the shutdown variable and exit. If the main thread closes before they do then they won't stop. if safeConfigGetBoolean('bitmessagesettings','daemon'): printLock.acquire()