Separated code in to many individual modules #253

Merged
Atheros1 merged 16 commits from master into master 2013-06-25 05:24:50 +02:00
8 changed files with 230 additions and 196 deletions
Showing only changes of commit fe8998ca3a - Show all commits

View File

@ -18,8 +18,12 @@ storeConfigFilesInSameDirectoryAsProgramByDefault = False # The user may de-sel
useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages. useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages.
encryptedBroadcastSwitchoverTime = 1369735200 encryptedBroadcastSwitchoverTime = 1369735200
alreadyAttemptedConnectionsList = {
} # This is a list of nodes to which we have already attempted a connection
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {}
neededPubkeys = {}
import sys import sys
import ConfigParser
import Queue import Queue
from addresses import * from addresses import *
import shared import shared
@ -33,13 +37,11 @@ import pickle
import random import random
import sqlite3 import sqlite3
from time import strftime, localtime, gmtime from time import strftime, localtime, gmtime
import shutil # used for moving the messages.dat file
import string import string
import socks import socks
import highlevelcrypto import highlevelcrypto
from pyelliptic.openssl import OpenSSL from pyelliptic.openssl import OpenSSL
import ctypes #import ctypes
from pyelliptic import arithmetic
import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully.
# The next 3 are used for the API # The next 3 are used for the API
from SimpleXMLRPCServer import * from SimpleXMLRPCServer import *
@ -49,13 +51,12 @@ import singleton
import proofofwork import proofofwork
# Classes # Classes
from class_singleListener import *
from class_sqlThread import * from class_sqlThread import *
from class_singleCleaner import * from class_singleCleaner import *
from class_singleWorker import * from class_singleWorker import *
from class_outgoingSynSender import *
from class_singleListener import *
from class_addressGenerator import * from class_addressGenerator import *
from class_sendDataThread import *
from class_receiveDataThread import *
# Helper Functions # Helper Functions
import helper_startup import helper_startup
@ -65,167 +66,6 @@ import helper_sent
import helper_generic import helper_generic
import helper_bitcoin import helper_bitcoin
# For each stream to which we connect, several outgoingSynSender threads
# will exist and will collectively create 8 connections with peers.
class outgoingSynSender(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def setup(self, streamNumber):
self.streamNumber = streamNumber
def run(self):
time.sleep(1)
global alreadyAttemptedConnectionsListResetTime
while True:
while len(selfInitiatedConnections[self.streamNumber]) >= 8: # maximum number of outgoing connections = 8
time.sleep(10)
if shared.shutdown:
break
random.seed()
shared.knownNodesLock.acquire()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
shared.knownNodesLock.release()
alreadyAttemptedConnectionsListLock.acquire()
while HOST in alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList:
alreadyAttemptedConnectionsListLock.release()
# print 'choosing new sample'
random.seed()
shared.knownNodesLock.acquire()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
shared.knownNodesLock.release()
time.sleep(1)
# Clear out the alreadyAttemptedConnectionsList every half
# hour so that this program will again attempt a connection
# to any nodes, even ones it has already tried.
if (time.time() - alreadyAttemptedConnectionsListResetTime) > 1800:
alreadyAttemptedConnectionsList.clear()
alreadyAttemptedConnectionsListResetTime = int(
time.time())
alreadyAttemptedConnectionsListLock.acquire()
alreadyAttemptedConnectionsList[HOST] = 0
alreadyAttemptedConnectionsListLock.release()
PORT, timeNodeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
# This option apparently avoids the TIME_WAIT state so that we
# can rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(20)
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and verbose >= 2:
shared.printLock.acquire()
print 'Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
if verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
proxytype = socks.PROXY_TYPE_SOCKS4
sockshostname = shared.config.get(
'bitmessagesettings', 'sockshostname')
socksport = shared.config.getint(
'bitmessagesettings', 'socksport')
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
if shared.config.getboolean('bitmessagesettings', 'socksauthentication'):
socksusername = shared.config.get(
'bitmessagesettings', 'socksusername')
sockspassword = shared.config.get(
'bitmessagesettings', 'sockspassword')
sock.setproxy(
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
else:
sock.setproxy(
proxytype, sockshostname, socksport, rdns)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
if verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
proxytype = socks.PROXY_TYPE_SOCKS5
sockshostname = shared.config.get(
'bitmessagesettings', 'sockshostname')
socksport = shared.config.getint(
'bitmessagesettings', 'socksport')
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
if shared.config.getboolean('bitmessagesettings', 'socksauthentication'):
socksusername = shared.config.get(
'bitmessagesettings', 'socksusername')
sockspassword = shared.config.get(
'bitmessagesettings', 'sockspassword')
sock.setproxy(
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
else:
sock.setproxy(
proxytype, sockshostname, socksport, rdns)
try:
sock.connect((HOST, PORT))
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
rd.setup(sock, HOST, PORT, self.streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware)
rd.start()
shared.printLock.acquire()
print self, 'connected to', HOST, 'during an outgoing attempt.'
shared.printLock.release()
sd = sendDataThread()
sd.setup(sock, HOST, PORT, self.streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
sd.sendVersionMessage()
except socks.GeneralProxyError as err:
if verbose >= 2:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()
PORT, timeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
shared.knownNodesLock.acquire()
del shared.knownNodes[self.streamNumber][HOST]
shared.knownNodesLock.release()
shared.printLock.acquire()
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
shared.printLock.release()
except socks.Socks5AuthError as err:
shared.UISignalQueue.put((
'updateStatusBar', translateText(
"MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err))))
except socks.Socks5Error as err:
pass
print 'SOCKS5 error. (It is possible that the server wants authentication).)', str(err)
except socks.Socks4Error as err:
print 'Socks4Error:', err
except socket.error as err:
if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
else:
if verbose >= 1:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()
PORT, timeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
shared.knownNodesLock.acquire()
del shared.knownNodes[self.streamNumber][HOST]
shared.knownNodesLock.release()
shared.printLock.acquire()
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
shared.printLock.release()
except Exception as err:
sys.stderr.write(
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: %s\n' % err)
time.sleep(0.1)
def isInSqlInventory(hash): def isInSqlInventory(hash):
t = (hash,) t = (hash,)
shared.sqlLock.acquire() shared.sqlLock.acquire()
@ -247,7 +87,7 @@ def connectToStream(streamNumber):
maximumNumberOfHalfOpenConnections = 32 maximumNumberOfHalfOpenConnections = 32
for i in range(maximumNumberOfHalfOpenConnections): for i in range(maximumNumberOfHalfOpenConnections):
a = outgoingSynSender() a = outgoingSynSender()
a.setup(streamNumber) a.setup(streamNumber, selfInitiatedConnections)
a.start() a.start()
@ -946,20 +786,16 @@ def translateText(context, text):
selfInitiatedConnections = {} selfInitiatedConnections = {}
# This is a list of current connections (the thread pointers at least) # This is a list of current connections (the thread pointers at least)
alreadyAttemptedConnectionsList = {
} # This is a list of nodes to which we have already attempted a connection
ackdataForWhichImWatching = {} ackdataForWhichImWatching = {}
alreadyAttemptedConnectionsListLock = threading.Lock() alreadyAttemptedConnectionsListLock = threading.Lock()
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack( eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
'>Q', random.randrange(1, 18446744073709551615)) '>Q', random.randrange(1, 18446744073709551615))
neededPubkeys = {}
successfullyDecryptMessageTimings = [ successfullyDecryptMessageTimings = [
] # A list of the amounts of time it took to successfully decrypt msg messages ] # A list of the amounts of time it took to successfully decrypt msg messages
apiAddressGeneratorReturnQueue = Queue.Queue( apiAddressGeneratorReturnQueue = Queue.Queue(
) # The address generator thread uses this queue to get information back to the API thread. ) # The address generator thread uses this queue to get information back to the API thread.
alreadyAttemptedConnectionsListResetTime = int( alreadyAttemptedConnectionsListResetTime = int(
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect. time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {}
if useVeryEasyProofOfWorkForTesting: if useVeryEasyProofOfWorkForTesting:
shared.networkDefaultProofOfWorkNonceTrialsPerByte = int( shared.networkDefaultProofOfWorkNonceTrialsPerByte = int(
@ -1032,6 +868,7 @@ if __name__ == "__main__":
connectToStream(1) connectToStream(1)
singleListenerThread = singleListener() singleListenerThread = singleListener()
singleListenerThread.setup(selfInitiatedConnections)
singleListenerThread.daemon = True # close the main program even if there are threads left singleListenerThread.daemon = True # close the main program even if there are threads left
singleListenerThread.start() singleListenerThread.start()

View File

@ -6,6 +6,7 @@ from pyelliptic.openssl import OpenSSL
import ctypes import ctypes
import hashlib import hashlib
from addresses import * from addresses import *
from pyelliptic import arithmetic
class addressGenerator(threading.Thread): class addressGenerator(threading.Thread):

View File

@ -0,0 +1,173 @@
import threading
import time
import random
import shared
import socks
import socket
import sys
import bitmessagemain
from class_sendDataThread import *
from class_receiveDataThread import *
# For each stream to which we connect, several outgoingSynSender threads
# will exist and will collectively create 8 connections with peers.
class outgoingSynSender(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def setup(self, streamNumber, selfInitiatedConnections):
self.streamNumber = streamNumber
self.selfInitiatedConnections = selfInitiatedConnections
def run(self):
time.sleep(1)
while True:
while len(self.selfInitiatedConnections[self.streamNumber]) >= 8: # maximum number of outgoing connections = 8
time.sleep(10)
if shared.shutdown:
break
random.seed()
shared.knownNodesLock.acquire()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
shared.knownNodesLock.release()
bitmessagemain.alreadyAttemptedConnectionsListLock.acquire()
while HOST in bitmessagemain.alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList:
bitmessagemain.alreadyAttemptedConnectionsListLock.release()
# print 'choosing new sample'
random.seed()
shared.knownNodesLock.acquire()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
shared.knownNodesLock.release()
time.sleep(1)
# Clear out the bitmessagemain.alreadyAttemptedConnectionsList every half
# hour so that this program will again attempt a connection
# to any nodes, even ones it has already tried.
if (time.time() - bitmessagemain.alreadyAttemptedConnectionsListResetTime) > 1800:
bitmessagemain.alreadyAttemptedConnectionsList.clear()
bitmessagemain.alreadyAttemptedConnectionsListResetTime = int(
time.time())
bitmessagemain.alreadyAttemptedConnectionsListLock.acquire()
bitmessagemain.alreadyAttemptedConnectionsList[HOST] = 0
bitmessagemain.alreadyAttemptedConnectionsListLock.release()
PORT, timeNodeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
# This option apparently avoids the TIME_WAIT state so that we
# can rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(20)
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and bitmessagemain.verbose >= 2:
shared.printLock.acquire()
print 'Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
if bitmessagemain.verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
proxytype = socks.PROXY_TYPE_SOCKS4
sockshostname = shared.config.get(
'bitmessagesettings', 'sockshostname')
socksport = shared.config.getint(
'bitmessagesettings', 'socksport')
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
if shared.config.getboolean('bitmessagesettings', 'socksauthentication'):
socksusername = shared.config.get(
'bitmessagesettings', 'socksusername')
sockspassword = shared.config.get(
'bitmessagesettings', 'sockspassword')
sock.setproxy(
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
else:
sock.setproxy(
proxytype, sockshostname, socksport, rdns)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
if bitmessagemain.verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
proxytype = socks.PROXY_TYPE_SOCKS5
sockshostname = shared.config.get(
'bitmessagesettings', 'sockshostname')
socksport = shared.config.getint(
'bitmessagesettings', 'socksport')
rdns = True # Do domain name lookups through the proxy; though this setting doesn't really matter since we won't be doing any domain name lookups anyway.
if shared.config.getboolean('bitmessagesettings', 'socksauthentication'):
socksusername = shared.config.get(
'bitmessagesettings', 'socksusername')
sockspassword = shared.config.get(
'bitmessagesettings', 'sockspassword')
sock.setproxy(
proxytype, sockshostname, socksport, rdns, socksusername, sockspassword)
else:
sock.setproxy(
proxytype, sockshostname, socksport, rdns)
try:
sock.connect((HOST, PORT))
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
rd.setup(sock, HOST, PORT, self.streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
rd.start()
shared.printLock.acquire()
print self, 'connected to', HOST, 'during an outgoing attempt.'
shared.printLock.release()
sd = sendDataThread()
sd.setup(sock, HOST, PORT, self.streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
sd.sendVersionMessage()
except socks.GeneralProxyError as err:
if bitmessagemain.verbose >= 2:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()
PORT, timeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
shared.knownNodesLock.acquire()
del shared.knownNodes[self.streamNumber][HOST]
shared.knownNodesLock.release()
shared.printLock.acquire()
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
shared.printLock.release()
except socks.Socks5AuthError as err:
shared.UISignalQueue.put((
'updateStatusBar', translateText(
"MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err))))
except socks.Socks5Error as err:
pass
print 'SOCKS5 error. (It is possible that the server wants authentication).)', str(err)
except socks.Socks4Error as err:
print 'Socks4Error:', err
except socket.error as err:
if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
else:
if bitmessagemain.verbose >= 1:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()
PORT, timeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
shared.knownNodesLock.acquire()
del shared.knownNodes[self.streamNumber][HOST]
shared.knownNodesLock.release()
shared.printLock.acquire()
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
shared.printLock.release()
except Exception as err:
sys.stderr.write(
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ')
import traceback
traceback.print_exc()
time.sleep(0.1)

View File

@ -1,6 +1,18 @@
import time import time
import threading import threading
import shared import shared
import hashlib
import socket
import pickle
import random
from struct import unpack, pack
import sys
from addresses import *
import helper_generic
import bitmessagemain
from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers, numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, neededPubkeys
# This thread is created either by the synSenderThread(for outgoing # This thread is created either by the synSenderThread(for outgoing
# connections) or the singleListenerThread(for incoming connectiosn). # connections) or the singleListenerThread(for incoming connectiosn).
@ -20,13 +32,15 @@ class receiveDataThread(threading.Thread):
HOST, HOST,
port, port,
streamNumber, streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware): objectsOfWhichThisRemoteNodeIsAlreadyAware,
selfInitiatedConnections):
self.sock = sock self.sock = sock
self.HOST = HOST self.HOST = HOST
self.PORT = port self.PORT = port
self.streamNumber = streamNumber self.streamNumber = streamNumber
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {} self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = {}
self.selfInitiatedConnections = selfInitiatedConnections
shared.connectedHostsList[ shared.connectedHostsList[
self.HOST] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it. self.HOST] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections. self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
@ -34,7 +48,7 @@ class receiveDataThread(threading.Thread):
self.initiatedConnection = False self.initiatedConnection = False
else: else:
self.initiatedConnection = True self.initiatedConnection = True
selfInitiatedConnections[streamNumber][self] = 0 self.selfInitiatedConnections[streamNumber][self] = 0
self.ackDataThatWeHaveYetToSend = [ self.ackDataThatWeHaveYetToSend = [
] # When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer. ] # When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer.
self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware
@ -66,7 +80,7 @@ class receiveDataThread(threading.Thread):
self.processData() self.processData()
try: try:
del selfInitiatedConnections[self.streamNumber][self] del self.selfInitiatedConnections[self.streamNumber][self]
shared.printLock.acquire() shared.printLock.acquire()
print 'removed self (a receiveDataThread) from selfInitiatedConnections' print 'removed self (a receiveDataThread) from selfInitiatedConnections'
shared.printLock.release() shared.printLock.release()
@ -90,15 +104,14 @@ class receiveDataThread(threading.Thread):
shared.printLock.release() shared.printLock.release()
def processData(self): def processData(self):
global verbose # if bitmessagemain.verbose >= 3:
# if verbose >= 3:
# shared.printLock.acquire() # shared.printLock.acquire()
# print 'self.data is currently ', repr(self.data) # print 'self.data is currently ', repr(self.data)
# shared.printLock.release() # shared.printLock.release()
if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length
return return
if self.data[0:4] != '\xe9\xbe\xb4\xd9': if self.data[0:4] != '\xe9\xbe\xb4\xd9':
if verbose >= 1: if bitmessagemain.verbose >= 1:
shared.printLock.acquire() shared.printLock.acquire()
print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40]) print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
shared.printLock.release() shared.printLock.release()
@ -163,8 +176,8 @@ class receiveDataThread(threading.Thread):
shared.printLock.release() shared.printLock.release()
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[ del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
objectHash] objectHash]
elif isInSqlInventory(objectHash): elif bitmessagemain.isInSqlInventory(objectHash):
if verbose >= 3: if bitmessagemain.verbose >= 3:
shared.printLock.acquire() shared.printLock.acquire()
print 'Inventory (SQL on disk) already has object listed in inv message.' print 'Inventory (SQL on disk) already has object listed in inv message.'
shared.printLock.release() shared.printLock.release()
@ -364,7 +377,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this broadcast object. Ignoring.' print 'We have already received this broadcast object. Ignoring.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
elif isInSqlInventory(self.inventoryHash): elif bitmessagemain.isInSqlInventory(self.inventoryHash):
print 'We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.' print 'We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
@ -744,7 +757,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this msg message. Ignoring.' print 'We have already received this msg message. Ignoring.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
elif isInSqlInventory(self.inventoryHash): elif bitmessagemain.isInSqlInventory(self.inventoryHash):
print 'We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.' print 'We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
@ -788,11 +801,11 @@ class receiveDataThread(threading.Thread):
def processmsg(self, readPosition, encryptedData): def processmsg(self, readPosition, encryptedData):
initialDecryptionSuccessful = False initialDecryptionSuccessful = False
# Let's check whether this is a message acknowledgement bound for us. # Let's check whether this is a message acknowledgement bound for us.
if encryptedData[readPosition:] in ackdataForWhichImWatching: if encryptedData[readPosition:] in bitmessagemain.ackdataForWhichImWatching:
shared.printLock.acquire() shared.printLock.acquire()
print 'This msg IS an acknowledgement bound for me.' print 'This msg IS an acknowledgement bound for me.'
shared.printLock.release() shared.printLock.release()
del ackdataForWhichImWatching[encryptedData[readPosition:]] del bitmessagemain.ackdataForWhichImWatching[encryptedData[readPosition:]]
t = ('ackreceived', encryptedData[readPosition:]) t = ('ackreceived', encryptedData[readPosition:])
shared.sqlLock.acquire() shared.sqlLock.acquire()
shared.sqlSubmitQueue.put( shared.sqlSubmitQueue.put(
@ -807,7 +820,7 @@ class receiveDataThread(threading.Thread):
else: else:
shared.printLock.acquire() shared.printLock.acquire()
print 'This was NOT an acknowledgement bound for me.' print 'This was NOT an acknowledgement bound for me.'
# print 'ackdataForWhichImWatching', ackdataForWhichImWatching # print 'bitmessagemain.ackdataForWhichImWatching', bitmessagemain.ackdataForWhichImWatching
shared.printLock.release() shared.printLock.release()
# This is not an acknowledgement bound for me. See if it is a message # This is not an acknowledgement bound for me. See if it is a message
@ -1155,7 +1168,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this pubkey. Ignoring it.' print 'We have already received this pubkey. Ignoring it.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
elif isInSqlInventory(inventoryHash): elif bitmessagemain.isInSqlInventory(inventoryHash):
print 'We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.' print 'We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
@ -1370,7 +1383,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this getpubkey request. Ignoring it.' print 'We have already received this getpubkey request. Ignoring it.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
elif isInSqlInventory(inventoryHash): elif bitmessagemain.isInSqlInventory(inventoryHash):
print 'We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.' print 'We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release() shared.inventoryLock.release()
return return
@ -1458,7 +1471,7 @@ class receiveDataThread(threading.Thread):
shared.printLock.acquire() shared.printLock.acquire()
print 'Inventory (in memory) has inventory item already.' print 'Inventory (in memory) has inventory item already.'
shared.printLock.release() shared.printLock.release()
elif isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): elif bitmessagemain.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
print 'Inventory (SQL on disk) has inventory item already.' print 'Inventory (SQL on disk) has inventory item already.'
else: else:
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint]) self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
@ -1582,7 +1595,7 @@ class receiveDataThread(threading.Thread):
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint( numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
data[:10]) data[:10])
if verbose >= 1: if bitmessagemain.verbose >= 1:
shared.printLock.acquire() shared.printLock.acquire()
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
shared.printLock.release() shared.printLock.release()
@ -1825,7 +1838,7 @@ class receiveDataThread(threading.Thread):
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4] datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
datatosend = datatosend + payload datatosend = datatosend + payload
if verbose >= 1: if bitmessagemain.verbose >= 1:
shared.printLock.acquire() shared.printLock.acquire()
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
shared.printLock.release() shared.printLock.release()
@ -1917,7 +1930,7 @@ class receiveDataThread(threading.Thread):
datatosend = datatosend + payload datatosend = datatosend + payload
try: try:
self.sock.sendall(datatosend) self.sock.sendall(datatosend)
if verbose >= 1: if bitmessagemain.verbose >= 1:
shared.printLock.acquire() shared.printLock.acquire()
print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.' print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.'
shared.printLock.release() shared.printLock.release()
@ -1971,7 +1984,7 @@ class receiveDataThread(threading.Thread):
if not self.initiatedConnection: if not self.initiatedConnection:
shared.broadcastToSendDataQueues(( shared.broadcastToSendDataQueues((
0, 'setStreamNumber', (self.HOST, self.streamNumber))) 0, 'setStreamNumber', (self.HOST, self.streamNumber)))
if data[72:80] == eightBytesOfRandomDataUsedToDetectConnectionsToSelf: if data[72:80] == bitmessagemain.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
shared.printLock.acquire() shared.printLock.acquire()
print 'Closing connection to myself: ', self.HOST print 'Closing connection to myself: ', self.HOST
@ -1998,7 +2011,7 @@ class receiveDataThread(threading.Thread):
print 'Sending version message' print 'Sending version message'
shared.printLock.release() shared.printLock.release()
try: try:
self.sock.sendall(assembleVersionMessage( self.sock.sendall(bitmessagemain.assembleVersionMessage(
self.HOST, self.PORT, self.streamNumber)) self.HOST, self.PORT, self.streamNumber))
except Exception as err: except Exception as err:
# if not 'Bad file descriptor' in err: # if not 'Bad file descriptor' in err:

View File

@ -1,6 +1,10 @@
import time import time
import threading import threading
import shared import shared
import Queue
from struct import unpack, pack
import bitmessagemain
# Every connection to a peer has a sendDataThread (and also a # Every connection to a peer has a sendDataThread (and also a
# receiveDataThread). # receiveDataThread).
@ -36,7 +40,7 @@ class sendDataThread(threading.Thread):
shared.printLock.release() shared.printLock.release()
def sendVersionMessage(self): def sendVersionMessage(self):
datatosend = assembleVersionMessage( datatosend = bitmessagemain.assembleVersionMessage(
self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber. self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
shared.printLock.acquire() shared.printLock.acquire()

View File

@ -1,6 +1,7 @@
import threading import threading
import shared import shared
import socket import socket
import Queue
from class_sendDataThread import * from class_sendDataThread import *
from class_receiveDataThread import * from class_receiveDataThread import *
@ -18,6 +19,9 @@ class singleListener(threading.Thread):
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self)
def setup(self, selfInitiatedConnections):
self.selfInitiatedConnections = selfInitiatedConnections
def run(self): def run(self):
# We don't want to accept incoming connections if the user is using a # We don't want to accept incoming connections if the user is using a
# SOCKS proxy. If they eventually select proxy 'none' then this will # SOCKS proxy. If they eventually select proxy 'none' then this will
@ -71,7 +75,7 @@ class singleListener(threading.Thread):
rd = receiveDataThread() rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left rd.daemon = True # close the main program even if there are threads left
rd.setup( rd.setup(
a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware) a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
rd.start() rd.start()
shared.printLock.acquire() shared.printLock.acquire()

View File

@ -2,6 +2,7 @@ import threading
import shared import shared
import sqlite3 import sqlite3
import time import time
import shutil # used for moving the messages.dat file
# This thread exists because SQLITE3 is so un-threadsafe that we must # This thread exists because SQLITE3 is so un-threadsafe that we must
# submit queries to it and it puts results back in a different queue. They # submit queries to it and it puts results back in a different queue. They

View File

@ -2,6 +2,7 @@ import shared
import socket import socket
import defaultKnownNodes import defaultKnownNodes
import pickle import pickle
import time
def knownNodes(): def knownNodes():
try: try: