From e8af4d813dbb1d31d0095f2dae4756b9c562be59 Mon Sep 17 00:00:00 2001 From: f97ada87 Date: Sat, 10 Mar 2018 01:14:14 +1100 Subject: [PATCH] define and enable netcat operating mode --- src/bitmessagemain.py | 166 ++++++++++++++++++++++++++---------------- src/helper_generic.py | 3 +- src/shutdown.py | 6 +- src/state.py | 7 ++ src/std_io.py | 163 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 278 insertions(+), 67 deletions(-) create mode 100644 src/std_io.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index accd5740..9a68be5f 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -66,6 +66,7 @@ from network.downloadthread import DownloadThread import helper_bootstrap import helper_generic import helper_threading +import std_io # 0.6.3+ SPECIALOPMODES def connectToStream(streamNumber): @@ -204,8 +205,9 @@ class Main: daemon = BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon') try: - opts, args = getopt.getopt(sys.argv[1:], "hcd", - ["help", "curses", "daemon"]) + # opts, args + opts, _ = getopt.getopt(sys.argv[1:], "hcdn", + ["help", "curses", "daemon", "mode-netcat"]) except getopt.GetoptError: self.usage() @@ -220,10 +222,19 @@ class Main: elif opt in ("-c", "--curses"): state.curses = True + if opt in ("-n", "--mode-netcat"): + # 0.6.3+ SPECIALOPMODES - reconfigure threads for netcat mode + state.enableNetwork = True # enable networking + state.enableObjProc = False # disable object processing + state.enableGUI = False # headless + state.enableSTDIO = True # enable STDIO + std_io.stdInputMode = 'netcat' + # is the application already running? If yes then exit. shared.thisapp = singleinstance("", daemon) if daemon: + state.enableGUI = False # run without a UI with shared.printLock: print('Running as a daemon. Send TERM signal to end.') self.daemonize() @@ -238,15 +249,18 @@ class Main: state.dandelion = 0 helper_bootstrap.knownNodes() - # Start the address generation thread - addressGeneratorThread = addressGenerator() - addressGeneratorThread.daemon = True # close the main program even if there are threads left - addressGeneratorThread.start() - # Start the thread that calculates POWs - singleWorkerThread = singleWorker() - singleWorkerThread.daemon = True # close the main program even if there are threads left - singleWorkerThread.start() + # 0.6.3+ SPECIALOPMODES - addressGenerator or singleWorker are objproc related + if state.enableObjProc: + # Start the address generation thread + addressGeneratorThread = addressGenerator() + addressGeneratorThread.daemon = True # close the main program even if there are threads left + addressGeneratorThread.start() + + # Start the thread that calculates POWs + singleWorkerThread = singleWorker() + singleWorkerThread.daemon = True # close the main program even if there are threads left + singleWorkerThread.start() # Start the SQL thread sqlLookup = sqlThread() @@ -256,73 +270,96 @@ class Main: Inventory() # init Dandelion() # init, needs to be early because other thread may access it early - # SMTP delivery thread - if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '': - smtpDeliveryThread = smtpDeliver() - smtpDeliveryThread.start() + # 0.6.3+ SPECIALOPMODES - enable objectProcessor and SMTP + if state.enableObjProc: + # SMTP delivery thread + if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '': + smtpDeliveryThread = smtpDeliver() + smtpDeliveryThread.start() - # SMTP daemon thread - if daemon and BMConfigParser().safeGetBoolean("bitmessagesettings", "smtpd"): - smtpServerThread = smtpServer() - smtpServerThread.start() + # SMTP daemon thread + if daemon and BMConfigParser().safeGetBoolean("bitmessagesettings", "smtpd"): + smtpServerThread = smtpServer() + smtpServerThread.start() - # Start the thread that calculates POWs - objectProcessorThread = objectProcessor() - objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object. - objectProcessorThread.start() + # Start the thread that calculates POWs + objectProcessorThread = objectProcessor() + objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object. + objectProcessorThread.start() + + elif state.enableSTDIO and std_io.stdInputMode == 'netcat': + # Start the thread that outputs objects to stdout in netcat mode + objectStdOutThread = std_io.objectStdOut() + objectStdOutThread.daemon = False # same as objectProcessorThread + objectStdOutThread.start() # Start the cleanerThread singleCleanerThread = singleCleaner() singleCleanerThread.daemon = True # close the main program even if there are threads left singleCleanerThread.start() - shared.reloadMyAddressHashes() - shared.reloadBroadcastSendersForWhichImWatching() + # 0.6.3+ SPECIALOPMODES + if state.enableObjProc: + shared.reloadMyAddressHashes() + shared.reloadBroadcastSendersForWhichImWatching() - if BMConfigParser().safeGetBoolean('bitmessagesettings', 'apienabled'): - try: - apiNotifyPath = BMConfigParser().get( - 'bitmessagesettings', 'apinotifypath') - except: - apiNotifyPath = '' - if apiNotifyPath != '': - with shared.printLock: - print('Trying to call', apiNotifyPath) + if BMConfigParser().safeGetBoolean('bitmessagesettings', 'apienabled'): + try: + apiNotifyPath = BMConfigParser().get( + 'bitmessagesettings', 'apinotifypath') + except: + apiNotifyPath = '' + if apiNotifyPath != '': + with shared.printLock: + print('Trying to call', apiNotifyPath) - call([apiNotifyPath, "startingUp"]) - singleAPIThread = singleAPI() - singleAPIThread.daemon = True # close the main program even if there are threads left - singleAPIThread.start() + call([apiNotifyPath, "startingUp"]) + singleAPIThread = singleAPI() + singleAPIThread.daemon = True # close the main program even if there are threads left + singleAPIThread.start() - BMConnectionPool() - asyncoreThread = BMNetworkThread() - asyncoreThread.daemon = True - asyncoreThread.start() - for i in range(BMConfigParser().getint("threads", "receive")): - receiveQueueThread = ReceiveQueueThread(i) - receiveQueueThread.daemon = True - receiveQueueThread.start() - announceThread = AnnounceThread() - announceThread.daemon = True - announceThread.start() - state.invThread = InvThread() - state.invThread.daemon = True - state.invThread.start() - state.addrThread = AddrThread() - state.addrThread.daemon = True - state.addrThread.start() - state.downloadThread = DownloadThread() - state.downloadThread.daemon = True - state.downloadThread.start() + # 0.6.3+ SPECIALOPMODES - Start the STDIN thread + if state.enableSTDIO: + stdinThread = std_io.stdInput(sys.stdin) + stdinThread.daemon = True + stdinThread.start() - connectToStream(1) - if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'): - import upnp - upnpThread = upnp.uPnPThread() - upnpThread.start() + # 0.6.3+ SPECIALOPMODES - no networking in airgap mode + if state.enableNetwork: + BMConnectionPool() + asyncoreThread = BMNetworkThread() + asyncoreThread.daemon = True + asyncoreThread.start() + for i in range(BMConfigParser().getint("threads", "receive")): + receiveQueueThread = ReceiveQueueThread(i) + receiveQueueThread.daemon = True + receiveQueueThread.start() + announceThread = AnnounceThread() + announceThread.daemon = True + announceThread.start() - if daemon == False and BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon') == False: + state.invThread = InvThread() + state.invThread.daemon = True + state.invThread.start() + state.addrThread = AddrThread() + state.addrThread.daemon = True + state.addrThread.start() + state.downloadThread = DownloadThread() + state.downloadThread.daemon = True + state.downloadThread.start() + + connectToStream(1) + + if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'): + import upnp + upnpThread = upnp.uPnPThread() + upnpThread.start() + else: + # Populate with hardcoded value (same as connectToStream above) + state.streamsInWhichIAmParticipating.append(1) + + if state.enableGUI: # includes daemon and netcat modes if state.curses == False: if not depends.check_pyqt(): print('PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon') @@ -408,6 +445,9 @@ Options: -c, --curses use curses (text mode) interface -d, --daemon run in daemon (background) mode +Advanced modes: + -n, --mode-netcat read and write raw objects on STDIO + All parameters are optional. ''' diff --git a/src/helper_generic.py b/src/helper_generic.py index 588ae8f1..9e3d22f4 100644 --- a/src/helper_generic.py +++ b/src/helper_generic.py @@ -9,6 +9,7 @@ import traceback import shared from debug import logger import queues +import state import shutdown def powQueueSize(): @@ -54,7 +55,7 @@ def signal_handler(signal, frame): if current_thread().name not in ("PyBitmessage", "MainThread"): return logger.error("Got signal %i", signal) - if shared.thisapp.daemon: + if shared.thisapp.daemon or not state.enableGUI: shutdown.doCleanShutdown() else: allThreadTraceback(frame) diff --git a/src/shutdown.py b/src/shutdown.py index f447148b..eda8fb4a 100644 --- a/src/shutdown.py +++ b/src/shutdown.py @@ -13,8 +13,8 @@ import shared import state def doCleanShutdown(): - state.shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. - objectProcessorQueue.put(('checkShutdownVariable', 'no data')) + state.shutdown = 1 # Used to tell proof of work worker threads and the objectProcessorThread to exit. + objectProcessorQueue.put(('checkShutdownVariable', 'clean_shutdown')) for thread in threading.enumerate(): if thread.isAlive() and isinstance(thread, StoppableThread): thread.stopThread() @@ -59,7 +59,7 @@ def doCleanShutdown(): except Queue.Empty: break - if shared.thisapp.daemon: + if shared.thisapp.daemon or not state.enableGUI: logger.info('Clean shutdown complete.') shared.thisapp.cleanup() os._exit(0) diff --git a/src/state.py b/src/state.py index 73e4f789..70201b90 100644 --- a/src/state.py +++ b/src/state.py @@ -19,6 +19,13 @@ shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof o curses = False +# 0.6.3+ SPECIALOPMODES - mode selector flags, default to standalone GUI mode +enableNetwork = True # enable network threads +enableObjProc = True # enable object processing threads +enableAPI = True # enable API (if configured) +enableGUI = True # enable GUI (QT or ncurses) +enableSTDIO = False # enable STDIO threads + sqlReady = False # set to true by sqlTread when ready for processing maximumNumberOfHalfOpenConnections = 0 diff --git a/src/std_io.py b/src/std_io.py new file mode 100644 index 00000000..b67c98b3 --- /dev/null +++ b/src/std_io.py @@ -0,0 +1,163 @@ +""" +0.6.3+ SPECIALOPMODES - STDIO handling threads for netcat and airgap modes + +stdInput thread: receives hex-encoded bitmessage objects on STDIN +Supported input formats, format is auto-detected: + - each line a hex-encoded object + - each line formatted: hex_timestamp - tab - hex-encoded_object + (the output format of netcat mode) + +objectStdOut thread: replaces the objectProcessor thread in netcat mode, + outputs to STDOUT in format: hex_timestamp - tab - hex-encoded_object +""" + +import threading +import time +from struct import unpack +from binascii import hexlify, unhexlify +from addresses import decodeVarint, calculateInventoryHash +from debug import logger +# 0.6.2+ imports +import protocol +import queues +import state +import shutdown +import shared # statusIconColor +from inventory import Inventory +from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute + +stdInputMode = 'netcat' # process STDIN in netcat mode by default + +class stdInput(threading.Thread): + """ Standard Input thread """ + + def __init__(self, inputSrc): + threading.Thread.__init__(self, name="stdInput") + self.inputSrc = inputSrc + logger.info('stdInput thread started.') + + def run(self): + while True: + + # read a line in hex encoding + line = self.inputSrc.readline() + if len(line) == 0: + logger.info("STDIN: End of input") + shutdown.doCleanShutdown() + break + + hexObject = line.rstrip() + hexTStamp = '' + + # detect timestamp-tab-object format (as output by netcat mode) + if "\t" in hexObject: + hexTStamp = hexObject.split("\t")[0] + hexObject = hexObject.split("\t")[-1] + + # unhex the input with error rejection + try: + binObject = unhexlify(hexObject) + except Exception: + logger.info("STDIN: Invalid input format") + continue + + # sanity check on object size + if len(binObject) < 22: + logger.info("STDIN: Invalid object size") + continue + + if not state.enableNetwork and state.enableGUI: + # in airgap mode, trick the status icon that we are in fact + # NOT waiting for a connection + # (may be removed after impact analysis) + shared.statusIconColor = 'yellow' + + if stdInputMode == 'airgap': + # airgap mode uses the timestamp + # unhex the timestamp with error rejection + if len(hexTStamp) == 16: + try: + # stdioStamp, = unpack('>Q', unhexlify(hexTStamp)) + _, = unpack('>Q', unhexlify(hexTStamp)) + except Exception: + logger.info("STDIN: Invalid timestamp format: " + hexTStamp) + continue + + # check that proof of work is sufficient. + if not protocol.isProofOfWorkSufficient(binObject): + logger.info('STDIN: Insufficient Proof of Work') + continue + + # extract expiry time, object type + eTime, = unpack('>Q', binObject[8:16]) + objectType, = unpack('>I', binObject[16:20]) + + # extract version number and stream number + readPosition = 20 # bypass the nonce, time, and object type + # versionNumber, versionLength + _, versionLength = decodeVarint(binObject[readPosition:readPosition + 10]) + readPosition += versionLength + streamNumber, streamNumberLength = decodeVarint(binObject[readPosition:readPosition + 10]) + readPosition += streamNumberLength + + # calculate inventory hash + inventoryHash = calculateInventoryHash(binObject) + + # duplicate check on inventory hash (both netcat and airgap) + if inventoryHash in Inventory(): + logger.info("STDIN: Already got object " + hexlify(inventoryHash)) + continue + + # in netcat mode, push object to TX inventory (for broadcasting) + if stdInputMode == 'netcat': + # publish object to inventory and advertise + Inventory()[inventoryHash] = (objectType, streamNumber, binObject, eTime, '') + logger.info("STDIN: Accepted object (type=%u) " % objectType + hexlify(inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) + + # honour global shutdown flag + if state.shutdown != 0: + logger.info('stdInput thread shutdown.') + break + +class objectStdOut(threading.Thread): + """ + The objectStdOut thread receives network objects from the receiveDataThreads. + """ + def __init__(self): + threading.Thread.__init__(self, name="objectStdOut") + + # REFACTOR THIS with objectProcessor into objectProcessorQueue + queryreturn = sqlQuery( + '''SELECT objecttype, data FROM objectprocessorqueue''') + for row in queryreturn: + objectType, data = row + queues.objectProcessorQueue.put((objectType, data)) + sqlExecute('''DELETE FROM objectprocessorqueue''') + logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn))) + # /REFACTOR THIS + + def run(self): + while True: + objectType, data = queues.objectProcessorQueue.get() + + # Output in documented format + print "%016x" % int(time.time()) + '\t' + hexlify(data) + + if state.shutdown: + time.sleep(.5) # Wait just a moment for most of the connections to close + + # REFACTOR THIS with objectProcessor into objectProcessorQueue + numberOfObjectsInObjProcQueue = 0 + with SqlBulkExecute() as sql: + while queues.objectProcessorQueue.curSize > 0: + objectType, data = queues.objectProcessorQueue.get() + sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', + objectType, data) + numberOfObjectsInObjProcQueue += 1 + logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsInObjProcQueue)) + # /REFACTOR THIS + + state.shutdown = 2 + break +