This commit is contained in:
Dmitri Bogomolov 2017-09-27 18:25:14 +03:00
parent 6548999a49
commit 42c46a2422
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
5 changed files with 225 additions and 102 deletions

View File

@ -19,7 +19,8 @@ sys.path.insert(0, app_dir)
import depends import depends
depends.check_dependencies() depends.check_dependencies()
import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully.
import signal
# The next 3 are used for the API # The next 3 are used for the API
from singleinstance import singleinstance from singleinstance import singleinstance
import errno import errno
@ -32,7 +33,9 @@ from random import randint
import getopt import getopt
from api import MySimpleXMLRPCRequestHandler, StoppableXMLRPCServer from api import MySimpleXMLRPCRequestHandler, StoppableXMLRPCServer
from helper_startup import isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections from helper_startup import (
isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections
)
import defaults import defaults
import shared import shared
@ -73,13 +76,15 @@ def connectToStream(streamNumber):
selfInitiatedConnections[streamNumber] = {} selfInitiatedConnections[streamNumber] = {}
if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections(): if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections():
# Some XP and Vista systems can only have 10 outgoing connections at a time. # Some XP and Vista systems can only have 10 outgoing connections
# at a time.
state.maximumNumberOfHalfOpenConnections = 9 state.maximumNumberOfHalfOpenConnections = 9
else: else:
state.maximumNumberOfHalfOpenConnections = 64 state.maximumNumberOfHalfOpenConnections = 64
try: try:
# don't overload Tor # don't overload Tor
if BMConfigParser().get('bitmessagesettings', 'socksproxytype') != 'none': if BMConfigParser().get(
'bitmessagesettings', 'socksproxytype') != 'none':
state.maximumNumberOfHalfOpenConnections = 4 state.maximumNumberOfHalfOpenConnections = 4
except: except:
pass pass
@ -94,6 +99,7 @@ def connectToStream(streamNumber):
BMConnectionPool().connectToStream(streamNumber) BMConnectionPool().connectToStream(streamNumber)
def _fixSocket(): def _fixSocket():
if sys.platform.startswith('linux'): if sys.platform.startswith('linux'):
socket.SO_BINDTODEVICE = 25 socket.SO_BINDTODEVICE = 25
@ -105,6 +111,7 @@ def _fixSocket():
# socket.inet_ntop but we can make one ourselves using ctypes # socket.inet_ntop but we can make one ourselves using ctypes
if not hasattr(socket, 'inet_ntop'): if not hasattr(socket, 'inet_ntop'):
addressToString = ctypes.windll.ws2_32.WSAAddressToStringA addressToString = ctypes.windll.ws2_32.WSAAddressToStringA
def inet_ntop(family, host): def inet_ntop(family, host):
if family == socket.AF_INET: if family == socket.AF_INET:
if len(host) != 4: if len(host) != 4:
@ -125,6 +132,7 @@ def _fixSocket():
# Same for inet_pton # Same for inet_pton
if not hasattr(socket, 'inet_pton'): if not hasattr(socket, 'inet_pton'):
stringToAddress = ctypes.windll.ws2_32.WSAStringToAddressA stringToAddress = ctypes.windll.ws2_32.WSAStringToAddressA
def inet_pton(family, host): def inet_pton(family, host):
buf = "\0" * 28 buf = "\0" * 28
lengthBuf = pack("I", len(buf)) lengthBuf = pack("I", len(buf))
@ -148,6 +156,7 @@ def _fixSocket():
if not hasattr(socket, 'IPV6_V6ONLY'): if not hasattr(socket, 'IPV6_V6ONLY'):
socket.IPV6_V6ONLY = 27 socket.IPV6_V6ONLY = 27
# This thread, of which there is only one, runs the API. # This thread, of which there is only one, runs the API.
class singleAPI(threading.Thread, helper_threading.StoppableThread): class singleAPI(threading.Thread, helper_threading.StoppableThread):
def __init__(self): def __init__(self):
@ -158,8 +167,10 @@ class singleAPI(threading.Thread, helper_threading.StoppableThread):
super(singleAPI, self).stopThread() super(singleAPI, self).stopThread()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try: try:
s.connect((BMConfigParser().get('bitmessagesettings', 'apiinterface'), BMConfigParser().getint( s.connect((
'bitmessagesettings', 'apiport'))) BMConfigParser().get('bitmessagesettings', 'apiinterface'),
BMConfigParser().getint('bitmessagesettings', 'apiport')
))
s.shutdown(socket.SHUT_RDWR) s.shutdown(socket.SHUT_RDWR)
s.close() s.close()
except: except:
@ -175,14 +186,18 @@ class singleAPI(threading.Thread, helper_threading.StoppableThread):
try: try:
if attempt > 0: if attempt > 0:
port = randint(32767, 65535) port = randint(32767, 65535)
se = StoppableXMLRPCServer((BMConfigParser().get('bitmessagesettings', 'apiinterface'), port), se = StoppableXMLRPCServer(
(BMConfigParser().get(
'bitmessagesettings', 'apiinterface'),
port),
MySimpleXMLRPCRequestHandler, True, True) MySimpleXMLRPCRequestHandler, True, True)
except socket.error as e: except socket.error as e:
if e.errno in (errno.EADDRINUSE, errno.WSAEADDRINUSE): if e.errno in (errno.EADDRINUSE, errno.WSAEADDRINUSE):
continue continue
else: else:
if attempt > 0: if attempt > 0:
BMConfigParser().set("bitmessagesettings", "apiport", str(port)) BMConfigParser().set(
"bitmessagesettings", "apiport", str(port))
BMConfigParser().save() BMConfigParser().save()
break break
se.register_introspection_functions() se.register_introspection_functions()
@ -197,14 +212,17 @@ if shared.useVeryEasyProofOfWorkForTesting:
defaults.networkDefaultPayloadLengthExtraBytes = int( defaults.networkDefaultPayloadLengthExtraBytes = int(
defaults.networkDefaultPayloadLengthExtraBytes / 100) defaults.networkDefaultPayloadLengthExtraBytes / 100)
class Main: class Main:
def start(self): def start(self):
_fixSocket() _fixSocket()
daemon = BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon') daemon = BMConfigParser().safeGetBoolean(
'bitmessagesettings', 'daemon')
try: try:
opts, args = getopt.getopt(sys.argv[1:], "hcd", opts, args = getopt.getopt(
sys.argv[1:], "hcd",
["help", "curses", "daemon"]) ["help", "curses", "daemon"])
except getopt.GetoptError: except getopt.GetoptError:
@ -237,40 +255,51 @@ class Main:
helper_bootstrap.knownNodes() helper_bootstrap.knownNodes()
# Start the address generation thread # Start the address generation thread
addressGeneratorThread = addressGenerator() addressGeneratorThread = addressGenerator()
addressGeneratorThread.daemon = True # close the main program even if there are threads left # close the main program even if there are threads left
addressGeneratorThread.daemon = True
addressGeneratorThread.start() addressGeneratorThread.start()
# Start the thread that calculates POWs # Start the thread that calculates POWs
singleWorkerThread = singleWorker() singleWorkerThread = singleWorker()
singleWorkerThread.daemon = True # close the main program even if there are threads left # close the main program even if there are threads left
singleWorkerThread.daemon = True
singleWorkerThread.start() singleWorkerThread.start()
# Start the SQL thread # Start the SQL thread
sqlLookup = sqlThread() sqlLookup = sqlThread()
sqlLookup.daemon = False # DON'T close the main program even if there are threads left. The closeEvent should command this thread to exit gracefully. # DON'T close the main program even if there are threads left.
# The closeEvent should command this thread to exit gracefully.
sqlLookup.daemon = False
sqlLookup.start() sqlLookup.start()
Inventory() # init Inventory() # init
DandelionStems() # init, needs to be early because other thread may access it early # init, needs to be early because other thread may access it early
DandelionStems()
# SMTP delivery thread # SMTP delivery thread
if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '': if daemon and BMConfigParser().safeGet(
"bitmessagesettings", "smtpdeliver", ''):
smtpDeliveryThread = smtpDeliver() smtpDeliveryThread = smtpDeliver()
smtpDeliveryThread.start() smtpDeliveryThread.start()
# SMTP daemon thread # SMTP daemon thread
if daemon and BMConfigParser().safeGetBoolean("bitmessagesettings", "smtpd"): if daemon and BMConfigParser().safeGetBoolean(
"bitmessagesettings", "smtpd"):
smtpServerThread = smtpServer() smtpServerThread = smtpServer()
smtpServerThread.start() smtpServerThread.start()
# Start the thread that calculates POWs # Start the thread that calculates POWs
objectProcessorThread = objectProcessor() objectProcessorThread = objectProcessor()
objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object. # DON'T close the main program even the thread remains.
# This thread checks the shutdown variable after processing
# each object.
objectProcessorThread.daemon = False
objectProcessorThread.start() objectProcessorThread.start()
# Start the cleanerThread # Start the cleanerThread
singleCleanerThread = singleCleaner() singleCleanerThread = singleCleaner()
singleCleanerThread.daemon = True # close the main program even if there are threads left # close the main program even if there are threads left
singleCleanerThread.daemon = True
singleCleanerThread.start() singleCleanerThread.start()
shared.reloadMyAddressHashes() shared.reloadMyAddressHashes()
@ -288,7 +317,8 @@ class Main:
call([apiNotifyPath, "startingUp"]) call([apiNotifyPath, "startingUp"])
singleAPIThread = singleAPI() singleAPIThread = singleAPI()
singleAPIThread.daemon = True # close the main program even if there are threads left # close the main program even if there are threads left
singleAPIThread.daemon = True
singleAPIThread.start() singleAPIThread.start()
BMConnectionPool() BMConnectionPool()
@ -314,23 +344,36 @@ class Main:
connectToStream(1) connectToStream(1)
if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'): if BMConfigParser().safeGetBoolean('bitmessagesettings', 'upnp'):
import upnp import upnp
upnpThread = upnp.uPnPThread() upnpThread = upnp.uPnPThread()
upnpThread.start() upnpThread.start()
if daemon == False and BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon') == False: if daemon is False and \
if state.curses == False: BMConfigParser().safeGetBoolean(
'bitmessagesettings', 'daemon') is False:
if state.curses is False:
if not depends.check_pyqt(): if not depends.check_pyqt():
print('PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon') print(
print('You can also run PyBitmessage with the new curses interface by providing \'-c\' as a commandline argument.') 'PyBitmessage requires PyQt unless you want'
' to run it as a daemon and interact with it'
' using the API. You can download PyQt from '
'http://www.riverbankcomputing.com/software/pyqt/download'
' or by searching Google for \'PyQt Download\'.'
' If you want to run in daemon mode, see '
'https://bitmessage.org/wiki/Daemon'
)
print(
'You can also run PyBitmessage with'
' the new curses interface by providing'
' \'-c\' as a commandline argument.'
)
sys.exit() sys.exit()
import bitmessageqt import bitmessageqt
bitmessageqt.run() bitmessageqt.run()
else: else:
if True: # if depends.check_curses():
# if depends.check_curses():
print('Running with curses') print('Running with curses')
import bitmessagecurses import bitmessagecurses
bitmessagecurses.runwrapper() bitmessagecurses.runwrapper()
@ -396,14 +439,14 @@ All parameters are optional.
print('Stopping Bitmessage Deamon.') print('Stopping Bitmessage Deamon.')
shutdown.doCleanShutdown() shutdown.doCleanShutdown()
# TODO: nice function but no one is using this
#TODO: nice function but no one is using this
def getApiAddress(self): def getApiAddress(self):
if not BMConfigParser().safeGetBoolean('bitmessagesettings', 'apienabled'): if not BMConfigParser().safeGetBoolean(
'bitmessagesettings', 'apienabled'):
return None return None
address = BMConfigParser().get('bitmessagesettings', 'apiinterface') address = BMConfigParser().get('bitmessagesettings', 'apiinterface')
port = BMConfigParser().getint('bitmessagesettings', 'apiport') port = BMConfigParser().getint('bitmessagesettings', 'apiport')
return {'address':address,'port':port} return {'address': address, 'port': port}
def main(): def main():

View File

@ -1,13 +1,12 @@
import threading import threading
import shared import shared
import time import time
import sys
import os import os
import tr#anslate import tr
from bmconfigparser import BMConfigParser from bmconfigparser import BMConfigParser
from helper_sql import * from helper_sql import sqlQuery, sqlExecute
from helper_threading import * from helper_threading import StoppableThread
from inventory import Inventory from inventory import Inventory
from network.connectionpool import BMConnectionPool from network.connectionpool import BMConnectionPool
from network.dandelion import DandelionStems from network.dandelion import DandelionStems
@ -48,9 +47,17 @@ class singleCleaner(threading.Thread, StoppableThread):
def run(self): def run(self):
timeWeLastClearedInventoryAndPubkeysTables = 0 timeWeLastClearedInventoryAndPubkeysTables = 0
try: try:
shared.maximumLengthOfTimeToBotherResendingMessages = (float(BMConfigParser().get('bitmessagesettings', 'stopresendingafterxdays')) * 24 * 60 * 60) + (float(BMConfigParser().get('bitmessagesettings', 'stopresendingafterxmonths')) * (60 * 60 * 24 *365)/12) shared.maximumLengthOfTimeToBotherResendingMessages = (
float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxdays')) *
24 * 60 * 60) + (
float(BMConfigParser().get(
'bitmessagesettings', 'stopresendingafterxmonths')) *
(60 * 60 * 24 * 365)/12)
except: except:
# Either the user hasn't set stopresendingafterxdays and stopresendingafterxmonths yet or the options are missing from the config file. # Either the user hasn't set stopresendingafterxdays and
# stopresendingafterxmonths yet or the options are missing
# from the config file.
shared.maximumLengthOfTimeToBotherResendingMessages = float('inf') shared.maximumLengthOfTimeToBotherResendingMessages = float('inf')
# initial wait # initial wait
@ -59,18 +66,23 @@ class singleCleaner(threading.Thread, StoppableThread):
while state.shutdown == 0: while state.shutdown == 0:
queues.UISignalQueue.put(( queues.UISignalQueue.put((
'updateStatusBar', 'Doing housekeeping (Flushing inventory in memory to disk...)')) 'updateStatusBar',
'Doing housekeeping (Flushing inventory in memory to disk...)'
))
Inventory().flush() Inventory().flush()
queues.UISignalQueue.put(('updateStatusBar', '')) queues.UISignalQueue.put(('updateStatusBar', ''))
protocol.broadcastToSendDataQueues(( # commands the sendData threads to send out a pong message
0, 'pong', 'no data')) # commands the sendData threads to send out a pong message if they haven't sent anything else in the last five minutes. The socket timeout-time is 10 minutes. # if they haven't sent anything else in the last five minutes.
# The socket timeout-time is 10 minutes.
protocol.broadcastToSendDataQueues((0, 'pong', 'no data'))
# If we are running as a daemon then we are going to fill up the UI # If we are running as a daemon then we are going to fill up the UI
# queue which will never be handled by a UI. We should clear it to # queue which will never be handled by a UI. We should clear it to
# save memory. # save memory.
if BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon'): if BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon'):
queues.UISignalQueue.queue.clear() queues.UISignalQueue.queue.clear()
if timeWeLastClearedInventoryAndPubkeysTables < int(time.time()) - 7380: if timeWeLastClearedInventoryAndPubkeysTables < \
int(time.time()) - 7380:
timeWeLastClearedInventoryAndPubkeysTables = int(time.time()) timeWeLastClearedInventoryAndPubkeysTables = int(time.time())
Inventory().clean() Inventory().clean()
# pubkeys # pubkeys
@ -78,14 +90,20 @@ class singleCleaner(threading.Thread, StoppableThread):
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''', '''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''',
int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys) int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
# Let us resend getpubkey objects if we have not yet heard a pubkey, and also msg objects if we have not yet heard an acknowledgement # Let us resend getpubkey objects if we have not yet heard
# a pubkey, and also msg objects if we have not yet heard
# an acknowledgement
queryreturn = sqlQuery( queryreturn = sqlQuery(
'''select toaddress, ackdata, status FROM sent WHERE ((status='awaitingpubkey' OR status='msgsent') AND folder='sent' AND sleeptill<? AND senttime>?) ''', '''select toaddress, ackdata, status FROM sent WHERE ((status='awaitingpubkey' OR status='msgsent') AND folder='sent' AND sleeptill<? AND senttime>?) ''',
int(time.time()), int(time.time()),
int(time.time()) - shared.maximumLengthOfTimeToBotherResendingMessages) int(time.time()) - shared.maximumLengthOfTimeToBotherResendingMessages)
for row in queryreturn: for row in queryreturn:
if len(row) < 2: if len(row) < 2:
logger.error('Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row)) logger.error(
'Something went wrong in the singleCleaner thread:'
' a query did not return the requested fields. %r',
repr(row)
)
self.stop.wait(3) self.stop.wait(3)
break break
toAddress, ackData, status = row toAddress, ackData, status = row
@ -96,7 +114,7 @@ class singleCleaner(threading.Thread, StoppableThread):
# cleanup old nodes # cleanup old nodes
now = int(time.time()) now = int(time.time())
toDelete = [] # toDelete = []
with knownnodes.knownNodesLock: with knownnodes.knownNodesLock:
for stream in knownnodes.knownNodes: for stream in knownnodes.knownNodes:
for node in knownnodes.knownNodes[stream].keys(): for node in knownnodes.knownNodes[stream].keys():
@ -107,14 +125,24 @@ class singleCleaner(threading.Thread, StoppableThread):
except TypeError: except TypeError:
print "Error in %s" % (str(node)) print "Error in %s" % (str(node))
# Let us write out the knowNodes to disk if there is anything new to write out. # Let us write out the knowNodes to disk
# if there is anything new to write out.
if shared.needToWriteKnownNodesToDisk: if shared.needToWriteKnownNodesToDisk:
try: try:
knownnodes.saveKnownNodes() knownnodes.saveKnownNodes()
except Exception as err: except Exception as err:
if "Errno 28" in str(err): if "Errno 28" in str(err):
logger.fatal('(while receiveDataThread knownnodes.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ') logger.fatal(
queues.UISignalQueue.put(('alert', (tr._translate("MainWindow", "Disk full"), tr._translate("MainWindow", 'Alert: Your disk or data storage volume is full. Bitmessage will now exit.'), True))) '(while receiveDataThread knownnodes.needToWriteKnownNodesToDisk) Alert: Your disk or data storage volume is full. ')
queues.UISignalQueue.put((
'alert',
(tr._translate("MainWindow", "Disk full"),
tr._translate(
"MainWindow",
'Alert: Your disk or data storage volume'
' is full. Bitmessage will now exit.'),
True)
))
if shared.daemon: if shared.daemon:
os._exit(0) os._exit(0)
shared.needToWriteKnownNodesToDisk = False shared.needToWriteKnownNodesToDisk = False
@ -125,7 +153,8 @@ class singleCleaner(threading.Thread, StoppableThread):
thread.downloadQueue.clear() thread.downloadQueue.clear()
# inv/object tracking # inv/object tracking
for connection in BMConnectionPool().inboundConnections.values() + BMConnectionPool().outboundConnections.values(): for connection in BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
connection.clean() connection.clean()
# dandelion fluff trigger by expiration # dandelion fluff trigger by expiration
for h, t in DandelionStems().timeouts: for h, t in DandelionStems().timeouts:
@ -147,25 +176,38 @@ class singleCleaner(threading.Thread, StoppableThread):
def resendPubkeyRequest(address): def resendPubkeyRequest(address):
logger.debug('It has been a long time and we haven\'t heard a response to our getpubkey request. Sending again.') logger.debug(
'It has been a long time and we haven\'t heard a response to our'
' getpubkey request. Sending again.'
)
try: try:
del state.neededPubkeys[ # We need to take this entry out of the neededPubkeys structure
address] # We need to take this entry out of the neededPubkeys structure because the queues.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently. # because the queues.workerQueue checks to see whether the entry
# is already present and will not do the POW and send the message
# because it assumes that it has already done it recently.
del state.neededPubkeys[address]
except: except:
pass pass
queues.UISignalQueue.put(( queues.UISignalQueue.put((
'updateStatusBar', 'Doing work necessary to again attempt to request a public key...')) 'updateStatusBar',
'Doing work necessary to again attempt to request a public key...'))
sqlExecute( sqlExecute(
'''UPDATE sent SET status='msgqueued' WHERE toaddress=?''', '''UPDATE sent SET status='msgqueued' WHERE toaddress=?''',
address) address)
queues.workerQueue.put(('sendmessage', '')) queues.workerQueue.put(('sendmessage', ''))
def resendMsg(ackdata): def resendMsg(ackdata):
logger.debug('It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.') logger.debug(
'It has been a long time and we haven\'t heard an acknowledgement'
' to our msg. Sending again.'
)
sqlExecute( sqlExecute(
'''UPDATE sent SET status='msgqueued' WHERE ackdata=?''', '''UPDATE sent SET status='msgqueued' WHERE ackdata=?''',
ackdata) ackdata)
queues.workerQueue.put(('sendmessage', '')) queues.workerQueue.put(('sendmessage', ''))
queues.UISignalQueue.put(( queues.UISignalQueue.put((
'updateStatusBar', 'Doing work necessary to again attempt to deliver a message...')) 'updateStatusBar',
'Doing work necessary to again attempt to deliver a message...'
))

View File

@ -1,4 +1,3 @@
import os
import socket import socket
import sys import sys
from binascii import hexlify, unhexlify from binascii import hexlify, unhexlify
@ -11,6 +10,7 @@ from debug import logger
import queues import queues
import shutdown import shutdown
def powQueueSize(): def powQueueSize():
curWorkerQueue = queues.workerQueue.qsize() curWorkerQueue = queues.workerQueue.qsize()
for thread in enumerate(): for thread in enumerate():
@ -21,6 +21,7 @@ def powQueueSize():
pass pass
return curWorkerQueue return curWorkerQueue
def convertIntToString(n): def convertIntToString(n):
a = __builtins__.hex(n) a = __builtins__.hex(n)
if a[-1:] == 'L': if a[-1:] == 'L':
@ -30,24 +31,33 @@ def convertIntToString(n):
else: else:
return unhexlify('0' + a[2:]) return unhexlify('0' + a[2:])
def convertStringToInt(s): def convertStringToInt(s):
return int(hexlify(s), 16) return int(hexlify(s), 16)
def allThreadTraceback(frame): def allThreadTraceback(frame):
id2name = dict([(th.ident, th.name) for th in enumerate()]) id2name = dict([(th.ident, th.name) for th in enumerate()])
code = [] code = []
for threadId, stack in sys._current_frames().items(): for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId)) code.append("\n# Thread: %s(%d)" %
(id2name.get(threadId, ""), threadId))
for filename, lineno, name, line in traceback.extract_stack(stack): for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name)) code.append('File: "%s", line %d, in %s' %
(filename, lineno, name))
if line: if line:
code.append(" %s" % (line.strip())) code.append(" %s" % (line.strip()))
print "\n".join(code) print "\n".join(code)
def signal_handler(signal, frame): def signal_handler(signal, frame):
logger.error("Got signal %i in %s/%s", signal, current_process().name, current_thread().name) logger.error(
"Got signal %i in %s/%s",
signal, current_process().name, current_thread().name
)
if current_process().name == "RegExParser": if current_process().name == "RegExParser":
# on Windows this isn't triggered, but it's fine, it has its own process termination thing # on Windows this isn't triggered, but it's fine,
# it has its own process termination thing
raise SystemExit raise SystemExit
if "PoolWorker" in current_process().name: if "PoolWorker" in current_process().name:
raise SystemExit raise SystemExit
@ -58,10 +68,12 @@ def signal_handler(signal, frame):
shutdown.doCleanShutdown() shutdown.doCleanShutdown()
else: else:
allThreadTraceback(frame) allThreadTraceback(frame)
print 'Unfortunately you cannot use Ctrl+C when running the UI because the UI captures the signal.' print('Unfortunately you cannot use Ctrl+C when running the UI'
' because the UI captures the signal.')
def isHostInPrivateIPRange(host): def isHostInPrivateIPRange(host):
if ":" in host: #IPv6 if ":" in host: # IPv6
hostAddr = socket.inet_pton(socket.AF_INET6, host) hostAddr = socket.inet_pton(socket.AF_INET6, host)
if hostAddr == ('\x00' * 15) + '\x01': if hostAddr == ('\x00' * 15) + '\x01':
return False return False
@ -84,5 +96,6 @@ def isHostInPrivateIPRange(host):
return True return True
return False return False
def addDataPadding(data, desiredMsgLength = 12, paddingChar = '\x00'):
def addDataPadding(data, desiredMsgLength=12, paddingChar='\x00'):
return data + paddingChar * (desiredMsgLength - len(data)) return data + paddingChar * (desiredMsgLength - len(data))

View File

@ -12,53 +12,68 @@ from helper_threading import StoppableThread
from knownnodes import saveKnownNodes from knownnodes import saveKnownNodes
from inventory import Inventory from inventory import Inventory
import protocol import protocol
from queues import addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue from queues import (
addressGeneratorQueue, objectProcessorQueue, UISignalQueue, workerQueue)
import shared import shared
import state import state
def doCleanShutdown(): def doCleanShutdown():
state.shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. # Used to tell proof of work worker threads and the objectProcessorThread
# to exit.
state.shutdown = 1
protocol.broadcastToSendDataQueues((0, 'shutdown', 'no data')) protocol.broadcastToSendDataQueues((0, 'shutdown', 'no data'))
objectProcessorQueue.put(('checkShutdownVariable', 'no data')) objectProcessorQueue.put(('checkShutdownVariable', 'no data'))
for thread in threading.enumerate(): for thread in threading.enumerate():
if thread.isAlive() and isinstance(thread, StoppableThread): if thread.isAlive() and isinstance(thread, StoppableThread):
thread.stopThread() thread.stopThread()
UISignalQueue.put(('updateStatusBar','Saving the knownNodes list of peers to disk...')) UISignalQueue.put((
'updateStatusBar',
'Saving the knownNodes list of peers to disk...'))
logger.info('Saving knownNodes list of peers to disk') logger.info('Saving knownNodes list of peers to disk')
saveKnownNodes() saveKnownNodes()
logger.info('Done saving knownNodes list of peers to disk') logger.info('Done saving knownNodes list of peers to disk')
UISignalQueue.put(('updateStatusBar','Done saving the knownNodes list of peers to disk.')) UISignalQueue.put((
'updateStatusBar',
'Done saving the knownNodes list of peers to disk.'))
logger.info('Flushing inventory in memory out to disk...') logger.info('Flushing inventory in memory out to disk...')
UISignalQueue.put(( UISignalQueue.put((
'updateStatusBar', 'updateStatusBar',
'Flushing inventory in memory out to disk. This should normally only take a second...')) 'Flushing inventory in memory out to disk.'
' This should normally only take a second...'))
Inventory().flush() Inventory().flush()
# Verify that the objectProcessor has finished exiting. It should have incremented the # Verify that the objectProcessor has finished exiting. It should have
# shutdown variable from 1 to 2. This must finish before we command the sqlThread to exit. # incremented the shutdown variable from 1 to 2. This must finish before
# we command the sqlThread to exit.
while state.shutdown == 1: while state.shutdown == 1:
time.sleep(.1) time.sleep(.1)
# This one last useless query will guarantee that the previous flush committed and that the # This one last useless query will guarantee that the previous flush
# committed and that the
# objectProcessorThread committed before we close the program. # objectProcessorThread committed before we close the program.
sqlQuery('SELECT address FROM subscriptions') sqlQuery('SELECT address FROM subscriptions')
logger.info('Finished flushing inventory.') logger.info('Finished flushing inventory.')
sqlStoredProcedure('exit') sqlStoredProcedure('exit')
# Wait long enough to guarantee that any running proof of work worker threads will check the # Wait long enough to guarantee that any running proof of work worker
# shutdown variable and exit. If the main thread closes before they do then they won't stop. # threads will check the shutdown variable and exit. If the main thread
# closes before they do then they won't stop.
time.sleep(.25) time.sleep(.25)
for thread in threading.enumerate(): for thread in threading.enumerate():
if isinstance(thread, sendDataThread): if isinstance(thread, sendDataThread):
thread.sendDataThreadQueue.put((0, 'shutdown','no data')) thread.sendDataThreadQueue.put((0, 'shutdown', 'no data'))
if thread is not threading.currentThread() and isinstance(thread, StoppableThread) and not isinstance(thread, outgoingSynSender): if thread is not threading.currentThread() \
and isinstance(thread, StoppableThread) \
and not isinstance(thread, outgoingSynSender):
logger.debug("Waiting for thread %s", thread.name) logger.debug("Waiting for thread %s", thread.name)
thread.join() thread.join()
# flush queued # flush queued
for queue in (workerQueue, UISignalQueue, addressGeneratorQueue, objectProcessorQueue): for queue in (workerQueue, UISignalQueue, addressGeneratorQueue,
objectProcessorQueue):
while True: while True:
try: try:
queue.get(False) queue.get(False)

View File

@ -1,22 +1,23 @@
#! /usr/bin/env python #! /usr/bin/env python
import atexit import atexit
import errno
from multiprocessing import Process
import os import os
import sys import sys
import state import state
try: try:
import fcntl # @UnresolvedImport import fcntl # @UnresolvedImport
except: except ImportError:
pass pass
class singleinstance: class singleinstance:
""" """
Implements a single instance application by creating a lock file at appdata. Implements a single instance application by creating a lock file
at appdata.
This is based upon the singleton class from tendo https://github.com/pycontribs/tendo This is based upon the singleton class from tendo
https://github.com/pycontribs/tendo
which is under the Python Software Foundation License version 2 which is under the Python Software Foundation License version 2
""" """
def __init__(self, flavor_id="", daemon=False): def __init__(self, flavor_id="", daemon=False):
@ -24,7 +25,8 @@ class singleinstance:
self.counter = 0 self.counter = 0
self.daemon = daemon self.daemon = daemon
self.lockPid = None self.lockPid = None
self.lockfile = os.path.normpath(os.path.join(state.appdata, 'singleton%s.lock' % flavor_id)) self.lockfile = os.path.normpath(
os.path.join(state.appdata, 'singleton%s.lock' % flavor_id))
if not self.daemon and not state.curses: if not self.daemon and not state.curses:
# Tells the already running (if any) application to get focus. # Tells the already running (if any) application to get focus.
@ -41,14 +43,21 @@ class singleinstance:
self.lockPid = os.getpid() self.lockPid = os.getpid()
if sys.platform == 'win32': if sys.platform == 'win32':
try: try:
# file already exists, we try to remove (in case previous execution was interrupted) # file already exists, we try to remove
# (in case previous execution was interrupted)
if os.path.exists(self.lockfile): if os.path.exists(self.lockfile):
os.unlink(self.lockfile) os.unlink(self.lockfile)
self.fd = os.open(self.lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR | os.O_TRUNC) self.fd = os.open(
self.lockfile,
os.O_CREAT | os.O_EXCL | os.O_RDWR | os.O_TRUNC
)
except OSError: except OSError:
type, e, tb = sys.exc_info() type, e, tb = sys.exc_info()
if e.errno == 13: if e.errno == 13:
print 'Another instance of this application is already running' print(
'Another instance of this application'
' is already running'
)
sys.exit(-1) sys.exit(-1)
print(e.errno) print(e.errno)
raise raise
@ -59,7 +68,8 @@ class singleinstance:
self.fp = open(self.lockfile, 'a+') self.fp = open(self.lockfile, 'a+')
try: try:
if self.daemon and self.lockPid != os.getpid(): if self.daemon and self.lockPid != os.getpid():
fcntl.lockf(self.fp, fcntl.LOCK_EX) # wait for parent to finish # wait for parent to finish
fcntl.lockf(self.fp, fcntl.LOCK_EX)
else: else:
fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB) fcntl.lockf(self.fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
self.lockPid = os.getpid() self.lockPid = os.getpid()
@ -88,5 +98,5 @@ class singleinstance:
fcntl.lockf(self.fp, fcntl.LOCK_UN) fcntl.lockf(self.fp, fcntl.LOCK_UN)
if os.path.isfile(self.lockfile): if os.path.isfile(self.lockfile):
os.unlink(self.lockfile) os.unlink(self.lockfile)
except Exception, e: except Exception:
pass pass