Merge branch 'master' of git://github.com/linkerlin/PyBitmessage into linkerlin-master
This commit is contained in:
commit
2012096673
|
@ -10,6 +10,12 @@
|
||||||
# The software version variable is now held in shared.py
|
# The software version variable is now held in shared.py
|
||||||
|
|
||||||
#import ctypes
|
#import ctypes
|
||||||
|
try:
|
||||||
|
from gevent import monkey
|
||||||
|
monkey.patch_all()
|
||||||
|
except ImportError as ex:
|
||||||
|
print "cannot find gevent"
|
||||||
|
|
||||||
import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully.
|
import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully.
|
||||||
# The next 3 are used for the API
|
# The next 3 are used for the API
|
||||||
from SimpleXMLRPCServer import *
|
from SimpleXMLRPCServer import *
|
||||||
|
@ -459,9 +465,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
status, addressVersionNumber, streamNumber, toRipe = decodeAddress(
|
status, addressVersionNumber, streamNumber, toRipe = decodeAddress(
|
||||||
toAddress)
|
toAddress)
|
||||||
if status != 'success':
|
if status != 'success':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'API Error 0007: Could not decode address:', toAddress, ':', status
|
print 'API Error 0007: Could not decode address:', toAddress, ':', status
|
||||||
shared.printLock.release()
|
|
||||||
if status == 'checksumfailed':
|
if status == 'checksumfailed':
|
||||||
return 'API Error 0008: Checksum failed for address: ' + toAddress
|
return 'API Error 0008: Checksum failed for address: ' + toAddress
|
||||||
if status == 'invalidcharacters':
|
if status == 'invalidcharacters':
|
||||||
|
@ -476,9 +482,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
status, addressVersionNumber, streamNumber, fromRipe = decodeAddress(
|
status, addressVersionNumber, streamNumber, fromRipe = decodeAddress(
|
||||||
fromAddress)
|
fromAddress)
|
||||||
if status != 'success':
|
if status != 'success':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
|
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
|
||||||
shared.printLock.release()
|
|
||||||
if status == 'checksumfailed':
|
if status == 'checksumfailed':
|
||||||
return 'API Error 0008: Checksum failed for address: ' + fromAddress
|
return 'API Error 0008: Checksum failed for address: ' + fromAddress
|
||||||
if status == 'invalidcharacters':
|
if status == 'invalidcharacters':
|
||||||
|
@ -541,9 +547,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
status, addressVersionNumber, streamNumber, fromRipe = decodeAddress(
|
status, addressVersionNumber, streamNumber, fromRipe = decodeAddress(
|
||||||
fromAddress)
|
fromAddress)
|
||||||
if status != 'success':
|
if status != 'success':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
|
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
|
||||||
shared.printLock.release()
|
|
||||||
if status == 'checksumfailed':
|
if status == 'checksumfailed':
|
||||||
return 'API Error 0008: Checksum failed for address: ' + fromAddress
|
return 'API Error 0008: Checksum failed for address: ' + fromAddress
|
||||||
if status == 'invalidcharacters':
|
if status == 'invalidcharacters':
|
||||||
|
@ -612,9 +618,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
|
||||||
status, addressVersionNumber, streamNumber, toRipe = decodeAddress(
|
status, addressVersionNumber, streamNumber, toRipe = decodeAddress(
|
||||||
address)
|
address)
|
||||||
if status != 'success':
|
if status != 'success':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'API Error 0007: Could not decode address:', address, ':', status
|
print 'API Error 0007: Could not decode address:', address, ':', status
|
||||||
shared.printLock.release()
|
|
||||||
if status == 'checksumfailed':
|
if status == 'checksumfailed':
|
||||||
return 'API Error 0008: Checksum failed for address: ' + address
|
return 'API Error 0008: Checksum failed for address: ' + address
|
||||||
if status == 'invalidcharacters':
|
if status == 'invalidcharacters':
|
||||||
|
@ -741,9 +747,9 @@ if __name__ == "__main__":
|
||||||
except:
|
except:
|
||||||
apiNotifyPath = ''
|
apiNotifyPath = ''
|
||||||
if apiNotifyPath != '':
|
if apiNotifyPath != '':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Trying to call', apiNotifyPath
|
print 'Trying to call', apiNotifyPath
|
||||||
shared.printLock.release()
|
|
||||||
call([apiNotifyPath, "startingUp"])
|
call([apiNotifyPath, "startingUp"])
|
||||||
singleAPIThread = singleAPI()
|
singleAPIThread = singleAPI()
|
||||||
singleAPIThread.daemon = True # close the main program even if there are threads left
|
singleAPIThread.daemon = True # close the main program even if there are threads left
|
||||||
|
@ -774,9 +780,9 @@ if __name__ == "__main__":
|
||||||
import bitmessageqt
|
import bitmessageqt
|
||||||
bitmessageqt.run()
|
bitmessageqt.run()
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Running as a daemon. You can use Ctrl+C to exit.'
|
print 'Running as a daemon. You can use Ctrl+C to exit.'
|
||||||
shared.printLock.release()
|
|
||||||
while True:
|
while True:
|
||||||
time.sleep(20)
|
time.sleep(20)
|
||||||
|
|
||||||
|
|
|
@ -1288,9 +1288,9 @@ class MyForm(QtGui.QMainWindow):
|
||||||
status, addressVersionNumber, streamNumber, ripe = decodeAddress(
|
status, addressVersionNumber, streamNumber, ripe = decodeAddress(
|
||||||
toAddress)
|
toAddress)
|
||||||
if status != 'success':
|
if status != 'success':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Error: Could not decode', toAddress, ':', status
|
print 'Error: Could not decode', toAddress, ':', status
|
||||||
shared.printLock.release()
|
|
||||||
if status == 'missingbm':
|
if status == 'missingbm':
|
||||||
self.statusBar().showMessage(_translate(
|
self.statusBar().showMessage(_translate(
|
||||||
"MainWindow", "Error: Bitmessage addresses start with BM- Please check %1").arg(toAddress))
|
"MainWindow", "Error: Bitmessage addresses start with BM- Please check %1").arg(toAddress))
|
||||||
|
@ -2621,9 +2621,9 @@ class MyForm(QtGui.QMainWindow):
|
||||||
|
|
||||||
def updateStatusBar(self, data):
|
def updateStatusBar(self, data):
|
||||||
if data != "":
|
if data != "":
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Status bar:', data
|
print 'Status bar:', data
|
||||||
shared.printLock.release()
|
|
||||||
self.statusBar().showMessage(data)
|
self.statusBar().showMessage(data)
|
||||||
|
|
||||||
|
|
||||||
|
@ -2875,14 +2875,16 @@ class myTableWidgetItem(QTableWidgetItem):
|
||||||
def __lt__(self, other):
|
def __lt__(self, other):
|
||||||
return int(self.data(33).toPyObject()) < int(other.data(33).toPyObject())
|
return int(self.data(33).toPyObject()) < int(other.data(33).toPyObject())
|
||||||
|
|
||||||
|
from threading import Thread
|
||||||
class UISignaler(QThread):
|
class UISignaler(Thread,QThread):
|
||||||
|
|
||||||
def __init__(self, parent=None):
|
def __init__(self, parent=None):
|
||||||
|
Thread.__init__(self, parent)
|
||||||
QThread.__init__(self, parent)
|
QThread.__init__(self, parent)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
|
try:
|
||||||
command, data = shared.UISignalQueue.get()
|
command, data = shared.UISignalQueue.get()
|
||||||
if command == 'writeNewAddressToTable':
|
if command == 'writeNewAddressToTable':
|
||||||
label, address, streamNumber = data
|
label, address, streamNumber = data
|
||||||
|
@ -2927,7 +2929,29 @@ class UISignaler(QThread):
|
||||||
else:
|
else:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'Command sent to UISignaler not recognized: %s\n' % command)
|
'Command sent to UISignaler not recognized: %s\n' % command)
|
||||||
|
except Exception,ex:
|
||||||
|
# uncaught exception will block gevent
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
traceback.print_stack()
|
||||||
|
print ex
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
import gevent
|
||||||
|
except ImportError as ex:
|
||||||
|
print "cannot find gevent"
|
||||||
|
else:
|
||||||
|
def mainloop(app):
|
||||||
|
while True:
|
||||||
|
app.processEvents()
|
||||||
|
while app.hasPendingEvents():
|
||||||
|
app.processEvents()
|
||||||
|
gevent.sleep(0.01)
|
||||||
|
gevent.sleep(0.01) # don't appear to get here but cooperate again
|
||||||
|
def testprint():
|
||||||
|
#print 'this is running'
|
||||||
|
gevent.spawn_later(1, testprint)
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
app = QtGui.QApplication(sys.argv)
|
app = QtGui.QApplication(sys.argv)
|
||||||
|
@ -2946,5 +2970,8 @@ def run():
|
||||||
myapp.appIndicatorInit(app)
|
myapp.appIndicatorInit(app)
|
||||||
myapp.ubuntuMessagingMenuInit()
|
myapp.ubuntuMessagingMenuInit()
|
||||||
myapp.notifierInit()
|
myapp.notifierInit()
|
||||||
|
if gevent is None:
|
||||||
sys.exit(app.exec_())
|
sys.exit(app.exec_())
|
||||||
|
else:
|
||||||
|
gevent.joinall([gevent.spawn(testprint), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app)])
|
||||||
|
print 'done'
|
||||||
|
|
38
src/class_bgWorker.py
Normal file
38
src/class_bgWorker.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
#! /usr/bin/python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# cody by linker.lin@me.com
|
||||||
|
|
||||||
|
__author__ = 'linkerlin'
|
||||||
|
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import Queue
|
||||||
|
import time
|
||||||
|
|
||||||
|
class bgWorker(threading.Thread):
|
||||||
|
def __init__(self):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.q = Queue.Queue()
|
||||||
|
self.setDaemon(True)
|
||||||
|
|
||||||
|
def post(self,job):
|
||||||
|
self.q.put(job)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while 1:
|
||||||
|
job=None
|
||||||
|
try:
|
||||||
|
job = self.q.get(block=True)
|
||||||
|
if job:
|
||||||
|
job()
|
||||||
|
except Exception as ex:
|
||||||
|
print "Error,job exception:",ex.message,type(ex)
|
||||||
|
time.sleep(0.05)
|
||||||
|
else:
|
||||||
|
#print "job: ", job, " done"
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
bgworker = bgWorker()
|
||||||
|
bgworker.start()
|
|
@ -61,15 +61,15 @@ class outgoingSynSender(threading.Thread):
|
||||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
sock.settimeout(20)
|
sock.settimeout(20)
|
||||||
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
|
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Trying an outgoing connection to', HOST, ':', PORT
|
print 'Trying an outgoing connection to', HOST, ':', PORT
|
||||||
shared.printLock.release()
|
|
||||||
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
|
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
|
||||||
if shared.verbose >= 2:
|
if shared.verbose >= 2:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
|
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
|
||||||
shared.printLock.release()
|
|
||||||
proxytype = socks.PROXY_TYPE_SOCKS4
|
proxytype = socks.PROXY_TYPE_SOCKS4
|
||||||
sockshostname = shared.config.get(
|
sockshostname = shared.config.get(
|
||||||
'bitmessagesettings', 'sockshostname')
|
'bitmessagesettings', 'sockshostname')
|
||||||
|
@ -88,9 +88,9 @@ class outgoingSynSender(threading.Thread):
|
||||||
proxytype, sockshostname, socksport, rdns)
|
proxytype, sockshostname, socksport, rdns)
|
||||||
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
|
||||||
if shared.verbose >= 2:
|
if shared.verbose >= 2:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
|
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
|
||||||
shared.printLock.release()
|
|
||||||
proxytype = socks.PROXY_TYPE_SOCKS5
|
proxytype = socks.PROXY_TYPE_SOCKS5
|
||||||
sockshostname = shared.config.get(
|
sockshostname = shared.config.get(
|
||||||
'bitmessagesettings', 'sockshostname')
|
'bitmessagesettings', 'sockshostname')
|
||||||
|
@ -116,9 +116,9 @@ class outgoingSynSender(threading.Thread):
|
||||||
rd.setup(sock, HOST, PORT, self.streamNumber,
|
rd.setup(sock, HOST, PORT, self.streamNumber,
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
||||||
rd.start()
|
rd.start()
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print self, 'connected to', HOST, 'during an outgoing attempt.'
|
print self, 'connected to', HOST, 'during an outgoing attempt.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
sd = sendDataThread()
|
sd = sendDataThread()
|
||||||
sd.setup(sock, HOST, PORT, self.streamNumber,
|
sd.setup(sock, HOST, PORT, self.streamNumber,
|
||||||
|
@ -128,18 +128,18 @@ class outgoingSynSender(threading.Thread):
|
||||||
|
|
||||||
except socks.GeneralProxyError as err:
|
except socks.GeneralProxyError as err:
|
||||||
if shared.verbose >= 2:
|
if shared.verbose >= 2:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||||
shared.printLock.release()
|
|
||||||
PORT, timeLastSeen = shared.knownNodes[
|
PORT, timeLastSeen = shared.knownNodes[
|
||||||
self.streamNumber][HOST]
|
self.streamNumber][HOST]
|
||||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
|
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
del shared.knownNodes[self.streamNumber][HOST]
|
del shared.knownNodes[self.streamNumber][HOST]
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
|
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||||
shared.printLock.release()
|
|
||||||
except socks.Socks5AuthError as err:
|
except socks.Socks5AuthError as err:
|
||||||
shared.UISignalQueue.put((
|
shared.UISignalQueue.put((
|
||||||
'updateStatusBar', tr.translateText(
|
'updateStatusBar', tr.translateText(
|
||||||
|
@ -154,18 +154,18 @@ class outgoingSynSender(threading.Thread):
|
||||||
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
|
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
|
||||||
else:
|
else:
|
||||||
if shared.verbose >= 1:
|
if shared.verbose >= 1:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
|
||||||
shared.printLock.release()
|
|
||||||
PORT, timeLastSeen = shared.knownNodes[
|
PORT, timeLastSeen = shared.knownNodes[
|
||||||
self.streamNumber][HOST]
|
self.streamNumber][HOST]
|
||||||
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
|
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
del shared.knownNodes[self.streamNumber][HOST]
|
del shared.knownNodes[self.streamNumber][HOST]
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
|
||||||
shared.printLock.release()
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ')
|
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ')
|
||||||
|
|
|
@ -61,9 +61,9 @@ class receiveDataThread(threading.Thread):
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||||
shared.printLock.release()
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
dataLen = len(self.data)
|
dataLen = len(self.data)
|
||||||
|
@ -71,60 +71,60 @@ class receiveDataThread(threading.Thread):
|
||||||
if len(self.data) == dataLen: # recv returns 0 bytes when the remote closes the connection
|
if len(self.data) == dataLen: # recv returns 0 bytes when the remote closes the connection
|
||||||
raise Exception("Remote closed the connection")
|
raise Exception("Remote closed the connection")
|
||||||
except socket.timeout:
|
except socket.timeout:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||||
shared.printLock.release()
|
|
||||||
break
|
break
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err
|
print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err
|
||||||
shared.printLock.release()
|
|
||||||
break
|
break
|
||||||
# print 'Received', repr(self.data)
|
# print 'Received', repr(self.data)
|
||||||
if self.data == "":
|
if self.data == "":
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
|
||||||
shared.printLock.release()
|
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
self.processData()
|
self.processData()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del self.selfInitiatedConnections[self.streamNumber][self]
|
del self.selfInitiatedConnections[self.streamNumber][self]
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||||
shared.printLock.release()
|
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
||||||
try:
|
try:
|
||||||
del shared.connectedHostsList[self.HOST]
|
del shared.connectedHostsList[self.HOST]
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
|
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||||
self.HOST]
|
self.HOST]
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList)
|
print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList)
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
def processData(self):
|
def processData(self):
|
||||||
# if shared.verbose >= 3:
|
# if shared.verbose >= 3:
|
||||||
# shared.printLock.acquire()
|
# with shared.printLock:
|
||||||
# print 'self.data is currently ', repr(self.data)
|
# print 'self.data is currently ', repr(self.data)
|
||||||
# shared.printLock.release()
|
#
|
||||||
if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length
|
if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length
|
||||||
return
|
return
|
||||||
if self.data[0:4] != '\xe9\xbe\xb4\xd9':
|
if self.data[0:4] != '\xe9\xbe\xb4\xd9':
|
||||||
if shared.verbose >= 1:
|
if shared.verbose >= 1:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
|
print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
|
||||||
shared.printLock.release()
|
|
||||||
self.data = ""
|
self.data = ""
|
||||||
return
|
return
|
||||||
self.payloadLength, = unpack('>L', self.data[16:20])
|
self.payloadLength, = unpack('>L', self.data[16:20])
|
||||||
|
@ -145,9 +145,9 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
|
if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
|
||||||
remoteCommand = self.data[4:16]
|
remoteCommand = self.data[4:16]
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST
|
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST
|
||||||
shared.printLock.release()
|
|
||||||
if remoteCommand == 'version\x00\x00\x00\x00\x00':
|
if remoteCommand == 'version\x00\x00\x00\x00\x00':
|
||||||
self.recversion(self.data[24:self.payloadLength + 24])
|
self.recversion(self.data[24:self.payloadLength + 24])
|
||||||
elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00':
|
elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00':
|
||||||
|
@ -181,16 +181,16 @@ class receiveDataThread(threading.Thread):
|
||||||
objectHash, = random.sample(
|
objectHash, = random.sample(
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 1)
|
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 1)
|
||||||
if objectHash in shared.inventory:
|
if objectHash in shared.inventory:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Inventory (in memory) already has object listed in inv message.'
|
print 'Inventory (in memory) already has object listed in inv message.'
|
||||||
shared.printLock.release()
|
|
||||||
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
||||||
objectHash]
|
objectHash]
|
||||||
elif shared.isInSqlInventory(objectHash):
|
elif shared.isInSqlInventory(objectHash):
|
||||||
if shared.verbose >= 3:
|
if shared.verbose >= 3:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
||||||
shared.printLock.release()
|
|
||||||
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
||||||
objectHash]
|
objectHash]
|
||||||
else:
|
else:
|
||||||
|
@ -198,9 +198,9 @@ class receiveDataThread(threading.Thread):
|
||||||
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
|
||||||
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
|
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||||
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
|
@ -208,18 +208,18 @@ class receiveDataThread(threading.Thread):
|
||||||
pass
|
pass
|
||||||
break
|
break
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
|
||||||
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0:
|
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
|
||||||
shared.printLock.release()
|
|
||||||
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len(
|
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len(
|
||||||
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
if len(self.ackDataThatWeHaveYetToSend) > 0:
|
if len(self.ackDataThatWeHaveYetToSend) > 0:
|
||||||
|
@ -247,9 +247,9 @@ class receiveDataThread(threading.Thread):
|
||||||
'\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
'\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
def recverack(self):
|
def recverack(self):
|
||||||
print 'verack received'
|
print 'verack received'
|
||||||
|
@ -267,19 +267,19 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
remoteNodeIncomingPort, remoteNodeSeenTime = shared.knownNodes[
|
remoteNodeIncomingPort, remoteNodeSeenTime = shared.knownNodes[
|
||||||
self.streamNumber][self.HOST]
|
self.streamNumber][self.HOST]
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Connection fully established with', self.HOST, remoteNodeIncomingPort
|
print 'Connection fully established with', self.HOST, remoteNodeIncomingPort
|
||||||
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
|
||||||
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
|
||||||
print 'broadcasting addr from within connectionFullyEstablished function.'
|
print 'broadcasting addr from within connectionFullyEstablished function.'
|
||||||
shared.printLock.release()
|
|
||||||
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST,
|
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST,
|
||||||
remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
|
remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
|
||||||
self.sendaddr() # This is one large addr message to this one peer.
|
self.sendaddr() # This is one large addr message to this one peer.
|
||||||
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'We are connected to too many people. Closing connection.'
|
print 'We are connected to too many people. Closing connection.'
|
||||||
shared.printLock.release()
|
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
||||||
return
|
return
|
||||||
self.sendBigInv()
|
self.sendBigInv()
|
||||||
|
@ -331,16 +331,16 @@ class receiveDataThread(threading.Thread):
|
||||||
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
headerData += pack('>L', len(payload))
|
headerData += pack('>L', len(payload))
|
||||||
headerData += hashlib.sha512(payload).digest()[:4]
|
headerData += hashlib.sha512(payload).digest()[:4]
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
|
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(headerData + payload)
|
self.sock.sendall(headerData + payload)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# We have received a broadcast message
|
# We have received a broadcast message
|
||||||
def recbroadcast(self, data):
|
def recbroadcast(self, data):
|
||||||
|
@ -419,13 +419,13 @@ class receiveDataThread(threading.Thread):
|
||||||
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
||||||
(time.time() - self.messageProcessingStartTime)
|
(time.time() - self.messageProcessingStartTime)
|
||||||
if sleepTime > 0 and doTimingAttackMitigation:
|
if sleepTime > 0 and doTimingAttackMitigation:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
|
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
time.sleep(sleepTime)
|
time.sleep(sleepTime)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# A broadcast message has a valid time and POW and requires processing.
|
# A broadcast message has a valid time and POW and requires processing.
|
||||||
# The recbroadcast function calls this one.
|
# The recbroadcast function calls this one.
|
||||||
|
@ -462,9 +462,9 @@ class receiveDataThread(threading.Thread):
|
||||||
sendersHash = data[readPosition:readPosition + 20]
|
sendersHash = data[readPosition:readPosition + 20]
|
||||||
if sendersHash not in shared.broadcastSendersForWhichImWatching:
|
if sendersHash not in shared.broadcastSendersForWhichImWatching:
|
||||||
# Display timing data
|
# Display timing data
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Time spent deciding that we are not interested in this v1 broadcast:', time.time() - self.messageProcessingStartTime
|
print 'Time spent deciding that we are not interested in this v1 broadcast:', time.time() - self.messageProcessingStartTime
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
# At this point, this message claims to be from sendersHash and
|
# At this point, this message claims to be from sendersHash and
|
||||||
# we are interested in it. We still have to hash the public key
|
# we are interested in it. We still have to hash the public key
|
||||||
|
@ -527,9 +527,9 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
fromAddress = encodeAddress(
|
fromAddress = encodeAddress(
|
||||||
sendersAddressVersion, sendersStream, ripe.digest())
|
sendersAddressVersion, sendersStream, ripe.digest())
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'fromAddress:', fromAddress
|
print 'fromAddress:', fromAddress
|
||||||
shared.printLock.release()
|
|
||||||
if messageEncodingType == 2:
|
if messageEncodingType == 2:
|
||||||
bodyPositionIndex = string.find(message, '\nBody:')
|
bodyPositionIndex = string.find(message, '\nBody:')
|
||||||
if bodyPositionIndex > 1:
|
if bodyPositionIndex > 1:
|
||||||
|
@ -570,9 +570,9 @@ class receiveDataThread(threading.Thread):
|
||||||
call([apiNotifyPath, "newBroadcast"])
|
call([apiNotifyPath, "newBroadcast"])
|
||||||
|
|
||||||
# Display timing data
|
# Display timing data
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
|
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
|
||||||
shared.printLock.release()
|
|
||||||
if broadcastVersion == 2:
|
if broadcastVersion == 2:
|
||||||
cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
|
cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
|
||||||
data[readPosition:readPosition + 10])
|
data[readPosition:readPosition + 10])
|
||||||
|
@ -590,9 +590,9 @@ class receiveDataThread(threading.Thread):
|
||||||
# print 'cryptorObject.decrypt Exception:', err
|
# print 'cryptorObject.decrypt Exception:', err
|
||||||
if not initialDecryptionSuccessful:
|
if not initialDecryptionSuccessful:
|
||||||
# This is not a broadcast I am interested in.
|
# This is not a broadcast I am interested in.
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Length of time program spent failing to decrypt this v2 broadcast:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
print 'Length of time program spent failing to decrypt this v2 broadcast:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
# At this point this is a broadcast I have decrypted and thus am
|
# At this point this is a broadcast I have decrypted and thus am
|
||||||
# interested in.
|
# interested in.
|
||||||
|
@ -683,9 +683,9 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
fromAddress = encodeAddress(
|
fromAddress = encodeAddress(
|
||||||
sendersAddressVersion, sendersStream, ripe.digest())
|
sendersAddressVersion, sendersStream, ripe.digest())
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'fromAddress:', fromAddress
|
print 'fromAddress:', fromAddress
|
||||||
shared.printLock.release()
|
|
||||||
if messageEncodingType == 2:
|
if messageEncodingType == 2:
|
||||||
bodyPositionIndex = string.find(message, '\nBody:')
|
bodyPositionIndex = string.find(message, '\nBody:')
|
||||||
if bodyPositionIndex > 1:
|
if bodyPositionIndex > 1:
|
||||||
|
@ -726,9 +726,9 @@ class receiveDataThread(threading.Thread):
|
||||||
call([apiNotifyPath, "newBroadcast"])
|
call([apiNotifyPath, "newBroadcast"])
|
||||||
|
|
||||||
# Display timing data
|
# Display timing data
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
|
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# We have received a msg message.
|
# We have received a msg message.
|
||||||
def recmsg(self, data):
|
def recmsg(self, data):
|
||||||
|
@ -798,13 +798,13 @@ class receiveDataThread(threading.Thread):
|
||||||
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
||||||
(time.time() - self.messageProcessingStartTime)
|
(time.time() - self.messageProcessingStartTime)
|
||||||
if sleepTime > 0 and doTimingAttackMitigation:
|
if sleepTime > 0 and doTimingAttackMitigation:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
|
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
time.sleep(sleepTime)
|
time.sleep(sleepTime)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# A msg message has a valid time and POW and requires processing. The
|
# A msg message has a valid time and POW and requires processing. The
|
||||||
# recmsg function calls this one.
|
# recmsg function calls this one.
|
||||||
|
@ -812,9 +812,9 @@ class receiveDataThread(threading.Thread):
|
||||||
initialDecryptionSuccessful = False
|
initialDecryptionSuccessful = False
|
||||||
# Let's check whether this is a message acknowledgement bound for us.
|
# Let's check whether this is a message acknowledgement bound for us.
|
||||||
if encryptedData[readPosition:] in shared.ackdataForWhichImWatching:
|
if encryptedData[readPosition:] in shared.ackdataForWhichImWatching:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'This msg IS an acknowledgement bound for me.'
|
print 'This msg IS an acknowledgement bound for me.'
|
||||||
shared.printLock.release()
|
|
||||||
del shared.ackdataForWhichImWatching[encryptedData[readPosition:]]
|
del shared.ackdataForWhichImWatching[encryptedData[readPosition:]]
|
||||||
t = ('ackreceived', encryptedData[readPosition:])
|
t = ('ackreceived', encryptedData[readPosition:])
|
||||||
shared.sqlLock.acquire()
|
shared.sqlLock.acquire()
|
||||||
|
@ -828,10 +828,10 @@ class receiveDataThread(threading.Thread):
|
||||||
time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8')))))
|
time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8')))))
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'This was NOT an acknowledgement bound for me.'
|
print 'This was NOT an acknowledgement bound for me.'
|
||||||
# print 'shared.ackdataForWhichImWatching', shared.ackdataForWhichImWatching
|
# print 'shared.ackdataForWhichImWatching', shared.ackdataForWhichImWatching
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# This is not an acknowledgement bound for me. See if it is a message
|
# This is not an acknowledgement bound for me. See if it is a message
|
||||||
# bound for me by trying to decrypt it with my private keys.
|
# bound for me by trying to decrypt it with my private keys.
|
||||||
|
@ -841,6 +841,7 @@ class receiveDataThread(threading.Thread):
|
||||||
encryptedData[readPosition:])
|
encryptedData[readPosition:])
|
||||||
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
|
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
|
||||||
initialDecryptionSuccessful = True
|
initialDecryptionSuccessful = True
|
||||||
|
with shared.printLock:
|
||||||
print 'EC decryption successful using key associated with ripe hash:', key.encode('hex')
|
print 'EC decryption successful using key associated with ripe hash:', key.encode('hex')
|
||||||
break
|
break
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
@ -848,9 +849,9 @@ class receiveDataThread(threading.Thread):
|
||||||
# print 'cryptorObject.decrypt Exception:', err
|
# print 'cryptorObject.decrypt Exception:', err
|
||||||
if not initialDecryptionSuccessful:
|
if not initialDecryptionSuccessful:
|
||||||
# This is not a message bound for me.
|
# This is not a message bound for me.
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Length of time program spent failing to decrypt this message:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
print 'Length of time program spent failing to decrypt this message:', time.time() - self.messageProcessingStartTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
else:
|
else:
|
||||||
# This is a message bound for me.
|
# This is a message bound for me.
|
||||||
toAddress = shared.myAddressesByHash[
|
toAddress = shared.myAddressesByHash[
|
||||||
|
@ -899,12 +900,12 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'sender\'s requiredPayloadLengthExtraBytes is', requiredPayloadLengthExtraBytes
|
print 'sender\'s requiredPayloadLengthExtraBytes is', requiredPayloadLengthExtraBytes
|
||||||
endOfThePublicKeyPosition = readPosition # needed for when we store the pubkey in our database of pubkeys for later use.
|
endOfThePublicKeyPosition = readPosition # needed for when we store the pubkey in our database of pubkeys for later use.
|
||||||
if toRipe != decryptedData[readPosition:readPosition + 20]:
|
if toRipe != decryptedData[readPosition:readPosition + 20]:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.'
|
print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.'
|
||||||
print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html'
|
print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html'
|
||||||
print 'your toRipe:', toRipe.encode('hex')
|
print 'your toRipe:', toRipe.encode('hex')
|
||||||
print 'embedded destination toRipe:', decryptedData[readPosition:readPosition + 20].encode('hex')
|
print 'embedded destination toRipe:', decryptedData[readPosition:readPosition + 20].encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
readPosition += 20
|
readPosition += 20
|
||||||
messageEncodingType, messageEncodingTypeLength = decodeVarint(
|
messageEncodingType, messageEncodingTypeLength = decodeVarint(
|
||||||
|
@ -935,9 +936,9 @@ class receiveDataThread(threading.Thread):
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
print 'ECDSA verify failed', err
|
print 'ECDSA verify failed', err
|
||||||
return
|
return
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person:', helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), ' ..and here is the testnet address:', helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey), '. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.'
|
print 'As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person:', helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), ' ..and here is the testnet address:', helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey), '. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.'
|
||||||
shared.printLock.release()
|
|
||||||
# calculate the fromRipe.
|
# calculate the fromRipe.
|
||||||
sha = hashlib.new('sha512')
|
sha = hashlib.new('sha512')
|
||||||
sha.update(pubSigningKey + pubEncryptionKey)
|
sha.update(pubSigningKey + pubEncryptionKey)
|
||||||
|
@ -983,9 +984,9 @@ class receiveDataThread(threading.Thread):
|
||||||
queryreturn = shared.sqlReturnQueue.get()
|
queryreturn = shared.sqlReturnQueue.get()
|
||||||
shared.sqlLock.release()
|
shared.sqlLock.release()
|
||||||
if queryreturn != []:
|
if queryreturn != []:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Message ignored because address is in blacklist.'
|
print 'Message ignored because address is in blacklist.'
|
||||||
shared.printLock.release()
|
|
||||||
blockMessage = True
|
blockMessage = True
|
||||||
else: # We're using a whitelist
|
else: # We're using a whitelist
|
||||||
t = (fromAddress,)
|
t = (fromAddress,)
|
||||||
|
@ -1084,10 +1085,10 @@ class receiveDataThread(threading.Thread):
|
||||||
sum = 0
|
sum = 0
|
||||||
for item in shared.successfullyDecryptMessageTimings:
|
for item in shared.successfullyDecryptMessageTimings:
|
||||||
sum += item
|
sum += item
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage
|
print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage
|
||||||
print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings)
|
print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings)
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
def isAckDataValid(self, ackData):
|
def isAckDataValid(self, ackData):
|
||||||
if len(ackData) < 24:
|
if len(ackData) < 24:
|
||||||
|
@ -1127,9 +1128,9 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.sqlLock.release()
|
shared.sqlLock.release()
|
||||||
shared.workerQueue.put(('sendmessage', ''))
|
shared.workerQueue.put(('sendmessage', ''))
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
|
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# We have received a pubkey
|
# We have received a pubkey
|
||||||
def recpubkey(self, data):
|
def recpubkey(self, data):
|
||||||
|
@ -1153,14 +1154,14 @@ class receiveDataThread(threading.Thread):
|
||||||
readPosition += 4
|
readPosition += 4
|
||||||
|
|
||||||
if embeddedTime < int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys:
|
if embeddedTime < int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
|
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
if embeddedTime > int(time.time()) + 10800:
|
if embeddedTime > int(time.time()) + 10800:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.'
|
print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.'
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
addressVersion, varintLength = decodeVarint(
|
addressVersion, varintLength = decodeVarint(
|
||||||
data[readPosition:readPosition + 10])
|
data[readPosition:readPosition + 10])
|
||||||
|
@ -1196,13 +1197,13 @@ class receiveDataThread(threading.Thread):
|
||||||
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
|
||||||
(time.time() - self.pubkeyProcessingStartTime)
|
(time.time() - self.pubkeyProcessingStartTime)
|
||||||
if sleepTime > 0 and doTimingAttackMitigation:
|
if sleepTime > 0 and doTimingAttackMitigation:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
|
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
time.sleep(sleepTime)
|
time.sleep(sleepTime)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Total pubkey processing time:', time.time() - self.pubkeyProcessingStartTime, 'seconds.'
|
print 'Total pubkey processing time:', time.time() - self.pubkeyProcessingStartTime, 'seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
def processpubkey(self, data):
|
def processpubkey(self, data):
|
||||||
readPosition = 8 # for the nonce
|
readPosition = 8 # for the nonce
|
||||||
|
@ -1226,9 +1227,9 @@ class receiveDataThread(threading.Thread):
|
||||||
print '(Within processpubkey) addressVersion of 0 doesn\'t make sense.'
|
print '(Within processpubkey) addressVersion of 0 doesn\'t make sense.'
|
||||||
return
|
return
|
||||||
if addressVersion >= 4 or addressVersion == 1:
|
if addressVersion >= 4 or addressVersion == 1:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'This version of Bitmessage cannot handle version', addressVersion, 'addresses.'
|
print 'This version of Bitmessage cannot handle version', addressVersion, 'addresses.'
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
if addressVersion == 2:
|
if addressVersion == 2:
|
||||||
if len(data) < 146: # sanity check. This is the minimum possible length.
|
if len(data) < 146: # sanity check. This is the minimum possible length.
|
||||||
|
@ -1252,12 +1253,12 @@ class receiveDataThread(threading.Thread):
|
||||||
ripeHasher.update(sha.digest())
|
ripeHasher.update(sha.digest())
|
||||||
ripe = ripeHasher.digest()
|
ripe = ripeHasher.digest()
|
||||||
|
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
|
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
|
||||||
print 'ripe', ripe.encode('hex')
|
print 'ripe', ripe.encode('hex')
|
||||||
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
|
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
|
||||||
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
|
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
t = (ripe,)
|
t = (ripe,)
|
||||||
shared.sqlLock.acquire()
|
shared.sqlLock.acquire()
|
||||||
|
@ -1321,12 +1322,12 @@ class receiveDataThread(threading.Thread):
|
||||||
ripeHasher.update(sha.digest())
|
ripeHasher.update(sha.digest())
|
||||||
ripe = ripeHasher.digest()
|
ripe = ripeHasher.digest()
|
||||||
|
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
|
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
|
||||||
print 'ripe', ripe.encode('hex')
|
print 'ripe', ripe.encode('hex')
|
||||||
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
|
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
|
||||||
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
|
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
t = (ripe,)
|
t = (ripe,)
|
||||||
shared.sqlLock.acquire()
|
shared.sqlLock.acquire()
|
||||||
|
@ -1423,10 +1424,10 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
if requestedHash in shared.myAddressesByHash: # if this address hash is one of mine
|
if requestedHash in shared.myAddressesByHash: # if this address hash is one of mine
|
||||||
if decodeAddress(shared.myAddressesByHash[requestedHash])[1] != requestedAddressVersionNumber:
|
if decodeAddress(shared.myAddressesByHash[requestedHash])[1] != requestedAddressVersionNumber:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. That shouldn\'t have happened. Ignoring.\n')
|
'(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. That shouldn\'t have happened. Ignoring.\n')
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
lastPubkeySendTime = int(shared.config.get(
|
lastPubkeySendTime = int(shared.config.get(
|
||||||
|
@ -1434,9 +1435,9 @@ class receiveDataThread(threading.Thread):
|
||||||
except:
|
except:
|
||||||
lastPubkeySendTime = 0
|
lastPubkeySendTime = 0
|
||||||
if lastPubkeySendTime < time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was at least 28 days ago...
|
if lastPubkeySendTime < time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was at least 28 days ago...
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.'
|
print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.'
|
||||||
shared.printLock.release()
|
|
||||||
if requestedAddressVersionNumber == 2:
|
if requestedAddressVersionNumber == 2:
|
||||||
shared.workerQueue.put((
|
shared.workerQueue.put((
|
||||||
'doPOWForMyV2Pubkey', requestedHash))
|
'doPOWForMyV2Pubkey', requestedHash))
|
||||||
|
@ -1444,13 +1445,13 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.workerQueue.put((
|
shared.workerQueue.put((
|
||||||
'doPOWForMyV3Pubkey', requestedHash))
|
'doPOWForMyV3Pubkey', requestedHash))
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Found getpubkey-requested-hash in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is:', lastPubkeySendTime
|
print 'Found getpubkey-requested-hash in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is:', lastPubkeySendTime
|
||||||
shared.printLock.release()
|
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'This getpubkey request is not for any of my keys.'
|
print 'This getpubkey request is not for any of my keys.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# We have received an inv message
|
# We have received an inv message
|
||||||
def recinv(self, data):
|
def recinv(self, data):
|
||||||
|
@ -1458,10 +1459,10 @@ class receiveDataThread(threading.Thread):
|
||||||
if len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
|
if len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
|
||||||
for key, value in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items():
|
for key, value in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items():
|
||||||
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value
|
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
|
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
|
||||||
print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave
|
print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave
|
||||||
shared.printLock.release()
|
|
||||||
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
|
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
|
||||||
if numberOfItemsInInv > 50000:
|
if numberOfItemsInInv > 50000:
|
||||||
sys.stderr.write('Too many items in inv message!')
|
sys.stderr.write('Too many items in inv message!')
|
||||||
|
@ -1471,16 +1472,16 @@ class receiveDataThread(threading.Thread):
|
||||||
return
|
return
|
||||||
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
|
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
|
||||||
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
|
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
|
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
|
||||||
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
data[lengthOfVarint:32 + lengthOfVarint]] = 0
|
||||||
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Inventory (in memory) has inventory item already.'
|
print 'Inventory (in memory) has inventory item already.'
|
||||||
shared.printLock.release()
|
|
||||||
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
|
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
|
||||||
print 'Inventory (SQL on disk) has inventory item already.'
|
print 'Inventory (SQL on disk) has inventory item already.'
|
||||||
else:
|
else:
|
||||||
|
@ -1490,9 +1491,9 @@ class receiveDataThread(threading.Thread):
|
||||||
for i in range(numberOfItemsInInv): # upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
for i in range(numberOfItemsInInv): # upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
|
||||||
if len(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) == 32: # The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously.
|
if len(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) == 32: # The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously.
|
||||||
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
|
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.'
|
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.'
|
||||||
shared.printLock.release()
|
|
||||||
break
|
break
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[data[
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[data[
|
||||||
lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
|
lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
|
||||||
|
@ -1504,9 +1505,9 @@ class receiveDataThread(threading.Thread):
|
||||||
# Send a getdata message to our peer to request the object with the given
|
# Send a getdata message to our peer to request the object with the given
|
||||||
# hash
|
# hash
|
||||||
def sendgetdata(self, hash):
|
def sendgetdata(self, hash):
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sending getdata to retrieve object with hash:', hash.encode('hex')
|
print 'sending getdata to retrieve object with hash:', hash.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
payload = '\x01' + hash
|
payload = '\x01' + hash
|
||||||
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
||||||
headerData += 'getdata\x00\x00\x00\x00\x00'
|
headerData += 'getdata\x00\x00\x00\x00\x00'
|
||||||
|
@ -1517,9 +1518,9 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sock.sendall(headerData + payload)
|
self.sock.sendall(headerData + payload)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# We have received a getdata request from our peer
|
# We have received a getdata request from our peer
|
||||||
def recgetdata(self, data):
|
def recgetdata(self, data):
|
||||||
|
@ -1531,9 +1532,9 @@ class receiveDataThread(threading.Thread):
|
||||||
for i in xrange(numberOfRequestedInventoryItems):
|
for i in xrange(numberOfRequestedInventoryItems):
|
||||||
hash = data[lengthOfVarint + (
|
hash = data[lengthOfVarint + (
|
||||||
i * 32):32 + lengthOfVarint + (i * 32)]
|
i * 32):32 + lengthOfVarint + (i * 32)]
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'received getdata request for item:', hash.encode('hex')
|
print 'received getdata request for item:', hash.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
# print 'inventory is', shared.inventory
|
# print 'inventory is', shared.inventory
|
||||||
if hash in shared.inventory:
|
if hash in shared.inventory:
|
||||||
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
objectType, streamNumber, payload, receivedTime = shared.inventory[
|
||||||
|
@ -1558,24 +1559,24 @@ class receiveDataThread(threading.Thread):
|
||||||
def sendData(self, objectType, payload):
|
def sendData(self, objectType, payload):
|
||||||
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
||||||
if objectType == 'pubkey':
|
if objectType == 'pubkey':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sending pubkey'
|
print 'sending pubkey'
|
||||||
shared.printLock.release()
|
|
||||||
headerData += 'pubkey\x00\x00\x00\x00\x00\x00'
|
headerData += 'pubkey\x00\x00\x00\x00\x00\x00'
|
||||||
elif objectType == 'getpubkey' or objectType == 'pubkeyrequest':
|
elif objectType == 'getpubkey' or objectType == 'pubkeyrequest':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sending getpubkey'
|
print 'sending getpubkey'
|
||||||
shared.printLock.release()
|
|
||||||
headerData += 'getpubkey\x00\x00\x00'
|
headerData += 'getpubkey\x00\x00\x00'
|
||||||
elif objectType == 'msg':
|
elif objectType == 'msg':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sending msg'
|
print 'sending msg'
|
||||||
shared.printLock.release()
|
|
||||||
headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
elif objectType == 'broadcast':
|
elif objectType == 'broadcast':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sending broadcast'
|
print 'sending broadcast'
|
||||||
shared.printLock.release()
|
|
||||||
headerData += 'broadcast\x00\x00\x00'
|
headerData += 'broadcast\x00\x00\x00'
|
||||||
else:
|
else:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
|
@ -1587,15 +1588,15 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sock.sendall(headerData + payload)
|
self.sock.sendall(headerData + payload)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# Send an inv message with just one hash to all of our peers
|
# Send an inv message with just one hash to all of our peers
|
||||||
def broadcastinv(self, hash):
|
def broadcastinv(self, hash):
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'broadcasting inv with hash:', hash.encode('hex')
|
print 'broadcasting inv with hash:', hash.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash))
|
shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash))
|
||||||
|
|
||||||
# We have received an addr message.
|
# We have received an addr message.
|
||||||
|
@ -1606,9 +1607,9 @@ class receiveDataThread(threading.Thread):
|
||||||
data[:10])
|
data[:10])
|
||||||
|
|
||||||
if shared.verbose >= 1:
|
if shared.verbose >= 1:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
if self.remoteProtocolVersion == 1:
|
if self.remoteProtocolVersion == 1:
|
||||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||||
|
@ -1621,25 +1622,25 @@ class receiveDataThread(threading.Thread):
|
||||||
for i in range(0, numberOfAddressesIncluded):
|
for i in range(0, numberOfAddressesIncluded):
|
||||||
try:
|
try:
|
||||||
if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
|
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
|
||||||
shared.printLock.release()
|
|
||||||
continue
|
continue
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + (
|
recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + (
|
||||||
34 * i):8 + lengthOfNumberOfAddresses + (34 * i)])
|
34 * i):8 + lengthOfNumberOfAddresses + (34 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
if recaddrStream == 0:
|
if recaddrStream == 0:
|
||||||
continue
|
continue
|
||||||
|
@ -1649,20 +1650,20 @@ class receiveDataThread(threading.Thread):
|
||||||
recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + (
|
recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + (
|
||||||
34 * i):16 + lengthOfNumberOfAddresses + (34 * i)])
|
34 * i):16 + lengthOfNumberOfAddresses + (34 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + (
|
recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + (
|
||||||
34 * i):34 + lengthOfNumberOfAddresses + (34 * i)])
|
34 * i):34 + lengthOfNumberOfAddresses + (34 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
||||||
# recaddrPort, ', i', i
|
# recaddrPort, ', i', i
|
||||||
|
@ -1711,9 +1712,9 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
self.broadcastaddr(
|
self.broadcastaddr(
|
||||||
listOfAddressDetailsToBroadcastToPeers) # no longer broadcast
|
listOfAddressDetailsToBroadcastToPeers) # no longer broadcast
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
shared.printLock.release()
|
|
||||||
elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
|
elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
|
||||||
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
|
||||||
return
|
return
|
||||||
|
@ -1725,25 +1726,25 @@ class receiveDataThread(threading.Thread):
|
||||||
for i in range(0, numberOfAddressesIncluded):
|
for i in range(0, numberOfAddressesIncluded):
|
||||||
try:
|
try:
|
||||||
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
|
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
shared.printLock.release()
|
|
||||||
continue
|
continue
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
|
||||||
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
if recaddrStream == 0:
|
if recaddrStream == 0:
|
||||||
continue
|
continue
|
||||||
|
@ -1753,20 +1754,20 @@ class receiveDataThread(threading.Thread):
|
||||||
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
||||||
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
|
|
||||||
try:
|
try:
|
||||||
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
|
||||||
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
|
||||||
shared.printLock.release()
|
|
||||||
break # giving up on unpacking any more. We should still be connected however.
|
break # giving up on unpacking any more. We should still be connected however.
|
||||||
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
# print 'Within recaddr(): IP', recaddrIP, ', Port',
|
||||||
# recaddrPort, ', i', i
|
# recaddrPort, ', i', i
|
||||||
|
@ -1794,9 +1795,9 @@ class receiveDataThread(threading.Thread):
|
||||||
shared.knownNodes[recaddrStream][hostFromAddrMessage] = (
|
shared.knownNodes[recaddrStream][hostFromAddrMessage] = (
|
||||||
recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
|
recaddrPort, timeSomeoneElseReceivedMessageFromThisNode)
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream
|
||||||
shared.printLock.release()
|
|
||||||
needToWriteKnownNodesToDisk = True
|
needToWriteKnownNodesToDisk = True
|
||||||
hostDetails = (
|
hostDetails = (
|
||||||
timeSomeoneElseReceivedMessageFromThisNode,
|
timeSomeoneElseReceivedMessageFromThisNode,
|
||||||
|
@ -1820,9 +1821,9 @@ class receiveDataThread(threading.Thread):
|
||||||
output.close()
|
output.close()
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# Function runs when we want to broadcast an addr message to all of our
|
# Function runs when we want to broadcast an addr message to all of our
|
||||||
# peers. Runs when we learn of nodes that we didn't previously know about
|
# peers. Runs when we learn of nodes that we didn't previously know about
|
||||||
|
@ -1849,9 +1850,9 @@ class receiveDataThread(threading.Thread):
|
||||||
datatosend = datatosend + payload
|
datatosend = datatosend + payload
|
||||||
|
|
||||||
if shared.verbose >= 1:
|
if shared.verbose >= 1:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.'
|
||||||
shared.printLock.release()
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
self.streamNumber, 'sendaddr', datatosend))
|
self.streamNumber, 'sendaddr', datatosend))
|
||||||
|
|
||||||
|
@ -1941,14 +1942,14 @@ class receiveDataThread(threading.Thread):
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(datatosend)
|
self.sock.sendall(datatosend)
|
||||||
if shared.verbose >= 1:
|
if shared.verbose >= 1:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.'
|
print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.'
|
||||||
shared.printLock.release()
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# We have received a version message
|
# We have received a version message
|
||||||
def recversion(self, data):
|
def recversion(self, data):
|
||||||
|
@ -1959,9 +1960,9 @@ class receiveDataThread(threading.Thread):
|
||||||
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
||||||
if self.remoteProtocolVersion <= 1:
|
if self.remoteProtocolVersion <= 1:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Closing connection to old protocol version 1 node: ', self.HOST
|
print 'Closing connection to old protocol version 1 node: ', self.HOST
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
# print 'remoteProtocolVersion', self.remoteProtocolVersion
|
# print 'remoteProtocolVersion', self.remoteProtocolVersion
|
||||||
self.myExternalIP = socket.inet_ntoa(data[40:44])
|
self.myExternalIP = socket.inet_ntoa(data[40:44])
|
||||||
|
@ -1978,14 +1979,14 @@ class receiveDataThread(threading.Thread):
|
||||||
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
||||||
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
|
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
|
||||||
data[readPosition:])
|
data[readPosition:])
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
||||||
shared.printLock.release()
|
|
||||||
if self.streamNumber != 1:
|
if self.streamNumber != 1:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber, '.'
|
print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber, '.'
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.HOST] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
self.HOST] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
||||||
|
@ -1996,9 +1997,9 @@ class receiveDataThread(threading.Thread):
|
||||||
0, 'setStreamNumber', (self.HOST, self.streamNumber)))
|
0, 'setStreamNumber', (self.HOST, self.streamNumber)))
|
||||||
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Closing connection to myself: ', self.HOST
|
print 'Closing connection to myself: ', self.HOST
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
shared.broadcastToSendDataQueues((0, 'setRemoteProtocolVersion', (
|
shared.broadcastToSendDataQueues((0, 'setRemoteProtocolVersion', (
|
||||||
self.HOST, self.remoteProtocolVersion)))
|
self.HOST, self.remoteProtocolVersion)))
|
||||||
|
@ -2017,31 +2018,31 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
# Sends a version message
|
# Sends a version message
|
||||||
def sendversion(self):
|
def sendversion(self):
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Sending version message'
|
print 'Sending version message'
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(shared.assembleVersionMessage(
|
self.sock.sendall(shared.assembleVersionMessage(
|
||||||
self.HOST, self.PORT, self.streamNumber))
|
self.HOST, self.PORT, self.streamNumber))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# Sends a verack message
|
# Sends a verack message
|
||||||
def sendverack(self):
|
def sendverack(self):
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Sending verack'
|
print 'Sending verack'
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(
|
self.sock.sendall(
|
||||||
'\xE9\xBE\xB4\xD9\x76\x65\x72\x61\x63\x6B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
'\xE9\xBE\xB4\xD9\x76\x65\x72\x61\x63\x6B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sock.sendall error:', err
|
print 'sock.sendall error:', err
|
||||||
shared.printLock.release()
|
|
||||||
# cf
|
# cf
|
||||||
# 83
|
# 83
|
||||||
# e1
|
# e1
|
||||||
|
|
|
@ -18,9 +18,9 @@ class sendDataThread(threading.Thread):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.mailbox = Queue.Queue()
|
self.mailbox = Queue.Queue()
|
||||||
shared.sendDataQueues.append(self.mailbox)
|
shared.sendDataQueues.append(self.mailbox)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
||||||
shared.printLock.release()
|
|
||||||
self.data = ''
|
self.data = ''
|
||||||
|
|
||||||
def setup(
|
def setup(
|
||||||
|
@ -39,48 +39,48 @@ class sendDataThread(threading.Thread):
|
||||||
self.lastTimeISentData = int(
|
self.lastTimeISentData = int(
|
||||||
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber
|
print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
def sendVersionMessage(self):
|
def sendVersionMessage(self):
|
||||||
datatosend = shared.assembleVersionMessage(
|
datatosend = shared.assembleVersionMessage(
|
||||||
self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
|
self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber.
|
||||||
|
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Sending version packet: ', repr(datatosend)
|
print 'Sending version packet: ', repr(datatosend)
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(datatosend)
|
self.sock.sendall(datatosend)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
# if not 'Bad file descriptor' in err:
|
# if not 'Bad file descriptor' in err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write('sock.sendall error: %s\n' % err)
|
sys.stderr.write('sock.sendall error: %s\n' % err)
|
||||||
shared.printLock.release()
|
|
||||||
self.versionSent = 1
|
self.versionSent = 1
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
deststream, command, data = self.mailbox.get()
|
deststream, command, data = self.mailbox.get()
|
||||||
# shared.printLock.acquire()
|
# with shared.printLock:
|
||||||
# print 'sendDataThread, destream:', deststream, ', Command:', command, ', ID:',id(self), ', HOST:', self.HOST
|
# print 'sendDataThread, destream:', deststream, ', Command:', command, ', ID:',id(self), ', HOST:', self.HOST
|
||||||
# shared.printLock.release()
|
#
|
||||||
|
|
||||||
if deststream == self.streamNumber or deststream == 0:
|
if deststream == self.streamNumber or deststream == 0:
|
||||||
if command == 'shutdown':
|
if command == 'shutdown':
|
||||||
if data == self.HOST or data == 'all':
|
if data == self.HOST or data == 'all':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sendDataThread (associated with', self.HOST, ') ID:', id(self), 'shutting down now.'
|
print 'sendDataThread (associated with', self.HOST, ') ID:', id(self), 'shutting down now.'
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
self.sock.shutdown(socket.SHUT_RDWR)
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.mailbox)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'len of sendDataQueues', len(shared.sendDataQueues)
|
print 'len of sendDataQueues', len(shared.sendDataQueues)
|
||||||
shared.printLock.release()
|
|
||||||
break
|
break
|
||||||
# When you receive an incoming connection, a sendDataThread is
|
# When you receive an incoming connection, a sendDataThread is
|
||||||
# created even though you don't yet know what stream number the
|
# created even though you don't yet know what stream number the
|
||||||
|
@ -91,16 +91,16 @@ class sendDataThread(threading.Thread):
|
||||||
elif command == 'setStreamNumber':
|
elif command == 'setStreamNumber':
|
||||||
hostInMessage, specifiedStreamNumber = data
|
hostInMessage, specifiedStreamNumber = data
|
||||||
if hostInMessage == self.HOST:
|
if hostInMessage == self.HOST:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', specifiedStreamNumber
|
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', specifiedStreamNumber
|
||||||
shared.printLock.release()
|
|
||||||
self.streamNumber = specifiedStreamNumber
|
self.streamNumber = specifiedStreamNumber
|
||||||
elif command == 'setRemoteProtocolVersion':
|
elif command == 'setRemoteProtocolVersion':
|
||||||
hostInMessage, specifiedRemoteProtocolVersion = data
|
hostInMessage, specifiedRemoteProtocolVersion = data
|
||||||
if hostInMessage == self.HOST:
|
if hostInMessage == self.HOST:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
||||||
shared.printLock.release()
|
|
||||||
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
||||||
elif command == 'sendaddr':
|
elif command == 'sendaddr':
|
||||||
try:
|
try:
|
||||||
|
@ -150,9 +150,9 @@ class sendDataThread(threading.Thread):
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
||||||
if self.lastTimeISentData < (int(time.time()) - 298):
|
if self.lastTimeISentData < (int(time.time()) - 298):
|
||||||
# Send out a pong message to keep the connection alive.
|
# Send out a pong message to keep the connection alive.
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Sending pong to', self.HOST, 'to keep connection alive.'
|
print 'Sending pong to', self.HOST, 'to keep connection alive.'
|
||||||
shared.printLock.release()
|
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(
|
self.sock.sendall(
|
||||||
'\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
'\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
||||||
|
@ -168,7 +168,7 @@ class sendDataThread(threading.Thread):
|
||||||
print 'sendDataThread thread', self, 'ending now. Was connected to', self.HOST
|
print 'sendDataThread thread', self, 'ending now. Was connected to', self.HOST
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
|
|
|
@ -78,11 +78,11 @@ class singleCleaner(threading.Thread):
|
||||||
queryreturn = shared.sqlReturnQueue.get()
|
queryreturn = shared.sqlReturnQueue.get()
|
||||||
for row in queryreturn:
|
for row in queryreturn:
|
||||||
if len(row) < 5:
|
if len(row) < 5:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
|
'Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row))
|
||||||
time.sleep(3)
|
time.sleep(3)
|
||||||
shared.printLock.release()
|
|
||||||
break
|
break
|
||||||
toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber = row
|
toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber = row
|
||||||
if status == 'awaitingpubkey':
|
if status == 'awaitingpubkey':
|
||||||
|
|
|
@ -27,9 +27,9 @@ class singleListener(threading.Thread):
|
||||||
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
||||||
time.sleep(300)
|
time.sleep(300)
|
||||||
|
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Listening for incoming connections.'
|
print 'Listening for incoming connections.'
|
||||||
shared.printLock.release()
|
|
||||||
HOST = '' # Symbolic name meaning all available interfaces
|
HOST = '' # Symbolic name meaning all available interfaces
|
||||||
PORT = shared.config.getint('bitmessagesettings', 'port')
|
PORT = shared.config.getint('bitmessagesettings', 'port')
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
@ -46,9 +46,9 @@ class singleListener(threading.Thread):
|
||||||
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS':
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
while len(shared.connectedHostsList) > 220:
|
while len(shared.connectedHostsList) > 220:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.'
|
print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.'
|
||||||
shared.printLock.release()
|
|
||||||
time.sleep(10)
|
time.sleep(10)
|
||||||
a, (HOST, PORT) = sock.accept()
|
a, (HOST, PORT) = sock.accept()
|
||||||
|
|
||||||
|
@ -57,9 +57,9 @@ class singleListener(threading.Thread):
|
||||||
# because the two computers will share the same external IP. This
|
# because the two computers will share the same external IP. This
|
||||||
# is here to prevent connection flooding.
|
# is here to prevent connection flooding.
|
||||||
while HOST in shared.connectedHostsList:
|
while HOST in shared.connectedHostsList:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'We are already connected to', HOST + '. Ignoring connection.'
|
print 'We are already connected to', HOST + '. Ignoring connection.'
|
||||||
shared.printLock.release()
|
|
||||||
a.close()
|
a.close()
|
||||||
a, (HOST, PORT) = sock.accept()
|
a, (HOST, PORT) = sock.accept()
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
||||||
|
@ -76,6 +76,6 @@ class singleListener(threading.Thread):
|
||||||
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
||||||
rd.start()
|
rd.start()
|
||||||
|
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print self, 'connected to', HOST, 'during INCOMING request.'
|
print self, 'connected to', HOST, 'during INCOMING request.'
|
||||||
shared.printLock.release()
|
|
||||||
|
|
|
@ -88,14 +88,14 @@ class singleWorker(threading.Thread):
|
||||||
shared.sqlLock.release()
|
shared.sqlLock.release()
|
||||||
self.sendMsg()
|
self.sendMsg()
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
|
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
|
||||||
shared.printLock.release()"""
|
"""
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)
|
'Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command)
|
||||||
shared.printLock.release()
|
|
||||||
shared.workerQueue.task_done()
|
shared.workerQueue.task_done()
|
||||||
|
|
||||||
def doPOWForMyV2Pubkey(self, hash): # This function also broadcasts out the pubkey message once it is done with the POW
|
def doPOWForMyV2Pubkey(self, hash): # This function also broadcasts out the pubkey message once it is done with the POW
|
||||||
|
@ -123,10 +123,10 @@ class singleWorker(threading.Thread):
|
||||||
privEncryptionKeyBase58 = shared.config.get(
|
privEncryptionKeyBase58 = shared.config.get(
|
||||||
myAddress, 'privencryptionkey')
|
myAddress, 'privencryptionkey')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
'Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
privSigningKeyHex = shared.decodeWalletImportFormat(
|
privSigningKeyHex = shared.decodeWalletImportFormat(
|
||||||
|
@ -162,9 +162,9 @@ class singleWorker(threading.Thread):
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime)
|
objectType, streamNumber, payload, embeddedTime)
|
||||||
|
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'sendinv', inventoryHash))
|
||||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
|
@ -190,10 +190,10 @@ class singleWorker(threading.Thread):
|
||||||
privEncryptionKeyBase58 = shared.config.get(
|
privEncryptionKeyBase58 = shared.config.get(
|
||||||
myAddress, 'privencryptionkey')
|
myAddress, 'privencryptionkey')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'Error within doPOWForMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
'Error within doPOWForMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err)
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
privSigningKeyHex = shared.decodeWalletImportFormat(
|
privSigningKeyHex = shared.decodeWalletImportFormat(
|
||||||
|
@ -238,9 +238,9 @@ class singleWorker(threading.Thread):
|
||||||
shared.inventory[inventoryHash] = (
|
shared.inventory[inventoryHash] = (
|
||||||
objectType, streamNumber, payload, embeddedTime)
|
objectType, streamNumber, payload, embeddedTime)
|
||||||
|
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
print 'broadcasting inv with hash:', inventoryHash.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
shared.broadcastToSendDataQueues((
|
shared.broadcastToSendDataQueues((
|
||||||
streamNumber, 'sendinv', inventoryHash))
|
streamNumber, 'sendinv', inventoryHash))
|
||||||
shared.UISignalQueue.put(('updateStatusBar', ''))
|
shared.UISignalQueue.put(('updateStatusBar', ''))
|
||||||
|
@ -423,10 +423,10 @@ class singleWorker(threading.Thread):
|
||||||
shared.sqlSubmitQueue.put('commit')
|
shared.sqlSubmitQueue.put('commit')
|
||||||
shared.sqlLock.release()
|
shared.sqlLock.release()
|
||||||
else:
|
else:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'Error: In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version.\n')
|
'Error: In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version.\n')
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
def sendMsg(self):
|
def sendMsg(self):
|
||||||
# Check to see if there are any messages queued to be sent
|
# Check to see if there are any messages queued to be sent
|
||||||
|
@ -507,10 +507,10 @@ class singleWorker(threading.Thread):
|
||||||
if queryreturn == [] and toripe not in shared.neededPubkeys:
|
if queryreturn == [] and toripe not in shared.neededPubkeys:
|
||||||
# We no longer have the needed pubkey and we haven't requested
|
# We no longer have the needed pubkey and we haven't requested
|
||||||
# it.
|
# it.
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex'))
|
'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex'))
|
||||||
shared.printLock.release()
|
|
||||||
t = (toaddress,)
|
t = (toaddress,)
|
||||||
shared.sqlLock.acquire()
|
shared.sqlLock.acquire()
|
||||||
shared.sqlSubmitQueue.put(
|
shared.sqlSubmitQueue.put(
|
||||||
|
@ -530,10 +530,10 @@ class singleWorker(threading.Thread):
|
||||||
fromaddress)
|
fromaddress)
|
||||||
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
|
shared.UISignalQueue.put(('updateSentItemStatusByAckdata', (
|
||||||
ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key"))))
|
ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key"))))
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Found a message in our database that needs to be sent with this pubkey.'
|
print 'Found a message in our database that needs to be sent with this pubkey.'
|
||||||
print 'First 150 characters of message:', repr(message[:150])
|
print 'First 150 characters of message:', repr(message[:150])
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
# mark the pubkey as 'usedpersonally' so that we don't ever delete
|
# mark the pubkey as 'usedpersonally' so that we don't ever delete
|
||||||
# it.
|
# it.
|
||||||
|
@ -553,10 +553,10 @@ class singleWorker(threading.Thread):
|
||||||
queryreturn = shared.sqlReturnQueue.get()
|
queryreturn = shared.sqlReturnQueue.get()
|
||||||
shared.sqlLock.release()
|
shared.sqlLock.release()
|
||||||
if queryreturn == []:
|
if queryreturn == []:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'(within sendMsg) The needed pubkey was not found. This should never happen. Aborting send.\n')
|
'(within sendMsg) The needed pubkey was not found. This should never happen. Aborting send.\n')
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
for row in queryreturn:
|
for row in queryreturn:
|
||||||
pubkeyPayload, = row
|
pubkeyPayload, = row
|
||||||
|
@ -746,19 +746,19 @@ class singleWorker(threading.Thread):
|
||||||
continue
|
continue
|
||||||
encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted
|
encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted
|
||||||
target = 2**64 / ((len(encryptedPayload)+requiredPayloadLengthExtraBytes+8) * requiredAverageProofOfWorkNonceTrialsPerByte)
|
target = 2**64 / ((len(encryptedPayload)+requiredPayloadLengthExtraBytes+8) * requiredAverageProofOfWorkNonceTrialsPerByte)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(For msg message) Doing proof of work. Total required difficulty:', float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte, 'Required small message difficulty:', float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes
|
print '(For msg message) Doing proof of work. Total required difficulty:', float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte, 'Required small message difficulty:', float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes
|
||||||
shared.printLock.release()
|
|
||||||
powStartTime = time.time()
|
powStartTime = time.time()
|
||||||
initialHash = hashlib.sha512(encryptedPayload).digest()
|
initialHash = hashlib.sha512(encryptedPayload).digest()
|
||||||
trialValue, nonce = proofofwork.run(target, initialHash)
|
trialValue, nonce = proofofwork.run(target, initialHash)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(For msg message) Found proof of work', trialValue, 'Nonce:', nonce
|
print '(For msg message) Found proof of work', trialValue, 'Nonce:', nonce
|
||||||
try:
|
try:
|
||||||
print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.'
|
print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.'
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.printLock.release()
|
|
||||||
encryptedPayload = pack('>Q', nonce) + encryptedPayload
|
encryptedPayload = pack('>Q', nonce) + encryptedPayload
|
||||||
|
|
||||||
inventoryHash = calculateInventoryHash(encryptedPayload)
|
inventoryHash = calculateInventoryHash(encryptedPayload)
|
||||||
|
@ -785,10 +785,10 @@ class singleWorker(threading.Thread):
|
||||||
toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress(
|
toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress(
|
||||||
toAddress)
|
toAddress)
|
||||||
if toStatus != 'success':
|
if toStatus != 'success':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write('Very abnormal error occurred in requestPubKey. toAddress is: ' + repr(
|
sys.stderr.write('Very abnormal error occurred in requestPubKey. toAddress is: ' + repr(
|
||||||
toAddress) + '. Please report this error to Atheros.')
|
toAddress) + '. Please report this error to Atheros.')
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
shared.neededPubkeys[ripe] = 0
|
shared.neededPubkeys[ripe] = 0
|
||||||
payload = pack('>Q', (int(time.time()) + random.randrange(
|
payload = pack('>Q', (int(time.time()) + random.randrange(
|
||||||
|
@ -796,9 +796,9 @@ class singleWorker(threading.Thread):
|
||||||
payload += encodeVarint(addressVersionNumber)
|
payload += encodeVarint(addressVersionNumber)
|
||||||
payload += encodeVarint(streamNumber)
|
payload += encodeVarint(streamNumber)
|
||||||
payload += ripe
|
payload += ripe
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'making request for pubkey with ripe:', ripe.encode('hex')
|
print 'making request for pubkey with ripe:', ripe.encode('hex')
|
||||||
shared.printLock.release()
|
|
||||||
# print 'trial value', trialValue
|
# print 'trial value', trialValue
|
||||||
statusbar = 'Doing the computations necessary to request the recipient\'s public key.'
|
statusbar = 'Doing the computations necessary to request the recipient\'s public key.'
|
||||||
shared.UISignalQueue.put(('updateStatusBar', statusbar))
|
shared.UISignalQueue.put(('updateStatusBar', statusbar))
|
||||||
|
@ -808,9 +808,9 @@ class singleWorker(threading.Thread):
|
||||||
8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte)
|
8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte)
|
||||||
initialHash = hashlib.sha512(payload).digest()
|
initialHash = hashlib.sha512(payload).digest()
|
||||||
trialValue, nonce = proofofwork.run(target, initialHash)
|
trialValue, nonce = proofofwork.run(target, initialHash)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Found proof of work', trialValue, 'Nonce:', nonce
|
print 'Found proof of work', trialValue, 'Nonce:', nonce
|
||||||
shared.printLock.release()
|
|
||||||
|
|
||||||
payload = pack('>Q', nonce) + payload
|
payload = pack('>Q', nonce) + payload
|
||||||
inventoryHash = calculateInventoryHash(payload)
|
inventoryHash = calculateInventoryHash(payload)
|
||||||
|
@ -839,19 +839,19 @@ class singleWorker(threading.Thread):
|
||||||
payload = embeddedTime + encodeVarint(toStreamNumber) + ackdata
|
payload = embeddedTime + encodeVarint(toStreamNumber) + ackdata
|
||||||
target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes +
|
target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes +
|
||||||
8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte)
|
8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(For ack message) Doing proof of work...'
|
print '(For ack message) Doing proof of work...'
|
||||||
shared.printLock.release()
|
|
||||||
powStartTime = time.time()
|
powStartTime = time.time()
|
||||||
initialHash = hashlib.sha512(payload).digest()
|
initialHash = hashlib.sha512(payload).digest()
|
||||||
trialValue, nonce = proofofwork.run(target, initialHash)
|
trialValue, nonce = proofofwork.run(target, initialHash)
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print '(For ack message) Found proof of work', trialValue, 'Nonce:', nonce
|
print '(For ack message) Found proof of work', trialValue, 'Nonce:', nonce
|
||||||
try:
|
try:
|
||||||
print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.'
|
print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.'
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.printLock.release()
|
|
||||||
payload = pack('>Q', nonce) + payload
|
payload = pack('>Q', nonce) + payload
|
||||||
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
||||||
headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00'
|
||||||
|
|
|
@ -61,9 +61,9 @@ class sqlThread(threading.Thread):
|
||||||
print 'Created messages database file'
|
print 'Created messages database file'
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if str(err) == 'table inbox already exists':
|
if str(err) == 'table inbox already exists':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'Database file already exists.'
|
print 'Database file already exists.'
|
||||||
shared.printLock.release()
|
|
||||||
else:
|
else:
|
||||||
sys.stderr.write(
|
sys.stderr.write(
|
||||||
'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
|
'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err))
|
||||||
|
@ -225,14 +225,14 @@ class sqlThread(threading.Thread):
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
elif item == 'exit':
|
elif item == 'exit':
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'sqlThread exiting gracefully.'
|
print 'sqlThread exiting gracefully.'
|
||||||
shared.printLock.release()
|
|
||||||
return
|
return
|
||||||
elif item == 'movemessagstoprog':
|
elif item == 'movemessagstoprog':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'the sqlThread is moving the messages.dat file to the local program directory.'
|
print 'the sqlThread is moving the messages.dat file to the local program directory.'
|
||||||
shared.printLock.release()
|
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
shutil.move(
|
shutil.move(
|
||||||
|
@ -241,9 +241,9 @@ class sqlThread(threading.Thread):
|
||||||
self.conn.text_factory = str
|
self.conn.text_factory = str
|
||||||
self.cur = self.conn.cursor()
|
self.cur = self.conn.cursor()
|
||||||
elif item == 'movemessagstoappdata':
|
elif item == 'movemessagstoappdata':
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
print 'the sqlThread is moving the messages.dat file to the Appdata folder.'
|
print 'the sqlThread is moving the messages.dat file to the Appdata folder.'
|
||||||
shared.printLock.release()
|
|
||||||
self.conn.commit()
|
self.conn.commit()
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
shutil.move(
|
shutil.move(
|
||||||
|
@ -263,11 +263,11 @@ class sqlThread(threading.Thread):
|
||||||
try:
|
try:
|
||||||
self.cur.execute(item, parameters)
|
self.cur.execute(item, parameters)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
shared.printLock.acquire()
|
with shared.printLock:
|
||||||
sys.stderr.write('\nMajor error occurred when trying to execute a SQL statement within the sqlThread. Please tell Atheros about this error message or post it in the forum! Error occurred while trying to execute statement: "' + str(
|
sys.stderr.write('\nMajor error occurred when trying to execute a SQL statement within the sqlThread. Please tell Atheros about this error message or post it in the forum! Error occurred while trying to execute statement: "' + str(
|
||||||
item) + '" Here are the parameters; you might want to censor this data with asterisks (***) as it can contain private information: ' + str(repr(parameters)) + '\nHere is the actual error message thrown by the sqlThread: ' + str(err) + '\n')
|
item) + '" Here are the parameters; you might want to censor this data with asterisks (***) as it can contain private information: ' + str(repr(parameters)) + '\nHere is the actual error message thrown by the sqlThread: ' + str(err) + '\n')
|
||||||
sys.stderr.write('This program shall now abruptly exit!\n')
|
sys.stderr.write('This program shall now abruptly exit!\n')
|
||||||
shared.printLock.release()
|
|
||||||
os._exit(0)
|
os._exit(0)
|
||||||
|
|
||||||
shared.sqlReturnQueue.put(self.cur.fetchall())
|
shared.sqlReturnQueue.put(self.cur.fetchall())
|
||||||
|
|
Loading…
Reference in New Issue
Block a user