Compare commits
1 Commits
v0.6
...
f97ada87/o
Author | SHA1 | Date | |
---|---|---|---|
|
5fd8990ce8 |
|
@ -66,6 +66,7 @@ from network.downloadthread import DownloadThread
|
|||
import helper_bootstrap
|
||||
import helper_generic
|
||||
import helper_threading
|
||||
import std_io # for STDIO modes
|
||||
|
||||
|
||||
def connectToStream(streamNumber):
|
||||
|
@ -205,8 +206,8 @@ class Main:
|
|||
|
||||
try:
|
||||
opts, args = getopt.getopt(
|
||||
sys.argv[1:], "hcdt",
|
||||
["help", "curses", "daemon", "test"])
|
||||
sys.argv[1:], "hcdtn",
|
||||
["help", "curses", "daemon", "test", "mode-netcat"])
|
||||
|
||||
except getopt.GetoptError:
|
||||
self.usage()
|
||||
|
@ -224,6 +225,14 @@ class Main:
|
|||
elif opt in ("-t", "--test"):
|
||||
state.testmode = daemon = True
|
||||
state.enableGUI = False # run without a UI
|
||||
elif opt in ("-n", "--mode-netcat"):
|
||||
# STDIO MODES - reconfigure threads for netcat mode
|
||||
state.enableNetwork = True # enable networking
|
||||
state.enableObjProc = False # disable object processing
|
||||
state.enableGUI = False # headless
|
||||
state.enableSTDIO = True # enable STDIO
|
||||
# STDIN to invQueue, STDOUT from objectProcessorQueue
|
||||
std_io.stdInputMode = 'netcat'
|
||||
|
||||
# is the application already running? If yes then exit.
|
||||
shared.thisapp = singleinstance("", daemon)
|
||||
|
@ -263,7 +272,12 @@ class Main:
|
|||
sqlLookup.start()
|
||||
|
||||
Inventory() # init
|
||||
Dandelion() # init, needs to be early because other thread may access it early
|
||||
|
||||
# start network components if networking is enabled
|
||||
if state.enableNetwork:
|
||||
Dandelion() # init, needs to be early because other thread may access it early
|
||||
else:
|
||||
state.dandelion = 0
|
||||
|
||||
# Enable object processor and SMTP only if objproc enabled
|
||||
if state.enableObjProc:
|
||||
|
@ -283,6 +297,12 @@ class Main:
|
|||
objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object.
|
||||
objectProcessorThread.start()
|
||||
|
||||
elif state.enableSTDIO and std_io.stdInputMode == 'netcat':
|
||||
# Start the thread that outputs objects to stdout in netcat mode
|
||||
objectStdOutThread = std_io.objectStdOut()
|
||||
objectStdOutThread.daemon = False # same as objectProcessorThread
|
||||
objectStdOutThread.start()
|
||||
|
||||
# Start the cleanerThread
|
||||
singleCleanerThread = singleCleaner()
|
||||
singleCleanerThread.daemon = True # close the main program even if there are threads left
|
||||
|
@ -309,6 +329,12 @@ class Main:
|
|||
singleAPIThread.daemon = True # close the main program even if there are threads left
|
||||
singleAPIThread.start()
|
||||
|
||||
# STDIO MODES - Start the STDIN thread
|
||||
if state.enableSTDIO:
|
||||
stdinThread = std_io.stdInput(sys.stdin)
|
||||
stdinThread.daemon = True
|
||||
stdinThread.start()
|
||||
|
||||
# start network components if networking is enabled
|
||||
if state.enableNetwork:
|
||||
BMConnectionPool()
|
||||
|
@ -440,7 +466,10 @@ Options:
|
|||
-h, --help show this help message and exit
|
||||
-c, --curses use curses (text mode) interface
|
||||
-d, --daemon run in daemon (background) mode
|
||||
|
||||
Advanced modes:
|
||||
-t, --test dryrun, make testing
|
||||
-n, --mode-netcat no GUI, read and write raw objects on STDIO
|
||||
|
||||
All parameters are optional.
|
||||
'''
|
||||
|
|
167
src/std_io.py
Normal file
167
src/std_io.py
Normal file
|
@ -0,0 +1,167 @@
|
|||
"""
|
||||
STDIO handling threads for netcat and airgap modes
|
||||
|
||||
stdInput thread: receives hex-encoded bitmessage objects on STDIN
|
||||
Supported input formats, format is auto-detected:
|
||||
- each line a hex-encoded object
|
||||
- each line formatted: hex_timestamp - tab - hex-encoded_object
|
||||
(the output format of netcat mode)
|
||||
|
||||
objectStdOut thread: replaces the objectProcessor thread in netcat mode,
|
||||
outputs to STDOUT in format: hex_timestamp - tab - hex-encoded_object
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
import protocol
|
||||
import queues
|
||||
import state
|
||||
import shutdown
|
||||
import shared # statusIconColor
|
||||
|
||||
from struct import unpack
|
||||
from binascii import hexlify, unhexlify
|
||||
from addresses import decodeVarint, calculateInventoryHash
|
||||
from debug import logger
|
||||
from inventory import Inventory
|
||||
from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute
|
||||
|
||||
stdInputMode = 'netcat' # process STDIN in netcat mode by default
|
||||
netcatExitOnEOF = True # exit program if EOF on STDIN
|
||||
|
||||
|
||||
class stdInput(threading.Thread):
|
||||
"""
|
||||
Standard Input thread reads objects from STDIN, posts them to Inventory
|
||||
"""
|
||||
|
||||
def __init__(self, inputSrc):
|
||||
threading.Thread.__init__(self, name="stdInput")
|
||||
self.inputSrc = inputSrc
|
||||
logger.info('stdInput thread started.')
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
# read a line in hex encoding
|
||||
line = self.inputSrc.readline()
|
||||
if not line:
|
||||
logger.info("STDIN: End of input")
|
||||
if netcatExitOnEOF:
|
||||
shutdown.doCleanShutdown()
|
||||
break
|
||||
|
||||
hexObject = line.rstrip()
|
||||
hexTStamp = ''
|
||||
|
||||
# detect timestamp-tab-object format (as output by netcat mode)
|
||||
if "\t" in hexObject:
|
||||
hexTStamp = hexObject.split("\t")[0]
|
||||
hexObject = hexObject.split("\t")[-1]
|
||||
|
||||
# unhex the input with error rejection
|
||||
try:
|
||||
binObject = unhexlify(hexObject)
|
||||
except TypeError: # fix codacy warning
|
||||
logger.info("STDIN: Invalid input format")
|
||||
continue
|
||||
|
||||
# sanity check on object size
|
||||
if len(binObject) < 22:
|
||||
logger.info("STDIN: Invalid object size")
|
||||
continue
|
||||
|
||||
if not state.enableNetwork and state.enableGUI:
|
||||
# in airgap mode, trick the status icon that we are in fact
|
||||
# NOT waiting for a connection
|
||||
# (may be removed after impact analysis)
|
||||
shared.statusIconColor = 'yellow'
|
||||
|
||||
if stdInputMode == 'airgap':
|
||||
# airgap mode uses the timestamp
|
||||
# unhex the timestamp with error rejection
|
||||
if len(hexTStamp) == 16:
|
||||
try:
|
||||
# stdioStamp, = unpack('>Q', unhexlify(hexTStamp))
|
||||
_, = unpack('>Q', unhexlify(hexTStamp))
|
||||
except (struct.error, TypeError): # fix codacy warning
|
||||
logger.info("STDIN: Invalid timestamp format: " + hexTStamp)
|
||||
continue
|
||||
|
||||
# check that proof of work is sufficient.
|
||||
if not protocol.isProofOfWorkSufficient(binObject):
|
||||
logger.info('STDIN: Insufficient Proof of Work')
|
||||
continue
|
||||
|
||||
# extract expiry time, object type
|
||||
eTime, = unpack('>Q', binObject[8:16])
|
||||
objectType, = unpack('>I', binObject[16:20])
|
||||
|
||||
# extract version number and stream number
|
||||
readPosition = 20 # bypass the nonce, time, and object type
|
||||
# versionNumber, versionLength
|
||||
_, versionLength = decodeVarint(binObject[readPosition:readPosition + 10])
|
||||
readPosition += versionLength
|
||||
streamNumber, streamNumberLength = decodeVarint(binObject[readPosition:readPosition + 10])
|
||||
readPosition += streamNumberLength
|
||||
|
||||
# calculate inventory hash
|
||||
inventoryHash = calculateInventoryHash(binObject)
|
||||
|
||||
# duplicate check on inventory hash (both netcat and airgap)
|
||||
if inventoryHash in Inventory():
|
||||
logger.debug("STDIN: Already got object " + hexlify(inventoryHash))
|
||||
continue
|
||||
|
||||
# in netcat mode, push object to inventory and id to output queue
|
||||
if stdInputMode == 'netcat':
|
||||
Inventory()[inventoryHash] = (objectType, streamNumber, binObject, eTime, '')
|
||||
logger.debug("STDIN: Accepted object (type=%u) " % objectType + hexlify(inventoryHash))
|
||||
queues.invQueue.put((streamNumber, inventoryHash))
|
||||
|
||||
# honour global shutdown flag
|
||||
if state.shutdown != 0:
|
||||
logger.info('stdInput thread shutdown.')
|
||||
break
|
||||
|
||||
|
||||
class objectStdOut(threading.Thread):
|
||||
"""
|
||||
The objectStdOut thread receives network objects from the receiveDataThreads.
|
||||
"""
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self, name="objectStdOut")
|
||||
|
||||
# REFACTOR THIS with objectProcessor into objectProcessorQueue
|
||||
queryreturn = sqlQuery(
|
||||
'''SELECT objecttype, data FROM objectprocessorqueue''')
|
||||
for row in queryreturn:
|
||||
objectType, data = row
|
||||
queues.objectProcessorQueue.put((objectType, data))
|
||||
sqlExecute('''DELETE FROM objectprocessorqueue''')
|
||||
logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn)))
|
||||
# /REFACTOR THIS
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
objectType, data = queues.objectProcessorQueue.get()
|
||||
|
||||
# Output in documented format
|
||||
print "%016x" % int(time.time()) + '\t' + hexlify(data)
|
||||
|
||||
if state.shutdown:
|
||||
time.sleep(.5) # Wait just a moment for most of the connections to close
|
||||
|
||||
# REFACTOR THIS with objectProcessor into objectProcessorQueue
|
||||
numberOfObjectsInObjProcQueue = 0
|
||||
with SqlBulkExecute() as sql:
|
||||
while queues.objectProcessorQueue.curSize > 0:
|
||||
objectType, data = queues.objectProcessorQueue.get()
|
||||
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
|
||||
objectType, data)
|
||||
numberOfObjectsInObjProcQueue += 1
|
||||
logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' %
|
||||
str(numberOfObjectsInObjProcQueue))
|
||||
# /REFACTOR THIS
|
||||
|
||||
state.shutdown = 2
|
||||
break
|
Loading…
Reference in New Issue
Block a user