diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index fc014b0c..848d9bbe 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -66,6 +66,7 @@ from network.downloadthread import DownloadThread import helper_bootstrap import helper_generic import helper_threading +import std_io # for STDIO modes def connectToStream(streamNumber): @@ -205,8 +206,8 @@ class Main: try: opts, args = getopt.getopt( - sys.argv[1:], "hcdt", - ["help", "curses", "daemon", "test"]) + sys.argv[1:], "hcdtn", + ["help", "curses", "daemon", "test", "mode-netcat"]) except getopt.GetoptError: self.usage() @@ -224,6 +225,14 @@ class Main: elif opt in ("-t", "--test"): state.testmode = daemon = True state.enableGUI = False # run without a UI + elif opt in ("-n", "--mode-netcat"): + # STDIO MODES - reconfigure threads for netcat mode + state.enableNetwork = True # enable networking + state.enableObjProc = False # disable object processing + state.enableGUI = False # headless + state.enableSTDIO = True # enable STDIO + # STDIN to invQueue, STDOUT from objectProcessorQueue + std_io.stdInputMode = 'netcat' # is the application already running? If yes then exit. shared.thisapp = singleinstance("", daemon) @@ -263,7 +272,12 @@ class Main: sqlLookup.start() Inventory() # init - Dandelion() # init, needs to be early because other thread may access it early + + # start network components if networking is enabled + if state.enableNetwork: + Dandelion() # init, needs to be early because other thread may access it early + else: + state.dandelion = 0 # Enable object processor and SMTP only if objproc enabled if state.enableObjProc: @@ -283,6 +297,12 @@ class Main: objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object. objectProcessorThread.start() + elif state.enableSTDIO and std_io.stdInputMode == 'netcat': + # Start the thread that outputs objects to stdout in netcat mode + objectStdOutThread = std_io.objectStdOut() + objectStdOutThread.daemon = False # same as objectProcessorThread + objectStdOutThread.start() + # Start the cleanerThread singleCleanerThread = singleCleaner() singleCleanerThread.daemon = True # close the main program even if there are threads left @@ -309,6 +329,12 @@ class Main: singleAPIThread.daemon = True # close the main program even if there are threads left singleAPIThread.start() + # STDIO MODES - Start the STDIN thread + if state.enableSTDIO: + stdinThread = std_io.stdInput(sys.stdin) + stdinThread.daemon = True + stdinThread.start() + # start network components if networking is enabled if state.enableNetwork: BMConnectionPool() @@ -440,7 +466,10 @@ Options: -h, --help show this help message and exit -c, --curses use curses (text mode) interface -d, --daemon run in daemon (background) mode + +Advanced modes: -t, --test dryrun, make testing + -n, --mode-netcat no GUI, read and write raw objects on STDIO All parameters are optional. ''' diff --git a/src/std_io.py b/src/std_io.py new file mode 100644 index 00000000..44f8bd00 --- /dev/null +++ b/src/std_io.py @@ -0,0 +1,167 @@ +""" +STDIO handling threads for netcat and airgap modes + +stdInput thread: receives hex-encoded bitmessage objects on STDIN +Supported input formats, format is auto-detected: + - each line a hex-encoded object + - each line formatted: hex_timestamp - tab - hex-encoded_object + (the output format of netcat mode) + +objectStdOut thread: replaces the objectProcessor thread in netcat mode, + outputs to STDOUT in format: hex_timestamp - tab - hex-encoded_object +""" + +import threading +import time +import protocol +import queues +import state +import shutdown +import shared # statusIconColor + +from struct import unpack +from binascii import hexlify, unhexlify +from addresses import decodeVarint, calculateInventoryHash +from debug import logger +from inventory import Inventory +from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute + +stdInputMode = 'netcat' # process STDIN in netcat mode by default +netcatExitOnEOF = True # exit program if EOF on STDIN + + +class stdInput(threading.Thread): + """ + Standard Input thread reads objects from STDIN, posts them to Inventory + """ + + def __init__(self, inputSrc): + threading.Thread.__init__(self, name="stdInput") + self.inputSrc = inputSrc + logger.info('stdInput thread started.') + + def run(self): + while True: + # read a line in hex encoding + line = self.inputSrc.readline() + if not line: + logger.info("STDIN: End of input") + if netcatExitOnEOF: + shutdown.doCleanShutdown() + break + + hexObject = line.rstrip() + hexTStamp = '' + + # detect timestamp-tab-object format (as output by netcat mode) + if "\t" in hexObject: + hexTStamp = hexObject.split("\t")[0] + hexObject = hexObject.split("\t")[-1] + + # unhex the input with error rejection + try: + binObject = unhexlify(hexObject) + except TypeError: # fix codacy warning + logger.info("STDIN: Invalid input format") + continue + + # sanity check on object size + if len(binObject) < 22: + logger.info("STDIN: Invalid object size") + continue + + if not state.enableNetwork and state.enableGUI: + # in airgap mode, trick the status icon that we are in fact + # NOT waiting for a connection + # (may be removed after impact analysis) + shared.statusIconColor = 'yellow' + + if stdInputMode == 'airgap': + # airgap mode uses the timestamp + # unhex the timestamp with error rejection + if len(hexTStamp) == 16: + try: + # stdioStamp, = unpack('>Q', unhexlify(hexTStamp)) + _, = unpack('>Q', unhexlify(hexTStamp)) + except (struct.error, TypeError): # fix codacy warning + logger.info("STDIN: Invalid timestamp format: " + hexTStamp) + continue + + # check that proof of work is sufficient. + if not protocol.isProofOfWorkSufficient(binObject): + logger.info('STDIN: Insufficient Proof of Work') + continue + + # extract expiry time, object type + eTime, = unpack('>Q', binObject[8:16]) + objectType, = unpack('>I', binObject[16:20]) + + # extract version number and stream number + readPosition = 20 # bypass the nonce, time, and object type + # versionNumber, versionLength + _, versionLength = decodeVarint(binObject[readPosition:readPosition + 10]) + readPosition += versionLength + streamNumber, streamNumberLength = decodeVarint(binObject[readPosition:readPosition + 10]) + readPosition += streamNumberLength + + # calculate inventory hash + inventoryHash = calculateInventoryHash(binObject) + + # duplicate check on inventory hash (both netcat and airgap) + if inventoryHash in Inventory(): + logger.debug("STDIN: Already got object " + hexlify(inventoryHash)) + continue + + # in netcat mode, push object to inventory and id to output queue + if stdInputMode == 'netcat': + Inventory()[inventoryHash] = (objectType, streamNumber, binObject, eTime, '') + logger.debug("STDIN: Accepted object (type=%u) " % objectType + hexlify(inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) + + # honour global shutdown flag + if state.shutdown != 0: + logger.info('stdInput thread shutdown.') + break + + +class objectStdOut(threading.Thread): + """ + The objectStdOut thread receives network objects from the receiveDataThreads. + """ + def __init__(self): + threading.Thread.__init__(self, name="objectStdOut") + + # REFACTOR THIS with objectProcessor into objectProcessorQueue + queryreturn = sqlQuery( + '''SELECT objecttype, data FROM objectprocessorqueue''') + for row in queryreturn: + objectType, data = row + queues.objectProcessorQueue.put((objectType, data)) + sqlExecute('''DELETE FROM objectprocessorqueue''') + logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn))) + # /REFACTOR THIS + + def run(self): + while True: + objectType, data = queues.objectProcessorQueue.get() + + # Output in documented format + print "%016x" % int(time.time()) + '\t' + hexlify(data) + + if state.shutdown: + time.sleep(.5) # Wait just a moment for most of the connections to close + + # REFACTOR THIS with objectProcessor into objectProcessorQueue + numberOfObjectsInObjProcQueue = 0 + with SqlBulkExecute() as sql: + while queues.objectProcessorQueue.curSize > 0: + objectType, data = queues.objectProcessorQueue.get() + sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''', + objectType, data) + numberOfObjectsInObjProcQueue += 1 + logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % + str(numberOfObjectsInObjProcQueue)) + # /REFACTOR THIS + + state.shutdown = 2 + break