Continuing migration from print-based logging.
This commit is contained in:
parent
5e93256be6
commit
87cbfe9434
|
@ -8,7 +8,7 @@
|
|||
# yet contain logic to expand into further streams.
|
||||
|
||||
# The software version variable is now held in shared.py
|
||||
verbose = 1
|
||||
verbose=2
|
||||
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # Equals two days and 12 hours.
|
||||
lengthOfTimeToLeaveObjectsInInventory = 237600 # Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice.
|
||||
lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time.
|
||||
|
@ -48,6 +48,7 @@ from subprocess import call # used when the API must execute an outside program
|
|||
import singleton
|
||||
import proofofwork
|
||||
# Logging
|
||||
# TODO(fiatflux): Add option to suppress extra logging info (e.g. tracebacks) in stdout/stderr.
|
||||
from debug import logger
|
||||
|
||||
# For each stream to which we connect, several outgoingSynSender threads
|
||||
|
@ -65,7 +66,8 @@ class outgoingSynSender(threading.Thread):
|
|||
time.sleep(1)
|
||||
global alreadyAttemptedConnectionsListResetTime
|
||||
while True:
|
||||
while len(selfInitiatedConnections[self.streamNumber]) >= 8: # maximum number of outgoing connections = 8
|
||||
# Maximum number of outgoing connections = 8.
|
||||
while len(selfInitiatedConnections[self.streamNumber]) >= 8:
|
||||
time.sleep(10)
|
||||
if shared.shutdown:
|
||||
break
|
||||
|
@ -74,7 +76,7 @@ class outgoingSynSender(threading.Thread):
|
|||
alreadyAttemptedConnectionsListLock.acquire()
|
||||
while HOST in alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList:
|
||||
alreadyAttemptedConnectionsListLock.release()
|
||||
# print 'choosing new sample'
|
||||
logger.debug('Choosing new sample.')
|
||||
random.seed()
|
||||
HOST, = random.sample(shared.knownNodes[
|
||||
self.streamNumber], 1)
|
||||
|
@ -96,23 +98,18 @@ class outgoingSynSender(threading.Thread):
|
|||
# can rebind faster
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.settimeout(20)
|
||||
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and verbose >= 2:
|
||||
#shared.printLock.acquire()
|
||||
#print 'Trying an outgoing connection to', HOST, ':', PORT
|
||||
logger.info('Trying an outgoing connection to %s:%s' % (HOST, PORT))
|
||||
#shared.printLock.release()
|
||||
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none':
|
||||
logger.debug('Trying an outgoing connection to %s:%s' % (HOST, PORT))
|
||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
|
||||
if verbose >= 2:
|
||||
shared.printLock.acquire()
|
||||
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
|
||||
shared.printLock.release()
|
||||
logger.debug('(Using SOCKS4a) Trying an outgoing connection to %s:%s' % (HOST, PORT))
|
||||
proxytype = socks.PROXY_TYPE_SOCKS4
|
||||
sockshostname = shared.config.get(
|
||||
'bitmessagesettings', 'sockshostname')
|
||||
socksport = shared.config.getint(
|
||||
'bitmessagesettings', 'socksport')
|
||||
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
|
||||
# Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be
|
||||
# doing any domain name lookups anyway.
|
||||
rdns = True
|
||||
if shared.config.getboolean('bitmessagesettings', 'socksauthentication'):
|
||||
socksusername = shared.config.get(
|
||||
'bitmessagesettings', 'socksusername')
|
||||
|
@ -124,16 +121,15 @@ class outgoingSynSender(threading.Thread):
|
|||
sock.setproxy(
|
||||
proxytype, sockshostname, socksport, rdns)
|
||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
||||
if verbose >= 2:
|
||||
shared.printLock.acquire()
|
||||
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
|
||||
shared.printLock.release()
|
||||
logger.debug('(Using SOCKS5) Trying an outgoing connection to %s:%s' % (HOST, PORT))
|
||||
proxytype = socks.PROXY_TYPE_SOCKS5
|
||||
sockshostname = shared.config.get(
|
||||
'bitmessagesettings', 'sockshostname')
|
||||
socksport = shared.config.getint(
|
||||
'bitmessagesettings', 'socksport')
|
||||
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
|
||||
# Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be
|
||||
# doing any domain name lookups anyway.
|
||||
rdns = True
|
||||
if shared.config.getboolean('bitmessagesettings', 'socksauthentication'):
|
||||
socksusername = shared.config.get(
|
||||
'bitmessagesettings', 'socksusername')
|
||||
|
@ -153,9 +149,7 @@ class outgoingSynSender(threading.Thread):
|
|||
rd.setup(sock, HOST, PORT, self.streamNumber,
|
||||
objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||
rd.start()
|
||||
shared.printLock.acquire()
|
||||
print self, 'connected to', HOST, 'during an outgoing attempt.'
|
||||
shared.printLock.release()
|
||||
logger.info('%s connected to %s during an outgoing attempt.' % (self, HOST))
|
||||
|
||||
sd = sendDataThread()
|
||||
sd.setup(sock, HOST, PORT, self.streamNumber,
|
||||
|
@ -164,48 +158,43 @@ class outgoingSynSender(threading.Thread):
|
|||
sd.sendVersionMessage()
|
||||
|
||||
except socks.GeneralProxyError as err:
|
||||
if verbose >= 2:
|
||||
shared.printLock.acquire()
|
||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||
shared.printLock.release()
|
||||
# TODO(fiatflux): turn off traceback, but preserve terse exception info.
|
||||
logger.debug('Could NOT connect to %s during outgoing attempt.', HOST, exc_info=True)
|
||||
PORT, timeLastSeen = shared.knownNodes[
|
||||
self.streamNumber][HOST]
|
||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
|
||||
# for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the
|
||||
# shared.knownNodes data-structure.
|
||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000:
|
||||
shared.knownNodesLock.acquire()
|
||||
del shared.knownNodes[self.streamNumber][HOST]
|
||||
shared.knownNodesLock.release()
|
||||
shared.printLock.acquire()
|
||||
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||
shared.printLock.release()
|
||||
logger.debug('deleting %s from shared.knownNodes because it is more than 48 hours old and we '
|
||||
'could not connect to it.' % (HOST))
|
||||
except socks.Socks5AuthError as err:
|
||||
shared.UISignalQueue.put((
|
||||
'updateStatusBar', _translate(
|
||||
"MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err))))
|
||||
except socks.Socks5Error as err:
|
||||
pass
|
||||
print 'SOCKS5 error. (It is possible that the server wants authentication).)', str(err)
|
||||
logger.exception('SOCKS5 error. It is possible that the server wants authentication.')
|
||||
except socks.Socks4Error as err:
|
||||
print 'Socks4Error:', err
|
||||
logger.exception('Socks4Error')
|
||||
except socket.error as err:
|
||||
if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
||||
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
|
||||
logger.exception('Bitmessage MIGHT be having trouble connecting to the SOCKS server.')
|
||||
else:
|
||||
if verbose >= 1:
|
||||
shared.printLock.acquire()
|
||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||
shared.printLock.release()
|
||||
logger.info('Could NOT connect to %s during outgoing attempt.', HOST, exc_info=True)
|
||||
PORT, timeLastSeen = shared.knownNodes[
|
||||
self.streamNumber][HOST]
|
||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
|
||||
# for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the
|
||||
# knownNodes data-structure.
|
||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000:
|
||||
shared.knownNodesLock.acquire()
|
||||
del shared.knownNodes[self.streamNumber][HOST]
|
||||
shared.knownNodesLock.release()
|
||||
shared.printLock.acquire()
|
||||
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||
shared.printLock.release()
|
||||
logger.info('Deleting %s from knownNodes because it is more than 48 hours old and we '
|
||||
'could not connect to it.' % (HOST))
|
||||
except Exception as err:
|
||||
sys.stderr.write(
|
||||
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: %s\n' % err)
|
||||
logger.exception('An uncaught exception has occurred in the outgoingSynSender thread.')
|
||||
time.sleep(0.1)
|
||||
|
||||
# Only one singleListener thread will ever exist. It creates the
|
||||
|
@ -228,9 +217,7 @@ class singleListener(threading.Thread):
|
|||
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
||||
time.sleep(300)
|
||||
|
||||
shared.printLock.acquire()
|
||||
print 'Listening for incoming connections.'
|
||||
shared.printLock.release()
|
||||
logger.info('Listening for incoming connections.')
|
||||
HOST = '' # Symbolic name meaning all available interfaces
|
||||
PORT = shared.config.getint('bitmessagesettings', 'port')
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
@ -247,9 +234,7 @@ class singleListener(threading.Thread):
|
|||
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
||||
time.sleep(10)
|
||||
while len(shared.connectedHostsList) > 220:
|
||||
shared.printLock.acquire()
|
||||
print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.'
|
||||
shared.printLock.release()
|
||||
logger.info('We are connected to too many people. Not accepting further incoming connections for ten seconds.')
|
||||
time.sleep(10)
|
||||
a, (HOST, PORT) = sock.accept()
|
||||
|
||||
|
@ -258,9 +243,7 @@ class singleListener(threading.Thread):
|
|||
# because the two computers will share the same external IP. This
|
||||
# is here to prevent connection flooding.
|
||||
while HOST in shared.connectedHostsList:
|
||||
shared.printLock.acquire()
|
||||
print 'We are already connected to', HOST + '. Ignoring connection.'
|
||||
shared.printLock.release()
|
||||
logger.warning('We are already connected to', HOST + '. Ignoring connection.')
|
||||
a.close()
|
||||
a, (HOST, PORT) = sock.accept()
|
||||
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
|
||||
|
@ -277,9 +260,7 @@ class singleListener(threading.Thread):
|
|||
a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||
rd.start()
|
||||
|
||||
shared.printLock.acquire()
|
||||
print self, 'connected to', HOST, 'during INCOMING request.'
|
||||
shared.printLock.release()
|
||||
logger.info(self, 'connected to %s during INCOMING request.' % (HOST))
|
||||
|
||||
# This thread is created either by the synSenderThread(for outgoing
|
||||
# connections) or the singleListenerThread(for incoming connectiosn).
|
||||
|
@ -304,106 +285,109 @@ class receiveDataThread(threading.Thread):
|
|||
self.HOST = HOST
|
||||
self.PORT = port
|
||||
self.streamNumber = streamNumber
|
||||
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
|
||||
# This is the protocol payload length thus it does NOT include the 24 byte message header.
|
||||
self.payloadLength = 0
|
||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
|
||||
|
||||
# The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it
|
||||
# to this list so that an outgoingSynSender thread doesn't try to connect to it..
|
||||
shared.connectedHostsList[
|
||||
self.HOST] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
||||
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
||||
if self.streamNumber == -1: # This was an incoming connection. Send out a version message if we accept the other node's version message.
|
||||
self.HOST] = 0
|
||||
|
||||
# set to true after the remote node and I accept each other's version messages. This is needed to allow the user
|
||||
# interface to accurately reflect the current number of connections.
|
||||
self.connectionIsOrWasFullyEstablished = False
|
||||
|
||||
# This was an incoming connection. Send out a version message if we accept the other node's version message.
|
||||
if self.streamNumber == -1:
|
||||
self.initiatedConnection = False
|
||||
else:
|
||||
self.initiatedConnection = True
|
||||
selfInitiatedConnections[streamNumber][self] = 0
|
||||
|
||||
# When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here
|
||||
# until we are done processing all other data received from this peer.
|
||||
self.ackDataThatWeHaveYetToSend = [
|
||||
] # When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer.
|
||||
]
|
||||
self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||
|
||||
def run(self):
|
||||
shared.printLock.acquire()
|
||||
print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||
shared.printLock.release()
|
||||
logger.info(
|
||||
'ID of the receiveDataThread is %s. The size of the shared.connectedHostsList is now',
|
||||
str(id(self)), len(shared.connectedHostsList))
|
||||
while True:
|
||||
try:
|
||||
self.data += self.sock.recv(4096)
|
||||
except socket.timeout:
|
||||
shared.printLock.acquire()
|
||||
print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||
shared.printLock.release()
|
||||
logger.info('Timeout occurred waiting for data from %s. Closing receiveData thread. (ID:%s)',
|
||||
self.HOST, id(self))
|
||||
break
|
||||
except Exception as err:
|
||||
shared.printLock.acquire()
|
||||
print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err
|
||||
shared.printLock.release()
|
||||
logger.exception('sock.recv error. Closing receiveData thread (HOST:%s, ID:%s).',
|
||||
self.HOST, id(self))
|
||||
break
|
||||
# print 'Received', repr(self.data)
|
||||
logger.debug('Received data: %s.' % repr(self.data))
|
||||
if self.data == "":
|
||||
shared.printLock.acquire()
|
||||
print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||
shared.printLock.release()
|
||||
logger.info('Connection to %s closed. Closing receiveData thread. (ID:%s)', self.HOST, id(self))
|
||||
break
|
||||
else:
|
||||
self.processData()
|
||||
|
||||
try:
|
||||
del selfInitiatedConnections[self.streamNumber][self]
|
||||
shared.printLock.acquire()
|
||||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||
shared.printLock.release()
|
||||
logger.info('Removed self (a receiveDataThread) from selfInitiatedConnections')
|
||||
except:
|
||||
pass
|
||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
||||
try:
|
||||
del shared.connectedHostsList[self.HOST]
|
||||
except Exception as err:
|
||||
shared.printLock.acquire()
|
||||
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
|
||||
shared.printLock.release()
|
||||
logger.exception('Could not delete from shared.connectedHostsList.' % (self.host))
|
||||
try:
|
||||
del numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||
self.HOST]
|
||||
except:
|
||||
pass
|
||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||
shared.printLock.acquire()
|
||||
print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList)
|
||||
shared.printLock.release()
|
||||
logger.info('The size of the connectedHostsList is now: %s' % len(shared.connectedHostsList))
|
||||
|
||||
def processData(self):
|
||||
global verbose
|
||||
# if verbose >= 3:
|
||||
# shared.printLock.acquire()
|
||||
# print 'self.data is currently ', repr(self.data)
|
||||
# shared.printLock.release()
|
||||
logger.debug('self.data is currently %s', self.data)
|
||||
if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length
|
||||
return
|
||||
if self.data[0:4] != '\xe9\xbe\xb4\xd9':
|
||||
if verbose >= 1:
|
||||
shared.printLock.acquire()
|
||||
print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
|
||||
shared.printLock.release()
|
||||
logger.debug('The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40]))
|
||||
self.data = ""
|
||||
return
|
||||
self.payloadLength, = unpack('>L', self.data[16:20])
|
||||
if len(self.data) < self.payloadLength + 24: # check if the whole message has arrived yet.
|
||||
return
|
||||
if self.data[20:24] != hashlib.sha512(self.data[24:self.payloadLength + 24]).digest()[0:4]: # test the checksum in the message. If it is correct...
|
||||
print 'Checksum incorrect. Clearing this message.'
|
||||
# test the checksum in the message. If it is correct...
|
||||
if self.data[20:24] != hashlib.sha512(self.data[24:self.payloadLength + 24]).digest()[0:4]:
|
||||
logger.info('Checksum incorrect. Clearing this message.')
|
||||
self.data = self.data[self.payloadLength + 24:]
|
||||
self.processData()
|
||||
return
|
||||
|
||||
# The time we've last seen this node is obviously right now since we
|
||||
# just received valid data from it. So update the knownNodes list so
|
||||
# that other peers can be made aware of its existance.
|
||||
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
||||
#
|
||||
# The remote port is only something we should share with others if it is the remote node's incoming port (rather
|
||||
# than some random operating-system-assigned outgoing port).
|
||||
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished:
|
||||
shared.knownNodesLock.acquire()
|
||||
shared.knownNodes[self.streamNumber][
|
||||
self.HOST] = (self.PORT, int(time.time()))
|
||||
shared.knownNodesLock.release()
|
||||
if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
|
||||
|
||||
# If the size of the message is greater than 180MB, ignore it. (I get
|
||||
# memory errors when processing messages much larger than this though
|
||||
# it is concievable that this value will have to be lowered if some
|
||||
# systems are less tolarant of large messages.)
|
||||
if self.payloadLength <= 180000000:
|
||||
remoteCommand = self.data[4:16]
|
||||
shared.printLock.acquire()
|
||||
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST
|
||||
shared.printLock.release()
|
||||
logger.warning('remoteCommand %s from %s', remoteCommand.replace('\x00', ''), self.HOST)
|
||||
if remoteCommand == 'version\x00\x00\x00\x00\x00':
|
||||
self.recversion(self.data[24:self.payloadLength + 24])
|
||||
elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00':
|
||||
|
|
Reference in New Issue
Block a user