Multi-core POW

This commit is contained in:
Jonathan Warren 2013-05-30 16:25:42 -04:00
parent b77e65d3f6
commit ad2457361f
6 changed files with 155 additions and 23 deletions

View File

@ -1506,10 +1506,10 @@ class receiveDataThread(threading.Thread):
#We have received an inv message #We have received an inv message
def recinv(self,data): def recinv(self,data):
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = 0 totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = 0 # ..from all peers, counting duplicates seperately (because they take up memory)
if len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
for key, value in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items(): for key, value in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items():
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value
if len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
shared.printLock.acquire() shared.printLock.acquire()
print 'number of keys(hosts) in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) print 'number of keys(hosts) in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave
@ -2434,6 +2434,13 @@ class sqlThread(threading.Thread):
print 'Vacuuming message.dat. You might notice that the file size gets much smaller.' print 'Vacuuming message.dat. You might notice that the file size gets much smaller.'
self.cur.execute( ''' VACUUM ''') self.cur.execute( ''' VACUUM ''')
#After code refactoring, the possible status values for sent messages as changed.
self.cur.execute( '''update sent set status='doingmsgpow' where status='doingpow' ''')
self.cur.execute( '''update sent set status='msgsent' where status='sentmessage' ''')
self.cur.execute( '''update sent set status='doingpubkeypow' where status='findingpubkey' ''')
self.cur.execute( '''update sent set status='broadcastqueued' where status='broadcastpending' ''')
self.conn.commit()
try: try:
testpayload = '\x00\x00' testpayload = '\x00\x00'
t = ('1234',testpayload,'12345678','no') t = ('1234',testpayload,'12345678','no')
@ -2545,7 +2552,6 @@ class singleCleaner(threading.Thread):
shared.sqlReturnQueue.get() shared.sqlReturnQueue.get()
del shared.inventory[hash] del shared.inventory[hash]
shared.sqlSubmitQueue.put('commit') shared.sqlSubmitQueue.put('commit')
#self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"),"")
shared.UISignalQueue.put(('updateStatusBar','')) shared.UISignalQueue.put(('updateStatusBar',''))
shared.sqlLock.release() shared.sqlLock.release()
shared.broadcastToSendDataQueues((0, 'pong', 'no data')) #commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. shared.broadcastToSendDataQueues((0, 'pong', 'no data')) #commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes.
@ -2556,7 +2562,7 @@ class singleCleaner(threading.Thread):
timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
#inventory (moves data from the inventory data structure to the on-disk sql database) #inventory (moves data from the inventory data structure to the on-disk sql database)
shared.sqlLock.acquire() shared.sqlLock.acquire()
#inventory (clears data more than 2 days and 12 hours old) #inventory (clears pubkeys after 28 days and everything else after 2 days and 12 hours)
t = (int(time.time())-lengthOfTimeToLeaveObjectsInInventory,int(time.time())-lengthOfTimeToHoldOnToAllPubkeys) t = (int(time.time())-lengthOfTimeToLeaveObjectsInInventory,int(time.time())-lengthOfTimeToHoldOnToAllPubkeys)
shared.sqlSubmitQueue.put('''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''') shared.sqlSubmitQueue.put('''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''')
shared.sqlSubmitQueue.put(t) shared.sqlSubmitQueue.put(t)

View File

@ -1460,6 +1460,10 @@ class MyForm(QtGui.QMainWindow):
shared.config.set('bitmessagesettings', 'defaultnoncetrialsperbyte',str(int(float(self.settingsDialogInstance.ui.lineEditTotalDifficulty.text())*shared.networkDefaultProofOfWorkNonceTrialsPerByte))) shared.config.set('bitmessagesettings', 'defaultnoncetrialsperbyte',str(int(float(self.settingsDialogInstance.ui.lineEditTotalDifficulty.text())*shared.networkDefaultProofOfWorkNonceTrialsPerByte)))
if float(self.settingsDialogInstance.ui.lineEditSmallMessageDifficulty.text()) >= 1: if float(self.settingsDialogInstance.ui.lineEditSmallMessageDifficulty.text()) >= 1:
shared.config.set('bitmessagesettings', 'defaultpayloadlengthextrabytes',str(int(float(self.settingsDialogInstance.ui.lineEditSmallMessageDifficulty.text())*shared.networkDefaultPayloadLengthExtraBytes))) shared.config.set('bitmessagesettings', 'defaultpayloadlengthextrabytes',str(int(float(self.settingsDialogInstance.ui.lineEditSmallMessageDifficulty.text())*shared.networkDefaultPayloadLengthExtraBytes)))
if str(self.settingsDialogInstance.ui.comboBoxMaxCores.currentText()) == 'All':
shared.config.set('bitmessagesettings', 'maxcores', '99999')
else:
shared.config.set('bitmessagesettings', 'maxcores', str(self.settingsDialogInstance.ui.comboBoxMaxCores.currentText()))
with open(shared.appdata + 'keys.dat', 'wb') as configfile: with open(shared.appdata + 'keys.dat', 'wb') as configfile:
shared.config.write(configfile) shared.config.write(configfile)
@ -2167,6 +2171,25 @@ class settingsDialog(QtGui.QDialog):
self.ui.lineEditTotalDifficulty.setText(str((float(shared.config.getint('bitmessagesettings', 'defaultnoncetrialsperbyte'))/shared.networkDefaultProofOfWorkNonceTrialsPerByte))) self.ui.lineEditTotalDifficulty.setText(str((float(shared.config.getint('bitmessagesettings', 'defaultnoncetrialsperbyte'))/shared.networkDefaultProofOfWorkNonceTrialsPerByte)))
self.ui.lineEditSmallMessageDifficulty.setText(str((float(shared.config.getint('bitmessagesettings', 'defaultpayloadlengthextrabytes'))/shared.networkDefaultPayloadLengthExtraBytes))) self.ui.lineEditSmallMessageDifficulty.setText(str((float(shared.config.getint('bitmessagesettings', 'defaultpayloadlengthextrabytes'))/shared.networkDefaultPayloadLengthExtraBytes)))
#On the System tab
try:
maxCores = shared.config.getint('bitmessagesettings', 'maxcores')
except:
maxCores = 99999
if maxCores <= 1:
self.ui.comboBoxMaxCores.setCurrentIndex(0)
elif maxCores == 2:
self.ui.comboBoxMaxCores.setCurrentIndex(1)
elif maxCores <= 4:
self.ui.comboBoxMaxCores.setCurrentIndex(2)
elif maxCores <= 8:
self.ui.comboBoxMaxCores.setCurrentIndex(3)
elif maxCores <= 16:
self.ui.comboBoxMaxCores.setCurrentIndex(4)
else:
self.ui.comboBoxMaxCores.setCurrentIndex(5)
QtGui.QWidget.resize(self,QtGui.QWidget.sizeHint(self)) QtGui.QWidget.resize(self,QtGui.QWidget.sizeHint(self))
def comboBoxProxyTypeChanged(self,comboBoxIndex): def comboBoxProxyTypeChanged(self,comboBoxIndex):

View File

@ -1,6 +1,10 @@
def _pool_worker(nonce, initialHash, target, pool_size): import shared
import time
from multiprocessing import Pool, cpu_count
import hashlib import hashlib
from struct import unpack, pack from struct import unpack, pack
def _pool_worker(nonce, initialHash, target, pool_size):
trialValue = 99999999999999999999 trialValue = 99999999999999999999
while trialValue > target: while trialValue > target:
nonce += pool_size nonce += pool_size
@ -8,21 +12,31 @@ def _pool_worker(nonce, initialHash, target, pool_size):
return [trialValue, nonce] return [trialValue, nonce]
def run(target, initialHash): def run(target, initialHash):
from multiprocessing import Pool, cpu_count
import time
try: try:
pool_size = cpu_count() pool_size = cpu_count()
except: except:
pool_size = 4 pool_size = 4
try:
maxCores = config.getint('bitmessagesettings', 'maxcores')
except:
maxCores = 99999
if pool_size > maxCores:
pool_size = maxCores
pool = Pool(processes=pool_size) pool = Pool(processes=pool_size)
result = [] result = []
for i in range(pool_size): for i in range(pool_size):
result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size))) result.append(pool.apply_async(_pool_worker, args = (i, initialHash, target, pool_size)))
while True: while True:
for i in range(pool_size): for i in range(pool_size):
if shared.shutdown:
pool.terminate()
time.sleep(5) #Don't return anything (doing so will cause exceptions because we'll return an unusable response). Sit here and wait for this thread to close.
return
if result[i].ready(): if result[i].ready():
result = result[i].get() result = result[i].get()
pool.terminate() pool.terminate()
return result[0], result[1] return result[0], result[1]
time.sleep(1) time.sleep(0.2)

View File

@ -2,8 +2,8 @@
# Form implementation generated from reading ui file 'settings.ui' # Form implementation generated from reading ui file 'settings.ui'
# #
# Created: Sat May 25 13:20:18 2013 # Created: Thu May 30 15:50:32 2013
# by: PyQt4 UI code generator 4.9.5 # by: PyQt4 UI code generator 4.9.4
# #
# WARNING! All changes made in this file will be lost! # WARNING! All changes made in this file will be lost!
@ -172,6 +172,28 @@ class Ui_settingsDialog(object):
self.label_10.setObjectName(_fromUtf8("label_10")) self.label_10.setObjectName(_fromUtf8("label_10"))
self.gridLayout_6.addWidget(self.label_10, 2, 0, 1, 3) self.gridLayout_6.addWidget(self.label_10, 2, 0, 1, 3)
self.tabWidgetSettings.addTab(self.tab, _fromUtf8("")) self.tabWidgetSettings.addTab(self.tab, _fromUtf8(""))
self.tab_2 = QtGui.QWidget()
self.tab_2.setObjectName(_fromUtf8("tab_2"))
self.formLayout = QtGui.QFormLayout(self.tab_2)
self.formLayout.setObjectName(_fromUtf8("formLayout"))
self.label_13 = QtGui.QLabel(self.tab_2)
self.label_13.setObjectName(_fromUtf8("label_13"))
self.formLayout.setWidget(0, QtGui.QFormLayout.LabelRole, self.label_13)
self.comboBoxMaxCores = QtGui.QComboBox(self.tab_2)
sizePolicy = QtGui.QSizePolicy(QtGui.QSizePolicy.Fixed, QtGui.QSizePolicy.Fixed)
sizePolicy.setHorizontalStretch(0)
sizePolicy.setVerticalStretch(0)
sizePolicy.setHeightForWidth(self.comboBoxMaxCores.sizePolicy().hasHeightForWidth())
self.comboBoxMaxCores.setSizePolicy(sizePolicy)
self.comboBoxMaxCores.setObjectName(_fromUtf8("comboBoxMaxCores"))
self.comboBoxMaxCores.addItem(_fromUtf8(""))
self.comboBoxMaxCores.addItem(_fromUtf8(""))
self.comboBoxMaxCores.addItem(_fromUtf8(""))
self.comboBoxMaxCores.addItem(_fromUtf8(""))
self.comboBoxMaxCores.addItem(_fromUtf8(""))
self.comboBoxMaxCores.addItem(_fromUtf8(""))
self.formLayout.setWidget(0, QtGui.QFormLayout.FieldRole, self.comboBoxMaxCores)
self.tabWidgetSettings.addTab(self.tab_2, _fromUtf8(""))
self.gridLayout.addWidget(self.tabWidgetSettings, 0, 0, 1, 1) self.gridLayout.addWidget(self.tabWidgetSettings, 0, 0, 1, 1)
self.retranslateUi(settingsDialog) self.retranslateUi(settingsDialog)
@ -222,4 +244,12 @@ class Ui_settingsDialog(object):
self.label_12.setText(QtGui.QApplication.translate("settingsDialog", "The \'Small message difficulty\' mostly only affects the difficulty of sending small messages. Doubling this value makes it almost twice as difficult to send a small message but doesn\'t really affect large messages.", None, QtGui.QApplication.UnicodeUTF8)) self.label_12.setText(QtGui.QApplication.translate("settingsDialog", "The \'Small message difficulty\' mostly only affects the difficulty of sending small messages. Doubling this value makes it almost twice as difficult to send a small message but doesn\'t really affect large messages.", None, QtGui.QApplication.UnicodeUTF8))
self.label_10.setText(QtGui.QApplication.translate("settingsDialog", "The \'Total difficulty\' affects the absolute amount of work the sender must complete. Doubling this value doubles the amount of work.", None, QtGui.QApplication.UnicodeUTF8)) self.label_10.setText(QtGui.QApplication.translate("settingsDialog", "The \'Total difficulty\' affects the absolute amount of work the sender must complete. Doubling this value doubles the amount of work.", None, QtGui.QApplication.UnicodeUTF8))
self.tabWidgetSettings.setTabText(self.tabWidgetSettings.indexOf(self.tab), QtGui.QApplication.translate("settingsDialog", "Demanded difficulty", None, QtGui.QApplication.UnicodeUTF8)) self.tabWidgetSettings.setTabText(self.tabWidgetSettings.indexOf(self.tab), QtGui.QApplication.translate("settingsDialog", "Demanded difficulty", None, QtGui.QApplication.UnicodeUTF8))
self.label_13.setText(QtGui.QApplication.translate("settingsDialog", "Maximum number of CPU cores to use when doing work:", None, QtGui.QApplication.UnicodeUTF8))
self.comboBoxMaxCores.setItemText(0, QtGui.QApplication.translate("settingsDialog", "1", None, QtGui.QApplication.UnicodeUTF8))
self.comboBoxMaxCores.setItemText(1, QtGui.QApplication.translate("settingsDialog", "2", None, QtGui.QApplication.UnicodeUTF8))
self.comboBoxMaxCores.setItemText(2, QtGui.QApplication.translate("settingsDialog", "4", None, QtGui.QApplication.UnicodeUTF8))
self.comboBoxMaxCores.setItemText(3, QtGui.QApplication.translate("settingsDialog", "8", None, QtGui.QApplication.UnicodeUTF8))
self.comboBoxMaxCores.setItemText(4, QtGui.QApplication.translate("settingsDialog", "16", None, QtGui.QApplication.UnicodeUTF8))
self.comboBoxMaxCores.setItemText(5, QtGui.QApplication.translate("settingsDialog", "All", None, QtGui.QApplication.UnicodeUTF8))
self.tabWidgetSettings.setTabText(self.tabWidgetSettings.indexOf(self.tab_2), QtGui.QApplication.translate("settingsDialog", "System", None, QtGui.QApplication.UnicodeUTF8))

View File

@ -373,6 +373,60 @@
</item> </item>
</layout> </layout>
</widget> </widget>
<widget class="QWidget" name="tab_2">
<attribute name="title">
<string>System</string>
</attribute>
<layout class="QFormLayout" name="formLayout">
<item row="0" column="0">
<widget class="QLabel" name="label_13">
<property name="text">
<string>Maximum number of CPU cores to use when doing work:</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QComboBox" name="comboBoxMaxCores">
<property name="sizePolicy">
<sizepolicy hsizetype="Fixed" vsizetype="Fixed">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<item>
<property name="text">
<string>1</string>
</property>
</item>
<item>
<property name="text">
<string>2</string>
</property>
</item>
<item>
<property name="text">
<string>4</string>
</property>
</item>
<item>
<property name="text">
<string>8</string>
</property>
</item>
<item>
<property name="text">
<string>16</string>
</property>
</item>
<item>
<property name="text">
<string>All</string>
</property>
</item>
</widget>
</item>
</layout>
</widget>
</widget> </widget>
</item> </item>
</layout> </layout>

View File

@ -7,6 +7,7 @@ import highlevelcrypto
import Queue import Queue
import pickle import pickle
import os import os
import time
myECCryptorObjects = {} myECCryptorObjects = {}
MyECSubscriptionCryptorObjects = {} MyECSubscriptionCryptorObjects = {}
@ -27,6 +28,7 @@ printLock = threading.Lock()
appdata = '' #holds the location of the application data storage directory appdata = '' #holds the location of the application data storage directory
statusIconColor = 'red' statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice. connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit.
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work. networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
@ -141,6 +143,8 @@ def reloadBroadcastSendersForWhichImWatching():
MyECSubscriptionCryptorObjects[hash] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex')) MyECSubscriptionCryptorObjects[hash] = highlevelcrypto.makeCryptor(privEncryptionKey.encode('hex'))
def doCleanShutdown(): def doCleanShutdown():
global shutdown
shutdown = 1 #Used to tell proof of work worker threads to exit.
knownNodesLock.acquire() knownNodesLock.acquire()
UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...'))
output = open(appdata + 'knownnodes.dat', 'wb') output = open(appdata + 'knownnodes.dat', 'wb')
@ -173,6 +177,7 @@ def doCleanShutdown():
print 'Finished flushing inventory.' print 'Finished flushing inventory.'
printLock.release() printLock.release()
time.sleep(.25) #Wait long enough to guarantee that any running proof of work worker threads will check the shutdown variable and exit. If the main thread closes before they do then they won't stop.
if safeConfigGetBoolean('bitmessagesettings','daemon'): if safeConfigGetBoolean('bitmessagesettings','daemon'):
printLock.acquire() printLock.acquire()