From 6facca4cb39d2fe2653b0078b95b470e6a961721 Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 15:25:31 +0800 Subject: [PATCH 01/10] Added a class for background working --- src/class_bgWorker.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 src/class_bgWorker.py diff --git a/src/class_bgWorker.py b/src/class_bgWorker.py new file mode 100644 index 00000000..9e374ee6 --- /dev/null +++ b/src/class_bgWorker.py @@ -0,0 +1,38 @@ +#! /usr/bin/python +# -*- coding: utf-8 -*- +# cody by linker.lin@me.com + +__author__ = 'linkerlin' + + +import threading +import Queue +import time + +class BGWorker(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.q = Queue.Queue() + + def post(self,job): + self.q.put(job) + + def run(self): + while 1: + job=None + try: + job = self.q.get(block=True) + if job: + job() + except Exception as ex: + print "Error,job exception:",ex.message,type(ex) + time.sleep(0.05) + else: + #print "job: ", job, " done" + pass + finally: + time.sleep(0.05) + +bgworker = BGWorker() +bgworker.setDaemon(True) +bgworker.start() From e47d35769b4e4fe5b893789c9bbc44156fe40c18 Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 15:36:34 +0800 Subject: [PATCH 02/10] renamed class BGWorker to bgWorker --- src/class_bgWorker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/class_bgWorker.py b/src/class_bgWorker.py index 9e374ee6..c163bcc0 100644 --- a/src/class_bgWorker.py +++ b/src/class_bgWorker.py @@ -9,7 +9,7 @@ import threading import Queue import time -class BGWorker(threading.Thread): +class bgWorker(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.q = Queue.Queue() @@ -33,6 +33,6 @@ class BGWorker(threading.Thread): finally: time.sleep(0.05) -bgworker = BGWorker() +bgworker = bgWorker() bgworker.setDaemon(True) bgworker.start() From 0aa7efab340cbf8f98c3c4b861ce485ff5adc968 Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 15:43:24 +0800 Subject: [PATCH 03/10] renamed class BGWorker to bgWorker --- src/bitmessagemain.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index c4c2b6db..6e88e3bd 100644 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -10,6 +10,12 @@ # The software version variable is now held in shared.py #import ctypes +try: + from gevent import monkey + monkey.patch_all() +except ImportError as ex: + print "cannot find gevent" + import signal # Used to capture a Ctrl-C keypress so that Bitmessage can shutdown gracefully. # The next 3 are used for the API from SimpleXMLRPCServer import * From 5df22b4181dea5da0d401c9b2ffd965a29d1c973 Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 16:45:03 +0800 Subject: [PATCH 04/10] Made gevent happy with PyQt. --- src/bitmessageqt/__init__.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index ce477836..49395ed9 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2927,7 +2927,22 @@ class UISignaler(QThread): else: sys.stderr.write( 'Command sent to UISignaler not recognized: %s\n' % command) +try: + import gevent +except ImportError as ex: + print "cannot find gevent" +else: + def mainloop(app): + while True: + app.processEvents() + while app.hasPendingEvents(): + app.processEvents() + gevent.sleep() + gevent.sleep() # don't appear to get here but cooperate again + def testprint(): + #print 'this is running' + gevent.spawn_later(1, testprint) def run(): app = QtGui.QApplication(sys.argv) @@ -2946,5 +2961,8 @@ def run(): myapp.appIndicatorInit(app) myapp.ubuntuMessagingMenuInit() myapp.notifierInit() - - sys.exit(app.exec_()) + if gevent is None: + sys.exit(app.exec_()) + else: + gevent.joinall([gevent.spawn(testprint), gevent.spawn(mainloop, app)]) + print 'done' From 3eea6d6a884b26d40fe8f3badd7fdff9bbd02361 Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 17:28:17 +0800 Subject: [PATCH 05/10] Removed a blank line. --- src/bitmessageqt/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 49395ed9..d0b71155 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2939,7 +2939,6 @@ else: app.processEvents() gevent.sleep() gevent.sleep() # don't appear to get here but cooperate again - def testprint(): #print 'this is running' gevent.spawn_later(1, testprint) From e8eaf65f07a7cdc5cce52acafe19d98214f46c4a Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 17:48:32 +0800 Subject: [PATCH 06/10] Sleep more , save more. --- src/bitmessageqt/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index d0b71155..64cc8419 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2938,7 +2938,7 @@ else: while app.hasPendingEvents(): app.processEvents() gevent.sleep() - gevent.sleep() # don't appear to get here but cooperate again + gevent.sleep(0.01) # don't appear to get here but cooperate again def testprint(): #print 'this is running' gevent.spawn_later(1, testprint) From 9fa90ccc3fc937884a3cf659b5534b9d891fc280 Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 17:50:43 +0800 Subject: [PATCH 07/10] Sleep more , save more. --- src/bitmessageqt/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 64cc8419..cc65a602 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2937,7 +2937,7 @@ else: app.processEvents() while app.hasPendingEvents(): app.processEvents() - gevent.sleep() + gevent.sleep(0.01) gevent.sleep(0.01) # don't appear to get here but cooperate again def testprint(): #print 'this is running' From 284b3a24f7e3b88afee218de3cddecdab925b7e5 Mon Sep 17 00:00:00 2001 From: "miao.lin" Date: Fri, 28 Jun 2013 18:22:10 +0800 Subject: [PATCH 08/10] Put setDaemon inside init. --- src/class_bgWorker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/class_bgWorker.py b/src/class_bgWorker.py index c163bcc0..c01e26ef 100644 --- a/src/class_bgWorker.py +++ b/src/class_bgWorker.py @@ -13,6 +13,7 @@ class bgWorker(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.q = Queue.Queue() + self.setDaemon(True) def post(self,job): self.q.put(job) @@ -34,5 +35,4 @@ class bgWorker(threading.Thread): time.sleep(0.05) bgworker = bgWorker() -bgworker.setDaemon(True) bgworker.start() From 80e5adad8ce3924bfa3df247a2623314e2b21785 Mon Sep 17 00:00:00 2001 From: linkerlin Date: Fri, 28 Jun 2013 22:26:31 +0800 Subject: [PATCH 09/10] Made it compatible with gevent 1.0dev version. --- src/bitmessageqt/__init__.py | 102 +++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 46 deletions(-) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index cc65a602..9d8c7e59 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -2875,58 +2875,68 @@ class myTableWidgetItem(QTableWidgetItem): def __lt__(self, other): return int(self.data(33).toPyObject()) < int(other.data(33).toPyObject()) - -class UISignaler(QThread): +from threading import Thread +class UISignaler(Thread,QThread): def __init__(self, parent=None): + Thread.__init__(self, parent) QThread.__init__(self, parent) def run(self): while True: - command, data = shared.UISignalQueue.get() - if command == 'writeNewAddressToTable': - label, address, streamNumber = data - self.emit(SIGNAL( - "writeNewAddressToTable(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), label, address, str(streamNumber)) - elif command == 'updateStatusBar': - self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"), data) - elif command == 'updateSentItemStatusByHash': - hash, message = data - self.emit(SIGNAL( - "updateSentItemStatusByHash(PyQt_PyObject,PyQt_PyObject)"), hash, message) - elif command == 'updateSentItemStatusByAckdata': - ackData, message = data - self.emit(SIGNAL( - "updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"), ackData, message) - elif command == 'displayNewInboxMessage': - inventoryHash, toAddress, fromAddress, subject, body = data - self.emit(SIGNAL( - "displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), - inventoryHash, toAddress, fromAddress, subject, body) - elif command == 'displayNewSentMessage': - toAddress, fromLabel, fromAddress, subject, message, ackdata = data - self.emit(SIGNAL( - "displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), - toAddress, fromLabel, fromAddress, subject, message, ackdata) - elif command == 'updateNetworkStatusTab': - self.emit(SIGNAL("updateNetworkStatusTab()")) - elif command == 'incrementNumberOfMessagesProcessed': - self.emit(SIGNAL("incrementNumberOfMessagesProcessed()")) - elif command == 'incrementNumberOfPubkeysProcessed': - self.emit(SIGNAL("incrementNumberOfPubkeysProcessed()")) - elif command == 'incrementNumberOfBroadcastsProcessed': - self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()")) - elif command == 'setStatusIcon': - self.emit(SIGNAL("setStatusIcon(PyQt_PyObject)"), data) - elif command == 'rerenderInboxFromLabels': - self.emit(SIGNAL("rerenderInboxFromLabels()")) - elif command == 'rerenderSubscriptions': - self.emit(SIGNAL("rerenderSubscriptions()")) - elif command == 'removeInboxRowByMsgid': - self.emit(SIGNAL("removeInboxRowByMsgid(PyQt_PyObject)"), data) - else: - sys.stderr.write( - 'Command sent to UISignaler not recognized: %s\n' % command) + try: + command, data = shared.UISignalQueue.get() + if command == 'writeNewAddressToTable': + label, address, streamNumber = data + self.emit(SIGNAL( + "writeNewAddressToTable(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), label, address, str(streamNumber)) + elif command == 'updateStatusBar': + self.emit(SIGNAL("updateStatusBar(PyQt_PyObject)"), data) + elif command == 'updateSentItemStatusByHash': + hash, message = data + self.emit(SIGNAL( + "updateSentItemStatusByHash(PyQt_PyObject,PyQt_PyObject)"), hash, message) + elif command == 'updateSentItemStatusByAckdata': + ackData, message = data + self.emit(SIGNAL( + "updateSentItemStatusByAckdata(PyQt_PyObject,PyQt_PyObject)"), ackData, message) + elif command == 'displayNewInboxMessage': + inventoryHash, toAddress, fromAddress, subject, body = data + self.emit(SIGNAL( + "displayNewInboxMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), + inventoryHash, toAddress, fromAddress, subject, body) + elif command == 'displayNewSentMessage': + toAddress, fromLabel, fromAddress, subject, message, ackdata = data + self.emit(SIGNAL( + "displayNewSentMessage(PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject,PyQt_PyObject)"), + toAddress, fromLabel, fromAddress, subject, message, ackdata) + elif command == 'updateNetworkStatusTab': + self.emit(SIGNAL("updateNetworkStatusTab()")) + elif command == 'incrementNumberOfMessagesProcessed': + self.emit(SIGNAL("incrementNumberOfMessagesProcessed()")) + elif command == 'incrementNumberOfPubkeysProcessed': + self.emit(SIGNAL("incrementNumberOfPubkeysProcessed()")) + elif command == 'incrementNumberOfBroadcastsProcessed': + self.emit(SIGNAL("incrementNumberOfBroadcastsProcessed()")) + elif command == 'setStatusIcon': + self.emit(SIGNAL("setStatusIcon(PyQt_PyObject)"), data) + elif command == 'rerenderInboxFromLabels': + self.emit(SIGNAL("rerenderInboxFromLabels()")) + elif command == 'rerenderSubscriptions': + self.emit(SIGNAL("rerenderSubscriptions()")) + elif command == 'removeInboxRowByMsgid': + self.emit(SIGNAL("removeInboxRowByMsgid(PyQt_PyObject)"), data) + else: + sys.stderr.write( + 'Command sent to UISignaler not recognized: %s\n' % command) + except Exception,ex: + # uncaught exception will block gevent + import traceback + traceback.print_exc() + traceback.print_stack() + print ex + pass + try: import gevent except ImportError as ex: From 4a84a30fc6509686422a2b491b79164367d8e321 Mon Sep 17 00:00:00 2001 From: Linker Lin Date: Sun, 30 Jun 2013 01:29:35 +0800 Subject: [PATCH 10/10] replace acquire lock by 'with' statement --- src/bitmessagemain.py | 36 +-- src/bitmessageqt/__init__.py | 14 +- src/class_outgoingSynSender.py | 48 +-- src/class_receiveDataThread.py | 571 +++++++++++++++++---------------- src/class_sendDataThread.py | 66 ++-- src/class_singleCleaner.py | 8 +- src/class_singleListener.py | 24 +- src/class_singleWorker.py | 134 ++++---- src/class_sqlThread.py | 34 +- 9 files changed, 468 insertions(+), 467 deletions(-) diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 6e88e3bd..1b7dc6ff 100644 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -465,9 +465,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): status, addressVersionNumber, streamNumber, toRipe = decodeAddress( toAddress) if status != 'success': - shared.printLock.acquire() - print 'API Error 0007: Could not decode address:', toAddress, ':', status - shared.printLock.release() + with shared.printLock: + print 'API Error 0007: Could not decode address:', toAddress, ':', status + if status == 'checksumfailed': return 'API Error 0008: Checksum failed for address: ' + toAddress if status == 'invalidcharacters': @@ -482,9 +482,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): status, addressVersionNumber, streamNumber, fromRipe = decodeAddress( fromAddress) if status != 'success': - shared.printLock.acquire() - print 'API Error 0007: Could not decode address:', fromAddress, ':', status - shared.printLock.release() + with shared.printLock: + print 'API Error 0007: Could not decode address:', fromAddress, ':', status + if status == 'checksumfailed': return 'API Error 0008: Checksum failed for address: ' + fromAddress if status == 'invalidcharacters': @@ -547,9 +547,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): status, addressVersionNumber, streamNumber, fromRipe = decodeAddress( fromAddress) if status != 'success': - shared.printLock.acquire() - print 'API Error 0007: Could not decode address:', fromAddress, ':', status - shared.printLock.release() + with shared.printLock: + print 'API Error 0007: Could not decode address:', fromAddress, ':', status + if status == 'checksumfailed': return 'API Error 0008: Checksum failed for address: ' + fromAddress if status == 'invalidcharacters': @@ -618,9 +618,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler): status, addressVersionNumber, streamNumber, toRipe = decodeAddress( address) if status != 'success': - shared.printLock.acquire() - print 'API Error 0007: Could not decode address:', address, ':', status - shared.printLock.release() + with shared.printLock: + print 'API Error 0007: Could not decode address:', address, ':', status + if status == 'checksumfailed': return 'API Error 0008: Checksum failed for address: ' + address if status == 'invalidcharacters': @@ -747,9 +747,9 @@ if __name__ == "__main__": except: apiNotifyPath = '' if apiNotifyPath != '': - shared.printLock.acquire() - print 'Trying to call', apiNotifyPath - shared.printLock.release() + with shared.printLock: + print 'Trying to call', apiNotifyPath + call([apiNotifyPath, "startingUp"]) singleAPIThread = singleAPI() singleAPIThread.daemon = True # close the main program even if there are threads left @@ -780,9 +780,9 @@ if __name__ == "__main__": import bitmessageqt bitmessageqt.run() else: - shared.printLock.acquire() - print 'Running as a daemon. You can use Ctrl+C to exit.' - shared.printLock.release() + with shared.printLock: + print 'Running as a daemon. You can use Ctrl+C to exit.' + while True: time.sleep(20) diff --git a/src/bitmessageqt/__init__.py b/src/bitmessageqt/__init__.py index 9d8c7e59..4eefdbf2 100644 --- a/src/bitmessageqt/__init__.py +++ b/src/bitmessageqt/__init__.py @@ -1288,9 +1288,9 @@ class MyForm(QtGui.QMainWindow): status, addressVersionNumber, streamNumber, ripe = decodeAddress( toAddress) if status != 'success': - shared.printLock.acquire() - print 'Error: Could not decode', toAddress, ':', status - shared.printLock.release() + with shared.printLock: + print 'Error: Could not decode', toAddress, ':', status + if status == 'missingbm': self.statusBar().showMessage(_translate( "MainWindow", "Error: Bitmessage addresses start with BM- Please check %1").arg(toAddress)) @@ -2621,9 +2621,9 @@ class MyForm(QtGui.QMainWindow): def updateStatusBar(self, data): if data != "": - shared.printLock.acquire() - print 'Status bar:', data - shared.printLock.release() + with shared.printLock: + print 'Status bar:', data + self.statusBar().showMessage(data) @@ -2973,5 +2973,5 @@ def run(): if gevent is None: sys.exit(app.exec_()) else: - gevent.joinall([gevent.spawn(testprint), gevent.spawn(mainloop, app)]) + gevent.joinall([gevent.spawn(testprint), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app)]) print 'done' diff --git a/src/class_outgoingSynSender.py b/src/class_outgoingSynSender.py index d547b3e3..2c4766f4 100644 --- a/src/class_outgoingSynSender.py +++ b/src/class_outgoingSynSender.py @@ -61,15 +61,15 @@ class outgoingSynSender(threading.Thread): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(20) if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2: - shared.printLock.acquire() - print 'Trying an outgoing connection to', HOST, ':', PORT - shared.printLock.release() + with shared.printLock: + print 'Trying an outgoing connection to', HOST, ':', PORT + # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a': if shared.verbose >= 2: - shared.printLock.acquire() - print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT - shared.printLock.release() + with shared.printLock: + print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT + proxytype = socks.PROXY_TYPE_SOCKS4 sockshostname = shared.config.get( 'bitmessagesettings', 'sockshostname') @@ -88,9 +88,9 @@ class outgoingSynSender(threading.Thread): proxytype, sockshostname, socksport, rdns) elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5': if shared.verbose >= 2: - shared.printLock.acquire() - print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT - shared.printLock.release() + with shared.printLock: + print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT + proxytype = socks.PROXY_TYPE_SOCKS5 sockshostname = shared.config.get( 'bitmessagesettings', 'sockshostname') @@ -116,9 +116,9 @@ class outgoingSynSender(threading.Thread): rd.setup(sock, HOST, PORT, self.streamNumber, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) rd.start() - shared.printLock.acquire() - print self, 'connected to', HOST, 'during an outgoing attempt.' - shared.printLock.release() + with shared.printLock: + print self, 'connected to', HOST, 'during an outgoing attempt.' + sd = sendDataThread() sd.setup(sock, HOST, PORT, self.streamNumber, @@ -128,18 +128,18 @@ class outgoingSynSender(threading.Thread): except socks.GeneralProxyError as err: if shared.verbose >= 2: - shared.printLock.acquire() - print 'Could NOT connect to', HOST, 'during outgoing attempt.', err - shared.printLock.release() + with shared.printLock: + print 'Could NOT connect to', HOST, 'during outgoing attempt.', err + PORT, timeLastSeen = shared.knownNodes[ self.streamNumber][HOST] if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure. shared.knownNodesLock.acquire() del shared.knownNodes[self.streamNumber][HOST] shared.knownNodesLock.release() - shared.printLock.acquire() - print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.' - shared.printLock.release() + with shared.printLock: + print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.' + except socks.Socks5AuthError as err: shared.UISignalQueue.put(( 'updateStatusBar', tr.translateText( @@ -154,18 +154,18 @@ class outgoingSynSender(threading.Thread): print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err) else: if shared.verbose >= 1: - shared.printLock.acquire() - print 'Could NOT connect to', HOST, 'during outgoing attempt.', err - shared.printLock.release() + with shared.printLock: + print 'Could NOT connect to', HOST, 'during outgoing attempt.', err + PORT, timeLastSeen = shared.knownNodes[ self.streamNumber][HOST] if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure. shared.knownNodesLock.acquire() del shared.knownNodes[self.streamNumber][HOST] shared.knownNodesLock.release() - shared.printLock.acquire() - print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' - shared.printLock.release() + with shared.printLock: + print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.' + except Exception as err: sys.stderr.write( 'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ') diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index e5293fe8..5abc7b43 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -61,67 +61,67 @@ class receiveDataThread(threading.Thread): self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware def run(self): - shared.printLock.acquire() - print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList) - shared.printLock.release() + with shared.printLock: + print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList) + while True: try: self.data += self.sock.recv(4096) except socket.timeout: - shared.printLock.acquire() - print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')' - shared.printLock.release() + with shared.printLock: + print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')' + break except Exception as err: - shared.printLock.acquire() - print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err - shared.printLock.release() + with shared.printLock: + print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err + break # print 'Received', repr(self.data) if self.data == "": - shared.printLock.acquire() - print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')' - shared.printLock.release() + with shared.printLock: + print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')' + break else: self.processData() try: del self.selfInitiatedConnections[self.streamNumber][self] - shared.printLock.acquire() - print 'removed self (a receiveDataThread) from selfInitiatedConnections' - shared.printLock.release() + with shared.printLock: + print 'removed self (a receiveDataThread) from selfInitiatedConnections' + except: pass shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) try: del shared.connectedHostsList[self.HOST] except Exception as err: - shared.printLock.acquire() - print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err - shared.printLock.release() + with shared.printLock: + print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err + try: del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ self.HOST] except: pass shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) - shared.printLock.acquire() - print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList) - shared.printLock.release() + with shared.printLock: + print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList) + def processData(self): # if shared.verbose >= 3: - # shared.printLock.acquire() - # print 'self.data is currently ', repr(self.data) - # shared.printLock.release() + # with shared.printLock: + # print 'self.data is currently ', repr(self.data) + # if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length return if self.data[0:4] != '\xe9\xbe\xb4\xd9': if shared.verbose >= 1: - shared.printLock.acquire() - print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40]) - shared.printLock.release() + with shared.printLock: + print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40]) + self.data = "" return self.payloadLength, = unpack('>L', self.data[16:20]) @@ -142,9 +142,9 @@ class receiveDataThread(threading.Thread): shared.knownNodesLock.release() if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.) remoteCommand = self.data[4:16] - shared.printLock.acquire() - print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST - shared.printLock.release() + with shared.printLock: + print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST + if remoteCommand == 'version\x00\x00\x00\x00\x00': self.recversion(self.data[24:self.payloadLength + 24]) elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00': @@ -178,16 +178,16 @@ class receiveDataThread(threading.Thread): objectHash, = random.sample( self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 1) if objectHash in shared.inventory: - shared.printLock.acquire() - print 'Inventory (in memory) already has object listed in inv message.' - shared.printLock.release() + with shared.printLock: + print 'Inventory (in memory) already has object listed in inv message.' + del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[ objectHash] elif shared.isInSqlInventory(objectHash): if shared.verbose >= 3: - shared.printLock.acquire() - print 'Inventory (SQL on disk) already has object listed in inv message.' - shared.printLock.release() + with shared.printLock: + print 'Inventory (SQL on disk) already has object listed in inv message.' + del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[ objectHash] else: @@ -195,9 +195,9 @@ class receiveDataThread(threading.Thread): del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[ objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway. if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0: - shared.printLock.acquire() - print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) - shared.printLock.release() + with shared.printLock: + print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) + try: del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. @@ -205,18 +205,18 @@ class receiveDataThread(threading.Thread): pass break if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0: - shared.printLock.acquire() - print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) - shared.printLock.release() + with shared.printLock: + print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) + try: del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[ self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. except: pass if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0: - shared.printLock.acquire() - print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) - shared.printLock.release() + with shared.printLock: + print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) + shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len( self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. if len(self.ackDataThatWeHaveYetToSend) > 0: @@ -244,9 +244,9 @@ class receiveDataThread(threading.Thread): '\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35') except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - print 'sock.sendall error:', err - shared.printLock.release() + with shared.printLock: + print 'sock.sendall error:', err + def recverack(self): print 'verack received' @@ -264,19 +264,19 @@ class receiveDataThread(threading.Thread): shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) remoteNodeIncomingPort, remoteNodeSeenTime = shared.knownNodes[ self.streamNumber][self.HOST] - shared.printLock.acquire() - print 'Connection fully established with', self.HOST, remoteNodeIncomingPort - print 'The size of the connectedHostsList is now', len(shared.connectedHostsList) - print 'The length of sendDataQueues is now:', len(shared.sendDataQueues) - print 'broadcasting addr from within connectionFullyEstablished function.' - shared.printLock.release() + with shared.printLock: + print 'Connection fully established with', self.HOST, remoteNodeIncomingPort + print 'The size of the connectedHostsList is now', len(shared.connectedHostsList) + print 'The length of sendDataQueues is now:', len(shared.sendDataQueues) + print 'broadcasting addr from within connectionFullyEstablished function.' + self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST, remoteNodeIncomingPort)]) # This lets all of our peers know about this new node. self.sendaddr() # This is one large addr message to this one peer. if not self.initiatedConnection and len(shared.connectedHostsList) > 200: - shared.printLock.acquire() - print 'We are connected to too many people. Closing connection.' - shared.printLock.release() + with shared.printLock: + print 'We are connected to too many people. Closing connection.' + shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) return self.sendBigInv() @@ -328,16 +328,16 @@ class receiveDataThread(threading.Thread): headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00' headerData += pack('>L', len(payload)) headerData += hashlib.sha512(payload).digest()[:4] - shared.printLock.acquire() - print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer' - shared.printLock.release() + with shared.printLock: + print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer' + try: self.sock.sendall(headerData + payload) except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - print 'sock.sendall error:', err - shared.printLock.release() + with shared.printLock: + print 'sock.sendall error:', err + # We have received a broadcast message def recbroadcast(self, data): @@ -416,13 +416,13 @@ class receiveDataThread(threading.Thread): sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \ (time.time() - self.messageProcessingStartTime) if sleepTime > 0 and doTimingAttackMitigation: - shared.printLock.acquire() - print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.' + time.sleep(sleepTime) - shared.printLock.acquire() - print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.' + # A broadcast message has a valid time and POW and requires processing. # The recbroadcast function calls this one. @@ -459,9 +459,9 @@ class receiveDataThread(threading.Thread): sendersHash = data[readPosition:readPosition + 20] if sendersHash not in shared.broadcastSendersForWhichImWatching: # Display timing data - shared.printLock.acquire() - print 'Time spent deciding that we are not interested in this v1 broadcast:', time.time() - self.messageProcessingStartTime - shared.printLock.release() + with shared.printLock: + print 'Time spent deciding that we are not interested in this v1 broadcast:', time.time() - self.messageProcessingStartTime + return # At this point, this message claims to be from sendersHash and # we are interested in it. We still have to hash the public key @@ -524,9 +524,9 @@ class receiveDataThread(threading.Thread): fromAddress = encodeAddress( sendersAddressVersion, sendersStream, ripe.digest()) - shared.printLock.acquire() - print 'fromAddress:', fromAddress - shared.printLock.release() + with shared.printLock: + print 'fromAddress:', fromAddress + if messageEncodingType == 2: bodyPositionIndex = string.find(message, '\nBody:') if bodyPositionIndex > 1: @@ -567,9 +567,9 @@ class receiveDataThread(threading.Thread): call([apiNotifyPath, "newBroadcast"]) # Display timing data - shared.printLock.acquire() - print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime - shared.printLock.release() + with shared.printLock: + print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime + if broadcastVersion == 2: cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint( data[readPosition:readPosition + 10]) @@ -587,9 +587,9 @@ class receiveDataThread(threading.Thread): # print 'cryptorObject.decrypt Exception:', err if not initialDecryptionSuccessful: # This is not a broadcast I am interested in. - shared.printLock.acquire() - print 'Length of time program spent failing to decrypt this v2 broadcast:', time.time() - self.messageProcessingStartTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Length of time program spent failing to decrypt this v2 broadcast:', time.time() - self.messageProcessingStartTime, 'seconds.' + return # At this point this is a broadcast I have decrypted and thus am # interested in. @@ -680,9 +680,9 @@ class receiveDataThread(threading.Thread): fromAddress = encodeAddress( sendersAddressVersion, sendersStream, ripe.digest()) - shared.printLock.acquire() - print 'fromAddress:', fromAddress - shared.printLock.release() + with shared.printLock: + print 'fromAddress:', fromAddress + if messageEncodingType == 2: bodyPositionIndex = string.find(message, '\nBody:') if bodyPositionIndex > 1: @@ -723,9 +723,9 @@ class receiveDataThread(threading.Thread): call([apiNotifyPath, "newBroadcast"]) # Display timing data - shared.printLock.acquire() - print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime - shared.printLock.release() + with shared.printLock: + print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime + # We have received a msg message. def recmsg(self, data): @@ -795,13 +795,13 @@ class receiveDataThread(threading.Thread): sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \ (time.time() - self.messageProcessingStartTime) if sleepTime > 0 and doTimingAttackMitigation: - shared.printLock.acquire() - print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.' + time.sleep(sleepTime) - shared.printLock.acquire() - print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.' + # A msg message has a valid time and POW and requires processing. The # recmsg function calls this one. @@ -809,9 +809,9 @@ class receiveDataThread(threading.Thread): initialDecryptionSuccessful = False # Let's check whether this is a message acknowledgement bound for us. if encryptedData[readPosition:] in shared.ackdataForWhichImWatching: - shared.printLock.acquire() - print 'This msg IS an acknowledgement bound for me.' - shared.printLock.release() + with shared.printLock: + print 'This msg IS an acknowledgement bound for me.' + del shared.ackdataForWhichImWatching[encryptedData[readPosition:]] t = ('ackreceived', encryptedData[readPosition:]) shared.sqlLock.acquire() @@ -825,10 +825,10 @@ class receiveDataThread(threading.Thread): time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8'))))) return else: - shared.printLock.acquire() - print 'This was NOT an acknowledgement bound for me.' + with shared.printLock: + print 'This was NOT an acknowledgement bound for me.' # print 'shared.ackdataForWhichImWatching', shared.ackdataForWhichImWatching - shared.printLock.release() + # This is not an acknowledgement bound for me. See if it is a message # bound for me by trying to decrypt it with my private keys. @@ -838,16 +838,17 @@ class receiveDataThread(threading.Thread): encryptedData[readPosition:]) toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data. initialDecryptionSuccessful = True - print 'EC decryption successful using key associated with ripe hash:', key.encode('hex') + with shared.printLock: + print 'EC decryption successful using key associated with ripe hash:', key.encode('hex') break except Exception as err: pass # print 'cryptorObject.decrypt Exception:', err if not initialDecryptionSuccessful: # This is not a message bound for me. - shared.printLock.acquire() - print 'Length of time program spent failing to decrypt this message:', time.time() - self.messageProcessingStartTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Length of time program spent failing to decrypt this message:', time.time() - self.messageProcessingStartTime, 'seconds.' + else: # This is a message bound for me. toAddress = shared.myAddressesByHash[ @@ -896,12 +897,12 @@ class receiveDataThread(threading.Thread): print 'sender\'s requiredPayloadLengthExtraBytes is', requiredPayloadLengthExtraBytes endOfThePublicKeyPosition = readPosition # needed for when we store the pubkey in our database of pubkeys for later use. if toRipe != decryptedData[readPosition:readPosition + 20]: - shared.printLock.acquire() - print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.' - print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html' - print 'your toRipe:', toRipe.encode('hex') - print 'embedded destination toRipe:', decryptedData[readPosition:readPosition + 20].encode('hex') - shared.printLock.release() + with shared.printLock: + print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.' + print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html' + print 'your toRipe:', toRipe.encode('hex') + print 'embedded destination toRipe:', decryptedData[readPosition:readPosition + 20].encode('hex') + return readPosition += 20 messageEncodingType, messageEncodingTypeLength = decodeVarint( @@ -932,9 +933,9 @@ class receiveDataThread(threading.Thread): except Exception as err: print 'ECDSA verify failed', err return - shared.printLock.acquire() - print 'As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person:', helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), ' ..and here is the testnet address:', helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey), '. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.' - shared.printLock.release() + with shared.printLock: + print 'As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person:', helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), ' ..and here is the testnet address:', helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey), '. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.' + # calculate the fromRipe. sha = hashlib.new('sha512') sha.update(pubSigningKey + pubEncryptionKey) @@ -980,9 +981,9 @@ class receiveDataThread(threading.Thread): queryreturn = shared.sqlReturnQueue.get() shared.sqlLock.release() if queryreturn != []: - shared.printLock.acquire() - print 'Message ignored because address is in blacklist.' - shared.printLock.release() + with shared.printLock: + print 'Message ignored because address is in blacklist.' + blockMessage = True else: # We're using a whitelist t = (fromAddress,) @@ -1081,10 +1082,10 @@ class receiveDataThread(threading.Thread): sum = 0 for item in shared.successfullyDecryptMessageTimings: sum += item - shared.printLock.acquire() - print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage - print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings) - shared.printLock.release() + with shared.printLock: + print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage + print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings) + def isAckDataValid(self, ackData): if len(ackData) < 24: @@ -1124,9 +1125,9 @@ class receiveDataThread(threading.Thread): shared.sqlLock.release() shared.workerQueue.put(('sendmessage', '')) else: - shared.printLock.acquire() - print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') + # We have received a pubkey def recpubkey(self, data): @@ -1150,14 +1151,14 @@ class receiveDataThread(threading.Thread): readPosition += 4 if embeddedTime < int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys: - shared.printLock.acquire() - print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime - shared.printLock.release() + with shared.printLock: + print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime + return if embeddedTime > int(time.time()) + 10800: - shared.printLock.acquire() - print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.' - shared.printLock.release() + with shared.printLock: + print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.' + return addressVersion, varintLength = decodeVarint( data[readPosition:readPosition + 10]) @@ -1193,13 +1194,13 @@ class receiveDataThread(threading.Thread): sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \ (time.time() - self.pubkeyProcessingStartTime) if sleepTime > 0 and doTimingAttackMitigation: - shared.printLock.acquire() - print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.' + time.sleep(sleepTime) - shared.printLock.acquire() - print 'Total pubkey processing time:', time.time() - self.pubkeyProcessingStartTime, 'seconds.' - shared.printLock.release() + with shared.printLock: + print 'Total pubkey processing time:', time.time() - self.pubkeyProcessingStartTime, 'seconds.' + def processpubkey(self, data): readPosition = 8 # for the nonce @@ -1223,9 +1224,9 @@ class receiveDataThread(threading.Thread): print '(Within processpubkey) addressVersion of 0 doesn\'t make sense.' return if addressVersion >= 4 or addressVersion == 1: - shared.printLock.acquire() - print 'This version of Bitmessage cannot handle version', addressVersion, 'addresses.' - shared.printLock.release() + with shared.printLock: + print 'This version of Bitmessage cannot handle version', addressVersion, 'addresses.' + return if addressVersion == 2: if len(data) < 146: # sanity check. This is the minimum possible length. @@ -1249,12 +1250,12 @@ class receiveDataThread(threading.Thread): ripeHasher.update(sha.digest()) ripe = ripeHasher.digest() - shared.printLock.acquire() - print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber - print 'ripe', ripe.encode('hex') - print 'publicSigningKey in hex:', publicSigningKey.encode('hex') - print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber + print 'ripe', ripe.encode('hex') + print 'publicSigningKey in hex:', publicSigningKey.encode('hex') + print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') + t = (ripe,) shared.sqlLock.acquire() @@ -1318,12 +1319,12 @@ class receiveDataThread(threading.Thread): ripeHasher.update(sha.digest()) ripe = ripeHasher.digest() - shared.printLock.acquire() - print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber - print 'ripe', ripe.encode('hex') - print 'publicSigningKey in hex:', publicSigningKey.encode('hex') - print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber + print 'ripe', ripe.encode('hex') + print 'publicSigningKey in hex:', publicSigningKey.encode('hex') + print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex') + t = (ripe,) shared.sqlLock.acquire() @@ -1420,10 +1421,10 @@ class receiveDataThread(threading.Thread): if requestedHash in shared.myAddressesByHash: # if this address hash is one of mine if decodeAddress(shared.myAddressesByHash[requestedHash])[1] != requestedAddressVersionNumber: - shared.printLock.acquire() - sys.stderr.write( - '(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. That shouldn\'t have happened. Ignoring.\n') - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + '(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. That shouldn\'t have happened. Ignoring.\n') + return try: lastPubkeySendTime = int(shared.config.get( @@ -1431,9 +1432,9 @@ class receiveDataThread(threading.Thread): except: lastPubkeySendTime = 0 if lastPubkeySendTime < time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was at least 28 days ago... - shared.printLock.acquire() - print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.' - shared.printLock.release() + with shared.printLock: + print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.' + if requestedAddressVersionNumber == 2: shared.workerQueue.put(( 'doPOWForMyV2Pubkey', requestedHash)) @@ -1441,13 +1442,13 @@ class receiveDataThread(threading.Thread): shared.workerQueue.put(( 'doPOWForMyV3Pubkey', requestedHash)) else: - shared.printLock.acquire() - print 'Found getpubkey-requested-hash in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is:', lastPubkeySendTime - shared.printLock.release() + with shared.printLock: + print 'Found getpubkey-requested-hash in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is:', lastPubkeySendTime + else: - shared.printLock.acquire() - print 'This getpubkey request is not for any of my keys.' - shared.printLock.release() + with shared.printLock: + print 'This getpubkey request is not for any of my keys.' + # We have received an inv message def recinv(self, data): @@ -1455,10 +1456,10 @@ class receiveDataThread(threading.Thread): if len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0: for key, value in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items(): totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value - shared.printLock.acquire() - print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) - print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave - shared.printLock.release() + with shared.printLock: + print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) + print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave + numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10]) if numberOfItemsInInv > 50000: sys.stderr.write('Too many items in inv message!') @@ -1468,16 +1469,16 @@ class receiveDataThread(threading.Thread): return if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object. if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation - shared.printLock.acquire() - print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.' - shared.printLock.release() + with shared.printLock: + print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.' + return self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[ data[lengthOfVarint:32 + lengthOfVarint]] = 0 if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory: - shared.printLock.acquire() - print 'Inventory (in memory) has inventory item already.' - shared.printLock.release() + with shared.printLock: + print 'Inventory (in memory) has inventory item already.' + elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]): print 'Inventory (SQL on disk) has inventory item already.' else: @@ -1487,9 +1488,9 @@ class receiveDataThread(threading.Thread): for i in range(numberOfItemsInInv): # upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers. if len(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) == 32: # The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously. if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation - shared.printLock.acquire() - print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.' - shared.printLock.release() + with shared.printLock: + print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.' + break self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[data[ lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0 @@ -1501,9 +1502,9 @@ class receiveDataThread(threading.Thread): # Send a getdata message to our peer to request the object with the given # hash def sendgetdata(self, hash): - shared.printLock.acquire() - print 'sending getdata to retrieve object with hash:', hash.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'sending getdata to retrieve object with hash:', hash.encode('hex') + payload = '\x01' + hash headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. headerData += 'getdata\x00\x00\x00\x00\x00' @@ -1514,9 +1515,9 @@ class receiveDataThread(threading.Thread): self.sock.sendall(headerData + payload) except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - print 'sock.sendall error:', err - shared.printLock.release() + with shared.printLock: + print 'sock.sendall error:', err + # We have received a getdata request from our peer def recgetdata(self, data): @@ -1528,9 +1529,9 @@ class receiveDataThread(threading.Thread): for i in xrange(numberOfRequestedInventoryItems): hash = data[lengthOfVarint + ( i * 32):32 + lengthOfVarint + (i * 32)] - shared.printLock.acquire() - print 'received getdata request for item:', hash.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'received getdata request for item:', hash.encode('hex') + # print 'inventory is', shared.inventory if hash in shared.inventory: objectType, streamNumber, payload, receivedTime = shared.inventory[ @@ -1555,24 +1556,24 @@ class receiveDataThread(threading.Thread): def sendData(self, objectType, payload): headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. if objectType == 'pubkey': - shared.printLock.acquire() - print 'sending pubkey' - shared.printLock.release() + with shared.printLock: + print 'sending pubkey' + headerData += 'pubkey\x00\x00\x00\x00\x00\x00' elif objectType == 'getpubkey' or objectType == 'pubkeyrequest': - shared.printLock.acquire() - print 'sending getpubkey' - shared.printLock.release() + with shared.printLock: + print 'sending getpubkey' + headerData += 'getpubkey\x00\x00\x00' elif objectType == 'msg': - shared.printLock.acquire() - print 'sending msg' - shared.printLock.release() + with shared.printLock: + print 'sending msg' + headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00' elif objectType == 'broadcast': - shared.printLock.acquire() - print 'sending broadcast' - shared.printLock.release() + with shared.printLock: + print 'sending broadcast' + headerData += 'broadcast\x00\x00\x00' else: sys.stderr.write( @@ -1584,15 +1585,15 @@ class receiveDataThread(threading.Thread): self.sock.sendall(headerData + payload) except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - print 'sock.sendall error:', err - shared.printLock.release() + with shared.printLock: + print 'sock.sendall error:', err + # Send an inv message with just one hash to all of our peers def broadcastinv(self, hash): - shared.printLock.acquire() - print 'broadcasting inv with hash:', hash.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'broadcasting inv with hash:', hash.encode('hex') + shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash)) # We have received an addr message. @@ -1603,9 +1604,9 @@ class receiveDataThread(threading.Thread): data[:10]) if shared.verbose >= 1: - shared.printLock.acquire() - print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' - shared.printLock.release() + with shared.printLock: + print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.' + if self.remoteProtocolVersion == 1: if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: @@ -1618,25 +1619,25 @@ class receiveDataThread(threading.Thread): for i in range(0, numberOfAddressesIncluded): try: if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': - shared.printLock.acquire() - print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)]) - shared.printLock.release() + with shared.printLock: + print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)]) + continue except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. try: recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + ( 34 * i):8 + lengthOfNumberOfAddresses + (34 * i)]) except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. if recaddrStream == 0: continue @@ -1646,20 +1647,20 @@ class receiveDataThread(threading.Thread): recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + ( 34 * i):16 + lengthOfNumberOfAddresses + (34 * i)]) except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. try: recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + ( 34 * i):34 + lengthOfNumberOfAddresses + (34 * i)]) except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. # print 'Within recaddr(): IP', recaddrIP, ', Port', # recaddrPort, ', i', i @@ -1708,9 +1709,9 @@ class receiveDataThread(threading.Thread): shared.knownNodesLock.release() self.broadcastaddr( listOfAddressDetailsToBroadcastToPeers) # no longer broadcast - shared.printLock.acquire() - print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' - shared.printLock.release() + with shared.printLock: + print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' + elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times. if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0: return @@ -1722,25 +1723,25 @@ class receiveDataThread(threading.Thread): for i in range(0, numberOfAddressesIncluded): try: if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': - shared.printLock.acquire() - print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)]) - shared.printLock.release() + with shared.printLock: + print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)]) + continue except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. try: recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + ( 38 * i):12 + lengthOfNumberOfAddresses + (38 * i)]) except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. if recaddrStream == 0: continue @@ -1750,20 +1751,20 @@ class receiveDataThread(threading.Thread): recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + ( 38 * i):20 + lengthOfNumberOfAddresses + (38 * i)]) except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. try: recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + ( 38 * i):38 + lengthOfNumberOfAddresses + (38 * i)]) except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err)) + break # giving up on unpacking any more. We should still be connected however. # print 'Within recaddr(): IP', recaddrIP, ', Port', # recaddrPort, ', i', i @@ -1791,9 +1792,9 @@ class receiveDataThread(threading.Thread): shared.knownNodes[recaddrStream][hostFromAddrMessage] = ( recaddrPort, timeSomeoneElseReceivedMessageFromThisNode) shared.knownNodesLock.release() - shared.printLock.acquire() - print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream - shared.printLock.release() + with shared.printLock: + print 'added new node', hostFromAddrMessage, 'to knownNodes in stream', recaddrStream + needToWriteKnownNodesToDisk = True hostDetails = ( timeSomeoneElseReceivedMessageFromThisNode, @@ -1817,9 +1818,9 @@ class receiveDataThread(threading.Thread): output.close() shared.knownNodesLock.release() self.broadcastaddr(listOfAddressDetailsToBroadcastToPeers) - shared.printLock.acquire() - print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' - shared.printLock.release() + with shared.printLock: + print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.' + # Function runs when we want to broadcast an addr message to all of our # peers. Runs when we learn of nodes that we didn't previously know about @@ -1846,9 +1847,9 @@ class receiveDataThread(threading.Thread): datatosend = datatosend + payload if shared.verbose >= 1: - shared.printLock.acquire() - print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' - shared.printLock.release() + with shared.printLock: + print 'Broadcasting addr with', numberOfAddressesInAddrMessage, 'entries.' + shared.broadcastToSendDataQueues(( self.streamNumber, 'sendaddr', datatosend)) @@ -1938,14 +1939,14 @@ class receiveDataThread(threading.Thread): try: self.sock.sendall(datatosend) if shared.verbose >= 1: - shared.printLock.acquire() - print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.' - shared.printLock.release() + with shared.printLock: + print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.' + except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - print 'sock.sendall error:', err - shared.printLock.release() + with shared.printLock: + print 'sock.sendall error:', err + # We have received a version message def recversion(self, data): @@ -1956,9 +1957,9 @@ class receiveDataThread(threading.Thread): self.remoteProtocolVersion, = unpack('>L', data[:4]) if self.remoteProtocolVersion <= 1: shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) - shared.printLock.acquire() - print 'Closing connection to old protocol version 1 node: ', self.HOST - shared.printLock.release() + with shared.printLock: + print 'Closing connection to old protocol version 1 node: ', self.HOST + return # print 'remoteProtocolVersion', self.remoteProtocolVersion self.myExternalIP = socket.inet_ntoa(data[40:44]) @@ -1975,14 +1976,14 @@ class receiveDataThread(threading.Thread): readPosition += lengthOfNumberOfStreamsInVersionMessage self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint( data[readPosition:]) - shared.printLock.acquire() - print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber - shared.printLock.release() + with shared.printLock: + print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber + if self.streamNumber != 1: shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) - shared.printLock.acquire() - print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber, '.' - shared.printLock.release() + with shared.printLock: + print 'Closed connection to', self.HOST, 'because they are interested in stream', self.streamNumber, '.' + return shared.connectedHostsList[ self.HOST] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab. @@ -1993,9 +1994,9 @@ class receiveDataThread(threading.Thread): 0, 'setStreamNumber', (self.HOST, self.streamNumber))) if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf: shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST)) - shared.printLock.acquire() - print 'Closing connection to myself: ', self.HOST - shared.printLock.release() + with shared.printLock: + print 'Closing connection to myself: ', self.HOST + return shared.broadcastToSendDataQueues((0, 'setRemoteProtocolVersion', ( self.HOST, self.remoteProtocolVersion))) @@ -2014,31 +2015,31 @@ class receiveDataThread(threading.Thread): # Sends a version message def sendversion(self): - shared.printLock.acquire() - print 'Sending version message' - shared.printLock.release() + with shared.printLock: + print 'Sending version message' + try: self.sock.sendall(shared.assembleVersionMessage( self.HOST, self.PORT, self.streamNumber)) except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - print 'sock.sendall error:', err - shared.printLock.release() + with shared.printLock: + print 'sock.sendall error:', err + # Sends a verack message def sendverack(self): - shared.printLock.acquire() - print 'Sending verack' - shared.printLock.release() + with shared.printLock: + print 'Sending verack' + try: self.sock.sendall( '\xE9\xBE\xB4\xD9\x76\x65\x72\x61\x63\x6B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35') except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - print 'sock.sendall error:', err - shared.printLock.release() + with shared.printLock: + print 'sock.sendall error:', err + # cf # 83 # e1 diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index c1992067..dec436e9 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -18,9 +18,9 @@ class sendDataThread(threading.Thread): threading.Thread.__init__(self) self.mailbox = Queue.Queue() shared.sendDataQueues.append(self.mailbox) - shared.printLock.acquire() - print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues) - shared.printLock.release() + with shared.printLock: + print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues) + self.data = '' def setup( @@ -39,48 +39,48 @@ class sendDataThread(threading.Thread): self.lastTimeISentData = int( time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive. self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware - shared.printLock.acquire() - print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber - shared.printLock.release() + with shared.printLock: + print 'The streamNumber of this sendDataThread (ID:', str(id(self)) + ') at setup() is', self.streamNumber + def sendVersionMessage(self): datatosend = shared.assembleVersionMessage( self.HOST, self.PORT, self.streamNumber) # the IP and port of the remote host, and my streamNumber. - shared.printLock.acquire() - print 'Sending version packet: ', repr(datatosend) - shared.printLock.release() + with shared.printLock: + print 'Sending version packet: ', repr(datatosend) + try: self.sock.sendall(datatosend) except Exception as err: # if not 'Bad file descriptor' in err: - shared.printLock.acquire() - sys.stderr.write('sock.sendall error: %s\n' % err) - shared.printLock.release() + with shared.printLock: + sys.stderr.write('sock.sendall error: %s\n' % err) + self.versionSent = 1 def run(self): while True: deststream, command, data = self.mailbox.get() - # shared.printLock.acquire() - # print 'sendDataThread, destream:', deststream, ', Command:', command, ', ID:',id(self), ', HOST:', self.HOST - # shared.printLock.release() + # with shared.printLock: + # print 'sendDataThread, destream:', deststream, ', Command:', command, ', ID:',id(self), ', HOST:', self.HOST + # if deststream == self.streamNumber or deststream == 0: if command == 'shutdown': if data == self.HOST or data == 'all': - shared.printLock.acquire() - print 'sendDataThread (associated with', self.HOST, ') ID:', id(self), 'shutting down now.' - shared.printLock.release() + with shared.printLock: + print 'sendDataThread (associated with', self.HOST, ') ID:', id(self), 'shutting down now.' + try: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() except: pass shared.sendDataQueues.remove(self.mailbox) - shared.printLock.acquire() - print 'len of sendDataQueues', len(shared.sendDataQueues) - shared.printLock.release() + with shared.printLock: + print 'len of sendDataQueues', len(shared.sendDataQueues) + break # When you receive an incoming connection, a sendDataThread is # created even though you don't yet know what stream number the @@ -91,16 +91,16 @@ class sendDataThread(threading.Thread): elif command == 'setStreamNumber': hostInMessage, specifiedStreamNumber = data if hostInMessage == self.HOST: - shared.printLock.acquire() - print 'setting the stream number in the sendData thread (ID:', id(self), ') to', specifiedStreamNumber - shared.printLock.release() + with shared.printLock: + print 'setting the stream number in the sendData thread (ID:', id(self), ') to', specifiedStreamNumber + self.streamNumber = specifiedStreamNumber elif command == 'setRemoteProtocolVersion': hostInMessage, specifiedRemoteProtocolVersion = data if hostInMessage == self.HOST: - shared.printLock.acquire() - print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion - shared.printLock.release() + with shared.printLock: + print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion + self.remoteProtocolVersion = specifiedRemoteProtocolVersion elif command == 'sendaddr': try: @@ -150,9 +150,9 @@ class sendDataThread(threading.Thread): self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. if self.lastTimeISentData < (int(time.time()) - 298): # Send out a pong message to keep the connection alive. - shared.printLock.acquire() - print 'Sending pong to', self.HOST, 'to keep connection alive.' - shared.printLock.release() + with shared.printLock: + print 'Sending pong to', self.HOST, 'to keep connection alive.' + try: self.sock.sendall( '\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35') @@ -168,7 +168,7 @@ class sendDataThread(threading.Thread): print 'sendDataThread thread', self, 'ending now. Was connected to', self.HOST break else: - shared.printLock.acquire() - print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream - shared.printLock.release() + with shared.printLock: + print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream + diff --git a/src/class_singleCleaner.py b/src/class_singleCleaner.py index 5b77fdd4..6fed68a5 100644 --- a/src/class_singleCleaner.py +++ b/src/class_singleCleaner.py @@ -78,11 +78,11 @@ class singleCleaner(threading.Thread): queryreturn = shared.sqlReturnQueue.get() for row in queryreturn: if len(row) < 5: - shared.printLock.acquire() - sys.stderr.write( - 'Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row)) + with shared.printLock: + sys.stderr.write( + 'Something went wrong in the singleCleaner thread: a query did not return the requested fields. ' + repr(row)) time.sleep(3) - shared.printLock.release() + break toaddress, toripe, fromaddress, subject, message, ackdata, lastactiontime, status, pubkeyretrynumber, msgretrynumber = row if status == 'awaitingpubkey': diff --git a/src/class_singleListener.py b/src/class_singleListener.py index fff351bf..58bddf6f 100644 --- a/src/class_singleListener.py +++ b/src/class_singleListener.py @@ -27,9 +27,9 @@ class singleListener(threading.Thread): while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': time.sleep(300) - shared.printLock.acquire() - print 'Listening for incoming connections.' - shared.printLock.release() + with shared.printLock: + print 'Listening for incoming connections.' + HOST = '' # Symbolic name meaning all available interfaces PORT = shared.config.getint('bitmessagesettings', 'port') sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -46,9 +46,9 @@ class singleListener(threading.Thread): while shared.config.get('bitmessagesettings', 'socksproxytype')[0:5] == 'SOCKS': time.sleep(10) while len(shared.connectedHostsList) > 220: - shared.printLock.acquire() - print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.' - shared.printLock.release() + with shared.printLock: + print 'We are connected to too many people. Not accepting further incoming connections for ten seconds.' + time.sleep(10) a, (HOST, PORT) = sock.accept() @@ -57,9 +57,9 @@ class singleListener(threading.Thread): # because the two computers will share the same external IP. This # is here to prevent connection flooding. while HOST in shared.connectedHostsList: - shared.printLock.acquire() - print 'We are already connected to', HOST + '. Ignoring connection.' - shared.printLock.release() + with shared.printLock: + print 'We are already connected to', HOST + '. Ignoring connection.' + a.close() a, (HOST, PORT) = sock.accept() someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory. @@ -76,6 +76,6 @@ class singleListener(threading.Thread): a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections) rd.start() - shared.printLock.acquire() - print self, 'connected to', HOST, 'during INCOMING request.' - shared.printLock.release() + with shared.printLock: + print self, 'connected to', HOST, 'during INCOMING request.' + diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 13cceb9e..1a0fe149 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -88,14 +88,14 @@ class singleWorker(threading.Thread): shared.sqlLock.release() self.sendMsg() else: - shared.printLock.acquire() - print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') - shared.printLock.release()""" + with shared.printLock: + print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex') + """ else: - shared.printLock.acquire() - sys.stderr.write( - 'Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'Probable programming error: The command sent to the workerThread is weird. It is: %s\n' % command) + shared.workerQueue.task_done() def doPOWForMyV2Pubkey(self, hash): # This function also broadcasts out the pubkey message once it is done with the POW @@ -123,10 +123,10 @@ class singleWorker(threading.Thread): privEncryptionKeyBase58 = shared.config.get( myAddress, 'privencryptionkey') except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'Error within doPOWForMyV2Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err) + return privSigningKeyHex = shared.decodeWalletImportFormat( @@ -162,9 +162,9 @@ class singleWorker(threading.Thread): shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime) - shared.printLock.acquire() - print 'broadcasting inv with hash:', inventoryHash.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'broadcasting inv with hash:', inventoryHash.encode('hex') + shared.broadcastToSendDataQueues(( streamNumber, 'sendinv', inventoryHash)) shared.UISignalQueue.put(('updateStatusBar', '')) @@ -190,10 +190,10 @@ class singleWorker(threading.Thread): privEncryptionKeyBase58 = shared.config.get( myAddress, 'privencryptionkey') except Exception as err: - shared.printLock.acquire() - sys.stderr.write( - 'Error within doPOWForMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'Error within doPOWForMyV3Pubkey. Could not read the keys from the keys.dat file for a requested address. %s\n' % err) + return privSigningKeyHex = shared.decodeWalletImportFormat( @@ -238,9 +238,9 @@ class singleWorker(threading.Thread): shared.inventory[inventoryHash] = ( objectType, streamNumber, payload, embeddedTime) - shared.printLock.acquire() - print 'broadcasting inv with hash:', inventoryHash.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'broadcasting inv with hash:', inventoryHash.encode('hex') + shared.broadcastToSendDataQueues(( streamNumber, 'sendinv', inventoryHash)) shared.UISignalQueue.put(('updateStatusBar', '')) @@ -423,10 +423,10 @@ class singleWorker(threading.Thread): shared.sqlSubmitQueue.put('commit') shared.sqlLock.release() else: - shared.printLock.acquire() - sys.stderr.write( - 'Error: In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version.\n') - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'Error: In the singleWorker thread, the sendBroadcast function doesn\'t understand the address version.\n') + def sendMsg(self): # Check to see if there are any messages queued to be sent @@ -507,10 +507,10 @@ class singleWorker(threading.Thread): if queryreturn == [] and toripe not in shared.neededPubkeys: # We no longer have the needed pubkey and we haven't requested # it. - shared.printLock.acquire() - sys.stderr.write( - 'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex')) - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + 'For some reason, the status of a message in our outbox is \'doingmsgpow\' even though we lack the pubkey. Here is the RIPE hash of the needed pubkey: %s\n' % toripe.encode('hex')) + t = (toaddress,) shared.sqlLock.acquire() shared.sqlSubmitQueue.put( @@ -530,10 +530,10 @@ class singleWorker(threading.Thread): fromaddress) shared.UISignalQueue.put(('updateSentItemStatusByAckdata', ( ackdata, tr.translateText("MainWindow", "Looking up the receiver\'s public key")))) - shared.printLock.acquire() - print 'Found a message in our database that needs to be sent with this pubkey.' - print 'First 150 characters of message:', repr(message[:150]) - shared.printLock.release() + with shared.printLock: + print 'Found a message in our database that needs to be sent with this pubkey.' + print 'First 150 characters of message:', repr(message[:150]) + # mark the pubkey as 'usedpersonally' so that we don't ever delete # it. @@ -553,10 +553,10 @@ class singleWorker(threading.Thread): queryreturn = shared.sqlReturnQueue.get() shared.sqlLock.release() if queryreturn == []: - shared.printLock.acquire() - sys.stderr.write( - '(within sendMsg) The needed pubkey was not found. This should never happen. Aborting send.\n') - shared.printLock.release() + with shared.printLock: + sys.stderr.write( + '(within sendMsg) The needed pubkey was not found. This should never happen. Aborting send.\n') + return for row in queryreturn: pubkeyPayload, = row @@ -746,19 +746,19 @@ class singleWorker(threading.Thread): continue encryptedPayload = embeddedTime + encodeVarint(toStreamNumber) + encrypted target = 2**64 / ((len(encryptedPayload)+requiredPayloadLengthExtraBytes+8) * requiredAverageProofOfWorkNonceTrialsPerByte) - shared.printLock.acquire() - print '(For msg message) Doing proof of work. Total required difficulty:', float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte, 'Required small message difficulty:', float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes - shared.printLock.release() + with shared.printLock: + print '(For msg message) Doing proof of work. Total required difficulty:', float(requiredAverageProofOfWorkNonceTrialsPerByte) / shared.networkDefaultProofOfWorkNonceTrialsPerByte, 'Required small message difficulty:', float(requiredPayloadLengthExtraBytes) / shared.networkDefaultPayloadLengthExtraBytes + powStartTime = time.time() initialHash = hashlib.sha512(encryptedPayload).digest() trialValue, nonce = proofofwork.run(target, initialHash) - shared.printLock.acquire() - print '(For msg message) Found proof of work', trialValue, 'Nonce:', nonce - try: - print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.' - except: - pass - shared.printLock.release() + with shared.printLock: + print '(For msg message) Found proof of work', trialValue, 'Nonce:', nonce + try: + print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.' + except: + pass + encryptedPayload = pack('>Q', nonce) + encryptedPayload inventoryHash = calculateInventoryHash(encryptedPayload) @@ -785,10 +785,10 @@ class singleWorker(threading.Thread): toStatus, addressVersionNumber, streamNumber, ripe = decodeAddress( toAddress) if toStatus != 'success': - shared.printLock.acquire() - sys.stderr.write('Very abnormal error occurred in requestPubKey. toAddress is: ' + repr( - toAddress) + '. Please report this error to Atheros.') - shared.printLock.release() + with shared.printLock: + sys.stderr.write('Very abnormal error occurred in requestPubKey. toAddress is: ' + repr( + toAddress) + '. Please report this error to Atheros.') + return shared.neededPubkeys[ripe] = 0 payload = pack('>Q', (int(time.time()) + random.randrange( @@ -796,9 +796,9 @@ class singleWorker(threading.Thread): payload += encodeVarint(addressVersionNumber) payload += encodeVarint(streamNumber) payload += ripe - shared.printLock.acquire() - print 'making request for pubkey with ripe:', ripe.encode('hex') - shared.printLock.release() + with shared.printLock: + print 'making request for pubkey with ripe:', ripe.encode('hex') + # print 'trial value', trialValue statusbar = 'Doing the computations necessary to request the recipient\'s public key.' shared.UISignalQueue.put(('updateStatusBar', statusbar)) @@ -808,9 +808,9 @@ class singleWorker(threading.Thread): 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) - shared.printLock.acquire() - print 'Found proof of work', trialValue, 'Nonce:', nonce - shared.printLock.release() + with shared.printLock: + print 'Found proof of work', trialValue, 'Nonce:', nonce + payload = pack('>Q', nonce) + payload inventoryHash = calculateInventoryHash(payload) @@ -839,19 +839,19 @@ class singleWorker(threading.Thread): payload = embeddedTime + encodeVarint(toStreamNumber) + ackdata target = 2 ** 64 / ((len(payload) + shared.networkDefaultPayloadLengthExtraBytes + 8) * shared.networkDefaultProofOfWorkNonceTrialsPerByte) - shared.printLock.acquire() - print '(For ack message) Doing proof of work...' - shared.printLock.release() + with shared.printLock: + print '(For ack message) Doing proof of work...' + powStartTime = time.time() initialHash = hashlib.sha512(payload).digest() trialValue, nonce = proofofwork.run(target, initialHash) - shared.printLock.acquire() - print '(For ack message) Found proof of work', trialValue, 'Nonce:', nonce - try: - print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.' - except: - pass - shared.printLock.release() + with shared.printLock: + print '(For ack message) Found proof of work', trialValue, 'Nonce:', nonce + try: + print 'POW took', int(time.time() - powStartTime), 'seconds.', nonce / (time.time() - powStartTime), 'nonce trials per second.' + except: + pass + payload = pack('>Q', nonce) + payload headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits. headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00' diff --git a/src/class_sqlThread.py b/src/class_sqlThread.py index ebe6a7ff..84014a8c 100644 --- a/src/class_sqlThread.py +++ b/src/class_sqlThread.py @@ -61,9 +61,9 @@ class sqlThread(threading.Thread): print 'Created messages database file' except Exception as err: if str(err) == 'table inbox already exists': - shared.printLock.acquire() - print 'Database file already exists.' - shared.printLock.release() + with shared.printLock: + print 'Database file already exists.' + else: sys.stderr.write( 'ERROR trying to create database file (message.dat). Error message: %s\n' % str(err)) @@ -225,14 +225,14 @@ class sqlThread(threading.Thread): self.conn.commit() elif item == 'exit': self.conn.close() - shared.printLock.acquire() - print 'sqlThread exiting gracefully.' - shared.printLock.release() + with shared.printLock: + print 'sqlThread exiting gracefully.' + return elif item == 'movemessagstoprog': - shared.printLock.acquire() - print 'the sqlThread is moving the messages.dat file to the local program directory.' - shared.printLock.release() + with shared.printLock: + print 'the sqlThread is moving the messages.dat file to the local program directory.' + self.conn.commit() self.conn.close() shutil.move( @@ -241,9 +241,9 @@ class sqlThread(threading.Thread): self.conn.text_factory = str self.cur = self.conn.cursor() elif item == 'movemessagstoappdata': - shared.printLock.acquire() - print 'the sqlThread is moving the messages.dat file to the Appdata folder.' - shared.printLock.release() + with shared.printLock: + print 'the sqlThread is moving the messages.dat file to the Appdata folder.' + self.conn.commit() self.conn.close() shutil.move( @@ -263,11 +263,11 @@ class sqlThread(threading.Thread): try: self.cur.execute(item, parameters) except Exception as err: - shared.printLock.acquire() - sys.stderr.write('\nMajor error occurred when trying to execute a SQL statement within the sqlThread. Please tell Atheros about this error message or post it in the forum! Error occurred while trying to execute statement: "' + str( - item) + '" Here are the parameters; you might want to censor this data with asterisks (***) as it can contain private information: ' + str(repr(parameters)) + '\nHere is the actual error message thrown by the sqlThread: ' + str(err) + '\n') - sys.stderr.write('This program shall now abruptly exit!\n') - shared.printLock.release() + with shared.printLock: + sys.stderr.write('\nMajor error occurred when trying to execute a SQL statement within the sqlThread. Please tell Atheros about this error message or post it in the forum! Error occurred while trying to execute statement: "' + str( + item) + '" Here are the parameters; you might want to censor this data with asterisks (***) as it can contain private information: ' + str(repr(parameters)) + '\nHere is the actual error message thrown by the sqlThread: ' + str(err) + '\n') + sys.stderr.write('This program shall now abruptly exit!\n') + os._exit(0) shared.sqlReturnQueue.put(self.cur.fetchall())