diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index d25efbc7..b14d0edd 100644 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -18,8 +18,12 @@ storeConfigFilesInSameDirectoryAsProgramByDefault = False # The user may de-sel useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages. encryptedBroadcastSwitchoverTime = 1369735200 +alreadyAttemptedConnectionsList = { +} # This is a list of nodes to which we have already attempted a connection +numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {} +neededPubkeys = {} + import sys -import ConfigParser import Queue from addresses import * import shared @@ -33,13 +37,11 @@ import pickle import random import sqlite3 from time import strftime, localtime, gmtime -import shutil # used for moving the messages.dat file import string import socks import highlevelcrypto from pyelliptic.openssl import OpenSSL -import ctypes -from pyelliptic import arithmetic +#import ctypes import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. # The next 3 are used for the API from SimpleXMLRPCServer import * @@ -49,13 +51,12 @@ import singleton import proofofwork # Classes -from class_singleListener import * from class_sqlThread import * from class_singleCleaner import * from class_singleWorker import * +from class_outgoingSynSender import * +from class_singleListener import * from class_addressGenerator import * -from class_sendDataThread import * -from class_receiveDataThread import * # Helper Functions import helper_startup @@ -65,167 +66,6 @@ import helper_sent import helper_generic import helper_bitcoin -# For each stream to which we connect, several outgoingSynSender threads -# will exist and will collectively create 8 connections with peers. - -class outgoingSynSender(threading.Thread): - - def __init__(self): - threading.Thread.__init__(self) - - def setup(self, streamNumber): - self.streamNumber = streamNumber - - def run(self): - time.sleep(1) - global alreadyAttemptedConnectionsListResetTime - while True: - while len(selfInitiatedConnections[self.streamNumber]) >= 8: # maximum number of outgoing connections = 8 - time.sleep(10) - if shared.shutdown: - break - random.seed() - shared.knownNodesLock.acquire() - HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) - shared.knownNodesLock.release() - alreadyAttemptedConnectionsListLock.acquire() - while HOST in alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList: - alreadyAttemptedConnectionsListLock.release() - # print 'choosing new sample' - random.seed() - shared.knownNodesLock.acquire() - HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) - shared.knownNodesLock.release() - time.sleep(1) - # Clear out the alreadyAttemptedConnectionsList every half - # hour so that this program will again attempt a connection - # to any nodes, even ones it has already tried. - if (time.time() - alreadyAttemptedConnectionsListResetTime) > 1800: - alreadyAttemptedConnectionsList.clear() - alreadyAttemptedConnectionsListResetTime = int( - time.time()) - alreadyAttemptedConnectionsListLock.acquire() - alreadyAttemptedConnectionsList[HOST] = 0 - alreadyAttemptedConnectionsListLock.release() - PORT, timeNodeLastSeen = shared.knownNodes[ - self.streamNumber][HOST] - sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) - # This option apparently avoids the TIME_WAIT state so that we - # can rebind faster - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.settimeout(20) - if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and verbose >= 2: - shared.printLock.acquire() - print 'Trying an outgoing connection to', HOST, ':', PORT - shared.printLock.release() - # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a': - if verbose >= 2: - shared.printLock.acquire() - print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT - shared.printLock.release() - proxytype = socks.PROXY_TYPE_SOCKS4 - sockshostname = shared.config.get( - 'bitmessagesettings', 'sockshostname') - socksport = shared.config.getint( - 'bitmessagesettings', 'socksport') - rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway. - if shared.config.getboolean('bitmessagesettings', 'socksauthentication'): - socksusername = shared.config.get( - 'bitmessagesettings', 'socksusername') - sockspassword = shared.config.get( - 'bitmessagesettings', 'sockspassword') - sock.setproxy( - proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) - else: - sock.setproxy( - proxytype, sockshostname, socksport, rdns) - elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': - if verbose >= 2: - shared.printLock.acquire() - print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT - shared.printLock.release() - proxytype = socks.PROXY_TYPE_SOCKS5 - sockshostname = shared.config.get( - 'bitmessagesettings', 'sockshostname') - socksport = shared.config.getint( - 'bitmessagesettings', 'socksport') - rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway. - if shared.config.getboolean('bitmessagesettings', 'socksauthentication'): - socksusername = shared.config.get( - 'bitmessagesettings', 'socksusername') - sockspassword = shared.config.get( - 'bitmessagesettings', 'sockspassword') - sock.setproxy( - proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) - else: - sock.setproxy( - proxytype, sockshostname, socksport, rdns) - - try: - sock.connect((HOST, PORT)) - rd = receiveDataThread() - rd.daemon = True # close the main program even if there are threads left - objectsOfWhichThisRemoteNodeIsAlreadyAware = {} - rd.setup(sock, HOST, PORT, self.streamNumber, - objectsOfWhichThisRemoteNodeIsAlreadyAware) - rd.start() - shared.printLock.acquire() - print self, 'connected to', HOST, 'during an outgoing attempt.' - shared.printLock.release() - - sd = sendDataThread() - sd.setup(sock, HOST, PORT, self.streamNumber, - objectsOfWhichThisRemoteNodeIsAlreadyAware) - sd.start() - sd.sendVersionMessage() - - except socks.GeneralProxyError as err: - if verbose >= 2: - shared.printLock.acquire() - print 'Could NOT connect to', HOST, 'during outgoing attempt.', err - shared.printLock.release() - PORT, timeLastSeen = shared.knownNodes[ - self.streamNumber][HOST] - if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure. - shared.knownNodesLock.acquire() - del shared.knownNodes[self.streamNumber][HOST] - shared.knownNodesLock.release() - shared.printLock.acquire() - print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.' - shared.printLock.release() - except socks.Socks5AuthError as err: - shared.UISignalQueue.put(( - 'updateStatusBar', translateText( - "MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err)))) - except socks.Socks5Error as err: - pass - print 'SOCKS5 error. (It is possible that the server wants authentication).)', str(err) - except socks.Socks4Error as err: - print 'Socks4Error:', err - except socket.error as err: - if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': - print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err) - else: - if verbose >= 1: - shared.printLock.acquire() - print 'Could NOT connect to', HOST, 'during outgoing attempt.', err - shared.printLock.release() - PORT, timeLastSeen = shared.knownNodes[ - self.streamNumber][HOST] - if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure. - shared.knownNodesLock.acquire() - del shared.knownNodes[self.streamNumber][HOST] - shared.knownNodesLock.release() - shared.printLock.acquire() - print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' - shared.printLock.release() - except Exception as err: - sys.stderr.write( - 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: %s\n' % err) - time.sleep(0.1) - - def isInSqlInventory(hash): t = (hash,) shared.sqlLock.acquire() @@ -247,7 +87,7 @@ def connectToStream(streamNumber): maximumNumberOfHalfOpenConnections = 32 for i in range(maximumNumberOfHalfOpenConnections): a = outgoingSynSender() - a.setup(streamNumber) + a.setup(streamNumber, selfInitiatedConnections) a.start() @@ -946,20 +786,16 @@ def translateText(context, text): selfInitiatedConnections = {} # This is a list of current connections (the thread pointers at least) -alreadyAttemptedConnectionsList = { -} # This is a list of nodes to which we have already attempted a connection ackdataForWhichImWatching = {} alreadyAttemptedConnectionsListLock = threading.Lock() eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack( '>Q', random.randrange(1, 18446744073709551615)) -neededPubkeys = {} successfullyDecryptMessageTimings = [ ] # A list of the amounts of time it took to successfully decrypt msg messages apiAddressGeneratorReturnQueue = Queue.Queue( ) # The address generator thread uses this queue to get information back to the API thread. alreadyAttemptedConnectionsListResetTime = int( time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect. -numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {} if useVeryEasyProofOfWorkForTesting: shared.networkDefaultProofOfWorkNonceTrialsPerByte = int( @@ -1032,6 +868,7 @@ if __name__ == "__main__": connectToStream(1) singleListenerThread = singleListener() + singleListenerThread.setup(selfInitiatedConnections) singleListenerThread.daemon = True # close the main program even if there are threads left singleListenerThread.start() diff --git a/src/class_addressGenerator.py b/src/class_addressGenerator.py index 3059bc90..e22fdd51 100644 --- a/src/class_addressGenerator.py +++ b/src/class_addressGenerator.py @@ -6,6 +6,7 @@ from pyelliptic.openssl import OpenSSL import ctypes import hashlib from addresses import * +from pyelliptic import arithmetic class addressGenerator(threading.Thread): diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py new file mode 100644 index 00000000..ee8c5640 --- /dev/null +++ b/src/class_outgoingSynSender.py @@ -0,0 +1,173 @@ +import threading +import time +import random +import shared +import socks +import socket +import sys + +import bitmessagemain +from class_sendDataThread import * +from class_receiveDataThread import * + +# For each stream to which we connect, several outgoingSynSender threads +# will exist and will collectively create 8 connections with peers. + +class outgoingSynSender(threading.Thread): + + def __init__(self): + threading.Thread.__init__(self) + + def setup(self, streamNumber, selfInitiatedConnections): + self.streamNumber = streamNumber + self.selfInitiatedConnections = selfInitiatedConnections + + def run(self): + time.sleep(1) + while True: + while len(self.selfInitiatedConnections[self.streamNumber]) >= 8: # maximum number of outgoing connections = 8 + time.sleep(10) + if shared.shutdown: + break + random.seed() + shared.knownNodesLock.acquire() + HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) + shared.knownNodesLock.release() + bitmessagemain.alreadyAttemptedConnectionsListLock.acquire() + while HOST in bitmessagemain.alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList: + bitmessagemain.alreadyAttemptedConnectionsListLock.release() + # print 'choosing new sample' + random.seed() + shared.knownNodesLock.acquire() + HOST, = random.sample(shared.knownNodes[self.streamNumber], 1) + shared.knownNodesLock.release() + time.sleep(1) + # Clear out the bitmessagemain.alreadyAttemptedConnectionsList every half + # hour so that this program will again attempt a connection + # to any nodes, even ones it has already tried. + if (time.time() - bitmessagemain.alreadyAttemptedConnectionsListResetTime) > 1800: + bitmessagemain.alreadyAttemptedConnectionsList.clear() + bitmessagemain.alreadyAttemptedConnectionsListResetTime = int( + time.time()) + bitmessagemain.alreadyAttemptedConnectionsListLock.acquire() + bitmessagemain.alreadyAttemptedConnectionsList[HOST] = 0 + bitmessagemain.alreadyAttemptedConnectionsListLock.release() + PORT, timeNodeLastSeen = shared.knownNodes[ + self.streamNumber][HOST] + sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) + # This option apparently avoids the TIME_WAIT state so that we + # can rebind faster + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.settimeout(20) + if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and bitmessagemain.verbose >= 2: + shared.printLock.acquire() + print 'Trying an outgoing connection to', HOST, ':', PORT + shared.printLock.release() + # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a': + if bitmessagemain.verbose >= 2: + shared.printLock.acquire() + print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT + shared.printLock.release() + proxytype = socks.PROXY_TYPE_SOCKS4 + sockshostname = shared.config.get( + 'bitmessagesettings', 'sockshostname') + socksport = shared.config.getint( + 'bitmessagesettings', 'socksport') + rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway. + if shared.config.getboolean('bitmessagesettings', 'socksauthentication'): + socksusername = shared.config.get( + 'bitmessagesettings', 'socksusername') + sockspassword = shared.config.get( + 'bitmessagesettings', 'sockspassword') + sock.setproxy( + proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) + else: + sock.setproxy( + proxytype, sockshostname, socksport, rdns) + elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': + if bitmessagemain.verbose >= 2: + shared.printLock.acquire() + print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT + shared.printLock.release() + proxytype = socks.PROXY_TYPE_SOCKS5 + sockshostname = shared.config.get( + 'bitmessagesettings', 'sockshostname') + socksport = shared.config.getint( + 'bitmessagesettings', 'socksport') + rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway. + if shared.config.getboolean('bitmessagesettings', 'socksauthentication'): + socksusername = shared.config.get( + 'bitmessagesettings', 'socksusername') + sockspassword = shared.config.get( + 'bitmessagesettings', 'sockspassword') + sock.setproxy( + proxytype, sockshostname, socksport, rdns, socksusername, sockspassword) + else: + sock.setproxy( + proxytype, sockshostname, socksport, rdns) + + try: + sock.connect((HOST, PORT)) + rd = receiveDataThread() + rd.daemon = True # close the main program even if there are threads left + objectsOfWhichThisRemoteNodeIsAlreadyAware = {} + rd.setup(sock, HOST, PORT, self.streamNumber, + objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) + rd.start() + shared.printLock.acquire() + print self, 'connected to', HOST, 'during an outgoing attempt.' + shared.printLock.release() + + sd = sendDataThread() + sd.setup(sock, HOST, PORT, self.streamNumber, + objectsOfWhichThisRemoteNodeIsAlreadyAware) + sd.start() + sd.sendVersionMessage() + + except socks.GeneralProxyError as err: + if bitmessagemain.verbose >= 2: + shared.printLock.acquire() + print 'Could NOT connect to', HOST, 'during outgoing attempt.', err + shared.printLock.release() + PORT, timeLastSeen = shared.knownNodes[ + self.streamNumber][HOST] + if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure. + shared.knownNodesLock.acquire() + del shared.knownNodes[self.streamNumber][HOST] + shared.knownNodesLock.release() + shared.printLock.acquire() + print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.' + shared.printLock.release() + except socks.Socks5AuthError as err: + shared.UISignalQueue.put(( + 'updateStatusBar', translateText( + "MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err)))) + except socks.Socks5Error as err: + pass + print 'SOCKS5 error. (It is possible that the server wants authentication).)', str(err) + except socks.Socks4Error as err: + print 'Socks4Error:', err + except socket.error as err: + if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': + print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err) + else: + if bitmessagemain.verbose >= 1: + shared.printLock.acquire() + print 'Could NOT connect to', HOST, 'during outgoing attempt.', err + shared.printLock.release() + PORT, timeLastSeen = shared.knownNodes[ + self.streamNumber][HOST] + if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure. + shared.knownNodesLock.acquire() + del shared.knownNodes[self.streamNumber][HOST] + shared.knownNodesLock.release() + shared.printLock.acquire() + print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' + shared.printLock.release() + except Exception as err: + sys.stderr.write( + 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ') + import traceback + traceback.print_exc() + time.sleep(0.1) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index 2b7b0288..decd9618 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -1,6 +1,18 @@ import time import threading import shared +import hashlib +import socket +import pickle +import random +from struct import unpack, pack +import sys + +from addresses import * +import helper_generic +import bitmessagemain +from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers, numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, neededPubkeys + # This thread is created either by the synSenderThread(for outgoing # connections) or the singleListenerThread(for incoming connectiosn). @@ -20,13 +32,15 @@ class receiveDataThread(threading.Thread): HOST, port, streamNumber, - objectsOfWhichThisRemoteNodeIsAlreadyAware): + objectsOfWhichThisRemoteNodeIsAlreadyAware, + selfInitiatedConnections): self.sock = sock self.HOST = HOST self.PORT = port self.streamNumber = streamNumber self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {} + self.selfInitiatedConnections = selfInitiatedConnections shared.connectedHostsList[ self.HOST] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it. self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections. @@ -34,7 +48,7 @@ class receiveDataThread(threading.Thread): self.initiatedConnection = False else: self.initiatedConnection = True - selfInitiatedConnections[streamNumber][self] = 0 + self.selfInitiatedConnections[streamNumber][self] = 0 self.ackDataThatWeHaveYetToSend = [ ] # When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer. self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware @@ -66,7 +80,7 @@ class receiveDataThread(threading.Thread): self.processData() try: - del selfInitiatedConnections[self.streamNumber][self] + del self.selfInitiatedConnections[self.streamNumber][self] shared.printLock.acquire() print 'removed self (a receiveDataThread) from selfInitiatedConnections' shared.printLock.release() @@ -90,15 +104,14 @@ class receiveDataThread(threading.Thread): shared.printLock.release() def processData(self): - global verbose - # if verbose >= 3: + # if bitmessagemain.verbose >= 3: # shared.printLock.acquire() # print 'self.data is currently ', repr(self.data) # shared.printLock.release() if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length return if self.data[0:4] != '\xe9\xbe\xb4\xd9': - if verbose >= 1: + if bitmessagemain.verbose >= 1: shared.printLock.acquire() print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40]) shared.printLock.release() @@ -163,8 +176,8 @@ class receiveDataThread(threading.Thread): shared.printLock.release() del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[ objectHash] - elif isInSqlInventory(objectHash): - if verbose >= 3: + elif bitmessagemain.isInSqlInventory(objectHash): + if bitmessagemain.verbose >= 3: shared.printLock.acquire() print 'Inventory (SQL on disk) already has object listed in inv message.' shared.printLock.release() @@ -364,7 +377,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this broadcast object. Ignoring.' shared.inventoryLock.release() return - elif isInSqlInventory(self.inventoryHash): + elif bitmessagemain.isInSqlInventory(self.inventoryHash): print 'We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -744,7 +757,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this msg message. Ignoring.' shared.inventoryLock.release() return - elif isInSqlInventory(self.inventoryHash): + elif bitmessagemain.isInSqlInventory(self.inventoryHash): print 'We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -788,11 +801,11 @@ class receiveDataThread(threading.Thread): def processmsg(self, readPosition, encryptedData): initialDecryptionSuccessful = False # Let's check whether this is a message acknowledgement bound for us. - if encryptedData[readPosition:] in ackdataForWhichImWatching: + if encryptedData[readPosition:] in bitmessagemain.ackdataForWhichImWatching: shared.printLock.acquire() print 'This msg IS an acknowledgement bound for me.' shared.printLock.release() - del ackdataForWhichImWatching[encryptedData[readPosition:]] + del bitmessagemain.ackdataForWhichImWatching[encryptedData[readPosition:]] t = ('ackreceived', encryptedData[readPosition:]) shared.sqlLock.acquire() shared.sqlSubmitQueue.put( @@ -807,7 +820,7 @@ class receiveDataThread(threading.Thread): else: shared.printLock.acquire() print 'This was NOT an acknowledgement bound for me.' - # print 'ackdataForWhichImWatching', ackdataForWhichImWatching + # print 'bitmessagemain.ackdataForWhichImWatching', bitmessagemain.ackdataForWhichImWatching shared.printLock.release() # This is not an acknowledgement bound for me. See if it is a message @@ -1155,7 +1168,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this pubkey. Ignoring it.' shared.inventoryLock.release() return - elif isInSqlInventory(inventoryHash): + elif bitmessagemain.isInSqlInventory(inventoryHash): print 'We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -1370,7 +1383,7 @@ class receiveDataThread(threading.Thread): print 'We have already received this getpubkey request. Ignoring it.' shared.inventoryLock.release() return - elif isInSqlInventory(inventoryHash): + elif bitmessagemain.isInSqlInventory(inventoryHash): print 'We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.' shared.inventoryLock.release() return @@ -1458,7 +1471,7 @@ class receiveDataThread(threading.Thread): shared.printLock.acquire() print 'Inventory (in memory) has inventory item already.' shared.printLock.release() - elif isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): + elif bitmessagemain.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): print 'Inventory (SQL on disk) has inventory item already.' else: self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) @@ -1582,7 +1595,7 @@ class receiveDataThread(threading.Thread): numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint( data[:10]) - if verbose >= 1: + if bitmessagemain.verbose >= 1: shared.printLock.acquire() print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' shared.printLock.release() @@ -1825,7 +1838,7 @@ class receiveDataThread(threading.Thread): datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] datatosend = datatosend + payload - if verbose >= 1: + if bitmessagemain.verbose >= 1: shared.printLock.acquire() print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' shared.printLock.release() @@ -1917,7 +1930,7 @@ class receiveDataThread(threading.Thread): datatosend = datatosend + payload try: self.sock.sendall(datatosend) - if verbose >= 1: + if bitmessagemain.verbose >= 1: shared.printLock.acquire() print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.' shared.printLock.release() @@ -1971,7 +1984,7 @@ class receiveDataThread(threading.Thread): if not self.initiatedConnection: shared.broadcastToSendDataQueues(( 0, 'setStreamNumber', (self.HOST, self.streamNumber))) - if data[72:80] == eightBytesOfRandomDataUsedToDetectConnectionsToSelf: + if data[72:80] == bitmessagemain.eightBytesOfRandomDataUsedToDetectConnectionsToSelf: shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) shared.printLock.acquire() print 'Closing connection to myself: ', self.HOST @@ -1998,7 +2011,7 @@ class receiveDataThread(threading.Thread): print 'Sending version message' shared.printLock.release() try: - self.sock.sendall(assembleVersionMessage( + self.sock.sendall(bitmessagemain.assembleVersionMessage( self.HOST, self.PORT, self.streamNumber)) except Exception as err: # if not 'Bad file descriptor' in err: diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index 68791198..fd0ca786 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -1,6 +1,10 @@ import time import threading import shared +import Queue +from struct import unpack, pack + +import bitmessagemain # Every connection to a peer has a sendDataThread (and also a # receiveDataThread). @@ -36,7 +40,7 @@ class sendDataThread(threading.Thread): shared.printLock.release() def sendVersionMessage(self): - datatosend = assembleVersionMessage( + datatosend = bitmessagemain.assembleVersionMessage( self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber. shared.printLock.acquire() diff --git a/src/class_singleListener.py b/src/class_singleListener.py index c050907c..12f954a4 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -1,6 +1,7 @@ import threading import shared import socket +import Queue from class_sendDataThread import * from class_receiveDataThread import * @@ -18,6 +19,9 @@ class singleListener(threading.Thread): def __init__(self): threading.Thread.__init__(self) + def setup(self, selfInitiatedConnections): + self.selfInitiatedConnections = selfInitiatedConnections + def run(self): # We don't want to accept incoming connections if the user is using a # SOCKS proxy. If they eventually select proxy 'none' then this will @@ -71,7 +75,7 @@ class singleListener(threading.Thread): rd = receiveDataThread() rd.daemon = True # close the main program even if there are threads left rd.setup( - a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware) + a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) rd.start() shared.printLock.acquire() diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index e483284f..210a78e3 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -2,6 +2,7 @@ import threading import shared import sqlite3 import time +import shutil # used for moving the messages.dat file # This thread exists because SQLITE3 is so un-threadsafe that we must # submit queries to it and it puts results back in a different queue. They diff --git a/src/helper_bootstrap.py b/src/helper_bootstrap.py index 2dcd285c..296dda6b 100644 --- a/src/helper_bootstrap.py +++ b/src/helper_bootstrap.py @@ -2,6 +2,7 @@ import shared import socket import defaultKnownNodes import pickle +import time def knownNodes(): try: