From 7a89109fc917c5af530c9643aa8faca32ab99cf8 Mon Sep 17 00:00:00 2001 From: Dmitri Bogomolov <4glitch@gmail.com> Date: Tue, 6 Aug 2019 14:04:33 +0300 Subject: [PATCH] New logging approach in order to reduce imports from submodules and use logging without risk of circular import. Only subpackage that imports from debug is bitmessageqt - because it also uses debug.resetLogging(). Instead of from debug import logger is now recommended to use: import logging logger = logging.getLogger('default') All subclasses of StoppableThread now have a logger attribute. All threading related stuff except for set_thread_name() was moved from helper_threading to network.threads. Fixed two my mistakes from previous edit of debug in a1a8d3a: - logger.handlers is not dict but iterable - sys.excepthook should be set unconditionally --- src/api.py | 2 +- src/bitmessagemain.py | 2 +- src/class_addressGenerator.py | 21 ++++--- src/class_singleCleaner.py | 93 +++++++++++++++---------------- src/class_singleWorker.py | 84 ++++++++++++++-------------- src/class_smtpDeliver.py | 14 ++--- src/class_smtpServer.py | 46 ++++++++------- src/debug.py | 86 ++++++++++++++++------------ src/helper_threading.py | 39 +------------ src/messagetypes/__init__.py | 10 ++-- src/messagetypes/message.py | 9 ++- src/messagetypes/vote.py | 9 ++- src/network/addrthread.py | 4 +- src/network/advanceddispatcher.py | 6 +- src/network/announcethread.py | 10 ++-- src/network/bmobject.py | 4 +- src/network/bmproto.py | 4 +- src/network/connectionchooser.py | 4 +- src/network/connectionpool.py | 4 +- src/network/dandelion.py | 6 +- src/network/downloadthread.py | 9 +-- src/network/invthread.py | 2 +- src/network/networkthread.py | 11 +--- src/network/proxy.py | 7 ++- src/network/receivequeuethread.py | 19 +++---- src/network/tcp.py | 4 +- src/network/threads.py | 49 ++++++++++++++++ src/network/tls.py | 3 +- src/network/udp.py | 12 ++-- src/network/uploadthread.py | 31 ++++++----- src/shutdown.py | 14 ++--- src/upnp.py | 2 +- 32 files changed, 328 insertions(+), 292 deletions(-) create mode 100644 src/network/threads.py diff --git a/src/api.py b/src/api.py index f92abeb4..b7f5c62d 100644 --- a/src/api.py +++ b/src/api.py @@ -38,8 +38,8 @@ from bmconfigparser import BMConfigParser from debug import logger from helper_ackPayload import genAckPayload from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery, sqlStoredProcedure -from helper_threading import StoppableThread from inventory import Inventory +from network.threads import StoppableThread str_chan = '[chan]' diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 1dd2f271..4ad9311f 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -41,7 +41,7 @@ import shared import knownnodes import state import shutdown -from debug import logger +from debug import logger # this should go before any threads # Classes from class_sqlThread import sqlThread diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index d930fc99..fa268377 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -12,10 +12,9 @@ import shared import defaults import highlevelcrypto from bmconfigparser import BMConfigParser -from debug import logger from addresses import decodeAddress, encodeAddress, encodeVarint from fallback import RIPEMD160Hash -from helper_threading import StoppableThread +from network.threads import StoppableThread class addressGenerator(StoppableThread): @@ -85,12 +84,12 @@ class addressGenerator(StoppableThread): elif queueValue[0] == 'stopThread': break else: - logger.error( + self.logger.error( 'Programming error: A structure with the wrong number' ' of values was passed into the addressGeneratorQueue.' ' Here is the queueValue: %r\n', queueValue) if addressVersionNumber < 3 or addressVersionNumber > 4: - logger.error( + self.logger.error( 'Program error: For some reason the address generator' ' queue has been given a request to create at least' ' one version %s address which it cannot do.\n', @@ -139,10 +138,10 @@ class addressGenerator(StoppableThread): '\x00' * numberOfNullBytesDemandedOnFrontOfRipeHash ): break - logger.info( + self.logger.info( 'Generated address with ripe digest: %s', hexlify(ripe)) try: - logger.info( + self.logger.info( 'Address generator calculated %s addresses at %s' ' addresses per second before finding one with' ' the correct ripe-prefix.', @@ -210,7 +209,7 @@ class addressGenerator(StoppableThread): or command == 'getDeterministicAddress' \ or command == 'createChan' or command == 'joinChan': if len(deterministicPassphrase) == 0: - logger.warning( + self.logger.warning( 'You are creating deterministic' ' address(es) using a blank passphrase.' ' Bitmessage will do it but it is rather stupid.') @@ -263,10 +262,10 @@ class addressGenerator(StoppableThread): ): break - logger.info( + self.logger.info( 'Generated address with ripe digest: %s', hexlify(ripe)) try: - logger.info( + self.logger.info( 'Address generator calculated %s addresses' ' at %s addresses per second before finding' ' one with the correct ripe-prefix.', @@ -316,7 +315,7 @@ class addressGenerator(StoppableThread): addressAlreadyExists = True if addressAlreadyExists: - logger.info( + self.logger.info( '%s already exists. Not adding it again.', address ) @@ -329,7 +328,7 @@ class addressGenerator(StoppableThread): ).arg(address) )) else: - logger.debug('label: %s', label) + self.logger.debug('label: %s', label) BMConfigParser().set(address, 'label', label) BMConfigParser().set(address, 'enabled', 'true') BMConfigParser().set(address, 'decoy', 'false') diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 49e15f49..fc53a5b0 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -24,16 +24,15 @@ import os import shared import time -import tr -from bmconfigparser import BMConfigParser -from helper_sql import sqlQuery, sqlExecute -from helper_threading import StoppableThread -from inventory import Inventory -from network.connectionpool import BMConnectionPool -from debug import logger import knownnodes import queues import state +import tr +from bmconfigparser import BMConfigParser +from helper_sql import sqlQuery, sqlExecute +from inventory import Inventory +from network.connectionpool import BMConnectionPool +from network.threads import StoppableThread class singleCleaner(StoppableThread): @@ -99,7 +98,7 @@ class singleCleaner(StoppableThread): ) for row in queryreturn: if len(row) < 2: - logger.error( + self.logger.error( 'Something went wrong in the singleCleaner thread:' ' a query did not return the requested fields. %r', row @@ -108,9 +107,9 @@ class singleCleaner(StoppableThread): break toAddress, ackData, status = row if status == 'awaitingpubkey': - resendPubkeyRequest(toAddress) + self.resendPubkeyRequest(toAddress) elif status == 'msgsent': - resendMsg(ackData) + self.resendMsg(ackData) try: # Cleanup knownnodes and handle possible severe exception @@ -118,7 +117,7 @@ class singleCleaner(StoppableThread): knownnodes.cleanupKnownNodes() except Exception as err: if "Errno 28" in str(err): - logger.fatal( + self.logger.fatal( '(while writing knownnodes to disk)' ' Alert: Your disk or data storage volume is full.' ) @@ -161,41 +160,41 @@ class singleCleaner(StoppableThread): if state.shutdown == 0: self.stop.wait(singleCleaner.cycleLength) + def resendPubkeyRequest(self, address): + """Resend pubkey request for address""" + self.logger.debug( + 'It has been a long time and we haven\'t heard a response to our' + ' getpubkey request. Sending again.' + ) + try: + # We need to take this entry out of the neededPubkeys structure + # because the queues.workerQueue checks to see whether the entry + # is already present and will not do the POW and send the message + # because it assumes that it has already done it recently. + del state.neededPubkeys[address] + except: + pass -def resendPubkeyRequest(address): - logger.debug( - 'It has been a long time and we haven\'t heard a response to our' - ' getpubkey request. Sending again.' - ) - try: - # We need to take this entry out of the neededPubkeys structure - # because the queues.workerQueue checks to see whether the entry - # is already present and will not do the POW and send the message - # because it assumes that it has already done it recently. - del state.neededPubkeys[address] - except: - pass + queues.UISignalQueue.put(( + 'updateStatusBar', + 'Doing work necessary to again attempt to request a public key...' + )) + sqlExecute( + '''UPDATE sent SET status='msgqueued' WHERE toaddress=?''', + address) + queues.workerQueue.put(('sendmessage', '')) - queues.UISignalQueue.put(( - 'updateStatusBar', - 'Doing work necessary to again attempt to request a public key...' - )) - sqlExecute( - '''UPDATE sent SET status='msgqueued' WHERE toaddress=?''', - address) - queues.workerQueue.put(('sendmessage', '')) - - -def resendMsg(ackdata): - logger.debug( - 'It has been a long time and we haven\'t heard an acknowledgement' - ' to our msg. Sending again.' - ) - sqlExecute( - '''UPDATE sent SET status='msgqueued' WHERE ackdata=?''', - ackdata) - queues.workerQueue.put(('sendmessage', '')) - queues.UISignalQueue.put(( - 'updateStatusBar', - 'Doing work necessary to again attempt to deliver a message...' - )) + def resendMsg(self, ackdata): + """Resend message by ackdata""" + self.logger.debug( + 'It has been a long time and we haven\'t heard an acknowledgement' + ' to our msg. Sending again.' + ) + sqlExecute( + '''UPDATE sent SET status='msgqueued' WHERE ackdata=?''', + ackdata) + queues.workerQueue.put(('sendmessage', '')) + queues.UISignalQueue.put(( + 'updateStatusBar', + 'Doing work necessary to again attempt to deliver a message...' + )) diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 0798296e..77fa18c0 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -26,10 +26,9 @@ import state import tr from addresses import calculateInventoryHash, decodeAddress, decodeVarint, encodeVarint from bmconfigparser import BMConfigParser -from debug import logger from helper_sql import sqlExecute, sqlQuery -from helper_threading import StoppableThread from inventory import Inventory +from network.threads import StoppableThread def sizeof_fmt(num, suffix='h/s'): @@ -98,7 +97,7 @@ class singleWorker(StoppableThread): '''SELECT ackdata FROM sent WHERE status = 'msgsent' ''') for row in queryreturn: ackdata, = row - logger.info('Watching for ackdata %s', hexlify(ackdata)) + self.logger.info('Watching for ackdata %s', hexlify(ackdata)) shared.ackdataForWhichImWatching[ackdata] = 0 # Fix legacy (headerless) watched ackdata to include header @@ -173,14 +172,14 @@ class singleWorker(StoppableThread): self.busy = 0 return else: - logger.error( + self.logger.error( 'Probable programming error: The command sent' ' to the workerThread is weird. It is: %s\n', command ) queues.workerQueue.task_done() - logger.info("Quitting...") + self.logger.info("Quitting...") def _getKeysForAddress(self, address): privSigningKeyBase58 = BMConfigParser().get( @@ -217,25 +216,24 @@ class singleWorker(StoppableThread): )) / (2 ** 16)) )) initialHash = hashlib.sha512(payload).digest() - logger.info( + self.logger.info( '%s Doing proof of work... TTL set to %s', log_prefix, TTL) if log_time: start_time = time.time() trialValue, nonce = proofofwork.run(target, initialHash) - logger.info( + self.logger.info( '%s Found proof of work %s Nonce: %s', log_prefix, trialValue, nonce ) try: delta = time.time() - start_time - logger.info( + self.logger.info( 'PoW took %.1f seconds, speed %s.', delta, sizeof_fmt(nonce / delta) ) except: # NameError pass payload = pack('>Q', nonce) + payload - # inventoryHash = calculateInventoryHash(payload) return payload def doPOWForMyV2Pubkey(self, adressHash): @@ -260,7 +258,7 @@ class singleWorker(StoppableThread): _, _, pubSigningKey, pubEncryptionKey = \ self._getKeysForAddress(myAddress) except Exception as err: - logger.error( + self.logger.error( 'Error within doPOWForMyV2Pubkey. Could not read' ' the keys from the keys.dat file for a requested' ' address. %s\n', err @@ -278,7 +276,8 @@ class singleWorker(StoppableThread): Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') - logger.info('broadcasting inv with hash: %s', hexlify(inventoryHash)) + self.logger.info( + 'broadcasting inv with hash: %s', hexlify(inventoryHash)) queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) @@ -303,7 +302,7 @@ class singleWorker(StoppableThread): # The address has been deleted. return if BMConfigParser().safeGetBoolean(myAddress, 'chan'): - logger.info('This is a chan address. Not sending pubkey.') + self.logger.info('This is a chan address. Not sending pubkey.') return _, addressVersionNumber, streamNumber, adressHash = decodeAddress( myAddress) @@ -333,7 +332,7 @@ class singleWorker(StoppableThread): privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \ self._getKeysForAddress(myAddress) except Exception as err: - logger.error( + self.logger.error( 'Error within sendOutOrStoreMyV3Pubkey. Could not read' ' the keys from the keys.dat file for a requested' ' address. %s\n', err @@ -360,7 +359,8 @@ class singleWorker(StoppableThread): Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') - logger.info('broadcasting inv with hash: %s', hexlify(inventoryHash)) + self.logger.info( + 'broadcasting inv with hash: %s', hexlify(inventoryHash)) queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) @@ -383,7 +383,7 @@ class singleWorker(StoppableThread): # The address has been deleted. return if shared.BMConfigParser().safeGetBoolean(myAddress, 'chan'): - logger.info('This is a chan address. Not sending pubkey.') + self.logger.info('This is a chan address. Not sending pubkey.') return _, addressVersionNumber, streamNumber, addressHash = decodeAddress( myAddress) @@ -402,7 +402,7 @@ class singleWorker(StoppableThread): privSigningKeyHex, _, pubSigningKey, pubEncryptionKey = \ self._getKeysForAddress(myAddress) except Exception as err: - logger.error( + self.logger.error( 'Error within sendOutOrStoreMyV4Pubkey. Could not read' ' the keys from the keys.dat file for a requested' ' address. %s\n', err @@ -450,7 +450,8 @@ class singleWorker(StoppableThread): doubleHashOfAddressData[32:] ) - logger.info('broadcasting inv with hash: %s', hexlify(inventoryHash)) + self.logger.info( + 'broadcasting inv with hash: %s', hexlify(inventoryHash)) queues.invQueue.put((streamNumber, inventoryHash)) queues.UISignalQueue.put(('updateStatusBar', '')) @@ -459,7 +460,7 @@ class singleWorker(StoppableThread): myAddress, 'lastpubkeysendtime', str(int(time.time()))) BMConfigParser().save() except Exception as err: - logger.error( + self.logger.error( 'Error: Couldn\'t add the lastpubkeysendtime' ' to the keys.dat file. Error message: %s', err ) @@ -497,7 +498,7 @@ class singleWorker(StoppableThread): objectType, streamNumber, buffer(payload), embeddedTime, buffer(tag) ) - logger.info( + self.logger.info( 'sending inv (within sendOnionPeerObj function) for object: %s', hexlify(inventoryHash)) queues.invQueue.put((streamNumber, inventoryHash)) @@ -520,7 +521,7 @@ class singleWorker(StoppableThread): _, addressVersionNumber, streamNumber, ripe = \ decodeAddress(fromaddress) if addressVersionNumber <= 1: - logger.error( + self.logger.error( 'Error: In the singleWorker thread, the ' ' sendBroadcast function doesn\'t understand' ' the address version.\n') @@ -636,7 +637,7 @@ class singleWorker(StoppableThread): # to not let the user try to send a message this large # until we implement message continuation. if len(payload) > 2 ** 18: # 256 KiB - logger.critical( + self.logger.critical( 'This broadcast object is too large to send.' ' This should never happen. Object size: %s', len(payload) @@ -647,7 +648,7 @@ class singleWorker(StoppableThread): objectType = 3 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, tag) - logger.info( + self.logger.info( 'sending inv (within sendBroadcast function)' ' for object: %s', hexlify(inventoryHash) @@ -867,8 +868,8 @@ class singleWorker(StoppableThread): "MainWindow", "Looking up the receiver\'s public key")) )) - logger.info('Sending a message.') - logger.debug( + self.logger.info('Sending a message.') + self.logger.debug( 'First 150 characters of message: %s', repr(message[:150]) ) @@ -912,7 +913,7 @@ class singleWorker(StoppableThread): if not shared.BMConfigParser().safeGetBoolean( 'bitmessagesettings', 'willinglysendtomobile' ): - logger.info( + self.logger.info( 'The receiver is a mobile user but the' ' sender (you) has not selected that you' ' are willing to send to mobiles. Aborting' @@ -978,7 +979,7 @@ class singleWorker(StoppableThread): defaults.networkDefaultPayloadLengthExtraBytes: requiredPayloadLengthExtraBytes = \ defaults.networkDefaultPayloadLengthExtraBytes - logger.debug( + self.logger.debug( 'Using averageProofOfWorkNonceTrialsPerByte: %s' ' and payloadLengthExtraBytes: %s.', requiredAverageProofOfWorkNonceTrialsPerByte, @@ -1043,8 +1044,9 @@ class singleWorker(StoppableThread): l10n.formatTimestamp())))) continue else: # if we are sending a message to ourselves or a chan.. - logger.info('Sending a message.') - logger.debug('First 150 characters of message: %r', message[:150]) + self.logger.info('Sending a message.') + self.logger.debug( + 'First 150 characters of message: %r', message[:150]) behaviorBitfield = protocol.getBitfield(fromaddress) try: @@ -1063,7 +1065,7 @@ class singleWorker(StoppableThread): " message. %1" ).arg(l10n.formatTimestamp())) )) - logger.error( + self.logger.error( 'Error within sendMsg. Could not read the keys' ' from the keys.dat file for our own address. %s\n', err) @@ -1139,14 +1141,14 @@ class singleWorker(StoppableThread): payload += encodeVarint(encodedMessage.length) payload += encodedMessage.data if BMConfigParser().has_section(toaddress): - logger.info( + self.logger.info( 'Not bothering to include ackdata because we are' ' sending to ourselves or a chan.' ) fullAckPayload = '' elif not protocol.checkBitfield( behaviorBitfield, protocol.BITFIELD_DOESACK): - logger.info( + self.logger.info( 'Not bothering to include ackdata because' ' the receiver said that they won\'t relay it anyway.' ) @@ -1199,7 +1201,7 @@ class singleWorker(StoppableThread): requiredPayloadLengthExtraBytes )) / (2 ** 16)) )) - logger.info( + self.logger.info( '(For msg message) Doing proof of work. Total required' ' difficulty: %f. Required small message difficulty: %f.', float(requiredAverageProofOfWorkNonceTrialsPerByte) / @@ -1211,12 +1213,12 @@ class singleWorker(StoppableThread): powStartTime = time.time() initialHash = hashlib.sha512(encryptedPayload).digest() trialValue, nonce = proofofwork.run(target, initialHash) - logger.info( + self.logger.info( '(For msg message) Found proof of work %s Nonce: %s', trialValue, nonce ) try: - logger.info( + self.logger.info( 'PoW took %.1f seconds, speed %s.', time.time() - powStartTime, sizeof_fmt(nonce / (time.time() - powStartTime)) @@ -1231,7 +1233,7 @@ class singleWorker(StoppableThread): # in the code to not let the user try to send a message # this large until we implement message continuation. if len(encryptedPayload) > 2 ** 18: # 256 KiB - logger.critical( + self.logger.critical( 'This msg object is too large to send. This should' ' never happen. Object size: %i', len(encryptedPayload) @@ -1262,7 +1264,7 @@ class singleWorker(StoppableThread): " Sent on %1" ).arg(l10n.formatTimestamp())) )) - logger.info( + self.logger.info( 'Broadcasting inv for my msg(within sendmsg function): %s', hexlify(inventoryHash) ) @@ -1315,7 +1317,7 @@ class singleWorker(StoppableThread): toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress( toAddress) if toStatus != 'success': - logger.error( + self.logger.error( 'Very abnormal error occurred in requestPubKey.' ' toAddress is: %r. Please report this error to Atheros.', toAddress @@ -1329,7 +1331,7 @@ class singleWorker(StoppableThread): toAddress ) if not queryReturn: - logger.critical( + self.logger.critical( 'BUG: Why are we requesting the pubkey for %s' ' if there are no messages in the sent folder' ' to that address?', toAddress @@ -1377,11 +1379,11 @@ class singleWorker(StoppableThread): payload += encodeVarint(streamNumber) if addressVersionNumber <= 3: payload += ripe - logger.info( + self.logger.info( 'making request for pubkey with ripe: %s', hexlify(ripe)) else: payload += tag - logger.info( + self.logger.info( 'making request for v4 pubkey with tag: %s', hexlify(tag)) # print 'trial value', trialValue @@ -1402,7 +1404,7 @@ class singleWorker(StoppableThread): objectType = 1 Inventory()[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime, '') - logger.info('sending inv (for the getpubkey message)') + self.logger.info('sending inv (for the getpubkey message)') queues.invQueue.put((streamNumber, inventoryHash)) # wait 10% past expiration diff --git a/src/class_smtpDeliver.py b/src/class_smtpDeliver.py index fa607220..58cd4631 100644 --- a/src/class_smtpDeliver.py +++ b/src/class_smtpDeliver.py @@ -5,7 +5,6 @@ src/class_smtpDeliver.py # pylint: disable=unused-variable import smtplib -import sys import urlparse from email.header import Header from email.mime.text import MIMEText @@ -13,8 +12,7 @@ from email.mime.text import MIMEText import queues import state from bmconfigparser import BMConfigParser -from debug import logger -from helper_threading import StoppableThread +from network.threads import StoppableThread SMTPDOMAIN = "bmaddr.lan" @@ -75,10 +73,12 @@ class smtpDeliver(StoppableThread): client.starttls() client.ehlo() client.sendmail(msg['From'], [to], msg.as_string()) - logger.info("Delivered via SMTP to %s through %s:%i ...", to, u.hostname, u.port) + self.logger.info( + 'Delivered via SMTP to %s through %s:%i ...', + to, u.hostname, u.port) client.quit() except: - logger.error("smtp delivery error", exc_info=True) + self.logger.error('smtp delivery error', exc_info=True) elif command == 'displayNewSentMessage': toAddress, fromLabel, fromAddress, subject, message, ackdata = data elif command == 'updateNetworkStatusTab': @@ -112,5 +112,5 @@ class smtpDeliver(StoppableThread): elif command == 'stopThread': break else: - sys.stderr.write( - 'Command sent to smtpDeliver not recognized: %s\n' % command) + self.logger.warning( + 'Command sent to smtpDeliver not recognized: %s', command) diff --git a/src/class_smtpServer.py b/src/class_smtpServer.py index 99d9c4b3..924333a6 100644 --- a/src/class_smtpServer.py +++ b/src/class_smtpServer.py @@ -1,26 +1,28 @@ import asyncore import base64 import email -from email.parser import Parser -from email.header import decode_header +import logging import re import signal import smtpd import threading import time +from email.header import decode_header +from email.parser import Parser +import queues from addresses import decodeAddress from bmconfigparser import BMConfigParser -from debug import logger -from helper_sql import sqlExecute from helper_ackPayload import genAckPayload -from helper_threading import StoppableThread -import queues +from helper_sql import sqlExecute +from network.threads import StoppableThread from version import softwareVersion SMTPDOMAIN = "bmaddr.lan" LISTENPORT = 8425 +logger = logging.getLogger('default') + class smtpServerChannel(smtpd.SMTPChannel): def smtp_EHLO(self, arg): @@ -39,7 +41,7 @@ class smtpServerChannel(smtpd.SMTPChannel): decoded = base64.b64decode(authstring) correctauth = "\x00" + BMConfigParser().safeGet("bitmessagesettings", "smtpdusername", "") + \ "\x00" + BMConfigParser().safeGet("bitmessagesettings", "smtpdpassword", "") - logger.debug("authstring: %s / %s", correctauth, decoded) + logger.debug('authstring: %s / %s', correctauth, decoded) if correctauth == decoded: self.auth = True self.push('235 2.7.0 Authentication successful') @@ -50,7 +52,7 @@ class smtpServerChannel(smtpd.SMTPChannel): def smtp_DATA(self, arg): if not hasattr(self, "auth") or not self.auth: - self.push ("530 Authentication required") + self.push('530 Authentication required') return smtpd.SMTPChannel.smtp_DATA(self, arg) @@ -98,17 +100,15 @@ class smtpServerPyBitmessage(smtpd.SMTPServer): return ret - def process_message(self, peer, mailfrom, rcpttos, data): -# print 'Receiving message from:', peer p = re.compile(".*<([^>]+)>") if not hasattr(self.channel, "auth") or not self.channel.auth: - logger.error("Missing or invalid auth") + logger.error('Missing or invalid auth') return try: self.msg_headers = Parser().parsestr(data) except: - logger.error("Invalid headers") + logger.error('Invalid headers') return try: @@ -118,7 +118,7 @@ class smtpServerPyBitmessage(smtpd.SMTPServer): if sender not in BMConfigParser().addresses(): raise Exception("Nonexisting user %s" % sender) except Exception as err: - logger.debug("Bad envelope from %s: %s", mailfrom, repr(err)) + logger.debug('Bad envelope from %s: %r', mailfrom, err) msg_from = self.decode_header("from") try: msg_from = p.sub(r'\1', self.decode_header("from")[0]) @@ -128,7 +128,7 @@ class smtpServerPyBitmessage(smtpd.SMTPServer): if sender not in BMConfigParser().addresses(): raise Exception("Nonexisting user %s" % sender) except Exception as err: - logger.error("Bad headers from %s: %s", msg_from, repr(err)) + logger.error('Bad headers from %s: %r', msg_from, err) return try: @@ -147,11 +147,12 @@ class smtpServerPyBitmessage(smtpd.SMTPServer): rcpt, domain = p.sub(r'\1', to).split("@") if domain != SMTPDOMAIN: raise Exception("Bad domain %s" % domain) - logger.debug("Sending %s to %s about %s", sender, rcpt, msg_subject) + logger.debug( + 'Sending %s to %s about %s', sender, rcpt, msg_subject) self.send(sender, rcpt, msg_subject, body) - logger.info("Relayed %s to %s", sender, rcpt) + logger.info('Relayed %s to %s', sender, rcpt) except Exception as err: - logger.error( "Bad to %s: %s", to, repr(err)) + logger.error('Bad to %s: %r', to, err) continue return @@ -169,21 +170,24 @@ class smtpServer(StoppableThread): def run(self): asyncore.loop(1) + def signals(signal, frame): - print "Got signal, terminating" + logger.warning('Got signal, terminating') for thread in threading.enumerate(): if thread.isAlive() and isinstance(thread, StoppableThread): thread.stopThread() + def runServer(): - print "Running SMTPd thread" + logger.warning('Running SMTPd thread') smtpThread = smtpServer() smtpThread.start() signal.signal(signal.SIGINT, signals) signal.signal(signal.SIGTERM, signals) - print "Processing" + logger.warning('Processing') smtpThread.join() - print "The end" + logger.warning('The end') + if __name__ == "__main__": runServer() diff --git a/src/debug.py b/src/debug.py index d3730d7f..7d523b3c 100644 --- a/src/debug.py +++ b/src/debug.py @@ -1,26 +1,38 @@ """ Logging and debuging facility -============================= +----------------------------- Levels: - DEBUG - Detailed information, typically of interest only when diagnosing problems. - INFO - Confirmation that things are working as expected. - WARNING - An indication that something unexpected happened, or indicative of some problem in the - near future (e.g. 'disk space low'). The software is still working as expected. - ERROR - Due to a more serious problem, the software has not been able to perform some function. - CRITICAL - A serious error, indicating that the program itself may be unable to continue running. + DEBUG + Detailed information, typically of interest only when diagnosing problems. + INFO + Confirmation that things are working as expected. + WARNING + An indication that something unexpected happened, or indicative of + some problem in the near future (e.g. 'disk space low'). The software + is still working as expected. + ERROR + Due to a more serious problem, the software has not been able to + perform some function. + CRITICAL + A serious error, indicating that the program itself may be unable to + continue running. -There are three loggers: `console_only`, `file_only` and `both`. +There are three loggers by default: `console_only`, `file_only` and `both`. +You can configure logging in the logging.dat in the appdata dir. +It's format is described in the :func:`logging.config.fileConfig` doc. -Use: `from debug import logger` to import this facility into whatever module you wish to log messages from. - Logging is thread-safe so you don't have to worry about locks, just import and log. +Use: +>>> import logging +>>> logger = logging.getLogger('default') + +The old form: ``from debug import logger`` is also may be used, +but only in the top level modules. + +Logging is thread-safe so you don't have to worry about locks, +just import and log. """ import ConfigParser @@ -28,6 +40,7 @@ import logging import logging.config import os import sys + import helper_startup import state @@ -41,10 +54,17 @@ log_level = 'WARNING' def log_uncaught_exceptions(ex_cls, ex, tb): + """The last resort logging function used for sys.excepthook""" logging.critical('Unhandled exception', exc_info=(ex_cls, ex, tb)) def configureLogging(): + """ + Configure logging, + using either logging.dat file in the state.appdata dir + or dictionary with hardcoded settings. + """ + sys.excepthook = log_uncaught_exceptions fail_msg = '' try: logging_config = os.path.join(state.appdata, 'logging.dat') @@ -63,9 +83,7 @@ def configureLogging(): # no need to confuse the user if the logger config is missing entirely fail_msg = 'Using default logger configuration' - sys.excepthook = log_uncaught_exceptions - - logging.config.dictConfig({ + logging_config = { 'version': 1, 'formatters': { 'default': { @@ -107,34 +125,28 @@ def configureLogging(): 'level': log_level, 'handlers': ['console'], }, - }) + } + + logging_config['loggers']['default'] = logging_config['loggers'][ + 'file_only' if '-c' in sys.argv else 'both'] + logging.config.dictConfig(logging_config) return True, fail_msg -def initLogging(): - preconfigured, msg = configureLogging() - if preconfigured: - if '-c' in sys.argv: - logger = logging.getLogger('file_only') - else: - logger = logging.getLogger('both') - else: - logger = logging.getLogger('default') - - if msg: - logger.log(logging.WARNING if preconfigured else logging.INFO, msg) - return logger - - def resetLogging(): + """Reconfigure logging in runtime when state.appdata dir changed""" global logger - for i in logger.handlers.iterkeys(): + for i in logger.handlers: logger.removeHandler(i) i.flush() i.close() - logger = initLogging() + configureLogging() + logger = logging.getLogger('default') # ! -logger = initLogging() +preconfigured, msg = configureLogging() +logger = logging.getLogger('default') +if msg: + logger.log(logging.WARNING if preconfigured else logging.INFO, msg) diff --git a/src/helper_threading.py b/src/helper_threading.py index 4b0a074e..e4fbe940 100644 --- a/src/helper_threading.py +++ b/src/helper_threading.py @@ -1,9 +1,6 @@ -"""Helper threading perform all the threading operations.""" +"""set_thread_name for threads that don't use StoppableThread""" import threading -from contextlib import contextmanager - -import helper_random try: import prctl @@ -22,37 +19,3 @@ else: threading.Thread.__bootstrap_original__ = threading.Thread._Thread__bootstrap threading.Thread._Thread__bootstrap = _thread_name_hack - - -class StoppableThread(threading.Thread): - name = None - - def __init__(self, name=None): - if name: - self.name = name - super(StoppableThread, self).__init__(name=self.name) - self.initStop() - helper_random.seed() - - def initStop(self): - self.stop = threading.Event() - self._stopped = False - - def stopThread(self): - self._stopped = True - self.stop.set() - - -class BusyError(threading.ThreadError): - pass - - -@contextmanager -def nonBlocking(lock): - locked = lock.acquire(False) - if not locked: - raise BusyError - try: - yield - finally: - lock.release() diff --git a/src/messagetypes/__init__.py b/src/messagetypes/__init__.py index 7319dfd5..af6bcdaa 100644 --- a/src/messagetypes/__init__.py +++ b/src/messagetypes/__init__.py @@ -1,17 +1,15 @@ -""" -src/messagetypes/__init__.py -============================ -""" +import logging from importlib import import_module from os import path, listdir from string import lower -from debug import logger import messagetypes import paths +logger = logging.getLogger('default') -class MsgBase(object): # pylint: disable=too-few-public-methods + +class MsgBase(object): # pylint: disable=too-few-public-methods """Base class for message types""" def __init__(self): self.data = {"": lower(type(self).__name__)} diff --git a/src/messagetypes/message.py b/src/messagetypes/message.py index cd5bf762..573732d4 100644 --- a/src/messagetypes/message.py +++ b/src/messagetypes/message.py @@ -1,10 +1,9 @@ -""" -src/messagetypes/message.py -=========================== -""" -from debug import logger +import logging + from messagetypes import MsgBase +logger = logging.getLogger('default') + class Message(MsgBase): """Encapsulate a message""" diff --git a/src/messagetypes/vote.py b/src/messagetypes/vote.py index e128e9ba..b559c256 100644 --- a/src/messagetypes/vote.py +++ b/src/messagetypes/vote.py @@ -1,10 +1,9 @@ -""" -src/messagetypes/vote.py -======================== -""" -from debug import logger +import logging + from messagetypes import MsgBase +logger = logging.getLogger('default') + class Vote(MsgBase): """Module used to vote""" diff --git a/src/network/addrthread.py b/src/network/addrthread.py index 9f516e80..d5d21599 100644 --- a/src/network/addrthread.py +++ b/src/network/addrthread.py @@ -1,9 +1,9 @@ import Queue -from helper_threading import StoppableThread +import state from network.connectionpool import BMConnectionPool from queues import addrQueue -import state +from threads import StoppableThread class AddrThread(StoppableThread): diff --git a/src/network/advanceddispatcher.py b/src/network/advanceddispatcher.py index c8f125f0..eeb50bdf 100644 --- a/src/network/advanceddispatcher.py +++ b/src/network/advanceddispatcher.py @@ -10,8 +10,7 @@ import time import network.asyncore_pollchoose as asyncore import state -from debug import logger -from helper_threading import BusyError, nonBlocking +from threads import BusyError, nonBlocking class ProcessingError(Exception): @@ -84,7 +83,8 @@ class AdvancedDispatcher(asyncore.dispatcher): try: cmd = getattr(self, "state_" + str(self.state)) except AttributeError: - logger.error("Unknown state %s", self.state, exc_info=True) + self.logger.error( + 'Unknown state %s', self.state, exc_info=True) raise UnknownStateError(self.state) if not cmd(): break diff --git a/src/network/announcethread.py b/src/network/announcethread.py index 59fad128..5cd27ede 100644 --- a/src/network/announcethread.py +++ b/src/network/announcethread.py @@ -2,22 +2,20 @@ src/network/announcethread.py ================================= """ + import time +import state from bmconfigparser import BMConfigParser -from debug import logger -from helper_threading import StoppableThread from network.bmproto import BMProto from network.connectionpool import BMConnectionPool from network.udp import UDPSocket -import state +from threads import StoppableThread class AnnounceThread(StoppableThread): """A thread to manage regular announcing of this node""" - def __init__(self): - super(AnnounceThread, self).__init__(name="Announcer") - logger.info("init announce thread") + name = "Announcer" def run(self): lastSelfAnnounced = 0 diff --git a/src/network/bmobject.py b/src/network/bmobject.py index e19eaac9..ac6429e4 100644 --- a/src/network/bmobject.py +++ b/src/network/bmobject.py @@ -2,15 +2,17 @@ BMObject and it's exceptions. """ +import logging import time import protocol import state from addresses import calculateInventoryHash -from debug import logger from inventory import Inventory from network.dandelion import Dandelion +logger = logging.getLogger('default') + class BMObjectInsufficientPOWError(Exception): """Exception indicating the object doesn't have sufficient proof of work.""" diff --git a/src/network/bmproto.py b/src/network/bmproto.py index 0a2cdc7e..839630d8 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -5,6 +5,7 @@ src/network/bmproto.py # pylint: disable=attribute-defined-outside-init import base64 import hashlib +import logging import socket import struct import time @@ -16,7 +17,6 @@ import knownnodes import protocol import state from bmconfigparser import BMConfigParser -from debug import logger from inventory import Inventory from network.advanceddispatcher import AdvancedDispatcher from network.dandelion import Dandelion @@ -30,6 +30,8 @@ from objectracker import missingObjects, ObjectTracker from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue from randomtrackingdict import RandomTrackingDict +logger = logging.getLogger('default') + class BMProtoError(ProxyError): """A Bitmessage Protocol Base Error""" diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index 53ce30b7..ead8b31b 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -1,13 +1,15 @@ # pylint: disable=too-many-branches +import logging import random # nosec import knownnodes import protocol import state from bmconfigparser import BMConfigParser -from debug import logger from queues import Queue, portCheckerQueue +logger = logging.getLogger('default') + def getDiscoveredPeer(): try: diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 4d16df49..1267522a 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -3,6 +3,7 @@ src/network/connectionpool.py ================================== """ import errno +import logging import re import socket import time @@ -14,7 +15,6 @@ import protocol import state from bmconfigparser import BMConfigParser from connectionchooser import chooseConnection -from debug import logger from proxy import Proxy from singleton import Singleton from tcp import ( @@ -22,6 +22,8 @@ from tcp import ( TCPConnection, TCPServer) from udp import UDPSocket +logger = logging.getLogger('default') + @Singleton # pylint: disable=too-many-instance-attributes diff --git a/src/network/dandelion.py b/src/network/dandelion.py index fa9081cb..eed3c6ff 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -2,6 +2,7 @@ src/network/dandelion.py ======================== """ +import logging from collections import namedtuple from random import choice, sample, expovariate from threading import RLock @@ -9,7 +10,6 @@ from time import time import connectionpool import state -from debug import logging from queues import invQueue from singleton import Singleton @@ -24,6 +24,8 @@ MAX_STEMS = 2 Stem = namedtuple('Stem', ['child', 'stream', 'timeout']) +logger = logging.getLogger('default') + @Singleton class Dandelion(): # pylint: disable=old-style-class @@ -72,7 +74,7 @@ class Dandelion(): # pylint: disable=old-style-class def removeHash(self, hashId, reason="no reason specified"): """Switch inventory vector from stem to fluff mode""" - logging.debug( + logger.debug( "%s entering fluff mode due to %s.", ''.join('%02x' % ord(i) for i in hashId), reason) with self.lock: diff --git a/src/network/downloadthread.py b/src/network/downloadthread.py index a4b58862..472b32c0 100644 --- a/src/network/downloadthread.py +++ b/src/network/downloadthread.py @@ -2,17 +2,17 @@ src/network/downloadthread.py ============================= """ + import time import addresses import helper_random import protocol from dandelion import Dandelion -from debug import logger -from helper_threading import StoppableThread from inventory import Inventory from network.connectionpool import BMConnectionPool from objectracker import missingObjects +from threads import StoppableThread class DownloadThread(StoppableThread): @@ -25,7 +25,6 @@ class DownloadThread(StoppableThread): def __init__(self): super(DownloadThread, self).__init__(name="Downloader") - logger.info("init download thread") self.lastCleaned = time.time() def cleanPending(self): @@ -78,7 +77,9 @@ class DownloadThread(StoppableThread): continue payload[0:0] = addresses.encodeVarint(chunkCount) i.append_write_buf(protocol.CreatePacket('getdata', payload)) - logger.debug("%s:%i Requesting %i objects", i.destination.host, i.destination.port, chunkCount) + self.logger.debug( + '%s:%i Requesting %i objects', + i.destination.host, i.destination.port, chunkCount) requested += chunkCount if time.time() >= self.lastCleaned + DownloadThread.cleanInterval: self.cleanPending() diff --git a/src/network/invthread.py b/src/network/invthread.py index ad3a0764..bffa6ecb 100644 --- a/src/network/invthread.py +++ b/src/network/invthread.py @@ -9,10 +9,10 @@ from time import time import addresses import protocol import state -from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool from network.dandelion import Dandelion from queues import invQueue +from threads import StoppableThread def handleExpiredDandelion(expired): diff --git a/src/network/networkthread.py b/src/network/networkthread.py index 2a22367f..ba560906 100644 --- a/src/network/networkthread.py +++ b/src/network/networkthread.py @@ -1,20 +1,13 @@ -""" -src/network/networkthread.py -============================ -""" import network.asyncore_pollchoose as asyncore import state -from debug import logger -from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool from queues import excQueue +from threads import StoppableThread class BMNetworkThread(StoppableThread): """A thread to handle network concerns""" - def __init__(self): - super(BMNetworkThread, self).__init__(name="Asyncore") - logger.info("init asyncore thread") + name = "Asyncore" def run(self): try: diff --git a/src/network/proxy.py b/src/network/proxy.py index 479663d3..e65ac6a7 100644 --- a/src/network/proxy.py +++ b/src/network/proxy.py @@ -3,6 +3,7 @@ src/network/proxy.py ==================== """ # pylint: disable=protected-access +import logging import socket import time @@ -10,7 +11,8 @@ import asyncore_pollchoose as asyncore import state from advanceddispatcher import AdvancedDispatcher from bmconfigparser import BMConfigParser -from debug import logger + +logger = logging.getLogger('default') class ProxyError(Exception): @@ -144,5 +146,6 @@ class Proxy(AdvancedDispatcher): def state_proxy_handshake_done(self): """Handshake is complete at this point""" - self.connectedAt = time.time() # pylint: disable=attribute-defined-outside-init + # pylint: disable=attribute-defined-outside-init + self.connectedAt = time.time() return False diff --git a/src/network/receivequeuethread.py b/src/network/receivequeuethread.py index 5d8cbd37..13c12ce2 100644 --- a/src/network/receivequeuethread.py +++ b/src/network/receivequeuethread.py @@ -2,18 +2,16 @@ import errno import Queue import socket -from debug import logger -from helper_threading import StoppableThread +import state from network.connectionpool import BMConnectionPool from network.advanceddispatcher import UnknownStateError from queues import receiveDataQueue -import state +from threads import StoppableThread class ReceiveQueueThread(StoppableThread): def __init__(self, num=0): super(ReceiveQueueThread, self).__init__(name="ReceiveQueue_%i" % num) - logger.info("init receive queue thread %i", num) def run(self): while not self._stopped and state.shutdown == 0: @@ -26,11 +24,12 @@ class ReceiveQueueThread(StoppableThread): break # cycle as long as there is data - # methods should return False if there isn't enough data, or the connection is to be aborted - - # state_* methods should return False if there isn't enough data, + # methods should return False if there isn't enough data, # or the connection is to be aborted + # state_* methods should return False if there isn't + # enough data, or the connection is to be aborted + try: connection = BMConnectionPool().getConnectionByAddr(dest) # KeyError = connection object not found @@ -40,13 +39,13 @@ class ReceiveQueueThread(StoppableThread): try: connection.process() # UnknownStateError = state isn't implemented - except (UnknownStateError): + except UnknownStateError: pass except socket.error as err: if err.errno == errno.EBADF: connection.set_state("close", 0) else: - logger.error("Socket error: %s", str(err)) + self.logger.error('Socket error: %s', err) except: - logger.error("Error processing", exc_info=True) + self.logger.error('Error processing', exc_info=True) receiveDataQueue.task_done() diff --git a/src/network/tcp.py b/src/network/tcp.py index da02df2f..368ca5e0 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -4,6 +4,7 @@ src/network/tcp.py ================== """ +import logging import math import random import socket @@ -18,7 +19,6 @@ import protocol import shared import state from bmconfigparser import BMConfigParser -from debug import logger from helper_random import randomBytes from inventory import Inventory from network.advanceddispatcher import AdvancedDispatcher @@ -30,6 +30,8 @@ from network.socks5 import Socks5Connection from network.tls import TLSDispatcher from queues import UISignalQueue, invQueue, receiveDataQueue +logger = logging.getLogger('default') + class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instance-attributes diff --git a/src/network/threads.py b/src/network/threads.py new file mode 100644 index 00000000..9bdaa85d --- /dev/null +++ b/src/network/threads.py @@ -0,0 +1,49 @@ +"""Threading primitives for the network package""" + +import logging +import random +import threading +from contextlib import contextmanager + + +class StoppableThread(threading.Thread): + """Base class for application threads with stopThread method""" + name = None + logger = logging.getLogger('default') + + def __init__(self, name=None): + if name: + self.name = name + super(StoppableThread, self).__init__(name=self.name) + self.stop = threading.Event() + self._stopped = False + random.seed() + self.logger.info('Init thread %s', self.name) + + def stopThread(self): + """Stop the thread""" + self._stopped = True + self.stop.set() + + +class BusyError(threading.ThreadError): + """ + Thread error raised when another connection holds the lock + we are trying to acquire. + """ + pass + + +@contextmanager +def nonBlocking(lock): + """ + A context manager which acquires given lock non-blocking + and raises BusyError if failed to acquire. + """ + locked = lock.acquire(False) + if not locked: + raise BusyError + try: + yield + finally: + lock.release() diff --git a/src/network/tls.py b/src/network/tls.py index 17b1ee1f..52f17c29 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -2,17 +2,18 @@ SSL/TLS negotiation. """ +import logging import os import socket import ssl import sys -from debug import logger from network.advanceddispatcher import AdvancedDispatcher import network.asyncore_pollchoose as asyncore from queues import receiveDataQueue import paths +logger = logging.getLogger('default') _DISCONNECTED_SSL = frozenset((ssl.SSL_ERROR_EOF,)) diff --git a/src/network/udp.py b/src/network/udp.py index 01dc1f7b..97c6aee5 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -2,24 +2,27 @@ src/network/udp.py ================== """ +import logging import time import socket import state import protocol from bmproto import BMProto -from debug import logger from objectracker import ObjectTracker from queues import receiveDataQueue +logger = logging.getLogger('default') -class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes + +class UDPSocket(BMProto): # pylint: disable=too-many-instance-attributes """Bitmessage protocol over UDP (class)""" port = 8444 announceInterval = 60 def __init__(self, host=None, sock=None, announcing=False): - super(BMProto, self).__init__(sock=sock) # pylint: disable=bad-super-call + # pylint: disable=bad-super-call + super(BMProto, self).__init__(sock=sock) self.verackReceived = True self.verackSent = True # .. todo:: sort out streams @@ -79,7 +82,8 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attribut decodedIP = protocol.checkIPAddress(str(ip)) if stream not in state.streamsInWhichIAmParticipating: continue - if (seenTime < time.time() - self.maxTimeOffset or seenTime > time.time() + self.maxTimeOffset): + if (seenTime < time.time() - self.maxTimeOffset + or seenTime > time.time() + self.maxTimeOffset): continue if decodedIP is False: # if the address isn't local, interpret it as diff --git a/src/network/uploadthread.py b/src/network/uploadthread.py index 9b29ef0a..1b57bd9a 100644 --- a/src/network/uploadthread.py +++ b/src/network/uploadthread.py @@ -1,26 +1,23 @@ """ src/network/uploadthread.py """ -# pylint: disable=unsubscriptable-object import time import helper_random import protocol -from debug import logger -from helper_threading import StoppableThread from inventory import Inventory from network.connectionpool import BMConnectionPool from network.dandelion import Dandelion from randomtrackingdict import RandomTrackingDict +from threads import StoppableThread class UploadThread(StoppableThread): - """This is a thread that uploads the objects that the peers requested from me """ + """ + This is a thread that uploads the objects that the peers requested from me + """ maxBufSize = 2097152 # 2MB - - def __init__(self): - super(UploadThread, self).__init__(name="Uploader") - logger.info("init upload thread") + name = "Uploader" def run(self): while not self._stopped: @@ -47,22 +44,26 @@ class UploadThread(StoppableThread): if Dandelion().hasHash(chunk) and \ i != Dandelion().objectChildStem(chunk): i.antiIntersectionDelay() - logger.info('%s asked for a stem object we didn\'t offer to it.', - i.destination) + self.logger.info( + '%s asked for a stem object we didn\'t offer to it.', + i.destination) break try: - payload.extend(protocol.CreatePacket('object', - Inventory()[chunk].payload)) + payload.extend(protocol.CreatePacket( + 'object', Inventory()[chunk].payload)) chunk_count += 1 except KeyError: i.antiIntersectionDelay() - logger.info('%s asked for an object we don\'t have.', i.destination) + self.logger.info( + '%s asked for an object we don\'t have.', + i.destination) break if not chunk_count: continue i.append_write_buf(payload) - logger.debug("%s:%i Uploading %i objects", - i.destination.host, i.destination.port, chunk_count) + self.logger.debug( + '%s:%i Uploading %i objects', + i.destination.host, i.destination.port, chunk_count) uploaded += chunk_count if not uploaded: self.stop.wait(1) diff --git a/src/shutdown.py b/src/shutdown.py index f136ac75..85d11d67 100644 --- a/src/shutdown.py +++ b/src/shutdown.py @@ -3,15 +3,15 @@ import Queue import threading import time -from debug import logger -from helper_sql import sqlQuery, sqlStoredProcedure -from helper_threading import StoppableThread -from knownnodes import saveKnownNodes -from inventory import Inventory -from queues import ( - addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue) import shared import state +from debug import logger +from helper_sql import sqlQuery, sqlStoredProcedure +from inventory import Inventory +from knownnodes import saveKnownNodes +from network.threads import StoppableThread +from queues import ( + addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue) def doCleanShutdown(): diff --git a/src/upnp.py b/src/upnp.py index fdc4bc1d..b1ee2e7b 100644 --- a/src/upnp.py +++ b/src/upnp.py @@ -21,8 +21,8 @@ import state import tr from bmconfigparser import BMConfigParser from debug import logger -from helper_threading import StoppableThread from network.connectionpool import BMConnectionPool +from network.threads import StoppableThread def createRequestXML(service, action, arguments=None):