Improved logging
Fixes #118 - changed almost all "print" into logger - threads have nicer names - logger can have configuration in "logger.dat" in the same directory as "keys.dat", and the logger will pick the one named "default" to replace the "console" and "file" that are in PyBitmessage otherwise Example file for logging to syslog: [loggers] keys = root,syslog [logger_root] level=NOTSET handlers=syslog [logger_syslog] level=DEBUG handlers=syslog qualname=default [handlers] keys = syslog [handler_syslog] class = handlers.SysLogHandler formatter = syslog level = DEBUG args=(('localhost', handlers.SYSLOG_UDP_PORT), handlers.SysLogHandler.LOG_LOCAL7) [formatters] keys = syslog [formatter_syslog] format=%(asctime)s %(threadName)s %(filename)s@%(lineno)d %(message)s datefmt=%b %d %H:%M:%S
This commit is contained in:
parent
28f46cb8b3
commit
231219a193
|
@ -55,7 +55,7 @@ def decodeBase58(string, alphabet=ALPHABET):
|
|||
|
||||
def encodeVarint(integer):
|
||||
if integer < 0:
|
||||
print 'varint cannot be < 0'
|
||||
logger.error('varint cannot be < 0')
|
||||
raise SystemExit
|
||||
if integer < 253:
|
||||
return pack('>B',integer)
|
||||
|
@ -66,7 +66,7 @@ def encodeVarint(integer):
|
|||
if integer >= 4294967296 and integer < 18446744073709551616:
|
||||
return pack('>B',255) + pack('>Q',integer)
|
||||
if integer >= 18446744073709551616:
|
||||
print 'varint cannot be >= 18446744073709551616'
|
||||
logger.error('varint cannot be >= 18446744073709551616')
|
||||
raise SystemExit
|
||||
|
||||
class varintDecodeError(Exception):
|
||||
|
@ -185,25 +185,25 @@ def decodeAddress(address):
|
|||
try:
|
||||
addressVersionNumber, bytesUsedByVersionNumber = decodeVarint(data[:9])
|
||||
except varintDecodeError as e:
|
||||
print e
|
||||
logger.error(str(e))
|
||||
status = 'varintmalformed'
|
||||
return status,0,0,""
|
||||
#print 'addressVersionNumber', addressVersionNumber
|
||||
#print 'bytesUsedByVersionNumber', bytesUsedByVersionNumber
|
||||
|
||||
if addressVersionNumber > 4:
|
||||
print 'cannot decode address version numbers this high'
|
||||
logger.error('cannot decode address version numbers this high')
|
||||
status = 'versiontoohigh'
|
||||
return status,0,0,""
|
||||
elif addressVersionNumber == 0:
|
||||
print 'cannot decode address version numbers of zero.'
|
||||
logger.error('cannot decode address version numbers of zero.')
|
||||
status = 'versiontoohigh'
|
||||
return status,0,0,""
|
||||
|
||||
try:
|
||||
streamNumber, bytesUsedByStreamNumber = decodeVarint(data[bytesUsedByVersionNumber:])
|
||||
except varintDecodeError as e:
|
||||
print e
|
||||
logger.error(str(e))
|
||||
status = 'varintmalformed'
|
||||
return status,0,0,""
|
||||
#print streamNumber
|
||||
|
|
|
@ -12,14 +12,14 @@ try:
|
|||
from PyQt4.QtGui import *
|
||||
|
||||
except Exception as err:
|
||||
print 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download it from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\' (without quotes).'
|
||||
print 'Error message:', err
|
||||
logger.error( 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download it from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\' (without quotes).')
|
||||
logger.error('Error message: ' + str(err))
|
||||
sys.exit()
|
||||
|
||||
try:
|
||||
_encoding = QtGui.QApplication.UnicodeUTF8
|
||||
except AttributeError:
|
||||
print 'QtGui.QApplication.UnicodeUTF8 error:', err
|
||||
logger.error('QtGui.QApplication.UnicodeUTF8 error: ' + str(err))
|
||||
|
||||
from addresses import *
|
||||
import shared
|
||||
|
@ -503,7 +503,6 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
if len(db[toAddress]) > 0:
|
||||
j = 0
|
||||
for f, c in db[toAddress].iteritems():
|
||||
print "adding %s, %i" % (f, c)
|
||||
subwidget = Ui_FolderWidget(widget, j, toAddress, f, c)
|
||||
j += 1
|
||||
widget.setUnreadCount(unread)
|
||||
|
@ -742,7 +741,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
if nc.test()[0] == 'failed':
|
||||
self.ui.pushButtonFetchNamecoinID.hide()
|
||||
except:
|
||||
print 'There was a problem testing for a Namecoin daemon. Hiding the Fetch Namecoin ID button'
|
||||
logger.error('There was a problem testing for a Namecoin daemon. Hiding the Fetch Namecoin ID button')
|
||||
self.ui.pushButtonFetchNamecoinID.hide()
|
||||
|
||||
def updateTTL(self, sliderPosition):
|
||||
|
@ -1282,7 +1281,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
|
||||
# has messageing menu been installed
|
||||
if not withMessagingMenu:
|
||||
print 'WARNING: MessagingMenu is not available. Is libmessaging-menu-dev installed?'
|
||||
logger.warning('WARNING: MessagingMenu is not available. Is libmessaging-menu-dev installed?')
|
||||
return
|
||||
|
||||
# create the menu server
|
||||
|
@ -1295,7 +1294,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
self.ubuntuMessagingMenuUnread(True)
|
||||
except Exception:
|
||||
withMessagingMenu = False
|
||||
print 'WARNING: messaging menu disabled'
|
||||
logger.warning('WARNING: messaging menu disabled')
|
||||
|
||||
# update the Ubuntu messaging menu
|
||||
def ubuntuMessagingMenuUpdate(self, drawAttention, newItem, toLabel):
|
||||
|
@ -1307,7 +1306,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
|
||||
# has messageing menu been installed
|
||||
if not withMessagingMenu:
|
||||
print 'WARNING: messaging menu disabled or libmessaging-menu-dev not installed'
|
||||
logger.warning('WARNING: messaging menu disabled or libmessaging-menu-dev not installed')
|
||||
return
|
||||
|
||||
# remember this item to that the messaging menu can find it
|
||||
|
@ -1401,7 +1400,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
stdout=subprocess.PIPE)
|
||||
gst_available=True
|
||||
except:
|
||||
print "WARNING: gst123 must be installed in order to play mp3 sounds"
|
||||
logger.warning("WARNING: gst123 must be installed in order to play mp3 sounds")
|
||||
if not gst_available:
|
||||
try:
|
||||
subprocess.call(["mpg123", soundFilename],
|
||||
|
@ -1409,14 +1408,14 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
stdout=subprocess.PIPE)
|
||||
gst_available=True
|
||||
except:
|
||||
print "WARNING: mpg123 must be installed in order to play mp3 sounds"
|
||||
logger.warning("WARNING: mpg123 must be installed in order to play mp3 sounds")
|
||||
else:
|
||||
try:
|
||||
subprocess.call(["aplay", soundFilename],
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE)
|
||||
except:
|
||||
print "WARNING: aplay must be installed in order to play WAV sounds"
|
||||
logger.warning("WARNING: aplay must be installed in order to play WAV sounds")
|
||||
elif sys.platform[0:3] == 'win':
|
||||
# use winsound on Windows
|
||||
import winsound
|
||||
|
@ -1533,7 +1532,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
shared.apiAddressGeneratorReturnQueue.queue.clear()
|
||||
shared.addressGeneratorQueue.put(('createChan', 4, 1, self.str_chan + ' ' + str(self.newChanDialogInstance.ui.lineEditChanNameCreate.text().toUtf8()), self.newChanDialogInstance.ui.lineEditChanNameCreate.text().toUtf8()))
|
||||
addressGeneratorReturnValue = shared.apiAddressGeneratorReturnQueue.get()
|
||||
print 'addressGeneratorReturnValue', addressGeneratorReturnValue
|
||||
logger.debug('addressGeneratorReturnValue ' + addressGeneratorReturnValue)
|
||||
if len(addressGeneratorReturnValue) == 0:
|
||||
QMessageBox.about(self, _translate("MainWindow", "Address already present"), _translate(
|
||||
"MainWindow", "Could not add chan because it appears to already be one of your identities."))
|
||||
|
@ -1558,7 +1557,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
shared.apiAddressGeneratorReturnQueue.queue.clear()
|
||||
shared.addressGeneratorQueue.put(('joinChan', addBMIfNotPresent(self.newChanDialogInstance.ui.lineEditChanBitmessageAddress.text()), self.str_chan + ' ' + str(self.newChanDialogInstance.ui.lineEditChanNameJoin.text().toUtf8()), self.newChanDialogInstance.ui.lineEditChanNameJoin.text().toUtf8()))
|
||||
addressGeneratorReturnValue = shared.apiAddressGeneratorReturnQueue.get()
|
||||
print 'addressGeneratorReturnValue', addressGeneratorReturnValue
|
||||
logger.debug('addressGeneratorReturnValue ' + addressGeneratorReturnValue)
|
||||
if addressGeneratorReturnValue == 'chan name does not match address':
|
||||
QMessageBox.about(self, _translate("MainWindow", "Address does not match chan name"), _translate(
|
||||
"MainWindow", "Although the Bitmessage address you entered was valid, it doesn\'t match the chan name."))
|
||||
|
@ -2125,13 +2124,12 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
acct.createMessage(toAddress, fromAddress, subject, message)
|
||||
subject = acct.subject
|
||||
toAddress = acct.toAddress
|
||||
print "Subject: %s" % (subject)
|
||||
print "address: %s" % (toAddress)
|
||||
logger.debug("Subject: %s" % (subject))
|
||||
logger.debug("address: %s" % (toAddress))
|
||||
status, addressVersionNumber, streamNumber, ripe = decodeAddress(
|
||||
toAddress)
|
||||
if status != 'success':
|
||||
with shared.printLock:
|
||||
print 'Error: Could not decode', toAddress, ':', status
|
||||
logger.error('Error: Could not decode ' + toAddress + ':' + status)
|
||||
|
||||
if status == 'missingbm':
|
||||
self.statusBar().showMessage(_translate(
|
||||
|
@ -2499,7 +2497,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
shared.objectProcessorQueue.put((objectType,payload))
|
||||
|
||||
def click_pushButtonStatusIcon(self):
|
||||
print 'click_pushButtonStatusIcon'
|
||||
logger.debug('click_pushButtonStatusIcon')
|
||||
self.iconGlossaryInstance = iconGlossaryDialog(self)
|
||||
if self.iconGlossaryInstance.exec_():
|
||||
pass
|
||||
|
@ -2810,12 +2808,10 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
if acct.type != 'normal':
|
||||
return
|
||||
if self.dialog.ui.radioButtonUnregister.isChecked() and isinstance(acct, GatewayAccount):
|
||||
print "unregister"
|
||||
acct.unregister()
|
||||
shared.config.remove_option(addressAtCurrentRow, 'gateway')
|
||||
shared.writeKeysFile()
|
||||
elif self.dialog.ui.radioButtonRegister.isChecked():
|
||||
print "register"
|
||||
email = str(self.dialog.ui.lineEditEmail.text().toUtf8())
|
||||
acct = MailchuckAccount(addressAtCurrentRow)
|
||||
acct.register(email)
|
||||
|
@ -2864,7 +2860,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
shared.addressGeneratorQueue.put(('createDeterministicAddresses', 4, streamNumberForAddress, "unused deterministic address", self.dialog.ui.spinBoxNumberOfAddressesToMake.value(
|
||||
), self.dialog.ui.lineEditPassphrase.text().toUtf8(), self.dialog.ui.checkBoxEighteenByteRipe.isChecked()))
|
||||
else:
|
||||
print 'new address dialog box rejected'
|
||||
logger.debug('new address dialog box rejected')
|
||||
|
||||
# Quit selected from menu or application indicator
|
||||
def quit(self):
|
||||
|
@ -3052,7 +3048,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
# If the previous message was to a chan then we should send our reply to the chan rather than to the particular person who sent the message.
|
||||
if shared.config.has_section(toAddressAtCurrentInboxRow):
|
||||
if shared.safeConfigGetBoolean(toAddressAtCurrentInboxRow, 'chan'):
|
||||
print 'original sent to a chan. Setting the to address in the reply to the chan address.'
|
||||
logger.debug('original sent to a chan. Setting the to address in the reply to the chan address.')
|
||||
self.ui.lineEditTo.setText(str(toAddressAtCurrentInboxRow))
|
||||
|
||||
listOfAddressesInComboBoxSendFrom = [str(widget['from'].itemData(i).toPyObject()) for i in range(widget['from'].count())]
|
||||
|
@ -3727,7 +3723,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
if sourcefile != '':
|
||||
copied = QtCore.QFile.copy(sourcefile, destination)
|
||||
if not copied:
|
||||
print 'couldn\'t copy :('
|
||||
logger.error('couldn\'t copy :(')
|
||||
# set the icon
|
||||
self.rerenderTabTreeMessages()
|
||||
self.rerenderTabTreeSubscriptions()
|
||||
|
@ -3965,8 +3961,7 @@ class MyForm(settingsmixin.SMainWindow):
|
|||
|
||||
def updateStatusBar(self, data):
|
||||
if data != "":
|
||||
with shared.printLock:
|
||||
print 'Status bar:', data
|
||||
logger.info('Status bar: ' + data)
|
||||
|
||||
self.statusBar().showMessage(data)
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ class objectProcessor(threading.Thread):
|
|||
objecs (msg, broadcast, pubkey, getpubkey) from the receiveDataThreads.
|
||||
"""
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="objectProcessor")
|
||||
"""
|
||||
It may be the case that the last time Bitmessage was running, the user
|
||||
closed it before it finished processing everything in the
|
||||
|
@ -741,8 +741,7 @@ class objectProcessor(threading.Thread):
|
|||
|
||||
fromAddress = encodeAddress(
|
||||
sendersAddressVersion, sendersStream, calculatedRipe)
|
||||
with shared.printLock:
|
||||
print 'fromAddress:', fromAddress
|
||||
logger.debug('fromAddress: ' + fromAddress)
|
||||
|
||||
if messageEncodingType == 2:
|
||||
subject, body = self.decodeType2Message(message)
|
||||
|
|
|
@ -16,7 +16,7 @@ from class_receiveDataThread import *
|
|||
class outgoingSynSender(threading.Thread):
|
||||
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="outgoingSynSender")
|
||||
|
||||
def setup(self, streamNumber, selfInitiatedConnections):
|
||||
self.streamNumber = streamNumber
|
||||
|
@ -40,6 +40,7 @@ class outgoingSynSender(threading.Thread):
|
|||
while shared.safeConfigGetBoolean('bitmessagesettings', 'dontconnect'):
|
||||
time.sleep(2)
|
||||
while shared.safeConfigGetBoolean('bitmessagesettings', 'sendoutgoingconnections'):
|
||||
self.name = "outgoingSynSender"
|
||||
maximumConnections = 1 if shared.trustedPeer else 8 # maximum number of outgoing connections = 8
|
||||
while len(self.selfInitiatedConnections[self.streamNumber]) >= maximumConnections:
|
||||
time.sleep(10)
|
||||
|
@ -64,6 +65,7 @@ class outgoingSynSender(threading.Thread):
|
|||
shared.alreadyAttemptedConnectionsListLock.acquire()
|
||||
shared.alreadyAttemptedConnectionsList[peer] = 0
|
||||
shared.alreadyAttemptedConnectionsListLock.release()
|
||||
self.name = "outgoingSynSender-" + peer.host
|
||||
if peer.host.find(':') == -1:
|
||||
address_family = socket.AF_INET
|
||||
else:
|
||||
|
@ -86,22 +88,19 @@ class outgoingSynSender(threading.Thread):
|
|||
except:
|
||||
pass
|
||||
shared.knownNodesLock.release()
|
||||
with shared.printLock:
|
||||
print 'deleting ', peer, 'from shared.knownNodes because it caused a socks.socksocket exception. We must not be 64-bit compatible.'
|
||||
logger.debug('deleting ' + str(peer) + ' from shared.knownNodes because it caused a socks.socksocket exception. We must not be 64-bit compatible.')
|
||||
continue
|
||||
# This option apparently avoids the TIME_WAIT state so that we
|
||||
# can rebind faster
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.settimeout(20)
|
||||
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
|
||||
with shared.printLock:
|
||||
print 'Trying an outgoing connection to', peer
|
||||
logger.debug('Trying an outgoing connection to ' + str(peer))
|
||||
|
||||
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
|
||||
if shared.verbose >= 2:
|
||||
with shared.printLock:
|
||||
print '(Using SOCKS4a) Trying an outgoing connection to', peer
|
||||
logger.debug ('(Using SOCKS4a) Trying an outgoing connection to ' + str(peer))
|
||||
|
||||
proxytype = socks.PROXY_TYPE_SOCKS4
|
||||
sockshostname = shared.config.get(
|
||||
|
@ -121,8 +120,7 @@ class outgoingSynSender(threading.Thread):
|
|||
proxytype, sockshostname, socksport, rdns)
|
||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
||||
if shared.verbose >= 2:
|
||||
with shared.printLock:
|
||||
print '(Using SOCKS5) Trying an outgoing connection to', peer
|
||||
logger.debug ('(Using SOCKS5) Trying an outgoing connection to ' + str(peer))
|
||||
|
||||
proxytype = socks.PROXY_TYPE_SOCKS5
|
||||
sockshostname = shared.config.get(
|
||||
|
@ -155,8 +153,7 @@ class outgoingSynSender(threading.Thread):
|
|||
self.selfInitiatedConnections,
|
||||
sendDataThreadQueue)
|
||||
rd.start()
|
||||
with shared.printLock:
|
||||
print self, 'connected to', peer, 'during an outgoing attempt.'
|
||||
logger.debug(str(self) + ' connected to ' + str(peer) + ' during an outgoing attempt.')
|
||||
|
||||
|
||||
sd = sendDataThread(sendDataThreadQueue)
|
||||
|
@ -167,8 +164,7 @@ class outgoingSynSender(threading.Thread):
|
|||
|
||||
except socks.GeneralProxyError as err:
|
||||
if shared.verbose >= 2:
|
||||
with shared.printLock:
|
||||
print 'Could NOT connect to', peer, 'during outgoing attempt.', err
|
||||
logger.debug('Could NOT connect to ' + str(peer) + ' during outgoing attempt. ' + str(err))
|
||||
|
||||
deletedPeer = None
|
||||
with shared.knownNodesLock:
|
||||
|
@ -186,8 +182,7 @@ class outgoingSynSender(threading.Thread):
|
|||
del shared.knownNodes[self.streamNumber][peer]
|
||||
deletedPeer = peer
|
||||
if deletedPeer:
|
||||
with shared.printLock:
|
||||
print 'deleting', peer, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||
str ('deleting ' + str(peer) + ' from shared.knownNodes because it is more than 48 hours old and we could not connect to it.')
|
||||
|
||||
except socks.Socks5AuthError as err:
|
||||
shared.UISignalQueue.put((
|
||||
|
@ -195,16 +190,15 @@ class outgoingSynSender(threading.Thread):
|
|||
"MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err))))
|
||||
except socks.Socks5Error as err:
|
||||
pass
|
||||
print 'SOCKS5 error. (It is possible that the server wants authentication).)', str(err)
|
||||
logger.error('SOCKS5 error. (It is possible that the server wants authentication).) ' + str(err))
|
||||
except socks.Socks4Error as err:
|
||||
print 'Socks4Error:', err
|
||||
logger.error('Socks4Error: ' + str(err))
|
||||
except socket.error as err:
|
||||
if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
||||
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
|
||||
logger.error('Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err))
|
||||
else:
|
||||
if shared.verbose >= 1:
|
||||
with shared.printLock:
|
||||
print 'Could NOT connect to', peer, 'during outgoing attempt.', err
|
||||
logger.error('Could NOT connect to ' + str(peer) + 'during outgoing attempt. ' + str(err))
|
||||
|
||||
deletedPeer = None
|
||||
with shared.knownNodesLock:
|
||||
|
@ -222,12 +216,9 @@ class outgoingSynSender(threading.Thread):
|
|||
del shared.knownNodes[self.streamNumber][peer]
|
||||
deletedPeer = peer
|
||||
if deletedPeer:
|
||||
with shared.printLock:
|
||||
print 'deleting', peer, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||
logger.debug('deleting ' + str(peer) + ' from shared.knownNodes because it is more than 48 hours old and we could not connect to it.')
|
||||
|
||||
except Exception as err:
|
||||
sys.stderr.write(
|
||||
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ')
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
logger.exception('An exception has occurred in the outgoingSynSender thread that was not caught by other exception types:')
|
||||
time.sleep(0.1)
|
||||
|
|
|
@ -29,7 +29,7 @@ from debug import logger
|
|||
class receiveDataThread(threading.Thread):
|
||||
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="receiveData")
|
||||
self.data = ''
|
||||
self.verackSent = False
|
||||
self.verackReceived = False
|
||||
|
@ -46,6 +46,7 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
self.sock = sock
|
||||
self.peer = shared.Peer(HOST, port)
|
||||
self.name = "receiveData-" + self.peer.host
|
||||
self.streamNumber = streamNumber
|
||||
self.objectsThatWeHaveYetToGetFromThisPeer = {}
|
||||
self.selfInitiatedConnections = selfInitiatedConnections
|
||||
|
@ -62,8 +63,7 @@ class receiveDataThread(threading.Thread):
|
|||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||
|
||||
def run(self):
|
||||
with shared.printLock:
|
||||
print 'receiveDataThread starting. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||
logger.debug('receiveDataThread starting. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
|
||||
|
||||
while True:
|
||||
if shared.config.getint('bitmessagesettings', 'maxdownloadrate') == 0:
|
||||
|
@ -89,36 +89,31 @@ class receiveDataThread(threading.Thread):
|
|||
shared.numberOfBytesReceived += len(dataRecv) # for the 'network status' UI tab. The UI clears this value whenever it updates.
|
||||
shared.numberOfBytesReceivedLastSecond += len(dataRecv) # for the download rate limit
|
||||
except socket.timeout:
|
||||
with shared.printLock:
|
||||
print 'Timeout occurred waiting for data from', self.peer, '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||
logger.error ('Timeout occurred waiting for data from ' + str(self.peer) + '. Closing receiveData thread. (ID: ' + str(id(self)) + ')')
|
||||
break
|
||||
except Exception as err:
|
||||
if (sys.platform == 'win32' and err.errno in ([2, 10035])) or (sys.platform != 'win32' and err.errno == errno.EWOULDBLOCK):
|
||||
select.select([self.sslSock], [], [])
|
||||
continue
|
||||
with shared.printLock:
|
||||
print 'sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID:', str(id(self)) + ').', str(err.errno), "/", err
|
||||
logger.error('sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID: ' + str(id(self)) + ').' + str(err.errno) + "/" + str(err))
|
||||
break
|
||||
# print 'Received', repr(self.data)
|
||||
if len(self.data) == dataLen: # If self.sock.recv returned no data:
|
||||
with shared.printLock:
|
||||
print 'Connection to', self.peer, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||
logger.debug('Connection to ' + str(self.peer) + ' closed. Closing receiveData thread. (ID: ' + str(id(self)) + ')')
|
||||
break
|
||||
else:
|
||||
self.processData()
|
||||
|
||||
try:
|
||||
del self.selfInitiatedConnections[self.streamNumber][self]
|
||||
with shared.printLock:
|
||||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||
logger.debug('removed self (a receiveDataThread) from selfInitiatedConnections')
|
||||
except:
|
||||
pass
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data')) # commands the corresponding sendDataThread to shut itself down.
|
||||
try:
|
||||
del shared.connectedHostsList[self.peer.host]
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
print 'Could not delete', self.peer.host, 'from shared.connectedHostsList.', err
|
||||
logger.error('Could not delete ' + str(self.peer.host) + ' from shared.connectedHostsList.' + str(err))
|
||||
|
||||
try:
|
||||
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||
|
@ -126,8 +121,7 @@ class receiveDataThread(threading.Thread):
|
|||
except:
|
||||
pass
|
||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||
with shared.printLock:
|
||||
print 'receiveDataThread ending. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||
logger.debug('receiveDataThread ending. ID ' + str(id(self)) + '. The size of the shared.connectedHostsList is now ' + str(len(shared.connectedHostsList)))
|
||||
|
||||
|
||||
def processData(self):
|
||||
|
@ -148,7 +142,7 @@ class receiveDataThread(threading.Thread):
|
|||
return
|
||||
payload = self.data[shared.Header.size:payloadLength + shared.Header.size]
|
||||
if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message.
|
||||
print 'Checksum incorrect. Clearing this message.'
|
||||
logger.error('Checksum incorrect. Clearing this message.')
|
||||
self.data = self.data[payloadLength + shared.Header.size:]
|
||||
del magic,command,payloadLength,checksum,payload # better to clean up before the recursive call
|
||||
self.processData()
|
||||
|
@ -163,8 +157,7 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
#Strip the nulls
|
||||
command = command.rstrip('\x00')
|
||||
with shared.printLock:
|
||||
print 'remoteCommand', repr(command), ' from', self.peer
|
||||
logger.debug('remoteCommand ' + repr(command) + ' from ' + str(self.peer))
|
||||
|
||||
try:
|
||||
#TODO: Use a dispatcher here
|
||||
|
@ -202,14 +195,12 @@ class receiveDataThread(threading.Thread):
|
|||
objectHash, = random.sample(
|
||||
self.objectsThatWeHaveYetToGetFromThisPeer, 1)
|
||||
if objectHash in shared.inventory:
|
||||
with shared.printLock:
|
||||
print 'Inventory (in memory) already has object listed in inv message.'
|
||||
logger.debug('Inventory (in memory) already has object listed in inv message.')
|
||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||
objectHash]
|
||||
elif shared.isInSqlInventory(objectHash):
|
||||
if shared.verbose >= 3:
|
||||
with shared.printLock:
|
||||
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
||||
logger.debug('Inventory (SQL on disk) already has object listed in inv message.')
|
||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||
objectHash]
|
||||
else:
|
||||
|
@ -218,8 +209,7 @@ class receiveDataThread(threading.Thread):
|
|||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||
objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway.
|
||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||
with shared.printLock:
|
||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
|
||||
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
|
||||
try:
|
||||
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||
|
@ -228,16 +218,14 @@ class receiveDataThread(threading.Thread):
|
|||
break
|
||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
|
||||
with shared.printLock:
|
||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
|
||||
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now 0')
|
||||
try:
|
||||
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||
except:
|
||||
pass
|
||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||
with shared.printLock:
|
||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||
logger.debug('(concerning' + str(self.peer) + ') number of objectsThatWeHaveYetToGetFromThisPeer is now ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)))
|
||||
|
||||
shared.numberOfObjectsThatWeHaveYetToGetPerPeer[self.peer] = len(
|
||||
self.objectsThatWeHaveYetToGetFromThisPeer) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||
|
@ -245,14 +233,12 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
|
||||
def sendpong(self):
|
||||
with shared.printLock:
|
||||
print 'Sending pong'
|
||||
logger.debug('Sending pong')
|
||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('pong')))
|
||||
|
||||
|
||||
def recverack(self):
|
||||
with shared.printLock:
|
||||
print 'verack received'
|
||||
logger.debug('verack received')
|
||||
self.verackReceived = True
|
||||
if self.verackSent:
|
||||
# We have thus both sent and received a verack.
|
||||
|
@ -289,11 +275,10 @@ class receiveDataThread(threading.Thread):
|
|||
self.sock.settimeout(
|
||||
600) # We'll send out a pong every 5 minutes to make sure the connection stays alive if there has been no other traffic to send lately.
|
||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||
with shared.printLock:
|
||||
print 'Connection fully established with', self.peer
|
||||
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
||||
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
||||
print 'broadcasting addr from within connectionFullyEstablished function.'
|
||||
logger.debug('Connection fully established with ' + str(self.peer) + "\n" + \
|
||||
'The size of the connectedHostsList is now ' + str(len(shared.connectedHostsList)) + "\n" + \
|
||||
'The length of sendDataQueues is now: ' + str(len(shared.sendDataQueues)) + "\n" + \
|
||||
'broadcasting addr from within connectionFullyEstablished function.')
|
||||
|
||||
# Let all of our peers know about this new node.
|
||||
dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort)
|
||||
|
@ -302,8 +287,7 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
self.sendaddr() # This is one large addr message to this one peer.
|
||||
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
||||
with shared.printLock:
|
||||
print 'We are connected to too many people. Closing connection.'
|
||||
logger.info ('We are connected to too many people. Closing connection.')
|
||||
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
return
|
||||
|
@ -349,8 +333,7 @@ class receiveDataThread(threading.Thread):
|
|||
# function for broadcasting invs to everyone in our stream.
|
||||
def sendinvMessageToJustThisOnePeer(self, numberOfObjects, payload):
|
||||
payload = encodeVarint(numberOfObjects) + payload
|
||||
with shared.printLock:
|
||||
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
|
||||
logger.debug('Sending huge inv message with ' + str(numberOfObjects) + ' objects to just this one peer')
|
||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('inv', payload)))
|
||||
|
||||
def _sleepForTimingAttackMitigation(self, sleepTime):
|
||||
|
@ -358,8 +341,7 @@ class receiveDataThread(threading.Thread):
|
|||
# only connected to the trusted peer because we can trust the
|
||||
# peer not to attack
|
||||
if sleepTime > 0 and doTimingAttackMitigation and shared.trustedPeer == None:
|
||||
with shared.printLock:
|
||||
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
|
||||
logger.debug('Timing attack mitigation: Sleeping for ' + str(sleepTime) + ' seconds.')
|
||||
time.sleep(sleepTime)
|
||||
|
||||
def recerror(self, data):
|
||||
|
@ -417,30 +399,27 @@ class receiveDataThread(threading.Thread):
|
|||
if len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer) > 0:
|
||||
for key, value in shared.numberOfObjectsThatWeHaveYetToGetPerPeer.items():
|
||||
totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers += value
|
||||
with shared.printLock:
|
||||
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer:', len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer)
|
||||
print 'totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers
|
||||
logger.debug('number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToGetPerPeer: ' + str(len(shared.numberOfObjectsThatWeHaveYetToGetPerPeer)) + "\n" + \
|
||||
'totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers = ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers))
|
||||
|
||||
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
|
||||
if numberOfItemsInInv > 50000:
|
||||
sys.stderr.write('Too many items in inv message!')
|
||||
return
|
||||
if len(data) < lengthOfVarint + (numberOfItemsInInv * 32):
|
||||
print 'inv message doesn\'t contain enough data. Ignoring.'
|
||||
logger.info('inv message doesn\'t contain enough data. Ignoring.')
|
||||
return
|
||||
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
|
||||
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
||||
with shared.printLock:
|
||||
print 'We already have', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
|
||||
logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.')
|
||||
return
|
||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
|
||||
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
||||
shared.numberOfInventoryLookupsPerformed += 1
|
||||
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
||||
with shared.printLock:
|
||||
print 'Inventory (in memory) has inventory item already.'
|
||||
logger.debug('Inventory (in memory) has inventory item already.')
|
||||
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
|
||||
print 'Inventory (SQL on disk) has inventory item already.'
|
||||
logger.debug('Inventory (SQL on disk) has inventory item already.')
|
||||
else:
|
||||
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
|
||||
else:
|
||||
|
@ -455,8 +434,7 @@ class receiveDataThread(threading.Thread):
|
|||
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
||||
for item in objectsNewToMe:
|
||||
if totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers > 200000 and len(self.objectsThatWeHaveYetToGetFromThisPeer) > 1000: # inv flooding attack mitigation
|
||||
with shared.printLock:
|
||||
print 'We already have', totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToGetFromThisPeer), 'from this node in particular. Ignoring the rest of this inv message.'
|
||||
logger.debug('We already have ' + str(totalNumberOfobjectsThatWeHaveYetToGetFromAllPeers) + ' items yet to retrieve from peers and over ' + str(len(self.objectsThatWeHaveYetToGetFromThisPeer)), ' from this node in particular. Ignoring the rest of this inv message.')
|
||||
break
|
||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[item] = 0 # helps us keep from sending inv messages to peers that already know about the objects listed therein
|
||||
self.objectsThatWeHaveYetToGetFromThisPeer[item] = 0 # upon finishing dealing with an incoming message, the receiveDataThread will request a random object of from peer out of this data structure. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
||||
|
@ -467,8 +445,7 @@ class receiveDataThread(threading.Thread):
|
|||
# Send a getdata message to our peer to request the object with the given
|
||||
# hash
|
||||
def sendgetdata(self, hash):
|
||||
with shared.printLock:
|
||||
print 'sending getdata to retrieve object with hash:', hash.encode('hex')
|
||||
logger.debug('sending getdata to retrieve object with hash: ' + hash.encode('hex'))
|
||||
payload = '\x01' + hash
|
||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('getdata', payload)))
|
||||
|
||||
|
@ -478,13 +455,12 @@ class receiveDataThread(threading.Thread):
|
|||
numberOfRequestedInventoryItems, lengthOfVarint = decodeVarint(
|
||||
data[:10])
|
||||
if len(data) < lengthOfVarint + (32 * numberOfRequestedInventoryItems):
|
||||
print 'getdata message does not contain enough data. Ignoring.'
|
||||
logger.debug('getdata message does not contain enough data. Ignoring.')
|
||||
return
|
||||
for i in xrange(numberOfRequestedInventoryItems):
|
||||
hash = data[lengthOfVarint + (
|
||||
i * 32):32 + lengthOfVarint + (i * 32)]
|
||||
with shared.printLock:
|
||||
print 'received getdata request for item:', hash.encode('hex')
|
||||
logger.debug('received getdata request for item:' + hash.encode('hex'))
|
||||
|
||||
shared.numberOfInventoryLookupsPerformed += 1
|
||||
shared.inventoryLock.acquire()
|
||||
|
@ -507,36 +483,35 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
# Our peer has requested (in a getdata message) that we send an object.
|
||||
def sendObject(self, payload):
|
||||
with shared.printLock:
|
||||
print 'sending an object.'
|
||||
logger.debug('sending an object.')
|
||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('object',payload)))
|
||||
|
||||
|
||||
def _checkIPv4Address(self, host, hostStandardFormat):
|
||||
# print 'hostStandardFormat', hostStandardFormat
|
||||
if host[0] == '\x7F': # 127/8
|
||||
print 'Ignoring IP address in loopback range:', hostStandardFormat
|
||||
logger.debug('Ignoring IP address in loopback range: ' + hostStandardFormat)
|
||||
return False
|
||||
if host[0] == '\x0A': # 10/8
|
||||
print 'Ignoring IP address in private range:', hostStandardFormat
|
||||
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
|
||||
return False
|
||||
if host[0:2] == '\xC0\xA8': # 192.168/16
|
||||
print 'Ignoring IP address in private range:', hostStandardFormat
|
||||
logger.debug('Ignoring IP address in private range: ' + hostStandardFormat)
|
||||
return False
|
||||
if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12
|
||||
print 'Ignoring IP address in private range:', hostStandardFormat
|
||||
logger.debug('Ignoring IP address in private range:' + hostStandardFormat)
|
||||
return False
|
||||
return True
|
||||
|
||||
def _checkIPv6Address(self, host, hostStandardFormat):
|
||||
if host == ('\x00' * 15) + '\x01':
|
||||
print 'Ignoring loopback address:', hostStandardFormat
|
||||
logger.debug('Ignoring loopback address: ' + hostStandardFormat)
|
||||
return False
|
||||
if host[0] == '\xFE' and (ord(host[1]) & 0xc0) == 0x80:
|
||||
print 'Ignoring local address:', hostStandardFormat
|
||||
logger.debug ('Ignoring local address: ' + hostStandardFormat)
|
||||
return False
|
||||
if (ord(host[0]) & 0xfe) == 0xfc:
|
||||
print 'Ignoring unique local address:', hostStandardFormat
|
||||
logger.debug ('Ignoring unique local address: ' + hostStandardFormat)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
@ -546,13 +521,12 @@ class receiveDataThread(threading.Thread):
|
|||
data[:10])
|
||||
|
||||
if shared.verbose >= 1:
|
||||
with shared.printLock:
|
||||
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
||||
logger.debug('addr message contains ' + str(numberOfAddressesIncluded) + ' IP addresses.')
|
||||
|
||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||
return
|
||||
if len(data) != lengthOfNumberOfAddresses + (38 * numberOfAddressesIncluded):
|
||||
print 'addr message does not contain the correct amount of data. Ignoring.'
|
||||
logger.debug('addr message does not contain the correct amount of data. Ignoring.')
|
||||
return
|
||||
|
||||
for i in range(0, numberOfAddressesIncluded):
|
||||
|
@ -590,8 +564,7 @@ class receiveDataThread(threading.Thread):
|
|||
if len(shared.knownNodes[recaddrStream]) < 20000 and timeSomeoneElseReceivedMessageFromThisNode > (int(time.time()) - 10800) and timeSomeoneElseReceivedMessageFromThisNode < (int(time.time()) + 10800): # If we have more than 20000 nodes in our list already then just forget about adding more. Also, make sure that the time that someone else received a message from this node is within three hours from now.
|
||||
with shared.knownNodesLock:
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
with shared.printLock:
|
||||
print 'added new node', peerFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||
logger.debug('added new node ' + str(peerFromAddrMessage) + ' to knownNodes in stream ' + str(recaddrStream))
|
||||
|
||||
shared.needToWriteKnownNodesToDisk = True
|
||||
hostDetails = (
|
||||
|
@ -606,8 +579,7 @@ class receiveDataThread(threading.Thread):
|
|||
with shared.knownNodesLock:
|
||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||
|
||||
with shared.printLock:
|
||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||
logger.debug('knownNodes currently has ' + str(len(shared.knownNodes[self.streamNumber])) + ' nodes for this stream.')
|
||||
|
||||
|
||||
# Send a huge addr message to our peer. This is only used
|
||||
|
@ -706,8 +678,7 @@ class receiveDataThread(threading.Thread):
|
|||
self.services, = unpack('>q', data[4:12])
|
||||
if self.remoteProtocolVersion < 3:
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
with shared.printLock:
|
||||
print 'Closing connection to old protocol version', self.remoteProtocolVersion, 'node: ', self.peer
|
||||
logger.debug ('Closing connection to old protocol version ' + str(self.remoteProtocolVersion) + ' node: ' + str(self.peer))
|
||||
return
|
||||
timestamp, = unpack('>Q', data[12:20])
|
||||
timeOffset = timestamp - int(time.time())
|
||||
|
@ -748,13 +719,11 @@ class receiveDataThread(threading.Thread):
|
|||
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
||||
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
|
||||
data[readPosition:])
|
||||
with shared.printLock:
|
||||
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber, ' time offset:', timeOffset, 'seconds.'
|
||||
logger.debug('Remote node useragent: ' + useragent + ' stream number:' + str(self.streamNumber) + ' time offset: ' + str(timeOffset) + ' seconds.')
|
||||
|
||||
if self.streamNumber != 1:
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
with shared.printLock:
|
||||
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
|
||||
logger.debug ('Closed connection to ' + str(self.peer) + ' because they are interested in stream ' + str(self.streamNumber) + '.')
|
||||
return
|
||||
shared.connectedHostsList[
|
||||
self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
||||
|
@ -764,8 +733,7 @@ class receiveDataThread(threading.Thread):
|
|||
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber))
|
||||
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
with shared.printLock:
|
||||
print 'Closing connection to myself: ', self.peer
|
||||
logger.debug('Closing connection to myself: ' + str(self.peer))
|
||||
return
|
||||
|
||||
# The other peer's protocol version is of interest to the sendDataThread but we learn of it
|
||||
|
@ -782,16 +750,14 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
# Sends a version message
|
||||
def sendversion(self):
|
||||
with shared.printLock:
|
||||
print 'Sending version message'
|
||||
logger.debug('Sending version message')
|
||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.assembleVersionMessage(
|
||||
self.peer.host, self.peer.port, self.streamNumber)))
|
||||
|
||||
|
||||
# Sends a verack message
|
||||
def sendverack(self):
|
||||
with shared.printLock:
|
||||
print 'Sending verack'
|
||||
logger.debug('Sending verack')
|
||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('verack')))
|
||||
self.verackSent = True
|
||||
if self.verackReceived:
|
||||
|
|
|
@ -11,13 +11,14 @@ import socket
|
|||
from helper_generic import addDataPadding
|
||||
from class_objectHashHolder import *
|
||||
from addresses import *
|
||||
from debug import logger
|
||||
|
||||
# Every connection to a peer has a sendDataThread (and also a
|
||||
# receiveDataThread).
|
||||
class sendDataThread(threading.Thread):
|
||||
|
||||
def __init__(self, sendDataThreadQueue):
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="sendData")
|
||||
self.sendDataThreadQueue = sendDataThreadQueue
|
||||
shared.sendDataQueues.append(self.sendDataThreadQueue)
|
||||
self.data = ''
|
||||
|
@ -35,6 +36,7 @@ class sendDataThread(threading.Thread):
|
|||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
|
||||
self.sock = sock
|
||||
self.peer = shared.Peer(HOST, PORT)
|
||||
self.name = "sendData-" + self.peer.host
|
||||
self.streamNumber = streamNumber
|
||||
self.services = 0
|
||||
self.remoteProtocolVersion = - \
|
||||
|
@ -42,23 +44,20 @@ class sendDataThread(threading.Thread):
|
|||
self.lastTimeISentData = int(
|
||||
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||
with shared.printLock:
|
||||
print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber
|
||||
logger.debug('The streamNumber of this sendDataThread (ID: ' + str(id(self)) + ') at setup() is' + str(self.streamNumber))
|
||||
|
||||
|
||||
def sendVersionMessage(self):
|
||||
datatosend = shared.assembleVersionMessage(
|
||||
self.peer.host, self.peer.port, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
|
||||
|
||||
with shared.printLock:
|
||||
print 'Sending version packet: ', repr(datatosend)
|
||||
logger.debug('Sending version packet: ' + repr(datatosend))
|
||||
|
||||
try:
|
||||
self.sendBytes(datatosend)
|
||||
except Exception as err:
|
||||
# if not 'Bad file descriptor' in err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write('sock.sendall error: %s\n' % err)
|
||||
logger.error('sock.sendall error: %s\n' % err)
|
||||
|
||||
self.versionSent = 1
|
||||
|
||||
|
@ -94,15 +93,13 @@ class sendDataThread(threading.Thread):
|
|||
|
||||
|
||||
def run(self):
|
||||
with shared.printLock:
|
||||
print 'sendDataThread starting. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
|
||||
logger.debug('sendDataThread starting. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(shared.sendDataQueues)))
|
||||
while True:
|
||||
deststream, command, data = self.sendDataThreadQueue.get()
|
||||
|
||||
if deststream == self.streamNumber or deststream == 0:
|
||||
if command == 'shutdown':
|
||||
with shared.printLock:
|
||||
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
||||
logger.debug('sendDataThread (associated with ' + str(self.peer) + ') ID: ' + str(id(self)) + ' shutting down now.')
|
||||
break
|
||||
# When you receive an incoming connection, a sendDataThread is
|
||||
# created even though you don't yet know what stream number the
|
||||
|
@ -112,12 +109,10 @@ class sendDataThread(threading.Thread):
|
|||
# streamNumber of this send data thread here:
|
||||
elif command == 'setStreamNumber':
|
||||
self.streamNumber = data
|
||||
with shared.printLock:
|
||||
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', self.streamNumber
|
||||
logger.debug('setting the stream number in the sendData thread (ID: ' + str(id(self)) + ') to ' + str(self.streamNumber))
|
||||
elif command == 'setRemoteProtocolVersion':
|
||||
specifiedRemoteProtocolVersion = data
|
||||
with shared.printLock:
|
||||
print 'setting the remote node\'s protocol version in the sendDataThread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
||||
logger.debug('setting the remote node\'s protocol version in the sendDataThread (ID: ' + str(id(self)) + ') to ' + str(specifiedRemoteProtocolVersion))
|
||||
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
||||
elif command == 'advertisepeer':
|
||||
self.objectHashHolderInstance.holdPeer(data)
|
||||
|
@ -140,8 +135,7 @@ class sendDataThread(threading.Thread):
|
|||
try:
|
||||
self.sendBytes(packet)
|
||||
except:
|
||||
with shared.printLock:
|
||||
print 'sendaddr: self.sock.sendall failed'
|
||||
logger.error('sendaddr: self.sock.sendall failed')
|
||||
break
|
||||
elif command == 'advertiseobject':
|
||||
self.objectHashHolderInstance.holdHash(data)
|
||||
|
@ -157,35 +151,30 @@ class sendDataThread(threading.Thread):
|
|||
try:
|
||||
self.sendBytes(packet)
|
||||
except:
|
||||
with shared.printLock:
|
||||
print 'sendinv: self.sock.sendall failed'
|
||||
logger.error('sendinv: self.sock.sendall failed')
|
||||
break
|
||||
elif command == 'pong':
|
||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
||||
if self.lastTimeISentData < (int(time.time()) - 298):
|
||||
# Send out a pong message to keep the connection alive.
|
||||
with shared.printLock:
|
||||
print 'Sending pong to', self.peer, 'to keep connection alive.'
|
||||
logger.debug('Sending pong to ' + str(self.peer) + ' to keep connection alive.')
|
||||
packet = shared.CreatePacket('pong')
|
||||
try:
|
||||
self.sendBytes(packet)
|
||||
except:
|
||||
with shared.printLock:
|
||||
print 'send pong failed'
|
||||
logger.error('send pong failed')
|
||||
break
|
||||
elif command == 'sendRawData':
|
||||
try:
|
||||
self.sendBytes(data)
|
||||
except:
|
||||
with shared.printLock:
|
||||
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
||||
logger.error('Sending of data to ' + str(self.peer) + ' failed. sendDataThread thread ' + str(self) + ' ending now.')
|
||||
break
|
||||
elif command == 'connectionIsOrWasFullyEstablished':
|
||||
self.connectionIsOrWasFullyEstablished = True
|
||||
self.services, self.sslSock = data
|
||||
else:
|
||||
with shared.printLock:
|
||||
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
||||
logger.error('sendDataThread ID: ' + id(self) + ' ignoring command ' + command + ' because the thread is not in stream' + deststream)
|
||||
|
||||
try:
|
||||
self.sock.shutdown(socket.SHUT_RDWR)
|
||||
|
@ -193,6 +182,5 @@ class sendDataThread(threading.Thread):
|
|||
except:
|
||||
pass
|
||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||
with shared.printLock:
|
||||
print 'sendDataThread ending. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
|
||||
logger.info('sendDataThread ending. ID: ' + str(id(self)) + '. Number of queues in sendDataQueues: ' + str(len(shared.sendDataQueues)))
|
||||
self.objectHashHolderInstance.close()
|
||||
|
|
|
@ -31,7 +31,7 @@ resends msg messages in 5 days (then 10 days, then 20 days, etc...)
|
|||
class singleCleaner(threading.Thread):
|
||||
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="singleCleaner")
|
||||
|
||||
def run(self):
|
||||
timeWeLastClearedInventoryAndPubkeysTables = 0
|
||||
|
@ -83,9 +83,7 @@ class singleCleaner(threading.Thread):
|
|||
int(time.time()) - shared.maximumLengthOfTimeToBotherResendingMessages)
|
||||
for row in queryreturn:
|
||||
if len(row) < 2:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
|
||||
logger.error('Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
|
||||
time.sleep(3)
|
||||
break
|
||||
toAddress, ackData, status = row
|
||||
|
|
|
@ -18,7 +18,7 @@ import re
|
|||
class singleListener(threading.Thread):
|
||||
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="singleListener")
|
||||
|
||||
def setup(self, selfInitiatedConnections):
|
||||
self.selfInitiatedConnections = selfInitiatedConnections
|
||||
|
@ -54,8 +54,7 @@ class singleListener(threading.Thread):
|
|||
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'):
|
||||
time.sleep(5)
|
||||
|
||||
with shared.printLock:
|
||||
print 'Listening for incoming connections.'
|
||||
logger.info('Listening for incoming connections.')
|
||||
|
||||
# First try listening on an IPv6 socket. This should also be
|
||||
# able to accept connections on IPv4. If that's not available
|
||||
|
@ -82,8 +81,7 @@ class singleListener(threading.Thread):
|
|||
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS' and not shared.config.getboolean('bitmessagesettings', 'sockslisten'):
|
||||
time.sleep(10)
|
||||
while len(shared.connectedHostsList) > 220:
|
||||
with shared.printLock:
|
||||
print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.'
|
||||
logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.')
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
|
@ -104,8 +102,7 @@ class singleListener(threading.Thread):
|
|||
# connection flooding.
|
||||
if HOST in shared.connectedHostsList:
|
||||
socketObject.close()
|
||||
with shared.printLock:
|
||||
print 'We are already connected to', HOST + '. Ignoring connection.'
|
||||
logger.info('We are already connected to ' + str(HOST) + '. Ignoring connection.')
|
||||
else:
|
||||
break
|
||||
|
||||
|
@ -124,6 +121,5 @@ class singleListener(threading.Thread):
|
|||
socketObject, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue)
|
||||
rd.start()
|
||||
|
||||
with shared.printLock:
|
||||
print self, 'connected to', HOST, 'during INCOMING request.'
|
||||
logger.info('connected to ' + HOST + ' during INCOMING request.')
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ class singleWorker(threading.Thread):
|
|||
|
||||
def __init__(self):
|
||||
# QThread.__init__(self, parent)
|
||||
threading.Thread.__init__(self)
|
||||
threading.Thread.__init__(self, name="singleWorker")
|
||||
|
||||
def run(self):
|
||||
|
||||
|
@ -49,7 +49,7 @@ class singleWorker(threading.Thread):
|
|||
'''SELECT ackdata FROM sent where (status='msgsent' OR status='doingmsgpow')''')
|
||||
for row in queryreturn:
|
||||
ackdata, = row
|
||||
print 'Watching for ackdata', ackdata.encode('hex')
|
||||
logger.info('Watching for ackdata ' + ackdata.encode('hex'))
|
||||
shared.ackdataForWhichImWatching[ackdata] = 0
|
||||
|
||||
time.sleep(
|
||||
|
@ -81,9 +81,7 @@ class singleWorker(threading.Thread):
|
|||
elif command == 'sendOutOrStoreMyV4Pubkey':
|
||||
self.sendOutOrStoreMyV4Pubkey(data)
|
||||
else:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)
|
||||
logger.error('Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)
|
||||
|
||||
shared.workerQueue.task_done()
|
||||
|
||||
|
@ -114,9 +112,7 @@ class singleWorker(threading.Thread):
|
|||
privEncryptionKeyBase58 = shared.config.get(
|
||||
myAddress, 'privencryptionkey')
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
||||
logger.error('Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
||||
return
|
||||
|
||||
privSigningKeyHex = shared.decodeWalletImportFormat(
|
||||
|
@ -133,10 +129,10 @@ class singleWorker(threading.Thread):
|
|||
|
||||
# Do the POW for this pubkey message
|
||||
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
|
||||
print '(For pubkey message) Doing proof of work...'
|
||||
logger.info('(For pubkey message) Doing proof of work...')
|
||||
initialHash = hashlib.sha512(payload).digest()
|
||||
trialValue, nonce = proofofwork.run(target, initialHash)
|
||||
print '(For pubkey message) Found proof of work', trialValue, 'Nonce:', nonce
|
||||
logger.info('(For pubkey message) Found proof of work ' + str(trialValue), ' Nonce: ', str(nonce))
|
||||
payload = pack('>Q', nonce) + payload
|
||||
|
||||
inventoryHash = calculateInventoryHash(payload)
|
||||
|
@ -145,8 +141,7 @@ class singleWorker(threading.Thread):
|
|||
objectType, streamNumber, payload, embeddedTime,'')
|
||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||
|
||||
with shared.printLock:
|
||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
||||
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
|
@ -171,8 +166,7 @@ class singleWorker(threading.Thread):
|
|||
#The address has been deleted.
|
||||
return
|
||||
if shared.safeConfigGetBoolean(myAddress, 'chan'):
|
||||
with shared.printLock:
|
||||
print 'This is a chan address. Not sending pubkey.'
|
||||
logger.info('This is a chan address. Not sending pubkey.')
|
||||
return
|
||||
status, addressVersionNumber, streamNumber, hash = decodeAddress(
|
||||
myAddress)
|
||||
|
@ -199,9 +193,7 @@ class singleWorker(threading.Thread):
|
|||
privEncryptionKeyBase58 = shared.config.get(
|
||||
myAddress, 'privencryptionkey')
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
sys.stderr.write(
|
||||
'Error within sendOutOrStoreMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
||||
logger.error('Error within sendOutOrStoreMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
||||
|
||||
return
|
||||
|
||||
|
@ -228,12 +220,10 @@ class singleWorker(threading.Thread):
|
|||
|
||||
# Do the POW for this pubkey message
|
||||
target = 2 ** 64 / (shared.networkDefaultProofOfWorkNonceTrialsPerByte*(len(payload) + 8 + shared.networkDefaultPayloadLengthExtraBytes + ((TTL*(len(payload)+8+shared.networkDefaultPayloadLengthExtraBytes))/(2 ** 16))))
|
||||
with shared.printLock:
|
||||
print '(For pubkey message) Doing proof of work...'
|
||||
logger.info('(For pubkey message) Doing proof of work...')
|
||||
initialHash = hashlib.sha512(payload).digest()
|
||||
trialValue, nonce = proofofwork.run(target, initialHash)
|
||||
with shared.printLock:
|
||||
print '(For pubkey message) Found proof of work. Nonce:', nonce
|
||||
logger.info('(For pubkey message) Found proof of work. Nonce: ' + str(nonce))
|
||||
|
||||
payload = pack('>Q', nonce) + payload
|
||||
inventoryHash = calculateInventoryHash(payload)
|
||||
|
@ -242,8 +232,7 @@ class singleWorker(threading.Thread):
|
|||
objectType, streamNumber, payload, embeddedTime,'')
|
||||
shared.inventorySets[streamNumber].add(inventoryHash)
|
||||
|
||||
with shared.printLock:
|
||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||
logger.info('broadcasting inv with hash: ' + inventoryHash.encode('hex'))
|
||||
|
||||
shared.broadcastToSendDataQueues((
|
||||
streamNumber, 'advertiseobject', inventoryHash))
|
||||
|
@ -264,8 +253,7 @@ class singleWorker(threading.Thread):
|
|||