From c857f73d0bb3d967dcc1d06b963e575b076d7484 Mon Sep 17 00:00:00 2001 From: Jonathan Warren Date: Mon, 24 Jun 2013 15:51:01 -0400 Subject: [PATCH] Continued moving code into individual modules --- src/bitmessagemain.py | 112 ++------------------------------- src/bitmessageqt/__init__.py | 2 - src/class_addressGenerator.py | 14 ++--- src/class_outgoingSynSender.py | 41 ++++++------ src/class_receiveDataThread.py | 102 +++++++++++++++--------------- src/class_sendDataThread.py | 11 ++-- src/class_singleCleaner.py | 16 +++-- src/class_singleListener.py | 8 +-- src/class_singleWorker.py | 56 ++++++++--------- src/shared.py | 75 ++++++++++++++++++++++ src/tr.py | 30 +++++++++ 11 files changed, 231 insertions(+), 236 deletions(-) create mode 100644 src/tr.py diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 3b189445..63f1f8f1 100644 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -8,19 +8,6 @@ # yet contain logic to expand into further streams. # The software version variable is now held in shared.py -verbose = 1 -maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # Equals two days and 12 hours. -lengthOfTimeToLeaveObjectsInInventory = 237600 # Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice. -lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time. -maximumAgeOfObjectsThatIAdvertiseToOthers = 216000 # Equals two days and 12 hours -maximumAgeOfNodesThatIAdvertiseToOthers = 10800 # Equals three hours -useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages. -encryptedBroadcastSwitchoverTime = 1369735200 - -alreadyAttemptedConnectionsList = { -} # This is a list of nodes to which we have already attempted a connection -numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {} -neededPubkeys = {} #import ctypes import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. @@ -41,19 +28,6 @@ from class_addressGenerator import * import helper_startup import helper_bootstrap -def isInSqlInventory(hash): - t = (hash,) - shared.sqlLock.acquire() - shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''') - shared.sqlSubmitQueue.put(t) - queryreturn = shared.sqlReturnQueue.get() - shared.sqlLock.release() - if queryreturn == []: - return False - else: - return True - - def connectToStream(streamNumber): selfInitiatedConnections[streamNumber] = {} if sys.platform[0:3] == 'win': @@ -66,46 +40,6 @@ def connectToStream(streamNumber): a.start() - -def assembleVersionMessage(remoteHost, remotePort, myStreamNumber): - shared.softwareVersion - payload = '' - payload += pack('>L', 2) # protocol version. - payload += pack('>q', 1) # bitflags of the services I offer. - payload += pack('>q', int(time.time())) - - payload += pack( - '>q', 1) # boolservices of remote connection. How can I even know this for sure? This is probably ignored by the remote host. - payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ - socket.inet_aton(remoteHost) - payload += pack('>H', remotePort) # remote IPv6 and port - - payload += pack('>q', 1) # bitflags of the services I offer. - payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack( - '>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used. - payload += pack('>H', shared.config.getint( - 'bitmessagesettings', 'port')) # my external IPv6 and port - - random.seed() - payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf - userAgent = '/PyBitmessage:' + shared.softwareVersion + \ - '/' # Length of userAgent must be less than 253. - payload += pack('>B', len( - userAgent)) # user agent string length. If the user agent is more than 252 bytes long, this code isn't going to work. - payload += userAgent - payload += encodeVarint( - 1) # The number of streams about which I care. PyBitmessage currently only supports 1 per connection. - payload += encodeVarint(myStreamNumber) - - datatosend = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. - datatosend = datatosend + 'version\x00\x00\x00\x00\x00' # version command - datatosend = datatosend + pack('>L', len(payload)) # payload length - datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] - return datatosend + payload - - - - # This is one of several classes that constitute the API # This class was written by Vaibhav Bhatia. Modified by Jonathan Warren (Atheros). # http://code.activestate.com/recipes/501148-xmlrpc-serverclient-which-does-cookie-handling-and/ @@ -729,50 +663,14 @@ class singleAPI(threading.Thread): se.register_introspection_functions() se.serve_forever() -# This is used so that the translateText function can be used when we are in daemon mode and not using any QT functions. -class translateClass: - def __init__(self, context, text): - self.context = context - self.text = text - def arg(self,argument): - if '%' in self.text: - return translateClass(self.context, self.text.replace('%','',1)) # This doesn't actually do anything with the arguments because we don't have a UI in which to display this information anyway. - else: - return self.text - -def _translate(context, text): - return translateText(context, text) - -def translateText(context, text): - if not shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'): - try: - from PyQt4 import QtCore, QtGui - except Exception as err: - print 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon' - print 'Error message:', err - os._exit(0) - return QtGui.QApplication.translate(context, text) - else: - if '%' in text: - return translateClass(context, text.replace('%','',1)) - else: - return text - - selfInitiatedConnections = {} # This is a list of current connections (the thread pointers at least) -ackdataForWhichImWatching = {} -alreadyAttemptedConnectionsListLock = threading.Lock() -eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack( - '>Q', random.randrange(1, 18446744073709551615)) -successfullyDecryptMessageTimings = [ -] # A list of the amounts of time it took to successfully decrypt msg messages -apiAddressGeneratorReturnQueue = Queue.Queue( -) # The address generator thread uses this queue to get information back to the API thread. -alreadyAttemptedConnectionsListResetTime = int( - time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect. -if useVeryEasyProofOfWorkForTesting: + + + + +if shared.useVeryEasyProofOfWorkForTesting: shared.networkDefaultProofOfWorkNonceTrialsPerByte = int( shared.networkDefaultProofOfWorkNonceTrialsPerByte / 16) shared.networkDefaultPayloadLengthExtraBytes = int( diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 3f0c7e6f..9930b318 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -12,7 +12,6 @@ except Exception as err: print 'Error message:', err sys.exit() - try: _encoding = QtGui.QApplication.UnicodeUTF8 except AttributeError: @@ -21,7 +20,6 @@ except AttributeError: def _translate(context, text): return QtGui.QApplication.translate(context, text) - withMessagingMenu = False try: from gi.repository import MessagingMenu diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index c19a294b..38c7d5b9 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -1,6 +1,5 @@ import shared import threading -import bitmessagemain import time import sys from pyelliptic.openssl import OpenSSL @@ -9,6 +8,7 @@ import hashlib import highlevelcrypto from addresses import * from pyelliptic import arithmetic +import tr class addressGenerator(threading.Thread): @@ -44,7 +44,7 @@ class addressGenerator(threading.Thread): if addressVersionNumber == 3: # currently the only one supported. if command == 'createRandomAddress': shared.UISignalQueue.put(( - 'updateStatusBar', bitmessagemain.translateText("MainWindow", "Generating one new address"))) + 'updateStatusBar', tr.translateText("MainWindow", "Generating one new address"))) # This next section is a little bit strange. We're going to generate keys over and over until we # find one that starts with either \x00 or \x00\x00. Then when we pack them into a Bitmessage address, # we won't store the \x00 or \x00\x00 bytes thus making the @@ -112,10 +112,10 @@ class addressGenerator(threading.Thread): # It may be the case that this address is being generated # as a result of a call to the API. Let us put the result # in the necessary queue. - bitmessagemain.apiAddressGeneratorReturnQueue.put(address) + shared.apiAddressGeneratorReturnQueue.put(address) shared.UISignalQueue.put(( - 'updateStatusBar', bitmessagemain.translateText("MainWindow", "Done generating address. Doing work necessary to broadcast it..."))) + 'updateStatusBar', tr.translateText("MainWindow", "Done generating address. Doing work necessary to broadcast it..."))) shared.UISignalQueue.put(('writeNewAddressToTable', ( label, address, streamNumber))) shared.reloadMyAddressHashes() @@ -235,13 +235,13 @@ class addressGenerator(threading.Thread): # It may be the case that this address is being # generated as a result of a call to the API. Let us # put the result in the necessary queue. - bitmessagemain.apiAddressGeneratorReturnQueue.put( + shared.apiAddressGeneratorReturnQueue.put( listOfNewAddressesToSendOutThroughTheAPI) shared.UISignalQueue.put(( - 'updateStatusBar', bitmessagemain.translateText("MainWindow", "Done generating address"))) + 'updateStatusBar', tr.translateText("MainWindow", "Done generating address"))) # shared.reloadMyAddressHashes() elif command == 'getDeterministicAddress': - bitmessagemain.apiAddressGeneratorReturnQueue.put(address) + shared.apiAddressGeneratorReturnQueue.put(address) else: raise Exception( "Error in the addressGenerator thread. Thread was given a command it could not understand: " + command) diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index d3310698..d547b3e3 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -5,8 +5,9 @@ import shared import socks import socket import sys +import tr -import bitmessagemain +#import bitmessagemain from class_sendDataThread import * from class_receiveDataThread import * @@ -33,25 +34,25 @@ class outgoingSynSender(threading.Thread): shared.knownNodesLock.acquire() HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) shared.knownNodesLock.release() - bitmessagemain.alreadyAttemptedConnectionsListLock.acquire() - while HOST in bitmessagemain.alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList: - bitmessagemain.alreadyAttemptedConnectionsListLock.release() + shared.alreadyAttemptedConnectionsListLock.acquire() + while HOST in shared.alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList: + shared.alreadyAttemptedConnectionsListLock.release() # print 'choosing new sample' random.seed() shared.knownNodesLock.acquire() HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) shared.knownNodesLock.release() time.sleep(1) - # Clear out the bitmessagemain.alreadyAttemptedConnectionsList every half + # Clear out the shared.alreadyAttemptedConnectionsList every half # hour so that this program will again attempt a connection # to any nodes, even ones it has already tried. - if (time.time() - bitmessagemain.alreadyAttemptedConnectionsListResetTime) > 1800: - bitmessagemain.alreadyAttemptedConnectionsList.clear() - bitmessagemain.alreadyAttemptedConnectionsListResetTime = int( + if (time.time() - shared.alreadyAttemptedConnectionsListResetTime) > 1800: + shared.alreadyAttemptedConnectionsList.clear() + shared.alreadyAttemptedConnectionsListResetTime = int( time.time()) - bitmessagemain.alreadyAttemptedConnectionsListLock.acquire() - bitmessagemain.alreadyAttemptedConnectionsList[HOST] = 0 - bitmessagemain.alreadyAttemptedConnectionsListLock.release() + shared.alreadyAttemptedConnectionsListLock.acquire() + shared.alreadyAttemptedConnectionsList[HOST] = 0 + shared.alreadyAttemptedConnectionsListLock.release() PORT, timeNodeLastSeen = shared.knownNodes[ self.streamNumber][HOST] sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) @@ -59,13 +60,13 @@ class outgoingSynSender(threading.Thread): # can rebind faster sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(20) - if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and bitmessagemain.verbose >= 2: + if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2: shared.printLock.acquire() print 'Trying an outgoing connection to', HOST, ':', PORT shared.printLock.release() # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a': - if bitmessagemain.verbose >= 2: + if shared.verbose >= 2: shared.printLock.acquire() print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT shared.printLock.release() @@ -86,7 +87,7 @@ class outgoingSynSender(threading.Thread): sock.setproxy( proxytype, sockshostname, socksport, rdns) elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': - if bitmessagemain.verbose >= 2: + if shared.verbose >= 2: shared.printLock.acquire() print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT shared.printLock.release() @@ -111,9 +112,9 @@ class outgoingSynSender(threading.Thread): sock.connect((HOST, PORT)) rd = receiveDataThread() rd.daemon = True # close the main program even if there are threads left - objectsOfWhichThisRemoteNodeIsAlreadyAware = {} + someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. rd.setup(sock, HOST, PORT, self.streamNumber, - objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) + someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) rd.start() shared.printLock.acquire() print self, 'connected to', HOST, 'during an outgoing attempt.' @@ -121,12 +122,12 @@ class outgoingSynSender(threading.Thread): sd = sendDataThread() sd.setup(sock, HOST, PORT, self.streamNumber, - objectsOfWhichThisRemoteNodeIsAlreadyAware) + someObjectsOfWhichThisRemoteNodeIsAlreadyAware) sd.start() sd.sendVersionMessage() except socks.GeneralProxyError as err: - if bitmessagemain.verbose >= 2: + if shared.verbose >= 2: shared.printLock.acquire() print 'Could NOT connect to', HOST, 'during outgoing attempt.', err shared.printLock.release() @@ -141,7 +142,7 @@ class outgoingSynSender(threading.Thread): shared.printLock.release() except socks.Socks5AuthError as err: shared.UISignalQueue.put(( - 'updateStatusBar', bitmessagemain.translateText( + 'updateStatusBar', tr.translateText( "MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err)))) except socks.Socks5Error as err: pass @@ -152,7 +153,7 @@ class outgoingSynSender(threading.Thread): if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err) else: - if bitmessagemain.verbose >= 1: + if shared.verbose >= 1: shared.printLock.acquire() print 'Could NOT connect to', HOST, 'during outgoing attempt.', err shared.printLock.release() diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index c1148667..3d658255 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -17,14 +17,12 @@ import helper_generic import helper_bitcoin import helper_inbox import helper_sent -import bitmessagemain -from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers, numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, neededPubkeys - +import tr +#from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, shared.neededPubkeys # This thread is created either by the synSenderThread(for outgoing # connections) or the singleListenerThread(for incoming connectiosn). - class receiveDataThread(threading.Thread): def __init__(self): @@ -39,7 +37,7 @@ class receiveDataThread(threading.Thread): HOST, port, streamNumber, - objectsOfWhichThisRemoteNodeIsAlreadyAware, + someObjectsOfWhichThisRemoteNodeIsAlreadyAware, selfInitiatedConnections): self.sock = sock self.HOST = HOST @@ -58,7 +56,7 @@ class receiveDataThread(threading.Thread): self.selfInitiatedConnections[streamNumber][self] = 0 self.ackDataThatWeHaveYetToSend = [ ] # When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer. - self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware + self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware def run(self): shared.printLock.acquire() @@ -101,7 +99,7 @@ class receiveDataThread(threading.Thread): print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err shared.printLock.release() try: - del numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ + del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ self.HOST] except: pass @@ -111,14 +109,14 @@ class receiveDataThread(threading.Thread): shared.printLock.release() def processData(self): - # if bitmessagemain.verbose >= 3: + # if shared.verbose >= 3: # shared.printLock.acquire() # print 'self.data is currently ', repr(self.data) # shared.printLock.release() if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length return if self.data[0:4] != '\xe9\xbe\xb4\xd9': - if bitmessagemain.verbose >= 1: + if shared.verbose >= 1: shared.printLock.acquire() print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40]) shared.printLock.release() @@ -183,8 +181,8 @@ class receiveDataThread(threading.Thread): shared.printLock.release() del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[ objectHash] - elif bitmessagemain.isInSqlInventory(objectHash): - if bitmessagemain.verbose >= 3: + elif shared.isInSqlInventory(objectHash): + if shared.verbose >= 3: shared.printLock.acquire() print 'Inventory (SQL on disk) already has object listed in inv message.' shared.printLock.release() @@ -199,7 +197,7 @@ class receiveDataThread(threading.Thread): print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) shared.printLock.release() try: - del numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ + del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. except: pass @@ -209,7 +207,7 @@ class receiveDataThread(threading.Thread): print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) shared.printLock.release() try: - del numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ + del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. except: pass @@ -217,7 +215,7 @@ class receiveDataThread(threading.Thread): shared.printLock.acquire() print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) shared.printLock.release() - numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len( + shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len( self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. if len(self.ackDataThatWeHaveYetToSend) > 0: self.data = self.ackDataThatWeHaveYetToSend.pop() @@ -285,8 +283,8 @@ class receiveDataThread(threading.Thread): shared.sqlLock.acquire() # Select all hashes which are younger than two days old and in this # stream. - t = (int(time.time()) - maximumAgeOfObjectsThatIAdvertiseToOthers, int( - time.time()) - lengthOfTimeToHoldOnToAllPubkeys, self.streamNumber) + t = (int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, int( + time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, self.streamNumber) shared.sqlSubmitQueue.put( '''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''') shared.sqlSubmitQueue.put(t) @@ -295,14 +293,14 @@ class receiveDataThread(threading.Thread): bigInvList = {} for row in queryreturn: hash, = row - if hash not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware: + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: bigInvList[hash] = 0 # We also have messages in our inventory in memory (which is a python # dictionary). Let's fetch those too. for hash, storedValue in shared.inventory.items(): - if hash not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware: + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: objectType, streamNumber, payload, receivedTime = storedValue - if streamNumber == self.streamNumber and receivedTime > int(time.time()) - maximumAgeOfObjectsThatIAdvertiseToOthers: + if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers: bigInvList[hash] = 0 numberOfObjectsInInvMessage = 0 payload = '' @@ -360,7 +358,7 @@ class receiveDataThread(threading.Thread): if embeddedTime > (int(time.time()) + 10800): # prevent funny business print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.' return - if embeddedTime < (int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept): + if embeddedTime < (int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept): print 'The embedded time in this broadcast message is too old. Ignoring message.' return if len(data) < 180: @@ -384,7 +382,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this broadcast object. Ignoring.' shared.inventoryLock.release() return - elif bitmessagemain.isInSqlInventory(self.inventoryHash): + elif shared.isInSqlInventory(self.inventoryHash): print 'We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -749,7 +747,7 @@ class receiveDataThread(threading.Thread): if embeddedTime > int(time.time()) + 10800: print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime return - if embeddedTime < int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept: + if embeddedTime < int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept: print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime return streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint( @@ -764,7 +762,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this msg message. Ignoring.' shared.inventoryLock.release() return - elif bitmessagemain.isInSqlInventory(self.inventoryHash): + elif shared.isInSqlInventory(self.inventoryHash): print 'We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -808,11 +806,11 @@ class receiveDataThread(threading.Thread): def processmsg(self, readPosition, encryptedData): initialDecryptionSuccessful = False # Let's check whether this is a message acknowledgement bound for us. - if encryptedData[readPosition:] in bitmessagemain.ackdataForWhichImWatching: + if encryptedData[readPosition:] in shared.ackdataForWhichImWatching: shared.printLock.acquire() print 'This msg IS an acknowledgement bound for me.' shared.printLock.release() - del bitmessagemain.ackdataForWhichImWatching[encryptedData[readPosition:]] + del shared.ackdataForWhichImWatching[encryptedData[readPosition:]] t = ('ackreceived', encryptedData[readPosition:]) shared.sqlLock.acquire() shared.sqlSubmitQueue.put( @@ -821,13 +819,13 @@ class receiveDataThread(threading.Thread): shared.sqlReturnQueue.get() shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() - shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], bitmessagemain.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode( + shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode( time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8'))))) return else: shared.printLock.acquire() print 'This was NOT an acknowledgement bound for me.' - # print 'bitmessagemain.ackdataForWhichImWatching', bitmessagemain.ackdataForWhichImWatching + # print 'shared.ackdataForWhichImWatching', shared.ackdataForWhichImWatching shared.printLock.release() # This is not an acknowledgement bound for me. See if it is a message @@ -1076,14 +1074,14 @@ class receiveDataThread(threading.Thread): # Display timing data timeRequiredToAttemptToDecryptMessage = time.time( ) - self.messageProcessingStartTime - bitmessagemain.successfullyDecryptMessageTimings.append( + shared.successfullyDecryptMessageTimings.append( timeRequiredToAttemptToDecryptMessage) sum = 0 - for item in bitmessagemain.successfullyDecryptMessageTimings: + for item in shared.successfullyDecryptMessageTimings: sum += item shared.printLock.acquire() print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage - print 'Average time for all message decryption successes since startup:', sum / len(bitmessagemain.successfullyDecryptMessageTimings) + print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings) shared.printLock.release() def isAckDataValid(self, ackData): @@ -1111,9 +1109,9 @@ class receiveDataThread(threading.Thread): return '[' + mailingListName + '] ' + subject def possibleNewPubkey(self, toRipe): - if toRipe in neededPubkeys: + if toRipe in shared.neededPubkeys: print 'We have been awaiting the arrival of this pubkey.' - del neededPubkeys[toRipe] + del shared.neededPubkeys[toRipe] t = (toRipe,) shared.sqlLock.acquire() shared.sqlSubmitQueue.put( @@ -1149,7 +1147,7 @@ class receiveDataThread(threading.Thread): else: readPosition += 4 - if embeddedTime < int(time.time()) - lengthOfTimeToHoldOnToAllPubkeys: + if embeddedTime < int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys: shared.printLock.acquire() print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime shared.printLock.release() @@ -1175,7 +1173,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this pubkey. Ignoring it.' shared.inventoryLock.release() return - elif bitmessagemain.isInSqlInventory(inventoryHash): + elif shared.isInSqlInventory(inventoryHash): print 'We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -1371,7 +1369,7 @@ class receiveDataThread(threading.Thread): if embeddedTime > int(time.time()) + 10800: print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime return - if embeddedTime < int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept: + if embeddedTime < int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept: print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime return requestedAddressVersionNumber, addressVersionLength = decodeVarint( @@ -1390,7 +1388,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this getpubkey request. Ignoring it.' shared.inventoryLock.release() return - elif bitmessagemain.isInSqlInventory(inventoryHash): + elif shared.isInSqlInventory(inventoryHash): print 'We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -1430,7 +1428,7 @@ class receiveDataThread(threading.Thread): shared.myAddressesByHash[requestedHash], 'lastpubkeysendtime')) except: lastPubkeySendTime = 0 - if lastPubkeySendTime < time.time() - lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was 28 days ago + if lastPubkeySendTime < time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was 28 days ago shared.printLock.acquire() print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.' shared.printLock.release() @@ -1452,11 +1450,11 @@ class receiveDataThread(threading.Thread): # We have received an inv message def recinv(self, data): totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = 0 # ..from all peers, counting duplicates seperately (because they take up memory) - if len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0: - for key, value in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items(): + if len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0: + for key, value in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items(): totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value shared.printLock.acquire() - print 'number of keys(hosts) in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) + print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave shared.printLock.release() numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10]) @@ -1472,13 +1470,13 @@ class receiveDataThread(threading.Thread): print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.' shared.printLock.release() return - self.objectsOfWhichThisRemoteNodeIsAlreadyAware[ + self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[ data[lengthOfVarint:32 + lengthOfVarint]] = 0 if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory: shared.printLock.acquire() print 'Inventory (in memory) has inventory item already.' shared.printLock.release() - elif bitmessagemain.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): + elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): print 'Inventory (SQL on disk) has inventory item already.' else: self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) @@ -1491,11 +1489,11 @@ class receiveDataThread(threading.Thread): print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.' shared.printLock.release() break - self.objectsOfWhichThisRemoteNodeIsAlreadyAware[data[ + self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[data[ lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0 self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[ data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0 - numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ + shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ self.HOST] = len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # Send a getdata message to our peer to request the object with the given @@ -1602,7 +1600,7 @@ class receiveDataThread(threading.Thread): numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint( data[:10]) - if bitmessagemain.verbose >= 1: + if shared.verbose >= 1: shared.printLock.acquire() print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' shared.printLock.release() @@ -1845,7 +1843,7 @@ class receiveDataThread(threading.Thread): datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] datatosend = datatosend + payload - if bitmessagemain.verbose >= 1: + if shared.verbose >= 1: shared.printLock.acquire() print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' shared.printLock.release() @@ -1895,7 +1893,7 @@ class receiveDataThread(threading.Thread): # print 'addrsInMyStream.items()', addrsInMyStream.items() for HOST, value in addrsInMyStream.items(): PORT, timeLastReceivedMessageFromThisNode = value - if timeLastReceivedMessageFromThisNode > (int(time.time()) - maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. + if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. numberOfAddressesInAddrMessage += 1 payload += pack( '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time @@ -1907,7 +1905,7 @@ class receiveDataThread(threading.Thread): payload += pack('>H', PORT) # remote port for HOST, value in addrsInChildStreamLeft.items(): PORT, timeLastReceivedMessageFromThisNode = value - if timeLastReceivedMessageFromThisNode > (int(time.time()) - maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. + if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. numberOfAddressesInAddrMessage += 1 payload += pack( '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time @@ -1919,7 +1917,7 @@ class receiveDataThread(threading.Thread): payload += pack('>H', PORT) # remote port for HOST, value in addrsInChildStreamRight.items(): PORT, timeLastReceivedMessageFromThisNode = value - if timeLastReceivedMessageFromThisNode > (int(time.time()) - maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. + if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old.. numberOfAddressesInAddrMessage += 1 payload += pack( '>Q', timeLastReceivedMessageFromThisNode) # 64-bit time @@ -1937,7 +1935,7 @@ class receiveDataThread(threading.Thread): datatosend = datatosend + payload try: self.sock.sendall(datatosend) - if bitmessagemain.verbose >= 1: + if shared.verbose >= 1: shared.printLock.acquire() print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.' shared.printLock.release() @@ -1991,7 +1989,7 @@ class receiveDataThread(threading.Thread): if not self.initiatedConnection: shared.broadcastToSendDataQueues(( 0, 'setStreamNumber', (self.HOST, self.streamNumber))) - if data[72:80] == bitmessagemain.eightBytesOfRandomDataUsedToDetectConnectionsToSelf: + if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf: shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) shared.printLock.acquire() print 'Closing connection to myself: ', self.HOST @@ -2018,7 +2016,7 @@ class receiveDataThread(threading.Thread): print 'Sending version message' shared.printLock.release() try: - self.sock.sendall(bitmessagemain.assembleVersionMessage( + self.sock.sendall(shared.assembleVersionMessage( self.HOST, self.PORT, self.streamNumber)) except Exception as err: # if not 'Bad file descriptor' in err: diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 1e3528dd..c1992067 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -8,7 +8,7 @@ import random import sys import socket -import bitmessagemain +#import bitmessagemain # Every connection to a peer has a sendDataThread (and also a # receiveDataThread). @@ -29,7 +29,7 @@ class sendDataThread(threading.Thread): HOST, PORT, streamNumber, - objectsOfWhichThisRemoteNodeIsAlreadyAware): + someObjectsOfWhichThisRemoteNodeIsAlreadyAware): self.sock = sock self.HOST = HOST self.PORT = PORT @@ -38,13 +38,13 @@ class sendDataThread(threading.Thread): 1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue. self.lastTimeISentData = int( time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive. - self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware + self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware shared.printLock.acquire() print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber shared.printLock.release() def sendVersionMessage(self): - datatosend = bitmessagemain.assembleVersionMessage( + datatosend = shared.assembleVersionMessage( self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber. shared.printLock.acquire() @@ -123,7 +123,7 @@ class sendDataThread(threading.Thread): print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.HOST break elif command == 'sendinv': - if data not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware: + if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: payload = '\x01' + data headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' @@ -147,6 +147,7 @@ class sendDataThread(threading.Thread): print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.HOST break elif command == 'pong': + self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. if self.lastTimeISentData < (int(time.time()) - 298): # Send out a pong message to keep the connection alive. shared.printLock.acquire() diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index f04a5507..5b77fdd4 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -1,8 +1,6 @@ import threading import shared import time -from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers,\ - neededPubkeys import sys '''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy. @@ -58,15 +56,15 @@ class singleCleaner(threading.Thread): shared.sqlLock.acquire() # inventory (clears pubkeys after 28 days and everything else # after 2 days and 12 hours) - t = (int(time.time()) - lengthOfTimeToLeaveObjectsInInventory, int( - time.time()) - lengthOfTimeToHoldOnToAllPubkeys) + t = (int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory, int( + time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys) shared.sqlSubmitQueue.put( '''DELETE FROM inventory WHERE (receivedtime'pubkey') OR (receivedtime (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (pubkeyretrynumber))): + if int(time.time()) - lastactiontime > (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (pubkeyretrynumber))): print 'It has been a long time and we haven\'t heard a response to our getpubkey request. Sending again.' try: - del neededPubkeys[ - toripe] # We need to take this entry out of the neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently. + del shared.neededPubkeys[ + toripe] # We need to take this entry out of the shared.neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently. except: pass @@ -107,7 +105,7 @@ class singleCleaner(threading.Thread): shared.sqlSubmitQueue.put('commit') shared.workerQueue.put(('sendmessage', '')) else: # status == msgsent - if int(time.time()) - lastactiontime > (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))): + if int(time.time()) - lastactiontime > (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))): print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.' t = (int( time.time()), msgretrynumber + 1, 'msgqueued', ackdata) diff --git a/src/class_singleListener.py b/src/class_singleListener.py index 12f954a4..fff351bf 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -1,8 +1,6 @@ import threading import shared import socket -import Queue - from class_sendDataThread import * from class_receiveDataThread import * @@ -64,18 +62,18 @@ class singleListener(threading.Thread): shared.printLock.release() a.close() a, (HOST, PORT) = sock.accept() - objectsOfWhichThisRemoteNodeIsAlreadyAware = {} + someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. a.settimeout(20) sd = sendDataThread() sd.setup( - a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware) + a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware) sd.start() rd = receiveDataThread() rd.daemon = True # close the main program even if there are threads left rd.setup( - a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) + a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) rd.start() shared.printLock.acquire() diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index e0d54c37..8b659247 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -1,15 +1,13 @@ import threading import shared import time -from time import strftime, localtime, gmtime import random from addresses import * -import bitmessagemain import highlevelcrypto import proofofwork -from bitmessagemain import neededPubkeys, encryptedBroadcastSwitchoverTime import sys from class_addressGenerator import pointMult +import tr # This thread, of which there is only one, does the heavy lifting: # calculating POWs. @@ -32,7 +30,7 @@ class singleWorker(threading.Thread): toripe, = row neededPubkeys[toripe] = 0 - # Initialize the bitmessagemain.ackdataForWhichImWatching data structure using data + # Initialize the shared.ackdataForWhichImWatching data structure using data # from the sql database. shared.sqlLock.acquire() shared.sqlSubmitQueue.put( @@ -43,7 +41,7 @@ class singleWorker(threading.Thread): for row in queryreturn: ackdata, = row print 'Watching for ackdata', ackdata.encode('hex') - bitmessagemain.ackdataForWhichImWatching[ackdata] = 0 + shared.ackdataForWhichImWatching[ackdata] = 0 shared.sqlLock.acquire() shared.sqlSubmitQueue.put( @@ -262,7 +260,7 @@ class singleWorker(threading.Thread): fromaddress, subject, body, ackdata = row status, addressVersionNumber, streamNumber, ripe = decodeAddress( fromaddress) - if addressVersionNumber == 2 and int(time.time()) < encryptedBroadcastSwitchoverTime: + """if addressVersionNumber == 2 and int(time.time()) < shared.encryptedBroadcastSwitchoverTime: # We need to convert our private keys to public keys in order # to include them. try: @@ -272,7 +270,7 @@ class singleWorker(threading.Thread): fromaddress, 'privencryptionkey') except: shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) + ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) continue privSigningKeyHex = shared.decodeWalletImportFormat( @@ -307,7 +305,7 @@ class singleWorker(threading.Thread): payload) + shared.networkDefaultPayloadLengthExtraBytes + 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) print '(For broadcast message) Doing proof of work...' shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send broadcast...")))) + ackdata, tr.translateText("MainWindow", "Doing work necessary to send broadcast...")))) initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce @@ -322,7 +320,7 @@ class singleWorker(threading.Thread): shared.broadcastToSendDataQueues(( streamNumber, 'sendinv', inventoryHash)) - shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Broadcast sent on %1").arg(unicode( + shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) # Update the status of the message in the 'sent' table to have @@ -335,8 +333,8 @@ class singleWorker(threading.Thread): shared.sqlSubmitQueue.put(t) queryreturn = shared.sqlReturnQueue.get() shared.sqlSubmitQueue.put('commit') - shared.sqlLock.release() - elif addressVersionNumber == 3 or int(time.time()) > encryptedBroadcastSwitchoverTime: + shared.sqlLock.release()""" + if addressVersionNumber == 2 or addressVersionNumber == 3: # We need to convert our private keys to public keys in order # to include them. try: @@ -346,7 +344,7 @@ class singleWorker(threading.Thread): fromaddress, 'privencryptionkey') except: shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) + ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) continue privSigningKeyHex = shared.decodeWalletImportFormat( @@ -394,7 +392,7 @@ class singleWorker(threading.Thread): payload) + shared.networkDefaultPayloadLengthExtraBytes + 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) print '(For broadcast message) Doing proof of work...' shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send broadcast...")))) + ackdata, tr.translateText("MainWindow", "Doing work necessary to send broadcast...")))) initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce @@ -409,7 +407,7 @@ class singleWorker(threading.Thread): shared.broadcastToSendDataQueues(( streamNumber, 'sendinv', inventoryHash)) - shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Broadcast sent on %1").arg(unicode( + shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) # Update the status of the message in the 'sent' table to have @@ -467,7 +465,7 @@ class singleWorker(threading.Thread): shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() shared.UISignalQueue.put(('updateSentItemStatusByHash', ( - toripe, bitmessagemain.translateText("MainWindow",'Encryption key was requested earlier.')))) + toripe, tr.translateText("MainWindow",'Encryption key was requested earlier.')))) else: # We have not yet sent a request for the pubkey t = (toaddress,) @@ -479,7 +477,7 @@ class singleWorker(threading.Thread): shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() shared.UISignalQueue.put(('updateSentItemStatusByHash', ( - toripe, bitmessagemain.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) + toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) self.requestPubKey(toaddress) shared.sqlLock.acquire() # Get all messages that are ready to be sent, and also all messages @@ -521,16 +519,16 @@ class singleWorker(threading.Thread): shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() shared.UISignalQueue.put(('updateSentItemStatusByHash', ( - toripe, bitmessagemain.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) + toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.')))) self.requestPubKey(toaddress) continue - bitmessagemain.ackdataForWhichImWatching[ackdata] = 0 + shared.ackdataForWhichImWatching[ackdata] = 0 toStatus, toAddressVersionNumber, toStreamNumber, toHash = decodeAddress( toaddress) fromStatus, fromAddressVersionNumber, fromStreamNumber, fromHash = decodeAddress( fromaddress) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Looking up the receiver\'s public key")))) + ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key")))) shared.printLock.acquire() print 'Found a message in our database that needs to be sent with this pubkey.' print 'First 150 characters of message:', repr(message[:150]) @@ -593,7 +591,7 @@ class singleWorker(threading.Thread): requiredAverageProofOfWorkNonceTrialsPerByte = shared.networkDefaultProofOfWorkNonceTrialsPerByte requiredPayloadLengthExtraBytes = shared.networkDefaultPayloadLengthExtraBytes shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send message.\nThere is no required difficulty for version 2 addresses like this.")))) + ackdata, tr.translateText("MainWindow", "Doing work necessary to send message.\nThere is no required difficulty for version 2 addresses like this.")))) elif toAddressVersionNumber == 3: requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = decodeVarint( pubkeyPayload[readPosition:readPosition + 10]) @@ -605,7 +603,7 @@ class singleWorker(threading.Thread): requiredAverageProofOfWorkNonceTrialsPerByte = shared.networkDefaultProofOfWorkNonceTrialsPerByte if requiredPayloadLengthExtraBytes < shared.networkDefaultPayloadLengthExtraBytes: requiredPayloadLengthExtraBytes = shared.networkDefaultPayloadLengthExtraBytes - shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send message.\nReceiver\'s required difficulty: %1 and %2").arg(str(float( + shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Doing work necessary to send message.\nReceiver\'s required difficulty: %1 and %2").arg(str(float( requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes))))) if status != 'forcepow': if (requiredAverageProofOfWorkNonceTrialsPerByte > shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') and shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') != 0) or (requiredPayloadLengthExtraBytes > shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') and shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') != 0): @@ -619,7 +617,7 @@ class singleWorker(threading.Thread): shared.sqlReturnQueue.get() shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() - shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float( + shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float( requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) continue @@ -640,7 +638,7 @@ class singleWorker(threading.Thread): fromaddress, 'privencryptionkey') except: shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) + ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) continue privSigningKeyHex = shared.decodeWalletImportFormat( @@ -686,7 +684,7 @@ class singleWorker(threading.Thread): fromaddress, 'privencryptionkey') except: shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( - ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) + ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file.")))) continue privSigningKeyHex = shared.decodeWalletImportFormat( @@ -743,7 +741,7 @@ class singleWorker(threading.Thread): queryreturn = shared.sqlReturnQueue.get() shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() - shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,bitmessagemain.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))))) + shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8'))))) continue encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted target = 2**64 / ((len(encryptedPayload)+requiredPayloadLengthExtraBytes+8) * requiredAverageProofOfWorkNonceTrialsPerByte) @@ -766,7 +764,7 @@ class singleWorker(threading.Thread): objectType = 'msg' shared.inventory[inventoryHash] = ( objectType, toStreamNumber, encryptedPayload, int(time.time())) - shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Message sent. Waiting on acknowledgement. Sent on %1").arg(unicode( + shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Waiting on acknowledgement. Sent on %1").arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex') shared.broadcastToSendDataQueues(( @@ -804,7 +802,7 @@ class singleWorker(threading.Thread): statusbar = 'Doing the computations necessary to request the recipient\'s public key.' shared.UISignalQueue.put(('updateStatusBar', statusbar)) shared.UISignalQueue.put(('updateSentItemStatusByHash', ( - ripe, bitmessagemain.translateText("MainWindow",'Doing work necessary to request encryption key.')))) + ripe, tr.translateText("MainWindow",'Doing work necessary to request encryption key.')))) target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes + 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) initialHash = hashlib.sha512(payload).digest() @@ -832,8 +830,8 @@ class singleWorker(threading.Thread): shared.sqlLock.release() shared.UISignalQueue.put(( - 'updateStatusBar', bitmessagemain.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.'))) - shared.UISignalQueue.put(('updateSentItemStatusByHash', (ripe, bitmessagemain.translateText("MainWindow",'Sending public key request. Waiting for reply. Requested at %1').arg(unicode( + 'updateStatusBar', tr.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.'))) + shared.UISignalQueue.put(('updateSentItemStatusByHash', (ripe, tr.translateText("MainWindow",'Sending public key request. Waiting for reply. Requested at %1').arg(unicode( strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8'))))) def generateFullAckMessage(self, ackdata, toStreamNumber, embeddedTime): diff --git a/src/shared.py b/src/shared.py index b225012c..95be2a94 100644 --- a/src/shared.py +++ b/src/shared.py @@ -1,4 +1,12 @@ softwareVersion = '0.3.3-2' +verbose = 1 +maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # Equals two days and 12 hours. +lengthOfTimeToLeaveObjectsInInventory = 237600 # Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice. +lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time. +maximumAgeOfObjectsThatIAdvertiseToOthers = 216000 # Equals two days and 12 hours +maximumAgeOfNodesThatIAdvertiseToOthers = 10800 # Equals three hours +useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages. + import threading import sys @@ -9,6 +17,10 @@ import pickle import os import time import ConfigParser +import socket +import random +import highlevelcrypto +import shared config = ConfigParser.SafeConfigParser() myECCryptorObjects = {} @@ -31,11 +43,74 @@ appdata = '' #holds the location of the application data storage directory statusIconColor = 'red' connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice. shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit. +alreadyAttemptedConnectionsList = { +} # This is a list of nodes to which we have already attempted a connection +alreadyAttemptedConnectionsListLock = threading.Lock() +alreadyAttemptedConnectionsListResetTime = int( + time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect. +numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {} +neededPubkeys = {} +eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack( + '>Q', random.randrange(1, 18446744073709551615)) +successfullyDecryptMessageTimings = [ + ] # A list of the amounts of time it took to successfully decrypt msg messages +apiAddressGeneratorReturnQueue = Queue.Queue( + ) # The address generator thread uses this queue to get information back to the API thread. +ackdataForWhichImWatching = {} #If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them! networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work. networkDefaultPayloadLengthExtraBytes = 14000 #To make sending short messages a little more difficult, this value is added to the payload length for use in calculating the proof of work target. + + +def isInSqlInventory(hash): + t = (hash,) + shared.sqlLock.acquire() + shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''') + shared.sqlSubmitQueue.put(t) + queryreturn = shared.sqlReturnQueue.get() + shared.sqlLock.release() + if queryreturn == []: + return False + else: + return True + +def assembleVersionMessage(remoteHost, remotePort, myStreamNumber): + payload = '' + payload += pack('>L', 2) # protocol version. + payload += pack('>q', 1) # bitflags of the services I offer. + payload += pack('>q', int(time.time())) + + payload += pack( + '>q', 1) # boolservices of remote connection. How can I even know this for sure? This is probably ignored by the remote host. + payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ + socket.inet_aton(remoteHost) + payload += pack('>H', remotePort) # remote IPv6 and port + + payload += pack('>q', 1) # bitflags of the services I offer. + payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack( + '>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used. + payload += pack('>H', shared.config.getint( + 'bitmessagesettings', 'port')) # my external IPv6 and port + + random.seed() + payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf + userAgent = '/PyBitmessage:' + shared.softwareVersion + \ + '/' # Length of userAgent must be less than 253. + payload += pack('>B', len( + userAgent)) # user agent string length. If the user agent is more than 252 bytes long, this code isn't going to work. + payload += userAgent + payload += encodeVarint( + 1) # The number of streams about which I care. PyBitmessage currently only supports 1 per connection. + payload += encodeVarint(myStreamNumber) + + datatosend = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. + datatosend = datatosend + 'version\x00\x00\x00\x00\x00' # version command + datatosend = datatosend + pack('>L', len(payload)) # payload length + datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] + return datatosend + payload + def lookupAppdataFolder(): APPNAME = "PyBitmessage" from os import path, environ diff --git a/src/tr.py b/src/tr.py new file mode 100644 index 00000000..1f3ef9b8 --- /dev/null +++ b/src/tr.py @@ -0,0 +1,30 @@ +import shared + +# This is used so that the translateText function can be used when we are in daemon mode and not using any QT functions. +class translateClass: + def __init__(self, context, text): + self.context = context + self.text = text + def arg(self,argument): + if '%' in self.text: + return translateClass(self.context, self.text.replace('%','',1)) # This doesn't actually do anything with the arguments because we don't have a UI in which to display this information anyway. + else: + return self.text + +def _translate(context, text): + return translateText(context, text) + +def translateText(context, text): + if not shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'): + try: + from PyQt4 import QtCore, QtGui + except Exception as err: + print 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon' + print 'Error message:', err + os._exit(0) + return QtGui.QApplication.translate(context, text) + else: + if '%' in text: + return translateClass(context, text.replace('%','',1)) + else: + return text \ No newline at end of file