Improved logging

Fixes #118

- changed almost all "print" into logger
- threads have nicer names
- logger can have configuration in "logger.dat" in the same directory as
"keys.dat", and the logger will pick the one named "default" to replace
the "console" and "file" that are in PyBitmessage otherwise

Example file for logging to syslog:

[loggers]
keys = root,syslog

[logger_root]
level=NOTSET
handlers=syslog

[logger_syslog]
level=DEBUG
handlers=syslog
qualname=default

[handlers]
keys = syslog

[handler_syslog]
class = handlers.SysLogHandler
formatter = syslog
level = DEBUG
args=(('localhost', handlers.SYSLOG_UDP_PORT),
handlers.SysLogHandler.LOG_LOCAL7)

[formatters]
keys = syslog

[formatter_syslog]
format=%(asctime)s %(threadName)s %(filename)s@%(lineno)d %(message)s
datefmt=%b %d %H:%M:%S
This commit is contained in:
mailchuck 2015-11-18 16:22:17 +01:00
parent ef8f509b3a
commit cee1a59a86
Signed by untrusted user who does not match committer: PeterSurda
GPG Key ID: 0C5F50C0B5F37D87
12 changed files with 208 additions and 279 deletions

View File

@ -55,7 +55,7 @@ def decodeBase58(string, alphabet=ALPHABET):
def encodeVarint(integer): def encodeVarint(integer):
if integer < 0: if integer < 0:
print 'varint cannot be < 0' logger.error('varint cannot be < 0')
raise SystemExit raise SystemExit
if integer < 253: if integer < 253:
return pack('>B',integer) return pack('>B',integer)
@ -66,7 +66,7 @@ def encodeVarint(integer):
if integer >= 4294967296 and integer < 18446744073709551616: if integer >= 4294967296 and integer < 18446744073709551616:
return pack('>B',255) + pack('>Q',integer) return pack('>B',255) + pack('>Q',integer)
if integer >= 18446744073709551616: if integer >= 18446744073709551616:
print 'varint cannot be >= 18446744073709551616' logger.error('varint cannot be >= 18446744073709551616')
raise SystemExit raise SystemExit
class varintDecodeError(Exception): class varintDecodeError(Exception):
@ -185,25 +185,25 @@ def decodeAddress(address):
try: try:
addressVersionNumber, bytesUsedByVersionNumber = decodeVarint(data[:9]) addressVersionNumber, bytesUsedByVersionNumber = decodeVarint(data[:9])
except varintDecodeError as e: except varintDecodeError as e:
print e logger.error(str(e))
status = 'varintmalformed' status = 'varintmalformed'
return status,0,0,"" return status,0,0,""
#print 'addressVersionNumber', addressVersionNumber #print 'addressVersionNumber', addressVersionNumber
#print 'bytesUsedByVersionNumber', bytesUsedByVersionNumber #print 'bytesUsedByVersionNumber', bytesUsedByVersionNumber
if addressVersionNumber > 4: if addressVersionNumber > 4:
print 'cannot decode address version numbers this high' logger.error('cannot decode address version numbers this high')
status = 'versiontoohigh' status = 'versiontoohigh'
return status,0,0,"" return status,0,0,""
elif addressVersionNumber == 0: elif addressVersionNumber == 0:
print 'cannot decode address version numbers of zero.' logger.error('cannot decode address version numbers of zero.')
status = 'versiontoohigh' status = 'versiontoohigh'
return status,0,0,"" return status,0,0,""
try: try:
streamNumber, bytesUsedByStreamNumber = decodeVarint(data[bytesUsedByVersionNumber:]) streamNumber, bytesUsedByStreamNumber = decodeVarint(data[bytesUsedByVersionNumber:])
except varintDecodeError as e: except varintDecodeError as e:
print e logger.error(str(e))
status = 'varintmalformed' status = 'varintmalformed'
return status,0,0,"" return status,0,0,""
#print streamNumber #print streamNumber

View File

@ -13,14 +13,14 @@ try:
from PyQt4.QtNetwork import QLocalSocket, QLocalServer from PyQt4.QtNetwork import QLocalSocket, QLocalServer
except Exception as err: except Exception as err:
print 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download it from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\' (without quotes).' logger.error( 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download it from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\' (without quotes).')
print 'Error message:', err logger.error('Error message: ' + str(err))
sys.exit() sys.exit()
try: try:
_encoding = QtGui.QApplication.UnicodeUTF8 _encoding = QtGui.QApplication.UnicodeUTF8
except AttributeError: except AttributeError:
print 'QtGui.QApplication.UnicodeUTF8 error:', err logger.error('QtGui.QApplication.UnicodeUTF8 error: ' + str(err))
from addresses import * from addresses import *
import shared import shared
@ -491,7 +491,6 @@ class MyForm(settingsmixin.SMainWindow):
if len(db[toAddress]) > 0: if len(db[toAddress]) > 0:
j = 0 j = 0
for f, c in db[toAddress].iteritems(): for f, c in db[toAddress].iteritems():
print "adding %s, %i" % (f, c)
subwidget = Ui_FolderWidget(widget, j, toAddress, f, c) subwidget = Ui_FolderWidget(widget, j, toAddress, f, c)
j += 1 j += 1
widget.setUnreadCount(unread) widget.setUnreadCount(unread)
@ -730,7 +729,7 @@ class MyForm(settingsmixin.SMainWindow):
if nc.test()[0] == 'failed': if nc.test()[0] == 'failed':
self.ui.pushButtonFetchNamecoinID.hide() self.ui.pushButtonFetchNamecoinID.hide()
except: except:
print 'There was a problem testing for a Namecoin daemon. Hiding the Fetch Namecoin ID button' logger.error('There was a problem testing for a Namecoin daemon. Hiding the Fetch Namecoin ID button')
self.ui.pushButtonFetchNamecoinID.hide() self.ui.pushButtonFetchNamecoinID.hide()
def updateTTL(self, sliderPosition): def updateTTL(self, sliderPosition):
@ -1270,7 +1269,7 @@ class MyForm(settingsmixin.SMainWindow):
# has messageing menu been installed # has messageing menu been installed
if not withMessagingMenu: if not withMessagingMenu:
print 'WARNING: MessagingMenu is not available. Is libmessaging-menu-dev installed?' logger.warning('WARNING: MessagingMenu is not available. Is libmessaging-menu-dev installed?')
return return
# create the menu server # create the menu server
@ -1283,7 +1282,7 @@ class MyForm(settingsmixin.SMainWindow):
self.ubuntuMessagingMenuUnread(True) self.ubuntuMessagingMenuUnread(True)
except Exception: except Exception:
withMessagingMenu = False withMessagingMenu = False
print 'WARNING: messaging menu disabled' logger.warning('WARNING: messaging menu disabled')
# update the Ubuntu messaging menu # update the Ubuntu messaging menu
def ubuntuMessagingMenuUpdate(self, drawAttention, newItem, toLabel): def ubuntuMessagingMenuUpdate(self, drawAttention, newItem, toLabel):
@ -1295,7 +1294,7 @@ class MyForm(settingsmixin.SMainWindow):
# has messageing menu been installed # has messageing menu been installed
if not withMessagingMenu: if not withMessagingMenu:
print 'WARNING: messaging menu disabled or libmessaging-menu-dev not installed' logger.warning('WARNING: messaging menu disabled or libmessaging-menu-dev not installed')
return return
# remember this item to that the messaging menu can find it # remember this item to that the messaging menu can find it
@ -1389,7 +1388,7 @@ class MyForm(settingsmixin.SMainWindow):
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
gst_available=True gst_available=True
except: except:
print "WARNING: gst123 must be installed in order to play mp3 sounds" logger.warning("WARNING: gst123 must be installed in order to play mp3 sounds")
if not gst_available: if not gst_available:
try: try:
subprocess.call(["mpg123", soundFilename], subprocess.call(["mpg123", soundFilename],
@ -1397,14 +1396,14 @@ class MyForm(settingsmixin.SMainWindow):
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
gst_available=True gst_available=True
except: except:
print "WARNING: mpg123 must be installed in order to play mp3 sounds" logger.warning("WARNING: mpg123 must be installed in order to play mp3 sounds")
else: else:
try: try:
subprocess.call(["aplay", soundFilename], subprocess.call(["aplay", soundFilename],
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE) stdout=subprocess.PIPE)
except: except:
print "WARNING: aplay must be installed in order to play WAV sounds" logger.warning("WARNING: aplay must be installed in order to play WAV sounds")
elif sys.platform[0:3] == 'win': elif sys.platform[0:3] == 'win':
# use winsound on Windows # use winsound on Windows
import winsound import winsound
@ -1521,7 +1520,7 @@ class MyForm(settingsmixin.SMainWindow):
shared.apiAddressGeneratorReturnQueue.queue.clear() shared.apiAddressGeneratorReturnQueue.queue.clear()
shared.addressGeneratorQueue.put(('createChan', 4, 1, self.str_chan + ' ' + str(self.newChanDialogInstance.ui.lineEditChanNameCreate.text().toUtf8()), self.newChanDialogInstance.ui.lineEditChanNameCreate.text().toUtf8())) shared.addressGeneratorQueue.put(('createChan', 4, 1, self.str_chan + ' ' + str(self.newChanDialogInstance.ui.lineEditChanNameCreate.text().toUtf8()), self.newChanDialogInstance.ui.lineEditChanNameCreate.text().toUtf8()))
addressGeneratorReturnValue = shared.apiAddressGeneratorReturnQueue.get() addressGeneratorReturnValue = shared.apiAddressGeneratorReturnQueue.get()
print 'addressGeneratorReturnValue', addressGeneratorReturnValue logger.debug('addressGeneratorReturnValue ' + addressGeneratorReturnValue)
if len(addressGeneratorReturnValue) == 0: if len(addressGeneratorReturnValue) == 0:
QMessageBox.about(self, _translate("MainWindow", "Address already present"), _translate( QMessageBox.about(self, _translate("MainWindow", "Address already present"), _translate(
"MainWindow", "Could not add chan because it appears to already be one of your identities.")) "MainWindow", "Could not add chan because it appears to already be one of your identities."))
@ -1546,7 +1545,7 @@ class MyForm(settingsmixin.SMainWindow):
shared.apiAddressGeneratorReturnQueue.queue.clear() shared.apiAddressGeneratorReturnQueue.queue.clear()
shared.addressGeneratorQueue.put(('joinChan', addBMIfNotPresent(self.newChanDialogInstance.ui.lineEditChanBitmessageAddress.text()), self.str_chan + ' ' + str(self.newChanDialogInstance.ui.lineEditChanNameJoin.text().toUtf8()), self.newChanDialogInstance.ui.lineEditChanNameJoin.text().toUtf8())) shared.addressGeneratorQueue.put(('joinChan', addBMIfNotPresent(self.newChanDialogInstance.ui.lineEditChanBitmessageAddress.text()), self.str_chan + ' ' + str(self.newChanDialogInstance.ui.lineEditChanNameJoin.text().toUtf8()), self.newChanDialogInstance.ui.lineEditChanNameJoin.text().toUtf8()))
addressGeneratorReturnValue = shared.apiAddressGeneratorReturnQueue.get() addressGeneratorReturnValue = shared.apiAddressGeneratorReturnQueue.get()
print 'addressGeneratorReturnValue', addressGeneratorReturnValue logger.debug('addressGeneratorReturnValue ' + addressGeneratorReturnValue)
if addressGeneratorReturnValue == 'chan name does not match address': if addressGeneratorReturnValue == 'chan name does not match address':
QMessageBox.about(self, _translate("MainWindow", "Address does not match chan name"), _translate( QMessageBox.about(self, _translate("MainWindow", "Address does not match chan name"), _translate(
"MainWindow", "Although the Bitmessage address you entered was valid, it doesn\'t match the chan name.")) "MainWindow", "Although the Bitmessage address you entered was valid, it doesn\'t match the chan name."))
@ -2113,13 +2112,12 @@ class MyForm(settingsmixin.SMainWindow):
acct.createMessage(toAddress, fromAddress, subject, message) acct.createMessage(toAddress, fromAddress, subject, message)
subject = acct.subject subject = acct.subject
toAddress = acct.toAddress toAddress = acct.toAddress
print "Subject: %s" % (subject) logger.debug("Subject: %s" % (subject))
print "address: %s" % (toAddress) logger.debug("address: %s" % (toAddress))
status, addressVersionNumber, streamNumber, ripe = decodeAddress( status, addressVersionNumber, streamNumber, ripe = decodeAddress(
toAddress) toAddress)
if status != 'success': if status != 'success':
with shared.printLock: logger.error('Error: Could not decode ' + toAddress + ':' + status)
print 'Error: Could not decode', toAddress, ':', status
if status == 'missingbm': if status == 'missingbm':
self.statusBar().showMessage(_translate( self.statusBar().showMessage(_translate(
@ -2487,7 +2485,7 @@ class MyForm(settingsmixin.SMainWindow):
shared.objectProcessorQueue.put((objectType,payload)) shared.objectProcessorQueue.put((objectType,payload))
def click_pushButtonStatusIcon(self): def click_pushButtonStatusIcon(self):
print 'click_pushButtonStatusIcon' logger.debug('click_pushButtonStatusIcon')
self.iconGlossaryInstance = iconGlossaryDialog(self) self.iconGlossaryInstance = iconGlossaryDialog(self)
if self.iconGlossaryInstance.exec_(): if self.iconGlossaryInstance.exec_():
pass pass
@ -2798,12 +2796,10 @@ class MyForm(settingsmixin.SMainWindow):
if acct.type != 'normal': if acct.type != 'normal':
return return
if self.dialog.ui.radioButtonUnregister.isChecked() and isinstance(acct, GatewayAccount): if self.dialog.ui.radioButtonUnregister.isChecked() and isinstance(acct, GatewayAccount):
print "unregister"
acct.unregister() acct.unregister()
shared.config.remove_option(addressAtCurrentRow, 'gateway') shared.config.remove_option(addressAtCurrentRow, 'gateway')
shared.writeKeysFile() shared.writeKeysFile()
elif self.dialog.ui.radioButtonRegister.isChecked(): elif self.dialog.ui.radioButtonRegister.isChecked():
print "register"
email = str(self.dialog.ui.lineEditEmail.text().toUtf8()) email = str(self.dialog.ui.lineEditEmail.text().toUtf8())
acct = MailchuckAccount(addressAtCurrentRow) acct = MailchuckAccount(addressAtCurrentRow)
acct.register(email) acct.register(email)
@ -2852,7 +2848,7 @@ class MyForm(settingsmixin.SMainWindow):
shared.addressGeneratorQueue.put(('createDeterministicAddresses', 4, streamNumberForAddress, "unused deterministic address", self.dialog.ui.spinBoxNumberOfAddressesToMake.value( shared.addressGeneratorQueue.put(('createDeterministicAddresses', 4, streamNumberForAddress, "unused deterministic address", self.dialog.ui.spinBoxNumberOfAddressesToMake.value(
), self.dialog.ui.lineEditPassphrase.text().toUtf8(), self.dialog.ui.checkBoxEighteenByteRipe.isChecked())) ), self.dialog.ui.lineEditPassphrase.text().toUtf8(), self.dialog.ui.checkBoxEighteenByteRipe.isChecked()))
else: else:
print 'new address dialog box rejected' logger.debug('new address dialog box rejected')
# Quit selected from menu or application indicator # Quit selected from menu or application indicator
def quit(self): def quit(self):
@ -3040,7 +3036,7 @@ class MyForm(settingsmixin.SMainWindow):
# If the previous message was to a chan then we should send our reply to the chan rather than to the particular person who sent the message. # If the previous message was to a chan then we should send our reply to the chan rather than to the particular person who sent the message.
if shared.config.has_section(toAddressAtCurrentInboxRow): if shared.config.has_section(toAddressAtCurrentInboxRow):
if shared.safeConfigGetBoolean(toAddressAtCurrentInboxRow, 'chan'): if shared.safeConfigGetBoolean(toAddressAtCurrentInboxRow, 'chan'):
print 'original sent to a chan. Setting the to address in the reply to the chan address.' logger.debug('original sent to a chan. Setting the to address in the reply to the chan address.')
self.ui.lineEditTo.setText(str(toAddressAtCurrentInboxRow)) self.ui.lineEditTo.setText(str(toAddressAtCurrentInboxRow))
listOfAddressesInComboBoxSendFrom = [str(widget['from'].itemData(i).toPyObject()) for i in range(widget['from'].count())] listOfAddressesInComboBoxSendFrom = [str(widget['from'].itemData(i).toPyObject()) for i in range(widget['from'].count())]
@ -3715,7 +3711,7 @@ class MyForm(settingsmixin.SMainWindow):
if sourcefile != '': if sourcefile != '':
copied = QtCore.QFile.copy(sourcefile, destination) copied = QtCore.QFile.copy(sourcefile, destination)
if not copied: if not copied:
print 'couldn\'t copy :(' logger.error('couldn\'t copy :(')
# set the icon # set the icon
self.rerenderTabTreeMessages() self.rerenderTabTreeMessages()
self.rerenderTabTreeSubscriptions() self.rerenderTabTreeSubscriptions()
@ -3953,8 +3949,7 @@ class MyForm(settingsmixin.SMainWindow):
def updateStatusBar(self, data): def updateStatusBar(self, data):
if data != "": if data != "":
with shared.printLock: logger.info('Status bar: ' + data)
print 'Status bar:', data
self.statusBar().showMessage(data) self.statusBar().showMessage(data)

View File

@ -29,7 +29,7 @@ class objectProcessor(threading.Thread):
objecs (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads. objecs (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads.
""" """
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="objectProcessor")
""" """
It may be the case that the last time Bitmessage was running, the user It may be the case that the last time Bitmessage was running, the user
closed it before it finished processing everything in the closed it before it finished processing everything in the
@ -741,8 +741,7 @@ class objectProcessor(threading.Thread):
fromAddress = encodeAddress( fromAddress = encodeAddress(
sendersAddressVersion, sendersStream, calculatedRipe) sendersAddressVersion, sendersStream, calculatedRipe)
with shared.printLock: logger.debug('fromAddress: ' + fromAddress)
print 'fromAddress:', fromAddress
if messageEncodingType == 2: if messageEncodingType == 2:
subject, body = self.decodeType2Message(message) subject, body = self.decodeType2Message(message)

View File

@ -16,7 +16,7 @@ from class_receiveDataThread import *
class outgoingSynSender(threading.Thread): class outgoingSynSender(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="outgoingSynSender")
def setup(self, streamNumber, selfInitiatedConnections): def setup(self, streamNumber, selfInitiatedConnections):
self.streamNumber = streamNumber self.streamNumber = streamNumber
@ -40,6 +40,7 @@ class outgoingSynSender(threading.Thread):
while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'): while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'):
time.sleep(2) time.sleep(2)
while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections'): while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections'):
self.name = "outgoingSynSender"
maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8 maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8
while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections: while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections:
time.sleep(10) time.sleep(10)
@ -64,6 +65,7 @@ class outgoingSynSender(threading.Thread):
shared.alreadyAttemptedConnectionsListLock.acquire() shared.alreadyAttemptedConnectionsListLock.acquire()
shared.alreadyAttemptedConnectionsList[peer] = 0 shared.alreadyAttemptedConnectionsList[peer] = 0
shared.alreadyAttemptedConnectionsListLock.release() shared.alreadyAttemptedConnectionsListLock.release()
self.name = "outgoingSynSender-" + peer.host
if peer.host.find(':') == -1: if peer.host.find(':') == -1:
address_family = socket.AF_INET address_family = socket.AF_INET
else: else:
@ -86,22 +88,19 @@ class outgoingSynSender(threading.Thread):
except: except:
pass pass
shared.knownNodesLock.release() shared.knownNodesLock.release()
with shared.printLock: logger.debug('deleting ' + str(peer) + ' from shared.knownNodes because it caused a socks.socksocket exception. We must not be 64-bit compatible.')
print 'deleting ', peer, 'from shared.knownNodes because it caused a socks.socksocket exception. We must not be 64-bit compatible.'
continue continue
# This option apparently avoids the TIME_WAIT state so that we # This option apparently avoids the TIME_WAIT state so that we
# can rebind faster # can rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(20) sock.settimeout(20)
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2: if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
with shared.printLock: logger.debug('Trying an outgoing connection to ' + str(peer))
print 'Trying an outgoing connection to', peer
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a': elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
if shared.verbose >= 2: if shared.verbose >= 2:
with shared.printLock: logger.debug ('(Using SOCKS4a) Trying an outgoing connection to ' + str(peer))
print '(Using SOCKS4a) Trying an outgoing connection to', peer
proxytype = socks.PROXY_TYPE_SOCKS4 proxytype = socks.PROXY_TYPE_SOCKS4
sockshostname = shared.config.get( sockshostname = shared.config.get(
@ -121,8 +120,7 @@ class outgoingSynSender(threading.Thread):
proxytype, sockshostname, socksport, rdns) proxytype, sockshostname, socksport, rdns)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
if shared.verbose >= 2: if shared.verbose >= 2:
with shared.printLock: logger.debug ('(Using SOCKS5) Trying an outgoing connection to ' + str(peer))
print '(Using SOCKS5) Trying an outgoing connection to', peer
proxytype = socks.PROXY_TYPE_SOCKS5 proxytype = socks.PROXY_TYPE_SOCKS5
sockshostname = shared.config.get( sockshostname = shared.config.get(
@ -155,8 +153,7 @@ class outgoingSynSender(threading.Thread):
self.selfInitiatedConnections, self.selfInitiatedConnections,
sendDataThreadQueue) sendDataThreadQueue)
rd.start() rd.start()
with shared.printLock: logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.')
print self, 'connected to', peer, 'during an outgoing attempt.'
sd = sendDataThread(sendDataThreadQueue) sd = sendDataThread(sendDataThreadQueue)
@ -167,8 +164,7 @@ class outgoingSynSender(threading.Thread):
except socks.GeneralProxyError as err: except socks.GeneralProxyError as err:
if shared.verbose >= 2: if shared.verbose >= 2:
with shared.printLock: logger.debug('Could NOT connect to ' + str(peer) + ' during outgoing attempt. ' + str(err))
print 'Could NOT connect to', peer, 'during outgoing attempt.', err
deletedPeer = None deletedPeer = None
with shared.knownNodesLock: with shared.knownNodesLock:
@ -186,8 +182,7 @@ class outgoingSynSender(threading.Thread):
del shared.knownNodes[self.streamNumber][peer] del shared.knownNodes[self.streamNumber][peer]
deletedPeer = peer deletedPeer = peer
if deletedPeer: if deletedPeer:
with shared.printLock: str ('deleting ' + str(peer) + ' from shared.knownNodes because it is more than 48 hours old and we could not connect to it.')
print 'deleting', peer, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
except socks.Socks5AuthError as err: except socks.Socks5AuthError as err:
shared.UISignalQueue.put(( shared.UISignalQueue.put((
@ -195,16 +190,15 @@ class outgoingSynSender(threading.Thread):
"MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err)))) "MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err))))
except socks.Socks5Error as err: except socks.Socks5Error as err:
pass pass
print 'SOCKS5 error. (It is possible that the server wants authentication).)', str(err) logger.error('SOCKS5 error. (It is possible that the server wants authentication).) ' + str(err))
except socks.Socks4Error as err: except socks.Socks4Error as err:
print 'Socks4Error:', err logger.error('Socks4Error: ' + str(err))
except socket.error as err: except socket.error as err:
if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err) logger.error('Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err))
else: else:
if shared.verbose >= 1: if shared.verbose >= 1:
with shared.printLock: logger.error('Could NOT connect to ' + str(peer) + 'during outgoing attempt. ' + str(err))
print 'Could NOT connect to', peer, 'during outgoing attempt.', err
deletedPeer = None deletedPeer = None
with shared.knownNodesLock: with shared.knownNodesLock:
@ -222,12 +216,9 @@ class outgoingSynSender(threading.Thread):
del shared.knownNodes[self.streamNumber][peer] del shared.knownNodes[self.streamNumber][peer]
deletedPeer = peer deletedPeer = peer
if deletedPeer: if deletedPeer:
with shared.printLock: logger.debug('deleting ' + str(peer) + ' from shared.knownNodes because it is more than 48 hours old and we could not connect to it.')
print 'deleting', peer, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
except Exception as err: except Exception as err:
sys.stderr.write(
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ')
import traceback import traceback
traceback.print_exc() logger.exception('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:')
time.sleep(0.1) time.sleep(0.1)

View File

@ -29,7 +29,7 @@ from debug import logger
class receiveDataThread(threading.Thread): class receiveDataThread(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="receiveData")
self.data = '' self.data = ''
self.verackSent = False self.verackSent = False
self.verackReceived = False self.verackReceived = False
@ -46,6 +46,7 @@ class receiveDataThread(threading.Thread):
self.sock = sock self.sock = sock
self.peer = shared.Peer(HOST, port) self.peer = shared.Peer(HOST, port)
self.name = "receiveData-" + self.peer.host
self.streamNumber = streamNumber self.streamNumber = streamNumber
self.objectsThatWeHaveYetToGetFromThisPeer = {} self.objectsThatWeHaveYetToGetFromThisPeer = {}
self.selfInitiatedConnections = selfInitiatedConnections self.selfInitiatedConnections = selfInitiatedConnections
@ -62,8 +63,7 @@ class receiveDataThread(threading.Thread):
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
def run(self): def run(self):
with shared.printLock: logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
print 'receiveDataThread starting. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
while True: while True:
if shared.config.getint('bitmessagesettings', 'maxdownloadrate') == 0: if shared.config.getint('bitmessagesettings', 'maxdownloadrate') == 0:
@ -89,36 +89,31 @@ class receiveDataThread(threading.Thread):
shared.numberOfBytesReceived += len(dataRecv) # for the 'network status' UI tab. The UI clears this value whenever it updates. shared.numberOfBytesReceived += len(dataRecv) # for the 'network status' UI tab. The UI clears this value whenever it updates.
shared.numberOfBytesReceivedLastSecond += len(dataRecv) # for the download rate limit shared.numberOfBytesReceivedLastSecond += len(dataRecv) # for the download rate limit
except socket.timeout: except socket.timeout:
with shared.printLock: logger.error ('Timeout occurred waiting for data from ' + str(self.peer) + '. Closing receiveData thread. (ID: ' + str(id(self)) + ')')
print 'Timeout occurred waiting for data from', self.peer, '. Closing receiveData thread. (ID:', str(id(self)) + ')'
break break
except Exception as err: except Exception as err:
if (sys.platform == 'win32' and err.errno in ([2, 10035])) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK): if (sys.platform == 'win32' and err.errno in ([2, 10035])) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK):
select.select([self.sslSock], [], []) select.select([self.sslSock], [], [])
continue continue
with shared.printLock: logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err))
print 'sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID:', str(id(self)) + ').', str(err.errno), "/", err
break break
# print 'Received', repr(self.data) # print 'Received', repr(self.data)
if len(self.data) == dataLen: # If self.sock.recv returned no data: if len(self.data) == dataLen: # If self.sock.recv returned no data:
with shared.printLock: logger.debug('Connection to ' + str(self.peer) + ' closed. Closing receiveData thread. (ID: ' + str(id(self)) + ')')
print 'Connection to', self.peer, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
break break
else: else:
self.processData() self.processData()
try: try:
del self.selfInitiatedConnections[self.streamNumber][self] del self.selfInitiatedConnections[self.streamNumber][self]
with shared.printLock: logger.debug('removed self (a receiveDataThread) from selfInitiatedConnections')
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
except: except:
pass pass
self.sendDataThreadQueue.put((0, 'shutdown','no data')) # commands the corresponding sendDataThread to shut itself down. self.sendDataThreadQueue.put((0, 'shutdown','no data')) # commands the corresponding sendDataThread to shut itself down.
try: try:
del shared.connectedHostsList[self.peer.host] del shared.connectedHostsList[self.peer.host]
except Exception as err: except Exception as err:
with shared.printLock: logger.error('Could not delete ' + str(self.peer.host) + ' from shared.connectedHostsList.' + str(err))
print 'Could not delete', self.peer.host, 'from shared.connectedHostsList.', err
try: try:
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
@ -126,8 +121,7 @@ class receiveDataThread(threading.Thread):
except: except:
pass pass
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
with shared.printLock: logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
print 'receiveDataThread ending. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
def processData(self): def processData(self):
@ -148,7 +142,7 @@ class receiveDataThread(threading.Thread):
return return
payload = self.data[shared.Header.size:payloadLength + shared.Header.size] payload = self.data[shared.Header.size:payloadLength + shared.Header.size]
if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message. if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message.
print 'Checksum incorrect. Clearing this message.' logger.error('Checksum incorrect. Clearing this message.')
self.data = self.data[payloadLength + shared.Header.size:] self.data = self.data[payloadLength + shared.Header.size:]
del magic,command,payloadLength,checksum,payload # better to clean up before the recursive call del magic,command,payloadLength,checksum,payload # better to clean up before the recursive call
self.processData() self.processData()
@ -163,8 +157,7 @@ class receiveDataThread(threading.Thread):
#Strip the nulls #Strip the nulls
command = command.rstrip('\x00') command = command.rstrip('\x00')
with shared.printLock: logger.debug('remoteCommand ' + repr(command) + ' from ' + str(self.peer))
print 'remoteCommand', repr(command), ' from', self.peer
try: try:
#TODO: Use a dispatcher here #TODO: Use a dispatcher here
@ -202,14 +195,12 @@ class receiveDataThread(threading.Thread):
objectHash, = random.sample( objectHash, = random.sample(
self.objectsThatWeHaveYetToGetFromThisPeer, 1) self.objectsThatWeHaveYetToGetFromThisPeer, 1)
if objectHash in shared.inventory: if objectHash in shared.inventory:
with shared.printLock: logger.debug('Inventory (in memory) already has object listed in inv message.')
print 'Inventory (in memory) already has object listed in inv message.'
del self.objectsThatWeHaveYetToGetFromThisPeer[ del self.objectsThatWeHaveYetToGetFromThisPeer[
objectHash] objectHash]
elif shared.isInSqlInventory(objectHash): elif shared.isInSqlInventory(objectHash):
if shared.verbose >= 3: if shared.verbose >= 3:
with shared.printLock: logger.debug('Inventory (SQL on disk) already has object listed in inv message.')
print 'Inventory (SQL on disk) already has object listed in inv message.'
del self.objectsThatWeHaveYetToGetFromThisPeer[ del self.objectsThatWeHaveYetToGetFromThisPeer[
objectHash] objectHash]
else: else:
@ -218,8 +209,7 @@ class receiveDataThread(threading.Thread):
del self.objectsThatWeHaveYetToGetFromThisPeer[ del self.objectsThatWeHaveYetToGetFromThisPeer[
objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway. objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway.
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0: if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
with shared.printLock: logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
try: try:
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
@ -228,16 +218,14 @@ class receiveDataThread(threading.Thread):
break break
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0: if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore. # We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
with shared.printLock: logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
try: try:
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
except: except:
pass pass
if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
with shared.printLock: logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)))
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
shared.numberOfObjectsThatWeHaveYetToGetPerPeer[self.peer] = len( shared.numberOfObjectsThatWeHaveYetToGetPerPeer[self.peer] = len(
self.objectsThatWeHaveYetToGetFromThisPeer) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. self.objectsThatWeHaveYetToGetFromThisPeer) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
@ -245,14 +233,12 @@ class receiveDataThread(threading.Thread):
def sendpong(self): def sendpong(self):
with shared.printLock: logger.debug('Sending pong')
print 'Sending pong'
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('pong'))) self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('pong')))
def recverack(self): def recverack(self):
with shared.printLock: logger.debug('verack received')
print 'verack received'
self.verackReceived = True self.verackReceived = True
if self.verackSent: if self.verackSent:
# We have thus both sent and received a verack. # We have thus both sent and received a verack.
@ -289,11 +275,10 @@ class receiveDataThread(threading.Thread):
self.sock.settimeout( self.sock.settimeout(
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately. 600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
with shared.printLock: logger.debug('Connection fully established with ' + str(self.peer) + "\n" + \
print 'Connection fully established with', self.peer 'The size of the connectedHostsList is now ' + str(len(shared.connectedHostsList)) + "\n" + \
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList) 'The length of sendDataQueues is now: ' + str(len(shared.sendDataQueues)) + "\n" + \
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues) 'broadcasting addr from within connectionFullyEstablished function.')
print 'broadcasting addr from within connectionFullyEstablished function.'
# Let all of our peers know about this new node. # Let all of our peers know about this new node.
dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort) dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort)
@ -302,8 +287,7 @@ class receiveDataThread(threading.Thread):
self.sendaddr() # This is one large addr message to this one peer. self.sendaddr() # This is one large addr message to this one peer.
if not self.initiatedConnection and len(shared.connectedHostsList) > 200: if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
with shared.printLock: logger.info ('We are connected to too many people. Closing connection.')
print 'We are connected to too many people. Closing connection.'
self.sendDataThreadQueue.put((0, 'shutdown','no data')) self.sendDataThreadQueue.put((0, 'shutdown','no data'))
return return
@ -349,8 +333,7 @@ class receiveDataThread(threading.Thread):
# function for broadcasting invs to everyone in our stream. # function for broadcasting invs to everyone in our stream.
def sendinvMessageToJustThisOnePeer(self, numberOfObjects, payload): def sendinvMessageToJustThisOnePeer(self, numberOfObjects, payload):
payload = encodeVarint(numberOfObjects) + payload payload = encodeVarint(numberOfObjects) + payload
with shared.printLock: logger.debug('Sending huge inv message with ' + str(numberOfObjects) + ' objects to just this one peer')
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('inv', payload))) self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('inv', payload)))
def _sleepForTimingAttackMitigation(self, sleepTime): def _sleepForTimingAttackMitigation(self, sleepTime):
@ -358,8 +341,7 @@ class receiveDataThread(threading.Thread):
# only connected to the trusted peer because we can trust the # only connected to the trusted peer because we can trust the
# peer not to attack # peer not to attack
if sleepTime > 0 and doTimingAttackMitigation and shared.trustedPeer == None: if sleepTime > 0 and doTimingAttackMitigation and shared.trustedPeer == None:
with shared.printLock: logger.debug('Timing attack mitigation: Sleeping for ' + str(sleepTime) + ' seconds.')
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
time.sleep(sleepTime) time.sleep(sleepTime)
def recerror(self, data): def recerror(self, data):
@ -417,30 +399,27 @@ class receiveDataThread(threading.Thread):
if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0: if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0:
for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items(): for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items():
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value
with shared.printLock: logger.debug('number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer: ' + str(len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer)) + "\n" + \
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer:', len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) 'totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers))
print 'totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10]) numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
if numberOfItemsInInv > 50000: if numberOfItemsInInv > 50000:
sys.stderr.write('Too many items in inv message!') sys.stderr.write('Too many items in inv message!')
return return
if len(data) < lengthOfVarint + (numberOfItemsInInv * 32): if len(data) < lengthOfVarint + (numberOfItemsInInv * 32):
print 'inv message doesn\'t contain enough data. Ignoring.' logger.info('inv message doesn\'t contain enough data. Ignoring.')
return return
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object. if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
with shared.printLock: logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.')
print 'We already have', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
return return
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[ self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
data[lengthOfVarint:32 + lengthOfVarint]] = 0 data[lengthOfVarint:32 + lengthOfVarint]] = 0
shared.numberOfInventoryLookupsPerformed += 1 shared.numberOfInventoryLookupsPerformed += 1
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory: if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
with shared.printLock: logger.debug('Inventory (in memory) has inventory item already.')
print 'Inventory (in memory) has inventory item already.'
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
print 'Inventory (SQL on disk) has inventory item already.' logger.debug('Inventory (SQL on disk) has inventory item already.')
else: else:
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
else: else:
@ -455,8 +434,7 @@ class receiveDataThread(threading.Thread):
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime) logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
for item in objectsNewToMe: for item in objectsNewToMe:
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
with shared.printLock: logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)), ' from this node in particular. Ignoring the rest of this inv message.')
print 'We already have', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToGetFromThisPeer), 'from this node in particular. Ignoring the rest of this inv message.'
break break
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
self.objectsThatWeHaveYetToGetFromThisPeer[item] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers. self.objectsThatWeHaveYetToGetFromThisPeer[item] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
@ -467,8 +445,7 @@ class receiveDataThread(threading.Thread):
# Send a getdata message to our peer to request the object with the given # Send a getdata message to our peer to request the object with the given
# hash # hash
def sendgetdata(self, hash): def sendgetdata(self, hash):
with shared.printLock: logger.debug('sending getdata to retrieve object with hash: ' + hash.encode('hex'))
print 'sending getdata to retrieve object with hash:', hash.encode('hex')
payload = '\x01' + hash payload = '\x01' + hash
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('getdata', payload))) self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('getdata', payload)))
@ -478,13 +455,12 @@ class receiveDataThread(threading.Thread):
numberOfRequestedInventoryItems, lengthOfVarint = decodeVarint( numberOfRequestedInventoryItems, lengthOfVarint = decodeVarint(
data[:10]) data[:10])
if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems): if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems):
print 'getdata message does not contain enough data. Ignoring.' logger.debug('getdata message does not contain enough data. Ignoring.')
return return
for i in xrange(numberOfRequestedInventoryItems): for i in xrange(numberOfRequestedInventoryItems):
hash = data[lengthOfVarint + ( hash = data[lengthOfVarint + (
i * 32):32 + lengthOfVarint + (i * 32)] i * 32):32 + lengthOfVarint + (i * 32)]
with shared.printLock: logger.debug('received getdata request for item:' + hash.encode('hex'))
print 'received getdata request for item:', hash.encode('hex')
shared.numberOfInventoryLookupsPerformed += 1 shared.numberOfInventoryLookupsPerformed += 1
shared.inventoryLock.acquire() shared.inventoryLock.acquire()
@ -507,36 +483,35 @@ class receiveDataThread(threading.Thread):
# Our peer has requested (in a getdata message) that we send an object. # Our peer has requested (in a getdata message) that we send an object.
def sendObject(self, payload): def sendObject(self, payload):
with shared.printLock: logger.debug('sending an object.')
print 'sending an object.'
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload))) self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload)))
def _checkIPv4Address(self, host, hostStandardFormat): def _checkIPv4Address(self, host, hostStandardFormat):
# print 'hostStandardFormat', hostStandardFormat # print 'hostStandardFormat', hostStandardFormat
if host[0] == '\x7F': # 127/8 if host[0] == '\x7F': # 127/8
print 'Ignoring IP address in loopback range:', hostStandardFormat logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat)
return False return False
if host[0] == '\x0A': # 10/8 if host[0] == '\x0A': # 10/8
print 'Ignoring IP address in private range:', hostStandardFormat logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
return False return False
if host[0:2] == '\xC0\xA8': # 192.168/16 if host[0:2] == '\xC0\xA8': # 192.168/16
print 'Ignoring IP address in private range:', hostStandardFormat logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
return False return False
if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12
print 'Ignoring IP address in private range:', hostStandardFormat logger.debug('Ignoring IP address in private range:' + hostStandardFormat)
return False return False
return True return True
def _checkIPv6Address(self, host, hostStandardFormat): def _checkIPv6Address(self, host, hostStandardFormat):
if host == ('\x00' * 15) + '\x01': if host == ('\x00' * 15) + '\x01':
print 'Ignoring loopback address:', hostStandardFormat logger.debug('Ignoring loopback address: ' + hostStandardFormat)
return False return False
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80: if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
print 'Ignoring local address:', hostStandardFormat logger.debug ('Ignoring local address: ' + hostStandardFormat)
return False return False
if (ord(host[0]) & 0xfe) == 0xfc: if (ord(host[0]) & 0xfe) == 0xfc:
print 'Ignoring unique local address:', hostStandardFormat logger.debug ('Ignoring unique local address: ' + hostStandardFormat)
return False return False
return True return True
@ -546,13 +521,12 @@ class receiveDataThread(threading.Thread):
data[:10]) data[:10])
if shared.verbose >= 1: if shared.verbose >= 1:
with shared.printLock: logger.debug('addr message contains ' + str(numberOfAddressesIncluded) + ' IP addresses.')
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
return return
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded): if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
print 'addr message does not contain the correct amount of data. Ignoring.' logger.debug('addr message does not contain the correct amount of data. Ignoring.')
return return
for i in range(0, numberOfAddressesIncluded): for i in range(0, numberOfAddressesIncluded):
@ -590,8 +564,7 @@ class receiveDataThread(threading.Thread):
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now. if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
with shared.knownNodesLock: with shared.knownNodesLock:
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
with shared.printLock: logger.debug('added new node ' + str(peerFromAddrMessage) + ' to knownNodes in stream ' + str(recaddrStream))
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
shared.needToWriteKnownNodesToDisk = True shared.needToWriteKnownNodesToDisk = True
hostDetails = ( hostDetails = (
@ -606,8 +579,7 @@ class receiveDataThread(threading.Thread):
with shared.knownNodesLock: with shared.knownNodesLock:
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
with shared.printLock: logger.debug('knownNodes currently has ' + str(len(shared.knownNodes[self.streamNumber])) + ' nodes for this stream.')
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
# Send a huge addr message to our peer. This is only used # Send a huge addr message to our peer. This is only used
@ -706,8 +678,7 @@ class receiveDataThread(threading.Thread):
self.services, = unpack('>q', data[4:12]) self.services, = unpack('>q', data[4:12])
if self.remoteProtocolVersion < 3: if self.remoteProtocolVersion < 3:
self.sendDataThreadQueue.put((0, 'shutdown','no data')) self.sendDataThreadQueue.put((0, 'shutdown','no data'))
with shared.printLock: logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer))
print 'Closing connection to old protocol version', self.remoteProtocolVersion, 'node: ', self.peer
return return
timestamp, = unpack('>Q', data[12:20]) timestamp, = unpack('>Q', data[12:20])
timeOffset = timestamp - int(time.time()) timeOffset = timestamp - int(time.time())
@ -748,13 +719,11 @@ class receiveDataThread(threading.Thread):
readPosition += lengthOfNumberOfStreamsInVersionMessage readPosition += lengthOfNumberOfStreamsInVersionMessage
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint( self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
data[readPosition:]) data[readPosition:])
with shared.printLock: logger.debug('Remote node useragent: ' + useragent + ' stream number:' + str(self.streamNumber) + ' time offset: ' + str(timeOffset) + ' seconds.')
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber, ' time offset:', timeOffset, 'seconds.'
if self.streamNumber != 1: if self.streamNumber != 1:
self.sendDataThreadQueue.put((0, 'shutdown','no data')) self.sendDataThreadQueue.put((0, 'shutdown','no data'))
with shared.printLock: logger.debug ('Closed connection to ' + str(self.peer) + ' because they are interested in stream ' + str(self.streamNumber) + '.')
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
return return
shared.connectedHostsList[ shared.connectedHostsList[
self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab. self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
@ -764,8 +733,7 @@ class receiveDataThread(threading.Thread):
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber)) self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber))
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf: if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
self.sendDataThreadQueue.put((0, 'shutdown','no data')) self.sendDataThreadQueue.put((0, 'shutdown','no data'))
with shared.printLock: logger.debug('Closing connection to myself: ' + str(self.peer))
print 'Closing connection to myself: ', self.peer
return return
# The other peer's protocol version is of interest to the sendDataThread but we learn of it # The other peer's protocol version is of interest to the sendDataThread but we learn of it
@ -782,16 +750,14 @@ class receiveDataThread(threading.Thread):
# Sends a version message # Sends a version message
def sendversion(self): def sendversion(self):
with shared.printLock: logger.debug('Sending version message')
print 'Sending version message'
self.sendDataThreadQueue.put((0, 'sendRawData', shared.assembleVersionMessage( self.sendDataThreadQueue.put((0, 'sendRawData', shared.assembleVersionMessage(
self.peer.host, self.peer.port, self.streamNumber))) self.peer.host, self.peer.port, self.streamNumber)))
# Sends a verack message # Sends a verack message
def sendverack(self): def sendverack(self):
with shared.printLock: logger.debug('Sending verack')
print 'Sending verack'
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('verack'))) self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('verack')))
self.verackSent = True self.verackSent = True
if self.verackReceived: if self.verackReceived:

View File

@ -11,13 +11,14 @@ import socket
from helper_generic import addDataPadding from helper_generic import addDataPadding
from class_objectHashHolder import * from class_objectHashHolder import *
from addresses import * from addresses import *
from debug import logger
# Every connection to a peer has a sendDataThread (and also a # Every connection to a peer has a sendDataThread (and also a
# receiveDataThread). # receiveDataThread).
class sendDataThread(threading.Thread): class sendDataThread(threading.Thread):
def __init__(self, sendDataThreadQueue): def __init__(self, sendDataThreadQueue):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="sendData")
self.sendDataThreadQueue = sendDataThreadQueue self.sendDataThreadQueue = sendDataThreadQueue
shared.sendDataQueues.append(self.sendDataThreadQueue) shared.sendDataQueues.append(self.sendDataThreadQueue)
self.data = '' self.data = ''
@ -35,6 +36,7 @@ class sendDataThread(threading.Thread):
someObjectsOfWhichThisRemoteNodeIsAlreadyAware): someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
self.sock = sock self.sock = sock
self.peer = shared.Peer(HOST, PORT) self.peer = shared.Peer(HOST, PORT)
self.name = "sendData-" + self.peer.host
self.streamNumber = streamNumber self.streamNumber = streamNumber
self.services = 0 self.services = 0
self.remoteProtocolVersion = - \ self.remoteProtocolVersion = - \
@ -42,23 +44,20 @@ class sendDataThread(threading.Thread):
self.lastTimeISentData = int( self.lastTimeISentData = int(
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive. time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
with shared.printLock: logger.debug('The streamNumber of this sendDataThread (ID: ' + str(id(self)) + ') at setup() is' + str(self.streamNumber))
print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber
def sendVersionMessage(self): def sendVersionMessage(self):
datatosend = shared.assembleVersionMessage( datatosend = shared.assembleVersionMessage(
self.peer.host, self.peer.port, self.streamNumber) # the IP and port of the remote host, and my streamNumber. self.peer.host, self.peer.port, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
with shared.printLock: logger.debug('Sending version packet: ' + repr(datatosend))
print 'Sending version packet: ', repr(datatosend)
try: try:
self.sendBytes(datatosend) self.sendBytes(datatosend)
except Exception as err: except Exception as err:
# if not 'Bad file descriptor' in err: # if not 'Bad file descriptor' in err:
with shared.printLock: logger.error('sock.sendall error: %s\n' % err)
sys.stderr.write('sock.sendall error: %s\n' % err)
self.versionSent = 1 self.versionSent = 1
@ -94,15 +93,13 @@ class sendDataThread(threading.Thread):
def run(self): def run(self):
with shared.printLock: logger.debug('sendDataThread starting. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(shared.sendDataQueues)))
print 'sendDataThread starting. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
while True: while True:
deststream, command, data = self.sendDataThreadQueue.get() deststream, command, data = self.sendDataThreadQueue.get()
if deststream == self.streamNumber or deststream == 0: if deststream == self.streamNumber or deststream == 0:
if command == 'shutdown': if command == 'shutdown':
with shared.printLock: logger.debug('sendDataThread (associated with ' + str(self.peer) + ') ID: ' + str(id(self)) + ' shutting down now.')
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
break break
# When you receive an incoming connection, a sendDataThread is # When you receive an incoming connection, a sendDataThread is
# created even though you don't yet know what stream number the # created even though you don't yet know what stream number the
@ -112,12 +109,10 @@ class sendDataThread(threading.Thread):
# streamNumber of this send data thread here: # streamNumber of this send data thread here:
elif command == 'setStreamNumber': elif command == 'setStreamNumber':
self.streamNumber = data self.streamNumber = data
with shared.printLock: logger.debug('setting the stream number in the sendData thread (ID: ' + str(id(self)) + ') to ' + str(self.streamNumber))
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', self.streamNumber
elif command == 'setRemoteProtocolVersion': elif command == 'setRemoteProtocolVersion':
specifiedRemoteProtocolVersion = data specifiedRemoteProtocolVersion = data
with shared.printLock: logger.debug('setting the remote node\'s protocol version in the sendDataThread (ID: ' + str(id(self)) + ') to ' + str(specifiedRemoteProtocolVersion))
print 'setting the remote node\'s protocol version in the sendDataThread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
self.remoteProtocolVersion = specifiedRemoteProtocolVersion self.remoteProtocolVersion = specifiedRemoteProtocolVersion
elif command == 'advertisepeer': elif command == 'advertisepeer':
self.objectHashHolderInstance.holdPeer(data) self.objectHashHolderInstance.holdPeer(data)
@ -140,8 +135,7 @@ class sendDataThread(threading.Thread):
try: try:
self.sendBytes(packet) self.sendBytes(packet)
except: except:
with shared.printLock: logger.error('sendaddr: self.sock.sendall failed')
print 'sendaddr: self.sock.sendall failed'
break break
elif command == 'advertiseobject': elif command == 'advertiseobject':
self.objectHashHolderInstance.holdHash(data) self.objectHashHolderInstance.holdHash(data)
@ -157,35 +151,30 @@ class sendDataThread(threading.Thread):
try: try:
self.sendBytes(packet) self.sendBytes(packet)
except: except:
with shared.printLock: logger.error('sendinv: self.sock.sendall failed')
print 'sendinv: self.sock.sendall failed'
break break
elif command == 'pong': elif command == 'pong':
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
if self.lastTimeISentData < (int(time.time()) - 298): if self.lastTimeISentData < (int(time.time()) - 298):
# Send out a pong message to keep the connection alive. # Send out a pong message to keep the connection alive.
with shared.printLock: logger.debug('Sending pong to ' + str(self.peer) + ' to keep connection alive.')
print 'Sending pong to', self.peer, 'to keep connection alive.'
packet = shared.CreatePacket('pong') packet = shared.CreatePacket('pong')
try: try:
self.sendBytes(packet) self.sendBytes(packet)
except: except:
with shared.printLock: logger.error('send pong failed')
print 'send pong failed'
break break
elif command == 'sendRawData': elif command == 'sendRawData':
try: try:
self.sendBytes(data) self.sendBytes(data)
except: except:
with shared.printLock: logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.')
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
break break
elif command == 'connectionIsOrWasFullyEstablished': elif command == 'connectionIsOrWasFullyEstablished':
self.connectionIsOrWasFullyEstablished = True self.connectionIsOrWasFullyEstablished = True
self.services, self.sslSock = data self.services, self.sslSock = data
else: else:
with shared.printLock: logger.error('sendDataThread ID: ' + id(self) + ' ignoring command ' + command + ' because the thread is not in stream' + deststream)
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
try: try:
self.sock.shutdown(socket.SHUT_RDWR) self.sock.shutdown(socket.SHUT_RDWR)
@ -193,6 +182,5 @@ class sendDataThread(threading.Thread):
except: except:
pass pass
shared.sendDataQueues.remove(self.sendDataThreadQueue) shared.sendDataQueues.remove(self.sendDataThreadQueue)
with shared.printLock: logger.info('sendDataThread ending. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(shared.sendDataQueues)))
print 'sendDataThread ending. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
self.objectHashHolderInstance.close() self.objectHashHolderInstance.close()

View File

@ -31,7 +31,7 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...)
class singleCleaner(threading.Thread): class singleCleaner(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="singleCleaner")
def run(self): def run(self):
timeWeLastClearedInventoryAndPubkeysTables = 0 timeWeLastClearedInventoryAndPubkeysTables = 0
@ -83,9 +83,7 @@ class singleCleaner(threading.Thread):
int(time.time()) - shared.maximumLengthOfTimeToBotherResendingMessages) int(time.time()) - shared.maximumLengthOfTimeToBotherResendingMessages)
for row in queryreturn: for row in queryreturn:
if len(row) < 2: if len(row) < 2:
with shared.printLock: logger.error('Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
sys.stderr.write(
'Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
time.sleep(3) time.sleep(3)
break break
toAddress, ackData, status = row toAddress, ackData, status = row

View File

@ -18,7 +18,7 @@ import re
class singleListener(threading.Thread): class singleListener(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="singleListener")
def setup(self, selfInitiatedConnections): def setup(self, selfInitiatedConnections):
self.selfInitiatedConnections = selfInitiatedConnections self.selfInitiatedConnections = selfInitiatedConnections
@ -54,8 +54,7 @@ class singleListener(threading.Thread):
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'): while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'):
time.sleep(5) time.sleep(5)
with shared.printLock: logger.info('Listening for incoming connections.')
print 'Listening for incoming connections.'
# First try listening on an IPv6 socket. This should also be # First try listening on an IPv6 socket. This should also be
# able to accept connections on IPv4. If that's not available # able to accept connections on IPv4. If that's not available
@ -82,8 +81,7 @@ class singleListener(threading.Thread):
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'): while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'):
time.sleep(10) time.sleep(10)
while len(shared.connectedHostsList) > 220: while len(shared.connectedHostsList) > 220:
with shared.printLock: logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.')
print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.'
time.sleep(10) time.sleep(10)
@ -104,8 +102,7 @@ class singleListener(threading.Thread):
# connection flooding. # connection flooding.
if HOST in shared.connectedHostsList: if HOST in shared.connectedHostsList:
socketObject.close() socketObject.close()
with shared.printLock: logger.info('We are already connected to ' + str(HOST) + '. Ignoring connection.')
print 'We are already connected to', HOST + '. Ignoring connection.'
else: else:
break break
@ -124,6 +121,5 @@ class singleListener(threading.Thread):
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue) socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue)
rd.start() rd.start()
with shared.printLock: logger.info('connected to ' + HOST + ' during INCOMING request.')
print self, 'connected to', HOST, 'during INCOMING request.'

View File

@ -25,7 +25,7 @@ class singleWorker(threading.Thread):
def __init__(self): def __init__(self):
# QThread.__init__(self, parent) # QThread.__init__(self, parent)
threading.Thread.__init__(self) threading.Thread.__init__(self, name="singleWorker")
def run(self): def run(self):
@ -49,7 +49,7 @@ class singleWorker(threading.Thread):
'''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''') '''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''')
for row in queryreturn: for row in queryreturn:
ackdata, = row ackdata, = row
print 'Watching for ackdata', ackdata.encode('hex') logger.info('Watching for ackdata ' + ackdata.encode('hex'))
shared.ackdataForWhichImWatching[ackdata] = 0 shared.ackdataForWhichImWatching[ackdata] = 0
time.sleep( time.sleep(
@ -81,9 +81,7 @@ class singleWorker(threading.Thread):
elif command == 'sendOutOrStoreMyV4Pubkey': elif command == 'sendOutOrStoreMyV4Pubkey':
self.sendOutOrStoreMyV4Pubkey(data) self.sendOutOrStoreMyV4Pubkey(data)
else: else:
with shared.printLock: logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)
sys.stderr.write(
'Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)
shared.workerQueue.task_done() shared.workerQueue.task_done()
@ -114,9 +112,7 @@ class singleWorker(threading.Thread):
privEncryptionKeyBase58 = shared.config.get( privEncryptionKeyBase58 = shared.config.get(
myAddress, 'privencryptionkey') myAddress, 'privencryptionkey')
except Exception as err: except Exception as err:
with shared.printLock: logger.error('Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
sys.stderr.write(
'Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
return return
privSigningKeyHex = shared.decodeWalletImportFormat( privSigningKeyHex = shared.decodeWalletImportFormat(
@ -133,10 +129,10 @@ class singleWorker(threading.Thread):
# Do the POW for this pubkey message # Do the POW for this pubkey message
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
print '(For pubkey message) Doing proof of work...' logger.info('(For pubkey message) Doing proof of work...')
initialHash = hashlib.sha512(payload).digest() initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash) trialValue, nonce = proofofwork.run(target, initialHash)
print '(For pubkey message) Found proof of work', trialValue, 'Nonce:', nonce logger.info('(For pubkey message) Found proof of work ' + str(trialValue), ' Nonce: ', str(nonce))
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
@ -145,8 +141,7 @@ class singleWorker(threading.Thread):
objectType, streamNumber, payload, embeddedTime,'') objectType, streamNumber, payload, embeddedTime,'')
shared.inventorySets[streamNumber].add(inventoryHash) shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock: logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
@ -171,8 +166,7 @@ class singleWorker(threading.Thread):
#The address has been deleted. #The address has been deleted.
return return
if shared.safeConfigGetBoolean(myAddress, 'chan'): if shared.safeConfigGetBoolean(myAddress, 'chan'):
with shared.printLock: logger.info('This is a chan address. Not sending pubkey.')
print 'This is a chan address. Not sending pubkey.'
return return
status, addressVersionNumber, streamNumber, hash = decodeAddress( status, addressVersionNumber, streamNumber, hash = decodeAddress(
myAddress) myAddress)
@ -199,9 +193,7 @@ class singleWorker(threading.Thread):
privEncryptionKeyBase58 = shared.config.get( privEncryptionKeyBase58 = shared.config.get(
myAddress, 'privencryptionkey') myAddress, 'privencryptionkey')
except Exception as err: except Exception as err:
with shared.printLock: logger.error('Error within sendOutOrStoreMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
sys.stderr.write(
'Error within sendOutOrStoreMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
return return
@ -228,12 +220,10 @@ class singleWorker(threading.Thread):
# Do the POW for this pubkey message # Do the POW for this pubkey message
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
with shared.printLock: logger.info('(For pubkey message) Doing proof of work...')
print '(For pubkey message) Doing proof of work...'
initialHash = hashlib.sha512(payload).digest() initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash) trialValue, nonce = proofofwork.run(target, initialHash)
with shared.printLock: logger.info('(For pubkey message) Found proof of work. Nonce: ' + str(nonce))
print '(For pubkey message) Found proof of work. Nonce:', nonce
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
@ -242,8 +232,7 @@ class singleWorker(threading.Thread):
objectType, streamNumber, payload, embeddedTime,'') objectType, streamNumber, payload, embeddedTime,'')
shared.inventorySets[streamNumber].add(inventoryHash) shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock: logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
@ -264,8 +253,7 @@ class singleWorker(threading.Thread):
#The address has been deleted. #The address has been deleted.
return return
if shared.safeConfigGetBoolean(myAddress, 'chan'): if shared.safeConfigGetBoolean(myAddress, 'chan'):
with shared.printLock: logger.info('This is a chan address. Not sending pubkey.')
print 'This is a chan address. Not sending pubkey.'
return return
status, addressVersionNumber, streamNumber, hash = decodeAddress( status, addressVersionNumber, streamNumber, hash = decodeAddress(
myAddress) myAddress)
@ -285,9 +273,7 @@ class singleWorker(threading.Thread):
privEncryptionKeyBase58 = shared.config.get( privEncryptionKeyBase58 = shared.config.get(
myAddress, 'privencryptionkey') myAddress, 'privencryptionkey')
except Exception as err: except Exception as err:
with shared.printLock: logger.error('Error within sendOutOrStoreMyV4Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
sys.stderr.write(
'Error within sendOutOrStoreMyV4Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
return return
privSigningKeyHex = shared.decodeWalletImportFormat( privSigningKeyHex = shared.decodeWalletImportFormat(
@ -326,10 +312,10 @@ class singleWorker(threading.Thread):
# Do the POW for this pubkey message # Do the POW for this pubkey message
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
print '(For pubkey message) Doing proof of work...' logger.info('(For pubkey message) Doing proof of work...')
initialHash = hashlib.sha512(payload).digest() initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash) trialValue, nonce = proofofwork.run(target, initialHash)
print '(For pubkey message) Found proof of work', trialValue, 'Nonce:', nonce logger.info('(For pubkey message) Found proof of work ' + str(trialValue) + 'Nonce: ' + str(nonce))
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
@ -338,8 +324,7 @@ class singleWorker(threading.Thread):
objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:]) objectType, streamNumber, payload, embeddedTime, doubleHashOfAddressData[32:])
shared.inventorySets[streamNumber].add(inventoryHash) shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock: logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
@ -359,9 +344,7 @@ class singleWorker(threading.Thread):
status, addressVersionNumber, streamNumber, ripe = decodeAddress( status, addressVersionNumber, streamNumber, ripe = decodeAddress(
fromaddress) fromaddress)
if addressVersionNumber <= 1: if addressVersionNumber <= 1:
with shared.printLock: logger.error('Error: In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version.\n')
sys.stderr.write(
'Error: In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version.\n')
return return
# We need to convert our private keys to public keys in order # We need to convert our private keys to public keys in order
# to include them. # to include them.
@ -442,12 +425,12 @@ class singleWorker(threading.Thread):
dataToEncrypt, pubEncryptionKey.encode('hex')) dataToEncrypt, pubEncryptionKey.encode('hex'))
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
print '(For broadcast message) Doing proof of work...' logger.info('(For broadcast message) Doing proof of work...')
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, tr.translateText("MainWindow", "Doing work necessary to send broadcast...")))) ackdata, tr.translateText("MainWindow", "Doing work necessary to send broadcast..."))))
initialHash = hashlib.sha512(payload).digest() initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash) trialValue, nonce = proofofwork.run(target, initialHash)
print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce logger.info('(For broadcast message) Found proof of work ' + str(trialValue) + ' Nonce: ' + str(nonce))
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
@ -463,8 +446,7 @@ class singleWorker(threading.Thread):
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, tag) objectType, streamNumber, payload, embeddedTime, tag)
shared.inventorySets[streamNumber].add(inventoryHash) shared.inventorySets[streamNumber].add(inventoryHash)
with shared.printLock: logger.info('sending inv (within sendBroadcast function) for object: ' + inventoryHash.encode('hex'))
print 'sending inv (within sendBroadcast function) for object:', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
@ -612,8 +594,8 @@ class singleWorker(threading.Thread):
shared.ackdataForWhichImWatching[ackdata] = 0 shared.ackdataForWhichImWatching[ackdata] = 0
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key")))) ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key"))))
with shared.printLock: logger.info('Sending a message.')
print 'Sending a message. First 150 characters of message:', repr(message[:150]) logger.debug('First 150 characters of message: ' + repr(message[:150]))
# Let us fetch the recipient's public key out of our database. If # Let us fetch the recipient's public key out of our database. If
# the required proof of work difficulty is too hard then we'll # the required proof of work difficulty is too hard then we'll
@ -685,8 +667,8 @@ class singleWorker(threading.Thread):
requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(l10n.formatTimestamp())))) requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(l10n.formatTimestamp()))))
continue continue
else: # if we are sending a message to ourselves or a chan.. else: # if we are sending a message to ourselves or a chan..
with shared.printLock: logger.info('Sending a message.')
print 'Sending a message. First 150 characters of message:', repr(message[:150]) logger.debug('First 150 characters of message: ' + repr(message[:150]))
behaviorBitfield = '\x00\x00\x00\x01' behaviorBitfield = '\x00\x00\x00\x01'
try: try:
@ -694,9 +676,7 @@ class singleWorker(threading.Thread):
toaddress, 'privencryptionkey') toaddress, 'privencryptionkey')
except Exception as err: except Exception as err:
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: You are trying to send a message to yourself or a chan but your encryption key could not be found in the keys.dat file. Could not encrypt message. %1').arg(l10n.formatTimestamp())))) shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: You are trying to send a message to yourself or a chan but your encryption key could not be found in the keys.dat file. Could not encrypt message. %1').arg(l10n.formatTimestamp()))))
with shared.printLock: logger.error('Error within sendMsg. Could not read the keys from the keys.dat file for our own address. %s\n' % err)
sys.stderr.write(
'Error within sendMsg. Could not read the keys from the keys.dat file for our own address. %s\n' % err)
continue continue
privEncryptionKeyHex = shared.decodeWalletImportFormat( privEncryptionKeyHex = shared.decodeWalletImportFormat(
privEncryptionKeyBase58).encode('hex') privEncryptionKeyBase58).encode('hex')
@ -761,12 +741,10 @@ class singleWorker(threading.Thread):
payload += encodeVarint(len(messageToTransmit)) payload += encodeVarint(len(messageToTransmit))
payload += messageToTransmit payload += messageToTransmit
if shared.config.has_section(toaddress): if shared.config.has_section(toaddress):
with shared.printLock: logger.info('Not bothering to include ackdata because we are sending to ourselves or a chan.')
print 'Not bothering to include ackdata because we are sending to ourselves or a chan.'
fullAckPayload = '' fullAckPayload = ''
elif not shared.isBitSetWithinBitfield(behaviorBitfield,31): elif not shared.isBitSetWithinBitfield(behaviorBitfield,31):
with shared.printLock: logger.info('Not bothering to include ackdata because the receiver said that they won\'t relay it anyway.')
print 'Not bothering to include ackdata because the receiver said that they won\'t relay it anyway.'
fullAckPayload = '' fullAckPayload = ''
else: else:
fullAckPayload = self.generateFullAckMessage( fullAckPayload = self.generateFullAckMessage(
@ -791,18 +769,16 @@ class singleWorker(threading.Thread):
encryptedPayload += encodeVarint(1) # msg version encryptedPayload += encodeVarint(1) # msg version
encryptedPayload += encodeVarint(toStreamNumber) + encrypted encryptedPayload += encodeVarint(toStreamNumber) + encrypted
target = 2 ** 64 / (requiredAverageProofOfWorkNonceTrialsPerByte*(len(encryptedPayload) + 8 + requiredPayloadLengthExtraBytes + ((TTL*(len(encryptedPayload)+8+requiredPayloadLengthExtraBytes))/(2 ** 16)))) target = 2 ** 64 / (requiredAverageProofOfWorkNonceTrialsPerByte*(len(encryptedPayload) + 8 + requiredPayloadLengthExtraBytes + ((TTL*(len(encryptedPayload)+8+requiredPayloadLengthExtraBytes))/(2 ** 16))))
with shared.printLock: logger.info('(For msg message) Doing proof of work. Total required difficulty: %f. Required small message difficulty: %f.', float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte, float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)
print '(For msg message) Doing proof of work. Total required difficulty:', float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte, 'Required small message difficulty:', float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes
powStartTime = time.time() powStartTime = time.time()
initialHash = hashlib.sha512(encryptedPayload).digest() initialHash = hashlib.sha512(encryptedPayload).digest()
trialValue, nonce = proofofwork.run(target, initialHash) trialValue, nonce = proofofwork.run(target, initialHash)
with shared.printLock: logger.info('(For msg message) Found proof of work ' + str(trialValue) + ' Nonce: ' + str(nonce))
print '(For msg message) Found proof of work', trialValue, 'Nonce:', nonce try:
try: logger.info('POW took ' + str(int(time.time() - powStartTime)) + ' seconds. ' + str(nonce / (time.time() - powStartTime)) + ' nonce trials per second.')
print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.' except:
except: pass
pass
encryptedPayload = pack('>Q', nonce) + encryptedPayload encryptedPayload = pack('>Q', nonce) + encryptedPayload
@ -823,7 +799,7 @@ class singleWorker(threading.Thread):
else: else:
# not sending to a chan or one of my addresses # not sending to a chan or one of my addresses
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Waiting for acknowledgement. Sent on %1").arg(l10n.formatTimestamp())))) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Waiting for acknowledgement. Sent on %1").arg(l10n.formatTimestamp()))))
print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex') logger.info('Broadcasting inv for my msg(within sendmsg function):' + inventoryHash.encode('hex'))
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
toStreamNumber, 'advertiseobject', inventoryHash)) toStreamNumber, 'advertiseobject', inventoryHash))
@ -871,8 +847,7 @@ class singleWorker(threading.Thread):
toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress( toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress(
toAddress) toAddress)
if toStatus != 'success': if toStatus != 'success':
with shared.printLock: logger.error('Very abnormal error occurred in requestPubKey. toAddress is: ' + repr(
sys.stderr.write('Very abnormal error occurred in requestPubKey. toAddress is: ' + repr(
toAddress) + '. Please report this error to Atheros.') toAddress) + '. Please report this error to Atheros.')
return return
@ -907,12 +882,10 @@ class singleWorker(threading.Thread):
payload += encodeVarint(streamNumber) payload += encodeVarint(streamNumber)
if addressVersionNumber <= 3: if addressVersionNumber <= 3:
payload += ripe payload += ripe
with shared.printLock: logger.info('making request for pubkey with ripe:', ripe.encode('hex'))
print 'making request for pubkey with ripe:', ripe.encode('hex')
else: else:
payload += tag payload += tag
with shared.printLock: logger.info('making request for v4 pubkey with tag:', tag.encode('hex'))
print 'making request for v4 pubkey with tag:', tag.encode('hex')
# print 'trial value', trialValue # print 'trial value', trialValue
statusbar = 'Doing the computations necessary to request the recipient\'s public key.' statusbar = 'Doing the computations necessary to request the recipient\'s public key.'
@ -923,8 +896,7 @@ class singleWorker(threading.Thread):
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
initialHash = hashlib.sha512(payload).digest() initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash) trialValue, nonce = proofofwork.run(target, initialHash)
with shared.printLock: logger.info('Found proof of work ' + str(trialValue) + ' Nonce: ' + str(nonce))
print 'Found proof of work', trialValue, 'Nonce:', nonce
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
inventoryHash = calculateInventoryHash(payload) inventoryHash = calculateInventoryHash(payload)
@ -932,7 +904,7 @@ class singleWorker(threading.Thread):
shared.inventory[inventoryHash] = ( shared.inventory[inventoryHash] = (
objectType, streamNumber, payload, embeddedTime, '') objectType, streamNumber, payload, embeddedTime, '')
shared.inventorySets[streamNumber].add(inventoryHash) shared.inventorySets[streamNumber].add(inventoryHash)
print 'sending inv (for the getpubkey message)' logger.info('sending inv (for the getpubkey message)')
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
streamNumber, 'advertiseobject', inventoryHash)) streamNumber, 'advertiseobject', inventoryHash))
@ -976,18 +948,16 @@ class singleWorker(threading.Thread):
payload += encodeVarint(toStreamNumber) + ackdata payload += encodeVarint(toStreamNumber) + ackdata
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16)))) target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
with shared.printLock: logger.info('(For ack message) Doing proof of work. TTL set to ' + str(TTL))
print '(For ack message) Doing proof of work. TTL set to', TTL
powStartTime = time.time() powStartTime = time.time()
initialHash = hashlib.sha512(payload).digest() initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash) trialValue, nonce = proofofwork.run(target, initialHash)
with shared.printLock: logger.info('(For ack message) Found proof of work ' + str(trialValue) + ' Nonce: ' + str(nonce))
print '(For ack message) Found proof of work', trialValue, 'Nonce:', nonce try:
try: logger.info('POW took ' + str(time.time() - powStartTime) + ' seconds. ' + str(nonce / (time.time() - powStartTime)) + ' nonce trials per second.')
print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.' except:
except: pass
pass
payload = pack('>Q', nonce) + payload payload = pack('>Q', nonce) + payload
return shared.CreatePacket('object', payload) return shared.CreatePacket('object', payload)

View File

@ -19,7 +19,7 @@ import tr#anslate
class sqlThread(threading.Thread): class sqlThread(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self, name="SQL")
def run(self): def run(self):
self.conn = sqlite3.connect(shared.appdata + 'messages.dat') self.conn = sqlite3.connect(shared.appdata + 'messages.dat')

View File

@ -18,15 +18,33 @@ Use: `from debug import logger` to import this facility into whatever module you
''' '''
import logging import logging
import logging.config import logging.config
import os
import shared import shared
import sys import sys
import traceback
import helper_startup import helper_startup
helper_startup.loadConfig() helper_startup.loadConfig()
# TODO(xj9): Get from a config file. # TODO(xj9): Get from a config file.
log_level = 'DEBUG' log_level = 'DEBUG'
def log_uncaught_exceptions(ex_cls, ex, tb):
logging.critical(''.join(traceback.format_tb(tb)))
logging.critical('{0}: {1}'.format(ex_cls, ex))
def configureLogging(): def configureLogging():
have_logging = False
try:
logging.config.fileConfig(os.path.join (shared.appdata, 'logging.dat'))
have_logging = True
except:
pass
sys.excepthook = log_uncaught_exceptions
if have_logging:
return False
logging.config.dictConfig({ logging.config.dictConfig({
'version': 1, 'version': 1,
'formatters': { 'formatters': {
@ -69,13 +87,17 @@ def configureLogging():
'handlers': ['console'], 'handlers': ['console'],
}, },
}) })
return True
# TODO (xj9): Get from a config file. # TODO (xj9): Get from a config file.
#logger = logging.getLogger('console_only') #logger = logging.getLogger('console_only')
configureLogging() if configureLogging():
if '-c' in sys.argv: if '-c' in sys.argv:
logger = logging.getLogger('file_only') logger = logging.getLogger('file_only')
else:
logger = logging.getLogger('both')
else: else:
logger = logging.getLogger('both') logger = logging.getLogger('default')
def restartLoggingInUpdatedAppdataLocation(): def restartLoggingInUpdatedAppdataLocation():
global logger global logger
@ -83,9 +105,11 @@ def restartLoggingInUpdatedAppdataLocation():
logger.removeHandler(i) logger.removeHandler(i)
i.flush() i.flush()
i.close() i.close()
configureLogging() if configureLogging():
if '-c' in sys.argv: if '-c' in sys.argv:
logger = logging.getLogger('file_only') logger = logging.getLogger('file_only')
else:
logger = logging.getLogger('both')
else: else:
logger = logging.getLogger('both') logger = logging.getLogger('default')

View File

@ -4,6 +4,8 @@ import defaultKnownNodes
import pickle import pickle
import time import time
from debug import logger
def knownNodes(): def knownNodes():
try: try:
# We shouldn't have to use the shared.knownNodesLock because this had # We shouldn't have to use the shared.knownNodesLock because this had
@ -26,7 +28,7 @@ def knownNodes():
except: except:
shared.knownNodes = defaultKnownNodes.createDefaultKnownNodes(shared.appdata) shared.knownNodes = defaultKnownNodes.createDefaultKnownNodes(shared.appdata)
if shared.config.getint('bitmessagesettings', 'settingsversion') > 10: if shared.config.getint('bitmessagesettings', 'settingsversion') > 10:
print 'Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.' logger.error('Bitmessage cannot read future versions of the keys file (keys.dat). Run the newer version of Bitmessage.')
raise SystemExit raise SystemExit
def dns(): def dns():
@ -39,16 +41,16 @@ def dns():
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none': if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none':
try: try:
for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80): for item in socket.getaddrinfo('bootstrap8080.bitmessage.org', 80):
print 'Adding', item[4][0], 'to knownNodes based on DNS boostrap method' logger.info('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method')
shared.knownNodes[1][shared.Peer(item[4][0], 8080)] = int(time.time()) shared.knownNodes[1][shared.Peer(item[4][0], 8080)] = int(time.time())
except: except:
print 'bootstrap8080.bitmessage.org DNS bootstrapping failed.' logger.error('bootstrap8080.bitmessage.org DNS bootstrapping failed.')
try: try:
for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80): for item in socket.getaddrinfo('bootstrap8444.bitmessage.org', 80):
print 'Adding', item[4][0], 'to knownNodes based on DNS boostrap method' logger.info ('Adding ' + item[4][0] + ' to knownNodes based on DNS bootstrap method')
shared.knownNodes[1][shared.Peer(item[4][0], 8444)] = int(time.time()) shared.knownNodes[1][shared.Peer(item[4][0], 8444)] = int(time.time())
except: except:
print 'bootstrap8444.bitmessage.org DNS bootstrapping failed.' logger.error('bootstrap8444.bitmessage.org DNS bootstrapping failed.')
else: else:
print 'DNS bootstrap skipped because SOCKS is used.' logger.info('DNS bootstrap skipped because SOCKS is used.')