Continued moving code into individual modules

This commit is contained in:
Jonathan Warren 2013-06-24 15:51:01 -04:00
parent 9925d55df2
commit c857f73d0b
11 changed files with 231 additions and 236 deletions

View File

@ -8,19 +8,6 @@
# yet contain logic to expand into further streams.
# The software version variable is now held in shared.py
verbose = 1
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # Equals two days and 12 hours.
lengthOfTimeToLeaveObjectsInInventory = 237600 # Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice.
lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time.
maximumAgeOfObjectsThatIAdvertiseToOthers = 216000 # Equals two days and 12 hours
maximumAgeOfNodesThatIAdvertiseToOthers = 10800 # Equals three hours
useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages.
encryptedBroadcastSwitchoverTime = 1369735200
alreadyAttemptedConnectionsList = {
} # This is a list of nodes to which we have already attempted a connection
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {}
neededPubkeys = {}
#import ctypes
import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully.
@ -41,19 +28,6 @@ from class_addressGenerator import *
import helper_startup
import helper_bootstrap
def isInSqlInventory(hash):
t = (hash,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []:
return False
else:
return True
def connectToStream(streamNumber):
selfInitiatedConnections[streamNumber] = {}
if sys.platform[0:3] == 'win':
@ -66,46 +40,6 @@ def connectToStream(streamNumber):
a.start()
def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
shared.softwareVersion
payload = ''
payload += pack('>L', 2) # protocol version.
payload += pack('>q', 1) # bitflags of the services I offer.
payload += pack('>q', int(time.time()))
payload += pack(
'>q', 1) # boolservices of remote connection. How can I even know this for sure? This is probably ignored by the remote host.
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(remoteHost)
payload += pack('>H', remotePort) # remote IPv6 and port
payload += pack('>q', 1) # bitflags of the services I offer.
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack(
'>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
payload += pack('>H', shared.config.getint(
'bitmessagesettings', 'port')) # my external IPv6 and port
random.seed()
payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf
userAgent = '/PyBitmessage:' + shared.softwareVersion + \
'/' # Length of userAgent must be less than 253.
payload += pack('>B', len(
userAgent)) # user agent string length. If the user agent is more than 252 bytes long, this code isn't going to work.
payload += userAgent
payload += encodeVarint(
1) # The number of streams about which I care. PyBitmessage currently only supports 1 per connection.
payload += encodeVarint(myStreamNumber)
datatosend = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
datatosend = datatosend + 'version\x00\x00\x00\x00\x00' # version command
datatosend = datatosend + pack('>L', len(payload)) # payload length
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
return datatosend + payload
# This is one of several classes that constitute the API
# This class was written by Vaibhav Bhatia. Modified by Jonathan Warren (Atheros).
# http://code.activestate.com/recipes/501148-xmlrpc-serverclient-which-does-cookie-handling-and/
@ -729,50 +663,14 @@ class singleAPI(threading.Thread):
se.register_introspection_functions()
se.serve_forever()
# This is used so that the translateText function can be used when we are in daemon mode and not using any QT functions.
class translateClass:
def __init__(self, context, text):
self.context = context
self.text = text
def arg(self,argument):
if '%' in self.text:
return translateClass(self.context, self.text.replace('%','',1)) # This doesn't actually do anything with the arguments because we don't have a UI in which to display this information anyway.
else:
return self.text
def _translate(context, text):
return translateText(context, text)
def translateText(context, text):
if not shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'):
try:
from PyQt4 import QtCore, QtGui
except Exception as err:
print 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon'
print 'Error message:', err
os._exit(0)
return QtGui.QApplication.translate(context, text)
else:
if '%' in text:
return translateClass(context, text.replace('%','',1))
else:
return text
selfInitiatedConnections = {}
# This is a list of current connections (the thread pointers at least)
ackdataForWhichImWatching = {}
alreadyAttemptedConnectionsListLock = threading.Lock()
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
'>Q', random.randrange(1, 18446744073709551615))
successfullyDecryptMessageTimings = [
] # A list of the amounts of time it took to successfully decrypt msg messages
apiAddressGeneratorReturnQueue = Queue.Queue(
) # The address generator thread uses this queue to get information back to the API thread.
alreadyAttemptedConnectionsListResetTime = int(
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
if useVeryEasyProofOfWorkForTesting:
if shared.useVeryEasyProofOfWorkForTesting:
shared.networkDefaultProofOfWorkNonceTrialsPerByte = int(
shared.networkDefaultProofOfWorkNonceTrialsPerByte / 16)
shared.networkDefaultPayloadLengthExtraBytes = int(

View File

@ -12,7 +12,6 @@ except Exception as err:
print 'Error message:', err
sys.exit()
try:
_encoding = QtGui.QApplication.UnicodeUTF8
except AttributeError:
@ -21,7 +20,6 @@ except AttributeError:
def _translate(context, text):
return QtGui.QApplication.translate(context, text)
withMessagingMenu = False
try:
from gi.repository import MessagingMenu

View File

@ -1,6 +1,5 @@
import shared
import threading
import bitmessagemain
import time
import sys
from pyelliptic.openssl import OpenSSL
@ -9,6 +8,7 @@ import hashlib
import highlevelcrypto
from addresses import *
from pyelliptic import arithmetic
import tr
class addressGenerator(threading.Thread):
@ -44,7 +44,7 @@ class addressGenerator(threading.Thread):
if addressVersionNumber == 3: # currently the only one supported.
if command == 'createRandomAddress':
shared.UISignalQueue.put((
'updateStatusBar', bitmessagemain.translateText("MainWindow", "Generating one new address")))
'updateStatusBar', tr.translateText("MainWindow", "Generating one new address")))
# This next section is a little bit strange. We're going to generate keys over and over until we
# find one that starts with either \x00 or \x00\x00. Then when we pack them into a Bitmessage address,
# we won't store the \x00 or \x00\x00 bytes thus making the
@ -112,10 +112,10 @@ class addressGenerator(threading.Thread):
# It may be the case that this address is being generated
# as a result of a call to the API. Let us put the result
# in the necessary queue.
bitmessagemain.apiAddressGeneratorReturnQueue.put(address)
shared.apiAddressGeneratorReturnQueue.put(address)
shared.UISignalQueue.put((
'updateStatusBar', bitmessagemain.translateText("MainWindow", "Done generating address. Doing work necessary to broadcast it...")))
'updateStatusBar', tr.translateText("MainWindow", "Done generating address. Doing work necessary to broadcast it...")))
shared.UISignalQueue.put(('writeNewAddressToTable', (
label, address, streamNumber)))
shared.reloadMyAddressHashes()
@ -235,13 +235,13 @@ class addressGenerator(threading.Thread):
# It may be the case that this address is being
# generated as a result of a call to the API. Let us
# put the result in the necessary queue.
bitmessagemain.apiAddressGeneratorReturnQueue.put(
shared.apiAddressGeneratorReturnQueue.put(
listOfNewAddressesToSendOutThroughTheAPI)
shared.UISignalQueue.put((
'updateStatusBar', bitmessagemain.translateText("MainWindow", "Done generating address")))
'updateStatusBar', tr.translateText("MainWindow", "Done generating address")))
# shared.reloadMyAddressHashes()
elif command == 'getDeterministicAddress':
bitmessagemain.apiAddressGeneratorReturnQueue.put(address)
shared.apiAddressGeneratorReturnQueue.put(address)
else:
raise Exception(
"Error in the addressGenerator thread. Thread was given a command it could not understand: " + command)

View File

@ -5,8 +5,9 @@ import shared
import socks
import socket
import sys
import tr
import bitmessagemain
#import bitmessagemain
from class_sendDataThread import *
from class_receiveDataThread import *
@ -33,25 +34,25 @@ class outgoingSynSender(threading.Thread):
shared.knownNodesLock.acquire()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
shared.knownNodesLock.release()
bitmessagemain.alreadyAttemptedConnectionsListLock.acquire()
while HOST in bitmessagemain.alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList:
bitmessagemain.alreadyAttemptedConnectionsListLock.release()
shared.alreadyAttemptedConnectionsListLock.acquire()
while HOST in shared.alreadyAttemptedConnectionsList or HOST in shared.connectedHostsList:
shared.alreadyAttemptedConnectionsListLock.release()
# print 'choosing new sample'
random.seed()
shared.knownNodesLock.acquire()
HOST, = random.sample(shared.knownNodes[self.streamNumber], 1)
shared.knownNodesLock.release()
time.sleep(1)
# Clear out the bitmessagemain.alreadyAttemptedConnectionsList every half
# Clear out the shared.alreadyAttemptedConnectionsList every half
# hour so that this program will again attempt a connection
# to any nodes, even ones it has already tried.
if (time.time() - bitmessagemain.alreadyAttemptedConnectionsListResetTime) > 1800:
bitmessagemain.alreadyAttemptedConnectionsList.clear()
bitmessagemain.alreadyAttemptedConnectionsListResetTime = int(
if (time.time() - shared.alreadyAttemptedConnectionsListResetTime) > 1800:
shared.alreadyAttemptedConnectionsList.clear()
shared.alreadyAttemptedConnectionsListResetTime = int(
time.time())
bitmessagemain.alreadyAttemptedConnectionsListLock.acquire()
bitmessagemain.alreadyAttemptedConnectionsList[HOST] = 0
bitmessagemain.alreadyAttemptedConnectionsListLock.release()
shared.alreadyAttemptedConnectionsListLock.acquire()
shared.alreadyAttemptedConnectionsList[HOST] = 0
shared.alreadyAttemptedConnectionsListLock.release()
PORT, timeNodeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
@ -59,13 +60,13 @@ class outgoingSynSender(threading.Thread):
# can rebind faster
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(20)
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and bitmessagemain.verbose >= 2:
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
shared.printLock.acquire()
print 'Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
if bitmessagemain.verbose >= 2:
if shared.verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
@ -86,7 +87,7 @@ class outgoingSynSender(threading.Thread):
sock.setproxy(
proxytype, sockshostname, socksport, rdns)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
if bitmessagemain.verbose >= 2:
if shared.verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
@ -111,9 +112,9 @@ class outgoingSynSender(threading.Thread):
sock.connect((HOST, PORT))
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
rd.setup(sock, HOST, PORT, self.streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
rd.start()
shared.printLock.acquire()
print self, 'connected to', HOST, 'during an outgoing attempt.'
@ -121,12 +122,12 @@ class outgoingSynSender(threading.Thread):
sd = sendDataThread()
sd.setup(sock, HOST, PORT, self.streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware)
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
sd.sendVersionMessage()
except socks.GeneralProxyError as err:
if bitmessagemain.verbose >= 2:
if shared.verbose >= 2:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()
@ -141,7 +142,7 @@ class outgoingSynSender(threading.Thread):
shared.printLock.release()
except socks.Socks5AuthError as err:
shared.UISignalQueue.put((
'updateStatusBar', bitmessagemain.translateText(
'updateStatusBar', tr.translateText(
"MainWindow", "SOCKS5 Authentication problem: %1").arg(str(err))))
except socks.Socks5Error as err:
pass
@ -152,7 +153,7 @@ class outgoingSynSender(threading.Thread):
if shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
else:
if bitmessagemain.verbose >= 1:
if shared.verbose >= 1:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()

View File

@ -17,14 +17,12 @@ import helper_generic
import helper_bitcoin
import helper_inbox
import helper_sent
import bitmessagemain
from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers, numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, neededPubkeys
import tr
#from bitmessagemain import shared.lengthOfTimeToLeaveObjectsInInventory, shared.lengthOfTimeToHoldOnToAllPubkeys, shared.maximumAgeOfAnObjectThatIAmWillingToAccept, shared.maximumAgeOfObjectsThatIAdvertiseToOthers, shared.maximumAgeOfNodesThatIAdvertiseToOthers, shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer, shared.neededPubkeys
# This thread is created either by the synSenderThread(for outgoing
# connections) or the singleListenerThread(for incoming connectiosn).
class receiveDataThread(threading.Thread):
def __init__(self):
@ -39,7 +37,7 @@ class receiveDataThread(threading.Thread):
HOST,
port,
streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
selfInitiatedConnections):
self.sock = sock
self.HOST = HOST
@ -58,7 +56,7 @@ class receiveDataThread(threading.Thread):
self.selfInitiatedConnections[streamNumber][self] = 0
self.ackDataThatWeHaveYetToSend = [
] # When we receive a message bound for us, we store the acknowledgement that we need to send (the ackdata) here until we are done processing all other data received from this peer.
self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
def run(self):
shared.printLock.acquire()
@ -101,7 +99,7 @@ class receiveDataThread(threading.Thread):
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
shared.printLock.release()
try:
del numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
self.HOST]
except:
pass
@ -111,14 +109,14 @@ class receiveDataThread(threading.Thread):
shared.printLock.release()
def processData(self):
# if bitmessagemain.verbose >= 3:
# if shared.verbose >= 3:
# shared.printLock.acquire()
# print 'self.data is currently ', repr(self.data)
# shared.printLock.release()
if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length
return
if self.data[0:4] != '\xe9\xbe\xb4\xd9':
if bitmessagemain.verbose >= 1:
if shared.verbose >= 1:
shared.printLock.acquire()
print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
shared.printLock.release()
@ -183,8 +181,8 @@ class receiveDataThread(threading.Thread):
shared.printLock.release()
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
objectHash]
elif bitmessagemain.isInSqlInventory(objectHash):
if bitmessagemain.verbose >= 3:
elif shared.isInSqlInventory(objectHash):
if shared.verbose >= 3:
shared.printLock.acquire()
print 'Inventory (SQL on disk) already has object listed in inv message.'
shared.printLock.release()
@ -199,7 +197,7 @@ class receiveDataThread(threading.Thread):
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
try:
del numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
except:
pass
@ -209,7 +207,7 @@ class receiveDataThread(threading.Thread):
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
try:
del numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
except:
pass
@ -217,7 +215,7 @@ class receiveDataThread(threading.Thread):
shared.printLock.acquire()
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len(
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len(
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
if len(self.ackDataThatWeHaveYetToSend) > 0:
self.data = self.ackDataThatWeHaveYetToSend.pop()
@ -285,8 +283,8 @@ class receiveDataThread(threading.Thread):
shared.sqlLock.acquire()
# Select all hashes which are younger than two days old and in this
# stream.
t = (int(time.time()) - maximumAgeOfObjectsThatIAdvertiseToOthers, int(
time.time()) - lengthOfTimeToHoldOnToAllPubkeys, self.streamNumber)
t = (int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers, int(
time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys, self.streamNumber)
shared.sqlSubmitQueue.put(
'''SELECT hash FROM inventory WHERE ((receivedtime>? and objecttype<>'pubkey') or (receivedtime>? and objecttype='pubkey')) and streamnumber=?''')
shared.sqlSubmitQueue.put(t)
@ -295,14 +293,14 @@ class receiveDataThread(threading.Thread):
bigInvList = {}
for row in queryreturn:
hash, = row
if hash not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware:
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
bigInvList[hash] = 0
# We also have messages in our inventory in memory (which is a python
# dictionary). Let's fetch those too.
for hash, storedValue in shared.inventory.items():
if hash not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware:
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
objectType, streamNumber, payload, receivedTime = storedValue
if streamNumber == self.streamNumber and receivedTime > int(time.time()) - maximumAgeOfObjectsThatIAdvertiseToOthers:
if streamNumber == self.streamNumber and receivedTime > int(time.time()) - shared.maximumAgeOfObjectsThatIAdvertiseToOthers:
bigInvList[hash] = 0
numberOfObjectsInInvMessage = 0
payload = ''
@ -360,7 +358,7 @@ class receiveDataThread(threading.Thread):
if embeddedTime > (int(time.time()) + 10800): # prevent funny business
print 'The embedded time in this broadcast message is more than three hours in the future. That doesn\'t make sense. Ignoring message.'
return
if embeddedTime < (int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept):
if embeddedTime < (int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept):
print 'The embedded time in this broadcast message is too old. Ignoring message.'
return
if len(data) < 180:
@ -384,7 +382,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this broadcast object. Ignoring.'
shared.inventoryLock.release()
return
elif bitmessagemain.isInSqlInventory(self.inventoryHash):
elif shared.isInSqlInventory(self.inventoryHash):
print 'We have already received this broadcast object (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release()
return
@ -749,7 +747,7 @@ class receiveDataThread(threading.Thread):
if embeddedTime > int(time.time()) + 10800:
print 'The time in the msg message is too new. Ignoring it. Time:', embeddedTime
return
if embeddedTime < int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept:
if embeddedTime < int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept:
print 'The time in the msg message is too old. Ignoring it. Time:', embeddedTime
return
streamNumberAsClaimedByMsg, streamNumberAsClaimedByMsgLength = decodeVarint(
@ -764,7 +762,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this msg message. Ignoring.'
shared.inventoryLock.release()
return
elif bitmessagemain.isInSqlInventory(self.inventoryHash):
elif shared.isInSqlInventory(self.inventoryHash):
print 'We have already received this msg message (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release()
return
@ -808,11 +806,11 @@ class receiveDataThread(threading.Thread):
def processmsg(self, readPosition, encryptedData):
initialDecryptionSuccessful = False
# Let's check whether this is a message acknowledgement bound for us.
if encryptedData[readPosition:] in bitmessagemain.ackdataForWhichImWatching:
if encryptedData[readPosition:] in shared.ackdataForWhichImWatching:
shared.printLock.acquire()
print 'This msg IS an acknowledgement bound for me.'
shared.printLock.release()
del bitmessagemain.ackdataForWhichImWatching[encryptedData[readPosition:]]
del shared.ackdataForWhichImWatching[encryptedData[readPosition:]]
t = ('ackreceived', encryptedData[readPosition:])
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
@ -821,13 +819,13 @@ class receiveDataThread(threading.Thread):
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], bitmessagemain.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode(
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (encryptedData[readPosition:], tr.translateText("MainWindow",'Acknowledgement of the message received. %1').arg(unicode(
time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8')))))
return
else:
shared.printLock.acquire()
print 'This was NOT an acknowledgement bound for me.'
# print 'bitmessagemain.ackdataForWhichImWatching', bitmessagemain.ackdataForWhichImWatching
# print 'shared.ackdataForWhichImWatching', shared.ackdataForWhichImWatching
shared.printLock.release()
# This is not an acknowledgement bound for me. See if it is a message
@ -1076,14 +1074,14 @@ class receiveDataThread(threading.Thread):
# Display timing data
timeRequiredToAttemptToDecryptMessage = time.time(
) - self.messageProcessingStartTime
bitmessagemain.successfullyDecryptMessageTimings.append(
shared.successfullyDecryptMessageTimings.append(
timeRequiredToAttemptToDecryptMessage)
sum = 0
for item in bitmessagemain.successfullyDecryptMessageTimings:
for item in shared.successfullyDecryptMessageTimings:
sum += item
shared.printLock.acquire()
print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage
print 'Average time for all message decryption successes since startup:', sum / len(bitmessagemain.successfullyDecryptMessageTimings)
print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings)
shared.printLock.release()
def isAckDataValid(self, ackData):
@ -1111,9 +1109,9 @@ class receiveDataThread(threading.Thread):
return '[' + mailingListName + '] ' + subject
def possibleNewPubkey(self, toRipe):
if toRipe in neededPubkeys:
if toRipe in shared.neededPubkeys:
print 'We have been awaiting the arrival of this pubkey.'
del neededPubkeys[toRipe]
del shared.neededPubkeys[toRipe]
t = (toRipe,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
@ -1149,7 +1147,7 @@ class receiveDataThread(threading.Thread):
else:
readPosition += 4
if embeddedTime < int(time.time()) - lengthOfTimeToHoldOnToAllPubkeys:
if embeddedTime < int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys:
shared.printLock.acquire()
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
shared.printLock.release()
@ -1175,7 +1173,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this pubkey. Ignoring it.'
shared.inventoryLock.release()
return
elif bitmessagemain.isInSqlInventory(inventoryHash):
elif shared.isInSqlInventory(inventoryHash):
print 'We have already received this pubkey (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release()
return
@ -1371,7 +1369,7 @@ class receiveDataThread(threading.Thread):
if embeddedTime > int(time.time()) + 10800:
print 'The time in this getpubkey message is too new. Ignoring it. Time:', embeddedTime
return
if embeddedTime < int(time.time()) - maximumAgeOfAnObjectThatIAmWillingToAccept:
if embeddedTime < int(time.time()) - shared.maximumAgeOfAnObjectThatIAmWillingToAccept:
print 'The time in this getpubkey message is too old. Ignoring it. Time:', embeddedTime
return
requestedAddressVersionNumber, addressVersionLength = decodeVarint(
@ -1390,7 +1388,7 @@ class receiveDataThread(threading.Thread):
print 'We have already received this getpubkey request. Ignoring it.'
shared.inventoryLock.release()
return
elif bitmessagemain.isInSqlInventory(inventoryHash):
elif shared.isInSqlInventory(inventoryHash):
print 'We have already received this getpubkey request (it is stored on disk in the SQL inventory). Ignoring it.'
shared.inventoryLock.release()
return
@ -1430,7 +1428,7 @@ class receiveDataThread(threading.Thread):
shared.myAddressesByHash[requestedHash], 'lastpubkeysendtime'))
except:
lastPubkeySendTime = 0
if lastPubkeySendTime < time.time() - lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was 28 days ago
if lastPubkeySendTime < time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was 28 days ago
shared.printLock.acquire()
print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.'
shared.printLock.release()
@ -1452,11 +1450,11 @@ class receiveDataThread(threading.Thread):
# We have received an inv message
def recinv(self, data):
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = 0 # ..from all peers, counting duplicates seperately (because they take up memory)
if len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
for key, value in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items():
if len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
for key, value in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items():
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value
shared.printLock.acquire()
print 'number of keys(hosts) in numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave
shared.printLock.release()
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
@ -1472,13 +1470,13 @@ class receiveDataThread(threading.Thread):
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
shared.printLock.release()
return
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
data[lengthOfVarint:32 + lengthOfVarint]] = 0
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
shared.printLock.acquire()
print 'Inventory (in memory) has inventory item already.'
shared.printLock.release()
elif bitmessagemain.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
print 'Inventory (SQL on disk) has inventory item already.'
else:
self.sendgetdata(data[lengthOfVarint:32 + lengthOfVarint])
@ -1491,11 +1489,11 @@ class receiveDataThread(threading.Thread):
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.'
shared.printLock.release()
break
self.objectsOfWhichThisRemoteNodeIsAlreadyAware[data[
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[data[
lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
self.HOST] = len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
# Send a getdata message to our peer to request the object with the given
@ -1602,7 +1600,7 @@ class receiveDataThread(threading.Thread):
numberOfAddressesIncluded, lengthOfNumberOfAddresses = decodeVarint(
data[:10])
if bitmessagemain.verbose >= 1:
if shared.verbose >= 1:
shared.printLock.acquire()
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
shared.printLock.release()
@ -1845,7 +1843,7 @@ class receiveDataThread(threading.Thread):
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
datatosend = datatosend + payload
if bitmessagemain.verbose >= 1:
if shared.verbose >= 1:
shared.printLock.acquire()
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
shared.printLock.release()
@ -1895,7 +1893,7 @@ class receiveDataThread(threading.Thread):
# print 'addrsInMyStream.items()', addrsInMyStream.items()
for HOST, value in addrsInMyStream.items():
PORT, timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time()) - maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1
payload += pack(
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
@ -1907,7 +1905,7 @@ class receiveDataThread(threading.Thread):
payload += pack('>H', PORT) # remote port
for HOST, value in addrsInChildStreamLeft.items():
PORT, timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time()) - maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1
payload += pack(
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
@ -1919,7 +1917,7 @@ class receiveDataThread(threading.Thread):
payload += pack('>H', PORT) # remote port
for HOST, value in addrsInChildStreamRight.items():
PORT, timeLastReceivedMessageFromThisNode = value
if timeLastReceivedMessageFromThisNode > (int(time.time()) - maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
numberOfAddressesInAddrMessage += 1
payload += pack(
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
@ -1937,7 +1935,7 @@ class receiveDataThread(threading.Thread):
datatosend = datatosend + payload
try:
self.sock.sendall(datatosend)
if bitmessagemain.verbose >= 1:
if shared.verbose >= 1:
shared.printLock.acquire()
print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.'
shared.printLock.release()
@ -1991,7 +1989,7 @@ class receiveDataThread(threading.Thread):
if not self.initiatedConnection:
shared.broadcastToSendDataQueues((
0, 'setStreamNumber', (self.HOST, self.streamNumber)))
if data[72:80] == bitmessagemain.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
shared.printLock.acquire()
print 'Closing connection to myself: ', self.HOST
@ -2018,7 +2016,7 @@ class receiveDataThread(threading.Thread):
print 'Sending version message'
shared.printLock.release()
try:
self.sock.sendall(bitmessagemain.assembleVersionMessage(
self.sock.sendall(shared.assembleVersionMessage(
self.HOST, self.PORT, self.streamNumber))
except Exception as err:
# if not 'Bad file descriptor' in err:

View File

@ -8,7 +8,7 @@ import random
import sys
import socket
import bitmessagemain
#import bitmessagemain
# Every connection to a peer has a sendDataThread (and also a
# receiveDataThread).
@ -29,7 +29,7 @@ class sendDataThread(threading.Thread):
HOST,
PORT,
streamNumber,
objectsOfWhichThisRemoteNodeIsAlreadyAware):
someObjectsOfWhichThisRemoteNodeIsAlreadyAware):
self.sock = sock
self.HOST = HOST
self.PORT = PORT
@ -38,13 +38,13 @@ class sendDataThread(threading.Thread):
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue.
self.lastTimeISentData = int(
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
self.objectsOfWhichThisRemoteNodeIsAlreadyAware = objectsOfWhichThisRemoteNodeIsAlreadyAware
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
shared.printLock.acquire()
print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber
shared.printLock.release()
def sendVersionMessage(self):
datatosend = bitmessagemain.assembleVersionMessage(
datatosend = shared.assembleVersionMessage(
self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
shared.printLock.acquire()
@ -123,7 +123,7 @@ class sendDataThread(threading.Thread):
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.HOST
break
elif command == 'sendinv':
if data not in self.objectsOfWhichThisRemoteNodeIsAlreadyAware:
if data not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
payload = '\x01' + data
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
@ -147,6 +147,7 @@ class sendDataThread(threading.Thread):
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.HOST
break
elif command == 'pong':
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
if self.lastTimeISentData < (int(time.time()) - 298):
# Send out a pong message to keep the connection alive.
shared.printLock.acquire()

View File

@ -1,8 +1,6 @@
import threading
import shared
import time
from bitmessagemain import lengthOfTimeToLeaveObjectsInInventory, lengthOfTimeToHoldOnToAllPubkeys, maximumAgeOfAnObjectThatIAmWillingToAccept, maximumAgeOfObjectsThatIAdvertiseToOthers, maximumAgeOfNodesThatIAdvertiseToOthers,\
neededPubkeys
import sys
'''The singleCleaner class is a timer-driven thread that cleans data structures to free memory, resends messages when a remote node doesn't respond, and sends pong messages to keep connections alive if the network isn't busy.
@ -58,15 +56,15 @@ class singleCleaner(threading.Thread):
shared.sqlLock.acquire()
# inventory (clears pubkeys after 28 days and everything else
# after 2 days and 12 hours)
t = (int(time.time()) - lengthOfTimeToLeaveObjectsInInventory, int(
time.time()) - lengthOfTimeToHoldOnToAllPubkeys)
t = (int(time.time()) - shared.lengthOfTimeToLeaveObjectsInInventory, int(
time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys)
shared.sqlSubmitQueue.put(
'''DELETE FROM inventory WHERE (receivedtime<? AND objecttype<>'pubkey') OR (receivedtime<? AND objecttype='pubkey') ''')
shared.sqlSubmitQueue.put(t)
shared.sqlReturnQueue.get()
# pubkeys
t = (int(time.time()) - lengthOfTimeToHoldOnToAllPubkeys,)
t = (int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys,)
shared.sqlSubmitQueue.put(
'''DELETE FROM pubkeys WHERE time<? AND usedpersonally='no' ''')
shared.sqlSubmitQueue.put(t)
@ -88,11 +86,11 @@ class singleCleaner(threading.Thread):
break
toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber = row
if status == 'awaitingpubkey':
if int(time.time()) - lastactiontime > (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (pubkeyretrynumber))):
if int(time.time()) - lastactiontime > (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (pubkeyretrynumber))):
print 'It has been a long time and we haven\'t heard a response to our getpubkey request. Sending again.'
try:
del neededPubkeys[
toripe] # We need to take this entry out of the neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently.
del shared.neededPubkeys[
toripe] # We need to take this entry out of the shared.neededPubkeys structure because the shared.workerQueue checks to see whether the entry is already present and will not do the POW and send the message because it assumes that it has already done it recently.
except:
pass
@ -107,7 +105,7 @@ class singleCleaner(threading.Thread):
shared.sqlSubmitQueue.put('commit')
shared.workerQueue.put(('sendmessage', ''))
else: # status == msgsent
if int(time.time()) - lastactiontime > (maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))):
if int(time.time()) - lastactiontime > (shared.maximumAgeOfAnObjectThatIAmWillingToAccept * (2 ** (msgretrynumber))):
print 'It has been a long time and we haven\'t heard an acknowledgement to our msg. Sending again.'
t = (int(
time.time()), msgretrynumber + 1, 'msgqueued', ackdata)

View File

@ -1,8 +1,6 @@
import threading
import shared
import socket
import Queue
from class_sendDataThread import *
from class_receiveDataThread import *
@ -64,18 +62,18 @@ class singleListener(threading.Thread):
shared.printLock.release()
a.close()
a, (HOST, PORT) = sock.accept()
objectsOfWhichThisRemoteNodeIsAlreadyAware = {}
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
a.settimeout(20)
sd = sendDataThread()
sd.setup(
a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware)
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
sd.start()
rd = receiveDataThread()
rd.daemon = True # close the main program even if there are threads left
rd.setup(
a, HOST, PORT, -1, objectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
rd.start()
shared.printLock.acquire()

View File

@ -1,15 +1,13 @@
import threading
import shared
import time
from time import strftime, localtime, gmtime
import random
from addresses import *
import bitmessagemain
import highlevelcrypto
import proofofwork
from bitmessagemain import neededPubkeys, encryptedBroadcastSwitchoverTime
import sys
from class_addressGenerator import pointMult
import tr
# This thread, of which there is only one, does the heavy lifting:
# calculating POWs.
@ -32,7 +30,7 @@ class singleWorker(threading.Thread):
toripe, = row
neededPubkeys[toripe] = 0
# Initialize the bitmessagemain.ackdataForWhichImWatching data structure using data
# Initialize the shared.ackdataForWhichImWatching data structure using data
# from the sql database.
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
@ -43,7 +41,7 @@ class singleWorker(threading.Thread):
for row in queryreturn:
ackdata, = row
print 'Watching for ackdata', ackdata.encode('hex')
bitmessagemain.ackdataForWhichImWatching[ackdata] = 0
shared.ackdataForWhichImWatching[ackdata] = 0
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put(
@ -262,7 +260,7 @@ class singleWorker(threading.Thread):
fromaddress, subject, body, ackdata = row
status, addressVersionNumber, streamNumber, ripe = decodeAddress(
fromaddress)
if addressVersionNumber == 2 and int(time.time()) < encryptedBroadcastSwitchoverTime:
"""if addressVersionNumber == 2 and int(time.time()) < shared.encryptedBroadcastSwitchoverTime:
# We need to convert our private keys to public keys in order
# to include them.
try:
@ -272,7 +270,7 @@ class singleWorker(threading.Thread):
fromaddress, 'privencryptionkey')
except:
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
continue
privSigningKeyHex = shared.decodeWalletImportFormat(
@ -307,7 +305,7 @@ class singleWorker(threading.Thread):
payload) + shared.networkDefaultPayloadLengthExtraBytes + 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte)
print '(For broadcast message) Doing proof of work...'
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send broadcast..."))))
ackdata, tr.translateText("MainWindow", "Doing work necessary to send broadcast..."))))
initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash)
print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce
@ -322,7 +320,7 @@ class singleWorker(threading.Thread):
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
# Update the status of the message in the 'sent' table to have
@ -335,8 +333,8 @@ class singleWorker(threading.Thread):
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
elif addressVersionNumber == 3 or int(time.time()) > encryptedBroadcastSwitchoverTime:
shared.sqlLock.release()"""
if addressVersionNumber == 2 or addressVersionNumber == 3:
# We need to convert our private keys to public keys in order
# to include them.
try:
@ -346,7 +344,7 @@ class singleWorker(threading.Thread):
fromaddress, 'privencryptionkey')
except:
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
continue
privSigningKeyHex = shared.decodeWalletImportFormat(
@ -394,7 +392,7 @@ class singleWorker(threading.Thread):
payload) + shared.networkDefaultPayloadLengthExtraBytes + 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte)
print '(For broadcast message) Doing proof of work...'
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send broadcast..."))))
ackdata, tr.translateText("MainWindow", "Doing work necessary to send broadcast..."))))
initialHash = hashlib.sha512(payload).digest()
trialValue, nonce = proofofwork.run(target, initialHash)
print '(For broadcast message) Found proof of work', trialValue, 'Nonce:', nonce
@ -409,7 +407,7 @@ class singleWorker(threading.Thread):
shared.broadcastToSendDataQueues((
streamNumber, 'sendinv', inventoryHash))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Broadcast sent on %1").arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
# Update the status of the message in the 'sent' table to have
@ -467,7 +465,7 @@ class singleWorker(threading.Thread):
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, bitmessagemain.translateText("MainWindow",'Encryption key was requested earlier.'))))
toripe, tr.translateText("MainWindow",'Encryption key was requested earlier.'))))
else:
# We have not yet sent a request for the pubkey
t = (toaddress,)
@ -479,7 +477,7 @@ class singleWorker(threading.Thread):
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, bitmessagemain.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
self.requestPubKey(toaddress)
shared.sqlLock.acquire()
# Get all messages that are ready to be sent, and also all messages
@ -521,16 +519,16 @@ class singleWorker(threading.Thread):
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByHash', (
toripe, bitmessagemain.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
toripe, tr.translateText("MainWindow",'Sending a request for the recipient\'s encryption key.'))))
self.requestPubKey(toaddress)
continue
bitmessagemain.ackdataForWhichImWatching[ackdata] = 0
shared.ackdataForWhichImWatching[ackdata] = 0
toStatus, toAddressVersionNumber, toStreamNumber, toHash = decodeAddress(
toaddress)
fromStatus, fromAddressVersionNumber, fromStreamNumber, fromHash = decodeAddress(
fromaddress)
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Looking up the receiver\'s public key"))))
ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key"))))
shared.printLock.acquire()
print 'Found a message in our database that needs to be sent with this pubkey.'
print 'First 150 characters of message:', repr(message[:150])
@ -593,7 +591,7 @@ class singleWorker(threading.Thread):
requiredAverageProofOfWorkNonceTrialsPerByte = shared.networkDefaultProofOfWorkNonceTrialsPerByte
requiredPayloadLengthExtraBytes = shared.networkDefaultPayloadLengthExtraBytes
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send message.\nThere is no required difficulty for version 2 addresses like this."))))
ackdata, tr.translateText("MainWindow", "Doing work necessary to send message.\nThere is no required difficulty for version 2 addresses like this."))))
elif toAddressVersionNumber == 3:
requiredAverageProofOfWorkNonceTrialsPerByte, varintLength = decodeVarint(
pubkeyPayload[readPosition:readPosition + 10])
@ -605,7 +603,7 @@ class singleWorker(threading.Thread):
requiredAverageProofOfWorkNonceTrialsPerByte = shared.networkDefaultProofOfWorkNonceTrialsPerByte
if requiredPayloadLengthExtraBytes < shared.networkDefaultPayloadLengthExtraBytes:
requiredPayloadLengthExtraBytes = shared.networkDefaultPayloadLengthExtraBytes
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Doing work necessary to send message.\nReceiver\'s required difficulty: %1 and %2").arg(str(float(
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Doing work necessary to send message.\nReceiver\'s required difficulty: %1 and %2").arg(str(float(
requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)))))
if status != 'forcepow':
if (requiredAverageProofOfWorkNonceTrialsPerByte > shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') and shared.config.getint('bitmessagesettings', 'maxacceptablenoncetrialsperbyte') != 0) or (requiredPayloadLengthExtraBytes > shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') and shared.config.getint('bitmessagesettings', 'maxacceptablepayloadlengthextrabytes') != 0):
@ -619,7 +617,7 @@ class singleWorker(threading.Thread):
shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float(
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Problem: The work demanded by the recipient (%1 and %2) is more difficult than you are willing to do.").arg(str(float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte)).arg(str(float(
requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes)).arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
continue
@ -640,7 +638,7 @@ class singleWorker(threading.Thread):
fromaddress, 'privencryptionkey')
except:
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
continue
privSigningKeyHex = shared.decodeWalletImportFormat(
@ -686,7 +684,7 @@ class singleWorker(threading.Thread):
fromaddress, 'privencryptionkey')
except:
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
ackdata, bitmessagemain.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
ackdata, tr.translateText("MainWindow", "Error! Could not find sender address (your address) in the keys.dat file."))))
continue
privSigningKeyHex = shared.decodeWalletImportFormat(
@ -743,7 +741,7 @@ class singleWorker(threading.Thread):
queryreturn = shared.sqlReturnQueue.get()
shared.sqlSubmitQueue.put('commit')
shared.sqlLock.release()
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,bitmessagemain.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8')))))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata',(ackdata,tr.translateText("MainWindow",'Problem: The recipient\'s encryption key is no good. Could not encrypt message. %1').arg(unicode(strftime(shared.config.get('bitmessagesettings', 'timeformat'),localtime(int(time.time()))),'utf-8')))))
continue
encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted
target = 2**64 / ((len(encryptedPayload)+requiredPayloadLengthExtraBytes+8) * requiredAverageProofOfWorkNonceTrialsPerByte)
@ -766,7 +764,7 @@ class singleWorker(threading.Thread):
objectType = 'msg'
shared.inventory[inventoryHash] = (
objectType, toStreamNumber, encryptedPayload, int(time.time()))
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, bitmessagemain.translateText("MainWindow", "Message sent. Waiting on acknowledgement. Sent on %1").arg(unicode(
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (ackdata, tr.translateText("MainWindow", "Message sent. Waiting on acknowledgement. Sent on %1").arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
print 'Broadcasting inv for my msg(within sendmsg function):', inventoryHash.encode('hex')
shared.broadcastToSendDataQueues((
@ -804,7 +802,7 @@ class singleWorker(threading.Thread):
statusbar = 'Doing the computations necessary to request the recipient\'s public key.'
shared.UISignalQueue.put(('updateStatusBar', statusbar))
shared.UISignalQueue.put(('updateSentItemStatusByHash', (
ripe, bitmessagemain.translateText("MainWindow",'Doing work necessary to request encryption key.'))))
ripe, tr.translateText("MainWindow",'Doing work necessary to request encryption key.'))))
target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes +
8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte)
initialHash = hashlib.sha512(payload).digest()
@ -832,8 +830,8 @@ class singleWorker(threading.Thread):
shared.sqlLock.release()
shared.UISignalQueue.put((
'updateStatusBar', bitmessagemain.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.')))
shared.UISignalQueue.put(('updateSentItemStatusByHash', (ripe, bitmessagemain.translateText("MainWindow",'Sending public key request. Waiting for reply. Requested at %1').arg(unicode(
'updateStatusBar', tr.translateText("MainWindow",'Broacasting the public key request. This program will auto-retry if they are offline.')))
shared.UISignalQueue.put(('updateSentItemStatusByHash', (ripe, tr.translateText("MainWindow",'Sending public key request. Waiting for reply. Requested at %1').arg(unicode(
strftime(shared.config.get('bitmessagesettings', 'timeformat'), localtime(int(time.time()))), 'utf-8')))))
def generateFullAckMessage(self, ackdata, toStreamNumber, embeddedTime):

View File

@ -1,4 +1,12 @@
softwareVersion = '0.3.3-2'
verbose = 1
maximumAgeOfAnObjectThatIAmWillingToAccept = 216000 # Equals two days and 12 hours.
lengthOfTimeToLeaveObjectsInInventory = 237600 # Equals two days and 18 hours. This should be longer than maximumAgeOfAnObjectThatIAmWillingToAccept so that we don't process messages twice.
lengthOfTimeToHoldOnToAllPubkeys = 2419200 # Equals 4 weeks. You could make this longer if you want but making it shorter would not be advisable because there is a very small possibility that it could keep you from obtaining a needed pubkey for a period of time.
maximumAgeOfObjectsThatIAdvertiseToOthers = 216000 # Equals two days and 12 hours
maximumAgeOfNodesThatIAdvertiseToOthers = 10800 # Equals three hours
useVeryEasyProofOfWorkForTesting = False # If you set this to True while on the normal network, you won't be able to send or sometimes receive messages.
import threading
import sys
@ -9,6 +17,10 @@ import pickle
import os
import time
import ConfigParser
import socket
import random
import highlevelcrypto
import shared
config = ConfigParser.SafeConfigParser()
myECCryptorObjects = {}
@ -31,11 +43,74 @@ appdata = '' #holds the location of the application data storage directory
statusIconColor = 'red'
connectedHostsList = {} #List of hosts to which we are connected. Used to guarantee that the outgoingSynSender threads won't connect to the same remote node twice.
shutdown = 0 #Set to 1 by the doCleanShutdown function. Used to tell the proof of work worker threads to exit.
alreadyAttemptedConnectionsList = {
} # This is a list of nodes to which we have already attempted a connection
alreadyAttemptedConnectionsListLock = threading.Lock()
alreadyAttemptedConnectionsListResetTime = int(
time.time()) # used to clear out the alreadyAttemptedConnectionsList periodically so that we will retry connecting to hosts to which we have already tried to connect.
numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer = {}
neededPubkeys = {}
eightBytesOfRandomDataUsedToDetectConnectionsToSelf = pack(
'>Q', random.randrange(1, 18446744073709551615))
successfullyDecryptMessageTimings = [
] # A list of the amounts of time it took to successfully decrypt msg messages
apiAddressGeneratorReturnQueue = Queue.Queue(
) # The address generator thread uses this queue to get information back to the API thread.
ackdataForWhichImWatching = {}
#If changed, these values will cause particularly unexpected behavior: You won't be able to either send or receive messages because the proof of work you do (or demand) won't match that done or demanded by others. Don't change them!
networkDefaultProofOfWorkNonceTrialsPerByte = 320 #The amount of work that should be performed (and demanded) per byte of the payload. Double this number to double the work.
networkDefaultPayloadLengthExtraBytes = 14000 #To make sending short messages a little more difficult, this value is added to the payload length for use in calculating the proof of work target.
def isInSqlInventory(hash):
t = (hash,)
shared.sqlLock.acquire()
shared.sqlSubmitQueue.put('''select hash from inventory where hash=?''')
shared.sqlSubmitQueue.put(t)
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn == []:
return False
else:
return True
def assembleVersionMessage(remoteHost, remotePort, myStreamNumber):
payload = ''
payload += pack('>L', 2) # protocol version.
payload += pack('>q', 1) # bitflags of the services I offer.
payload += pack('>q', int(time.time()))
payload += pack(
'>q', 1) # boolservices of remote connection. How can I even know this for sure? This is probably ignored by the remote host.
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \
socket.inet_aton(remoteHost)
payload += pack('>H', remotePort) # remote IPv6 and port
payload += pack('>q', 1) # bitflags of the services I offer.
payload += '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + pack(
'>L', 2130706433) # = 127.0.0.1. This will be ignored by the remote host. The actual remote connected IP will be used.
payload += pack('>H', shared.config.getint(
'bitmessagesettings', 'port')) # my external IPv6 and port
random.seed()
payload += eightBytesOfRandomDataUsedToDetectConnectionsToSelf
userAgent = '/PyBitmessage:' + shared.softwareVersion + \
'/' # Length of userAgent must be less than 253.
payload += pack('>B', len(
userAgent)) # user agent string length. If the user agent is more than 252 bytes long, this code isn't going to work.
payload += userAgent
payload += encodeVarint(
1) # The number of streams about which I care. PyBitmessage currently only supports 1 per connection.
payload += encodeVarint(myStreamNumber)
datatosend = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
datatosend = datatosend + 'version\x00\x00\x00\x00\x00' # version command
datatosend = datatosend + pack('>L', len(payload)) # payload length
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
return datatosend + payload
def lookupAppdataFolder():
APPNAME = "PyBitmessage"
from os import path, environ

30
src/tr.py Normal file
View File

@ -0,0 +1,30 @@
import shared
# This is used so that the translateText function can be used when we are in daemon mode and not using any QT functions.
class translateClass:
def __init__(self, context, text):
self.context = context
self.text = text
def arg(self,argument):
if '%' in self.text:
return translateClass(self.context, self.text.replace('%','',1)) # This doesn't actually do anything with the arguments because we don't have a UI in which to display this information anyway.
else:
return self.text
def _translate(context, text):
return translateText(context, text)
def translateText(context, text):
if not shared.safeConfigGetBoolean('bitmessagesettings', 'daemon'):
try:
from PyQt4 import QtCore, QtGui
except Exception as err:
print 'PyBitmessage requires PyQt unless you want to run it as a daemon and interact with it using the API. You can download PyQt from http://www.riverbankcomputing.com/software/pyqt/download or by searching Google for \'PyQt Download\'. If you want to run in daemon mode, see https://bitmessage.org/wiki/Daemon'
print 'Error message:', err
os._exit(0)
return QtGui.QApplication.translate(context, text)
else:
if '%' in text:
return translateClass(context, text.replace('%','',1))
else:
return text