define and enable netcat operating mode

This commit is contained in:
f97ada87 2018-03-10 01:14:14 +11:00
parent 0da0c10995
commit e8af4d813d
No known key found for this signature in database
GPG Key ID: 07B327EEF97ADA87
5 changed files with 278 additions and 67 deletions

View File

@ -66,6 +66,7 @@ from network.downloadthread import DownloadThread
import helper_bootstrap import helper_bootstrap
import helper_generic import helper_generic
import helper_threading import helper_threading
import std_io # 0.6.3+ SPECIALOPMODES
def connectToStream(streamNumber): def connectToStream(streamNumber):
@ -204,8 +205,9 @@ class Main:
daemon = BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon') daemon = BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon')
try: try:
opts, args = getopt.getopt(sys.argv[1:], "hcd", # opts, args
["help", "curses", "daemon"]) opts, _ = getopt.getopt(sys.argv[1:], "hcdn",
["help", "curses", "daemon", "mode-netcat"])
except getopt.GetoptError: except getopt.GetoptError:
self.usage() self.usage()
@ -220,10 +222,19 @@ class Main:
elif opt in ("-c", "--curses"): elif opt in ("-c", "--curses"):
state.curses = True state.curses = True
if opt in ("-n", "--mode-netcat"):
# 0.6.3+ SPECIALOPMODES - reconfigure threads for netcat mode
state.enableNetwork = True # enable networking
state.enableObjProc = False # disable object processing
state.enableGUI = False # headless
state.enableSTDIO = True # enable STDIO
std_io.stdInputMode = 'netcat'
# is the application already running? If yes then exit. # is the application already running? If yes then exit.
shared.thisapp = singleinstance("", daemon) shared.thisapp = singleinstance("", daemon)
if daemon: if daemon:
state.enableGUI = False # run without a UI
with shared.printLock: with shared.printLock:
print('Running as a daemon. Send TERM signal to end.') print('Running as a daemon. Send TERM signal to end.')
self.daemonize() self.daemonize()
@ -238,15 +249,18 @@ class Main:
state.dandelion = 0 state.dandelion = 0
helper_bootstrap.knownNodes() helper_bootstrap.knownNodes()
# Start the address generation thread
addressGeneratorThread = addressGenerator()
addressGeneratorThread.daemon = True # close the main program even if there are threads left
addressGeneratorThread.start()
# Start the thread that calculates POWs # 0.6.3+ SPECIALOPMODES - addressGenerator or singleWorker are objproc related
singleWorkerThread = singleWorker() if state.enableObjProc:
singleWorkerThread.daemon = True # close the main program even if there are threads left # Start the address generation thread
singleWorkerThread.start() addressGeneratorThread = addressGenerator()
addressGeneratorThread.daemon = True # close the main program even if there are threads left
addressGeneratorThread.start()
# Start the thread that calculates POWs
singleWorkerThread = singleWorker()
singleWorkerThread.daemon = True # close the main program even if there are threads left
singleWorkerThread.start()
# Start the SQL thread # Start the SQL thread
sqlLookup = sqlThread() sqlLookup = sqlThread()
@ -256,73 +270,96 @@ class Main:
Inventory() # init Inventory() # init
Dandelion() # init, needs to be early because other thread may access it early Dandelion() # init, needs to be early because other thread may access it early
# SMTP delivery thread # 0.6.3+ SPECIALOPMODES - enable objectProcessor and SMTP
if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '': if state.enableObjProc:
smtpDeliveryThread = smtpDeliver() # SMTP delivery thread
smtpDeliveryThread.start() if daemon and BMConfigParser().safeGet("bitmessagesettings", "smtpdeliver", '') != '':
smtpDeliveryThread = smtpDeliver()
smtpDeliveryThread.start()
# SMTP daemon thread # SMTP daemon thread
if daemon and BMConfigParser().safeGetBoolean("bitmessagesettings", "smtpd"): if daemon and BMConfigParser().safeGetBoolean("bitmessagesettings", "smtpd"):
smtpServerThread = smtpServer() smtpServerThread = smtpServer()
smtpServerThread.start() smtpServerThread.start()
# Start the thread that calculates POWs # Start the thread that calculates POWs
objectProcessorThread = objectProcessor() objectProcessorThread = objectProcessor()
objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object. objectProcessorThread.daemon = False # DON'T close the main program even the thread remains. This thread checks the shutdown variable after processing each object.
objectProcessorThread.start() objectProcessorThread.start()
elif state.enableSTDIO and std_io.stdInputMode == 'netcat':
# Start the thread that outputs objects to stdout in netcat mode
objectStdOutThread = std_io.objectStdOut()
objectStdOutThread.daemon = False # same as objectProcessorThread
objectStdOutThread.start()
# Start the cleanerThread # Start the cleanerThread
singleCleanerThread = singleCleaner() singleCleanerThread = singleCleaner()
singleCleanerThread.daemon = True # close the main program even if there are threads left singleCleanerThread.daemon = True # close the main program even if there are threads left
singleCleanerThread.start() singleCleanerThread.start()
shared.reloadMyAddressHashes() # 0.6.3+ SPECIALOPMODES
shared.reloadBroadcastSendersForWhichImWatching() if state.enableObjProc:
shared.reloadMyAddressHashes()
shared.reloadBroadcastSendersForWhichImWatching()
if BMConfigParser().safeGetBoolean('bitmessagesettings', 'apienabled'): if BMConfigParser().safeGetBoolean('bitmessagesettings', 'apienabled'):
try: try:
apiNotifyPath = BMConfigParser().get( apiNotifyPath = BMConfigParser().get(
'bitmessagesettings', 'apinotifypath') 'bitmessagesettings', 'apinotifypath')
except: except:
apiNotifyPath = '' apiNotifyPath = ''
if apiNotifyPath != '': if apiNotifyPath != '':
with shared.printLock: with shared.printLock:
print('Trying to call', apiNotifyPath) print('Trying to call', apiNotifyPath)
call([apiNotifyPath, "startingUp"]) call([apiNotifyPath, "startingUp"])
singleAPIThread = singleAPI() singleAPIThread = singleAPI()
singleAPIThread.daemon = True # close the main program even if there are threads left singleAPIThread.daemon = True # close the main program even if there are threads left
singleAPIThread.start() singleAPIThread.start()
BMConnectionPool() # 0.6.3+ SPECIALOPMODES - Start the STDIN thread
asyncoreThread = BMNetworkThread() if state.enableSTDIO:
asyncoreThread.daemon = True stdinThread = std_io.stdInput(sys.stdin)
asyncoreThread.start() stdinThread.daemon = True
for i in range(BMConfigParser().getint("threads", "receive")): stdinThread.start()
receiveQueueThread = ReceiveQueueThread(i)
receiveQueueThread.daemon = True
receiveQueueThread.start()
announceThread = AnnounceThread()
announceThread.daemon = True
announceThread.start()
state.invThread = InvThread()
state.invThread.daemon = True
state.invThread.start()
state.addrThread = AddrThread()
state.addrThread.daemon = True
state.addrThread.start()
state.downloadThread = DownloadThread()
state.downloadThread.daemon = True
state.downloadThread.start()
connectToStream(1)
if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'): # 0.6.3+ SPECIALOPMODES - no networking in airgap mode
import upnp if state.enableNetwork:
upnpThread = upnp.uPnPThread() BMConnectionPool()
upnpThread.start() asyncoreThread = BMNetworkThread()
asyncoreThread.daemon = True
asyncoreThread.start()
for i in range(BMConfigParser().getint("threads", "receive")):
receiveQueueThread = ReceiveQueueThread(i)
receiveQueueThread.daemon = True
receiveQueueThread.start()
announceThread = AnnounceThread()
announceThread.daemon = True
announceThread.start()
if daemon == False and BMConfigParser().safeGetBoolean('bitmessagesettings', 'daemon') == False: state.invThread = InvThread()
state.invThread.daemon = True
state.invThread.start()
state.addrThread = AddrThread()
state.addrThread.daemon = True
state.addrThread.start()
state.downloadThread = DownloadThread()
state.downloadThread.daemon = True
state.downloadThread.start()
connectToStream(1)
if BMConfigParser().safeGetBoolean('bitmessagesettings','upnp'):
import upnp
upnpThread = upnp.uPnPThread()
upnpThread.start()
else:
# Populate with hardcoded value (same as connectToStream above)
state.streamsInWhichIAmParticipating.append(1)
if state.enableGUI: # includes daemon and netcat modes
if state.curses == False: if state.curses == False:
if not depends.check_pyqt(): if not depends.check_pyqt():
print('PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon') print('PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon')
@ -408,6 +445,9 @@ Options:
-c, --curses use curses (text mode) interface -c, --curses use curses (text mode) interface
-d, --daemon run in daemon (background) mode -d, --daemon run in daemon (background) mode
Advanced modes:
-n, --mode-netcat read and write raw objects on STDIO
All parameters are optional. All parameters are optional.
''' '''

View File

@ -9,6 +9,7 @@ import traceback
import shared import shared
from debug import logger from debug import logger
import queues import queues
import state
import shutdown import shutdown
def powQueueSize(): def powQueueSize():
@ -54,7 +55,7 @@ def signal_handler(signal, frame):
if current_thread().name not in ("PyBitmessage", "MainThread"): if current_thread().name not in ("PyBitmessage", "MainThread"):
return return
logger.error("Got signal %i", signal) logger.error("Got signal %i", signal)
if shared.thisapp.daemon: if shared.thisapp.daemon or not state.enableGUI:
shutdown.doCleanShutdown() shutdown.doCleanShutdown()
else: else:
allThreadTraceback(frame) allThreadTraceback(frame)

View File

@ -13,8 +13,8 @@ import shared
import state import state
def doCleanShutdown(): def doCleanShutdown():
state.shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. state.shutdown = 1 # Used to tell proof of work worker threads and the objectProcessorThread to exit.
objectProcessorQueue.put(('checkShutdownVariable', 'no data')) objectProcessorQueue.put(('checkShutdownVariable', 'clean_shutdown'))
for thread in threading.enumerate(): for thread in threading.enumerate():
if thread.isAlive() and isinstance(thread, StoppableThread): if thread.isAlive() and isinstance(thread, StoppableThread):
thread.stopThread() thread.stopThread()
@ -59,7 +59,7 @@ def doCleanShutdown():
except Queue.Empty: except Queue.Empty:
break break
if shared.thisapp.daemon: if shared.thisapp.daemon or not state.enableGUI:
logger.info('Clean shutdown complete.') logger.info('Clean shutdown complete.')
shared.thisapp.cleanup() shared.thisapp.cleanup()
os._exit(0) os._exit(0)

View File

@ -19,6 +19,13 @@ shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof o
curses = False curses = False
# 0.6.3+ SPECIALOPMODES - mode selector flags, default to standalone GUI mode
enableNetwork = True # enable network threads
enableObjProc = True # enable object processing threads
enableAPI = True # enable API (if configured)
enableGUI = True # enable GUI (QT or ncurses)
enableSTDIO = False # enable STDIO threads
sqlReady = False # set to true by sqlTread when ready for processing sqlReady = False # set to true by sqlTread when ready for processing
maximumNumberOfHalfOpenConnections = 0 maximumNumberOfHalfOpenConnections = 0

163
src/std_io.py Normal file
View File

@ -0,0 +1,163 @@
"""
0.6.3+ SPECIALOPMODES - STDIO handling threads for netcat and airgap modes
stdInput thread: receives hex-encoded bitmessage objects on STDIN
Supported input formats, format is auto-detected:
- each line a hex-encoded object
- each line formatted: hex_timestamp - tab - hex-encoded_object
(the output format of netcat mode)
objectStdOut thread: replaces the objectProcessor thread in netcat mode,
outputs to STDOUT in format: hex_timestamp - tab - hex-encoded_object
"""
import threading
import time
from struct import unpack
from binascii import hexlify, unhexlify
from addresses import decodeVarint, calculateInventoryHash
from debug import logger
# 0.6.2+ imports
import protocol
import queues
import state
import shutdown
import shared # statusIconColor
from inventory import Inventory
from helper_sql import sqlQuery, sqlExecute, SqlBulkExecute
stdInputMode = 'netcat' # process STDIN in netcat mode by default
class stdInput(threading.Thread):
""" Standard Input thread """
def __init__(self, inputSrc):
threading.Thread.__init__(self, name="stdInput")
self.inputSrc = inputSrc
logger.info('stdInput thread started.')
def run(self):
while True:
# read a line in hex encoding
line = self.inputSrc.readline()
if len(line) == 0:
logger.info("STDIN: End of input")
shutdown.doCleanShutdown()
break
hexObject = line.rstrip()
hexTStamp = ''
# detect timestamp-tab-object format (as output by netcat mode)
if "\t" in hexObject:
hexTStamp = hexObject.split("\t")[0]
hexObject = hexObject.split("\t")[-1]
# unhex the input with error rejection
try:
binObject = unhexlify(hexObject)
except Exception:
logger.info("STDIN: Invalid input format")
continue
# sanity check on object size
if len(binObject) < 22:
logger.info("STDIN: Invalid object size")
continue
if not state.enableNetwork and state.enableGUI:
# in airgap mode, trick the status icon that we are in fact
# NOT waiting for a connection
# (may be removed after impact analysis)
shared.statusIconColor = 'yellow'
if stdInputMode == 'airgap':
# airgap mode uses the timestamp
# unhex the timestamp with error rejection
if len(hexTStamp) == 16:
try:
# stdioStamp, = unpack('>Q', unhexlify(hexTStamp))
_, = unpack('>Q', unhexlify(hexTStamp))
except Exception:
logger.info("STDIN: Invalid timestamp format: " + hexTStamp)
continue
# check that proof of work is sufficient.
if not protocol.isProofOfWorkSufficient(binObject):
logger.info('STDIN: Insufficient Proof of Work')
continue
# extract expiry time, object type
eTime, = unpack('>Q', binObject[8:16])
objectType, = unpack('>I', binObject[16:20])
# extract version number and stream number
readPosition = 20 # bypass the nonce, time, and object type
# versionNumber, versionLength
_, versionLength = decodeVarint(binObject[readPosition:readPosition + 10])
readPosition += versionLength
streamNumber, streamNumberLength = decodeVarint(binObject[readPosition:readPosition + 10])
readPosition += streamNumberLength
# calculate inventory hash
inventoryHash = calculateInventoryHash(binObject)
# duplicate check on inventory hash (both netcat and airgap)
if inventoryHash in Inventory():
logger.info("STDIN: Already got object " + hexlify(inventoryHash))
continue
# in netcat mode, push object to TX inventory (for broadcasting)
if stdInputMode == 'netcat':
# publish object to inventory and advertise
Inventory()[inventoryHash] = (objectType, streamNumber, binObject, eTime, '')
logger.info("STDIN: Accepted object (type=%u) " % objectType + hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
# honour global shutdown flag
if state.shutdown != 0:
logger.info('stdInput thread shutdown.')
break
class objectStdOut(threading.Thread):
"""
The objectStdOut thread receives network objects from the receiveDataThreads.
"""
def __init__(self):
threading.Thread.__init__(self, name="objectStdOut")
# REFACTOR THIS with objectProcessor into objectProcessorQueue
queryreturn = sqlQuery(
'''SELECT objecttype, data FROM objectprocessorqueue''')
for row in queryreturn:
objectType, data = row
queues.objectProcessorQueue.put((objectType, data))
sqlExecute('''DELETE FROM objectprocessorqueue''')
logger.debug('Loaded %s objects from disk into the objectProcessorQueue.' % str(len(queryreturn)))
# /REFACTOR THIS
def run(self):
while True:
objectType, data = queues.objectProcessorQueue.get()
# Output in documented format
print "%016x" % int(time.time()) + '\t' + hexlify(data)
if state.shutdown:
time.sleep(.5) # Wait just a moment for most of the connections to close
# REFACTOR THIS with objectProcessor into objectProcessorQueue
numberOfObjectsInObjProcQueue = 0
with SqlBulkExecute() as sql:
while queues.objectProcessorQueue.curSize > 0:
objectType, data = queues.objectProcessorQueue.get()
sql.execute('''INSERT INTO objectprocessorqueue VALUES (?,?)''',
objectType, data)
numberOfObjectsInObjProcQueue += 1
logger.debug('Saved %s objects from the objectProcessorQueue to disk. objectProcessorThread exiting.' % str(numberOfObjectsInObjProcQueue))
# /REFACTOR THIS
state.shutdown = 2
break