replace acquire lock by 'with' statement

master
Linker Lin 9 years ago
parent 80e5adad8c
commit 4a84a30fc6
  1. 36
      src/bitmessagemain.py
  2. 14
      src/bitmessageqt/__init__.py
  3. 48
      src/class_outgoingSynSender.py
  4. 571
      src/class_receiveDataThread.py
  5. 66
      src/class_sendDataThread.py
  6. 8
      src/class_singleCleaner.py
  7. 24
      src/class_singleListener.py
  8. 134
      src/class_singleWorker.py
  9. 34
      src/class_sqlThread.py

@ -465,9 +465,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
status, addressVersionNumber, streamNumber, toRipe = decodeAddress(
toAddress)
if status != 'success':
shared.printLock.acquire()
print 'API Error 0007: Could not decode address:', toAddress, ':', status
shared.printLock.release()
with shared.printLock:
print 'API Error 0007: Could not decode address:', toAddress, ':', status
if status == 'checksumfailed':
return 'API Error 0008: Checksum failed for address: ' + toAddress
if status == 'invalidcharacters':
@ -482,9 +482,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
status, addressVersionNumber, streamNumber, fromRipe = decodeAddress(
fromAddress)
if status != 'success':
shared.printLock.acquire()
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
shared.printLock.release()
with shared.printLock:
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
if status == 'checksumfailed':
return 'API Error 0008: Checksum failed for address: ' + fromAddress
if status == 'invalidcharacters':
@ -547,9 +547,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
status, addressVersionNumber, streamNumber, fromRipe = decodeAddress(
fromAddress)
if status != 'success':
shared.printLock.acquire()
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
shared.printLock.release()
with shared.printLock:
print 'API Error 0007: Could not decode address:', fromAddress, ':', status
if status == 'checksumfailed':
return 'API Error 0008: Checksum failed for address: ' + fromAddress
if status == 'invalidcharacters':
@ -618,9 +618,9 @@ class MySimpleXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
status, addressVersionNumber, streamNumber, toRipe = decodeAddress(
address)
if status != 'success':
shared.printLock.acquire()
print 'API Error 0007: Could not decode address:', address, ':', status
shared.printLock.release()
with shared.printLock:
print 'API Error 0007: Could not decode address:', address, ':', status
if status == 'checksumfailed':
return 'API Error 0008: Checksum failed for address: ' + address
if status == 'invalidcharacters':
@ -747,9 +747,9 @@ if __name__ == "__main__":
except:
apiNotifyPath = ''
if apiNotifyPath != '':
shared.printLock.acquire()
print 'Trying to call', apiNotifyPath
shared.printLock.release()
with shared.printLock:
print 'Trying to call', apiNotifyPath
call([apiNotifyPath, "startingUp"])
singleAPIThread = singleAPI()
singleAPIThread.daemon = True # close the main program even if there are threads left
@ -780,9 +780,9 @@ if __name__ == "__main__":
import bitmessageqt
bitmessageqt.run()
else:
shared.printLock.acquire()
print 'Running as a daemon. You can use Ctrl+C to exit.'
shared.printLock.release()
with shared.printLock:
print 'Running as a daemon. You can use Ctrl+C to exit.'
while True:
time.sleep(20)

@ -1288,9 +1288,9 @@ class MyForm(QtGui.QMainWindow):
status, addressVersionNumber, streamNumber, ripe = decodeAddress(
toAddress)
if status != 'success':
shared.printLock.acquire()
print 'Error: Could not decode', toAddress, ':', status
shared.printLock.release()
with shared.printLock:
print 'Error: Could not decode', toAddress, ':', status
if status == 'missingbm':
self.statusBar().showMessage(_translate(
"MainWindow", "Error: Bitmessage addresses start with BM- Please check %1").arg(toAddress))
@ -2621,9 +2621,9 @@ class MyForm(QtGui.QMainWindow):
def updateStatusBar(self, data):
if data != "":
shared.printLock.acquire()
print 'Status bar:', data
shared.printLock.release()
with shared.printLock:
print 'Status bar:', data
self.statusBar().showMessage(data)
@ -2973,5 +2973,5 @@ def run():
if gevent is None:
sys.exit(app.exec_())
else:
gevent.joinall([gevent.spawn(testprint), gevent.spawn(mainloop, app)])
gevent.joinall([gevent.spawn(testprint), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app), gevent.spawn(mainloop, app)])
print 'done'

@ -61,15 +61,15 @@ class outgoingSynSender(threading.Thread):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(20)
if shared.config.get('bitmessagesettings', 'socksproxytype') == 'none' and shared.verbose >= 2:
shared.printLock.acquire()
print 'Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
with shared.printLock:
print 'Trying an outgoing connection to', HOST, ':', PORT
# sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS4a':
if shared.verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
with shared.printLock:
print '(Using SOCKS4a) Trying an outgoing connection to', HOST, ':', PORT
proxytype = socks.PROXY_TYPE_SOCKS4
sockshostname = shared.config.get(
'bitmessagesettings', 'sockshostname')
@ -88,9 +88,9 @@ class outgoingSynSender(threading.Thread):
proxytype, sockshostname, socksport, rdns)
elif shared.config.get('bitmessagesettings', 'socksproxytype') == 'SOCKS5':
if shared.verbose >= 2:
shared.printLock.acquire()
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
shared.printLock.release()
with shared.printLock:
print '(Using SOCKS5) Trying an outgoing connection to', HOST, ':', PORT
proxytype = socks.PROXY_TYPE_SOCKS5
sockshostname = shared.config.get(
'bitmessagesettings', 'sockshostname')
@ -116,9 +116,9 @@ class outgoingSynSender(threading.Thread):
rd.setup(sock, HOST, PORT, self.streamNumber,
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
rd.start()
shared.printLock.acquire()
print self, 'connected to', HOST, 'during an outgoing attempt.'
shared.printLock.release()
with shared.printLock:
print self, 'connected to', HOST, 'during an outgoing attempt.'
sd = sendDataThread()
sd.setup(sock, HOST, PORT, self.streamNumber,
@ -128,18 +128,18 @@ class outgoingSynSender(threading.Thread):
except socks.GeneralProxyError as err:
if shared.verbose >= 2:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()
with shared.printLock:
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
PORT, timeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the shared.knownNodes data-structure.
shared.knownNodesLock.acquire()
del shared.knownNodes[self.streamNumber][HOST]
shared.knownNodesLock.release()
shared.printLock.acquire()
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
shared.printLock.release()
with shared.printLock:
print 'deleting ', HOST, 'from shared.knownNodes because it is more than 48 hours old and we could not connect to it.'
except socks.Socks5AuthError as err:
shared.UISignalQueue.put((
'updateStatusBar', tr.translateText(
@ -154,18 +154,18 @@ class outgoingSynSender(threading.Thread):
print 'Bitmessage MIGHT be having trouble connecting to the SOCKS server. ' + str(err)
else:
if shared.verbose >= 1:
shared.printLock.acquire()
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
shared.printLock.release()
with shared.printLock:
print 'Could NOT connect to', HOST, 'during outgoing attempt.', err
PORT, timeLastSeen = shared.knownNodes[
self.streamNumber][HOST]
if (int(time.time()) - timeLastSeen) > 172800 and len(shared.knownNodes[self.streamNumber]) > 1000: # for nodes older than 48 hours old if we have more than 1000 hosts in our list, delete from the knownNodes data-structure.
shared.knownNodesLock.acquire()
del shared.knownNodes[self.streamNumber][HOST]
shared.knownNodesLock.release()
shared.printLock.acquire()
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
shared.printLock.release()
with shared.printLock:
print 'deleting ', HOST, 'from knownNodes because it is more than 48 hours old and we could not connect to it.'
except Exception as err:
sys.stderr.write(
'An exception has occurred in the outgoingSynSender thread that was not caught by other exception types: ')

@ -61,67 +61,67 @@ class receiveDataThread(threading.Thread):
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
def run(self):
shared.printLock.acquire()
print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
shared.printLock.release()
with shared.printLock:
print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
while True:
try:
self.data += self.sock.recv(4096)
except socket.timeout:
shared.printLock.acquire()
print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')'
shared.printLock.release()
with shared.printLock:
print 'Timeout occurred waiting for data from', self.HOST + '. Closing receiveData thread. (ID:', str(id(self)) + ')'
break
except Exception as err:
shared.printLock.acquire()
print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err
shared.printLock.release()
with shared.printLock:
print 'sock.recv error. Closing receiveData thread (HOST:', self.HOST, 'ID:', str(id(self)) + ').', err
break
# print 'Received', repr(self.data)
if self.data == "":
shared.printLock.acquire()
print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
shared.printLock.release()
with shared.printLock:
print 'Connection to', self.HOST, 'closed. Closing receiveData thread. (ID:', str(id(self)) + ')'
break
else:
self.processData()
try:
del self.selfInitiatedConnections[self.streamNumber][self]
shared.printLock.acquire()
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
shared.printLock.release()
with shared.printLock:
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
except:
pass
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
try:
del shared.connectedHostsList[self.HOST]
except Exception as err:
shared.printLock.acquire()
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
shared.printLock.release()
with shared.printLock:
print 'Could not delete', self.HOST, 'from shared.connectedHostsList.', err
try:
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
self.HOST]
except:
pass
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
shared.printLock.acquire()
print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList)
shared.printLock.release()
with shared.printLock:
print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList)
def processData(self):
# if shared.verbose >= 3:
# shared.printLock.acquire()
# print 'self.data is currently ', repr(self.data)
# shared.printLock.release()
# with shared.printLock:
# print 'self.data is currently ', repr(self.data)
#
if len(self.data) < 20: # if so little of the data has arrived that we can't even unpack the payload length
return
if self.data[0:4] != '\xe9\xbe\xb4\xd9':
if shared.verbose >= 1:
shared.printLock.acquire()
print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
shared.printLock.release()
with shared.printLock:
print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
self.data = ""
return
self.payloadLength, = unpack('>L', self.data[16:20])
@ -142,9 +142,9 @@ class receiveDataThread(threading.Thread):
shared.knownNodesLock.release()
if self.payloadLength <= 180000000: # If the size of the message is greater than 180MB, ignore it. (I get memory errors when processing messages much larger than this though it is concievable that this value will have to be lowered if some systems are less tolarant of large messages.)
remoteCommand = self.data[4:16]
shared.printLock.acquire()
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST
shared.printLock.release()
with shared.printLock:
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.HOST
if remoteCommand == 'version\x00\x00\x00\x00\x00':
self.recversion(self.data[24:self.payloadLength + 24])
elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00':
@ -178,16 +178,16 @@ class receiveDataThread(threading.Thread):
objectHash, = random.sample(
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 1)
if objectHash in shared.inventory:
shared.printLock.acquire()
print 'Inventory (in memory) already has object listed in inv message.'
shared.printLock.release()
with shared.printLock:
print 'Inventory (in memory) already has object listed in inv message.'
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
objectHash]
elif shared.isInSqlInventory(objectHash):
if shared.verbose >= 3:
shared.printLock.acquire()
print 'Inventory (SQL on disk) already has object listed in inv message.'
shared.printLock.release()
with shared.printLock:
print 'Inventory (SQL on disk) already has object listed in inv message.'
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
objectHash]
else:
@ -195,9 +195,9 @@ class receiveDataThread(threading.Thread):
del self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave[
objectHash] # It is possible that the remote node doesn't respond with the object. In that case, we'll very likely get it from someone else anyway.
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
shared.printLock.acquire()
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
with shared.printLock:
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
try:
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
@ -205,18 +205,18 @@ class receiveDataThread(threading.Thread):
pass
break
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) == 0:
shared.printLock.acquire()
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
with shared.printLock:
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
try:
del shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[
self.HOST] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
except:
pass
if len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 0:
shared.printLock.acquire()
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.printLock.release()
with shared.printLock:
print '(concerning', self.HOST + ')', 'number of objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave is now', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave)
shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer[self.HOST] = len(
self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
if len(self.ackDataThatWeHaveYetToSend) > 0:
@ -244,9 +244,9 @@ class receiveDataThread(threading.Thread):
'\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
except Exception as err:
# if not 'Bad file descriptor' in err:
shared.printLock.acquire()
print 'sock.sendall error:', err
shared.printLock.release()
with shared.printLock:
print 'sock.sendall error:', err
def recverack(self):
print 'verack received'
@ -264,19 +264,19 @@ class receiveDataThread(threading.Thread):
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
remoteNodeIncomingPort, remoteNodeSeenTime = shared.knownNodes[
self.streamNumber][self.HOST]
shared.printLock.acquire()
print 'Connection fully established with', self.HOST, remoteNodeIncomingPort
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
print 'broadcasting addr from within connectionFullyEstablished function.'
shared.printLock.release()
with shared.printLock:
print 'Connection fully established with', self.HOST, remoteNodeIncomingPort
print 'The size of the connectedHostsList is now', len(shared.connectedHostsList)
print 'The length of sendDataQueues is now:', len(shared.sendDataQueues)
print 'broadcasting addr from within connectionFullyEstablished function.'
self.broadcastaddr([(int(time.time()), self.streamNumber, 1, self.HOST,
remoteNodeIncomingPort)]) # This lets all of our peers know about this new node.
self.sendaddr() # This is one large addr message to this one peer.
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
shared.printLock.acquire()
print 'We are connected to too many people. Closing connection.'
shared.printLock.release()
with shared.printLock:
print 'We are connected to too many people. Closing connection.'
shared.broadcastToSendDataQueues((0, 'shutdown', self.HOST))
return
self.sendBigInv()
@ -328,16 +328,16 @@ class receiveDataThread(threading.Thread):
headerData += 'inv\x00\x00\x00\x00\x00\x00\x00\x00\x00'
headerData += pack('>L', len(payload))
headerData += hashlib.sha512(payload).digest()[:4]
shared.printLock.acquire()
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
shared.printLock.release()
with shared.printLock:
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
try:
self.sock.sendall(headerData + payload)
except Exception as err:
# if not 'Bad file descriptor' in err:
shared.printLock.acquire()
print 'sock.sendall error:', err
shared.printLock.release()
with shared.printLock:
print 'sock.sendall error:', err
# We have received a broadcast message
def recbroadcast(self, data):
@ -416,13 +416,13 @@ class receiveDataThread(threading.Thread):
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
(time.time() - self.messageProcessingStartTime)
if sleepTime > 0 and doTimingAttackMitigation:
shared.printLock.acquire()
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
time.sleep(sleepTime)
shared.printLock.acquire()
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
# A broadcast message has a valid time and POW and requires processing.
# The recbroadcast function calls this one.
@ -459,9 +459,9 @@ class receiveDataThread(threading.Thread):
sendersHash = data[readPosition:readPosition + 20]
if sendersHash not in shared.broadcastSendersForWhichImWatching:
# Display timing data
shared.printLock.acquire()
print 'Time spent deciding that we are not interested in this v1 broadcast:', time.time() - self.messageProcessingStartTime
shared.printLock.release()
with shared.printLock:
print 'Time spent deciding that we are not interested in this v1 broadcast:', time.time() - self.messageProcessingStartTime
return
# At this point, this message claims to be from sendersHash and
# we are interested in it. We still have to hash the public key
@ -524,9 +524,9 @@ class receiveDataThread(threading.Thread):
fromAddress = encodeAddress(
sendersAddressVersion, sendersStream, ripe.digest())
shared.printLock.acquire()
print 'fromAddress:', fromAddress
shared.printLock.release()
with shared.printLock:
print 'fromAddress:', fromAddress
if messageEncodingType == 2:
bodyPositionIndex = string.find(message, '\nBody:')
if bodyPositionIndex > 1:
@ -567,9 +567,9 @@ class receiveDataThread(threading.Thread):
call([apiNotifyPath, "newBroadcast"])
# Display timing data
shared.printLock.acquire()
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
shared.printLock.release()
with shared.printLock:
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
if broadcastVersion == 2:
cleartextStreamNumber, cleartextStreamNumberLength = decodeVarint(
data[readPosition:readPosition + 10])
@ -587,9 +587,9 @@ class receiveDataThread(threading.Thread):
# print 'cryptorObject.decrypt Exception:', err
if not initialDecryptionSuccessful:
# This is not a broadcast I am interested in.
shared.printLock.acquire()
print 'Length of time program spent failing to decrypt this v2 broadcast:', time.time() - self.messageProcessingStartTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Length of time program spent failing to decrypt this v2 broadcast:', time.time() - self.messageProcessingStartTime, 'seconds.'
return
# At this point this is a broadcast I have decrypted and thus am
# interested in.
@ -680,9 +680,9 @@ class receiveDataThread(threading.Thread):
fromAddress = encodeAddress(
sendersAddressVersion, sendersStream, ripe.digest())
shared.printLock.acquire()
print 'fromAddress:', fromAddress
shared.printLock.release()
with shared.printLock:
print 'fromAddress:', fromAddress
if messageEncodingType == 2:
bodyPositionIndex = string.find(message, '\nBody:')
if bodyPositionIndex > 1:
@ -723,9 +723,9 @@ class receiveDataThread(threading.Thread):
call([apiNotifyPath, "newBroadcast"])
# Display timing data
shared.printLock.acquire()
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
shared.printLock.release()
with shared.printLock:
print 'Time spent processing this interesting broadcast:', time.time() - self.messageProcessingStartTime
# We have received a msg message.
def recmsg(self, data):
@ -795,13 +795,13 @@ class receiveDataThread(threading.Thread):
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
(time.time() - self.messageProcessingStartTime)
if sleepTime > 0 and doTimingAttackMitigation:
shared.printLock.acquire()
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
time.sleep(sleepTime)
shared.printLock.acquire()
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Total message processing time:', time.time() - self.messageProcessingStartTime, 'seconds.'
# A msg message has a valid time and POW and requires processing. The
# recmsg function calls this one.
@ -809,9 +809,9 @@ class receiveDataThread(threading.Thread):
initialDecryptionSuccessful = False
# Let's check whether this is a message acknowledgement bound for us.
if encryptedData[readPosition:] in shared.ackdataForWhichImWatching:
shared.printLock.acquire()
print 'This msg IS an acknowledgement bound for me.'
shared.printLock.release()
with shared.printLock:
print 'This msg IS an acknowledgement bound for me.'
del shared.ackdataForWhichImWatching[encryptedData[readPosition:]]
t = ('ackreceived', encryptedData[readPosition:])
shared.sqlLock.acquire()
@ -825,10 +825,10 @@ class receiveDataThread(threading.Thread):
time.strftime(shared.config.get('bitmessagesettings', 'timeformat'), time.localtime(int(time.time()))), 'utf-8')))))
return
else:
shared.printLock.acquire()
print 'This was NOT an acknowledgement bound for me.'
with shared.printLock:
print 'This was NOT an acknowledgement bound for me.'
# print 'shared.ackdataForWhichImWatching', shared.ackdataForWhichImWatching
shared.printLock.release()
# This is not an acknowledgement bound for me. See if it is a message
# bound for me by trying to decrypt it with my private keys.
@ -838,16 +838,17 @@ class receiveDataThread(threading.Thread):
encryptedData[readPosition:])
toRipe = key # This is the RIPE hash of my pubkeys. We need this below to compare to the destination_ripe included in the encrypted data.
initialDecryptionSuccessful = True
print 'EC decryption successful using key associated with ripe hash:', key.encode('hex')
with shared.printLock:
print 'EC decryption successful using key associated with ripe hash:', key.encode('hex')
break
except Exception as err:
pass
# print 'cryptorObject.decrypt Exception:', err
if not initialDecryptionSuccessful:
# This is not a message bound for me.
shared.printLock.acquire()
print 'Length of time program spent failing to decrypt this message:', time.time() - self.messageProcessingStartTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Length of time program spent failing to decrypt this message:', time.time() - self.messageProcessingStartTime, 'seconds.'
else:
# This is a message bound for me.
toAddress = shared.myAddressesByHash[
@ -896,12 +897,12 @@ class receiveDataThread(threading.Thread):
print 'sender\'s requiredPayloadLengthExtraBytes is', requiredPayloadLengthExtraBytes
endOfThePublicKeyPosition = readPosition # needed for when we store the pubkey in our database of pubkeys for later use.
if toRipe != decryptedData[readPosition:readPosition + 20]:
shared.printLock.acquire()
print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.'
print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html'
print 'your toRipe:', toRipe.encode('hex')
print 'embedded destination toRipe:', decryptedData[readPosition:readPosition + 20].encode('hex')
shared.printLock.release()
with shared.printLock:
print 'The original sender of this message did not send it to you. Someone is attempting a Surreptitious Forwarding Attack.'
print 'See: http://world.std.com/~dtd/sign_encrypt/sign_encrypt7.html'
print 'your toRipe:', toRipe.encode('hex')
print 'embedded destination toRipe:', decryptedData[readPosition:readPosition + 20].encode('hex')
return
readPosition += 20
messageEncodingType, messageEncodingTypeLength = decodeVarint(
@ -932,9 +933,9 @@ class receiveDataThread(threading.Thread):
except Exception as err:
print 'ECDSA verify failed', err
return
shared.printLock.acquire()
print 'As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person:', helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), ' ..and here is the testnet address:', helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey), '. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.'
shared.printLock.release()
with shared.printLock:
print 'As a matter of intellectual curiosity, here is the Bitcoin address associated with the keys owned by the other person:', helper_bitcoin.calculateBitcoinAddressFromPubkey(pubSigningKey), ' ..and here is the testnet address:', helper_bitcoin.calculateTestnetAddressFromPubkey(pubSigningKey), '. The other person must take their private signing key from Bitmessage and import it into Bitcoin (or a service like Blockchain.info) for it to be of any use. Do not use this unless you know what you are doing.'
# calculate the fromRipe.
sha = hashlib.new('sha512')
sha.update(pubSigningKey + pubEncryptionKey)
@ -980,9 +981,9 @@ class receiveDataThread(threading.Thread):
queryreturn = shared.sqlReturnQueue.get()
shared.sqlLock.release()
if queryreturn != []:
shared.printLock.acquire()
print 'Message ignored because address is in blacklist.'
shared.printLock.release()
with shared.printLock:
print 'Message ignored because address is in blacklist.'
blockMessage = True
else: # We're using a whitelist
t = (fromAddress,)
@ -1081,10 +1082,10 @@ class receiveDataThread(threading.Thread):
sum = 0
for item in shared.successfullyDecryptMessageTimings:
sum += item
shared.printLock.acquire()
print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage
print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings)
shared.printLock.release()
with shared.printLock:
print 'Time to decrypt this message successfully:', timeRequiredToAttemptToDecryptMessage
print 'Average time for all message decryption successes since startup:', sum / len(shared.successfullyDecryptMessageTimings)
def isAckDataValid(self, ackData):
if len(ackData) < 24:
@ -1124,9 +1125,9 @@ class receiveDataThread(threading.Thread):
shared.sqlLock.release()
shared.workerQueue.put(('sendmessage', ''))
else:
shared.printLock.acquire()
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
shared.printLock.release()
with shared.printLock:
print 'We don\'t need this pub key. We didn\'t ask for it. Pubkey hash:', toRipe.encode('hex')
# We have received a pubkey
def recpubkey(self, data):
@ -1150,14 +1151,14 @@ class receiveDataThread(threading.Thread):
readPosition += 4
if embeddedTime < int(time.time()) - shared.lengthOfTimeToHoldOnToAllPubkeys:
shared.printLock.acquire()
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
shared.printLock.release()
with shared.printLock:
print 'The embedded time in this pubkey message is too old. Ignoring. Embedded time is:', embeddedTime
return
if embeddedTime > int(time.time()) + 10800:
shared.printLock.acquire()
print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.'
shared.printLock.release()
with shared.printLock:
print 'The embedded time in this pubkey message more than several hours in the future. This is irrational. Ignoring message.'
return
addressVersion, varintLength = decodeVarint(
data[readPosition:readPosition + 10])
@ -1193,13 +1194,13 @@ class receiveDataThread(threading.Thread):
sleepTime = lengthOfTimeWeShouldUseToProcessThisMessage - \
(time.time() - self.pubkeyProcessingStartTime)
if sleepTime > 0 and doTimingAttackMitigation:
shared.printLock.acquire()
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Timing attack mitigation: Sleeping for', sleepTime, 'seconds.'
time.sleep(sleepTime)
shared.printLock.acquire()
print 'Total pubkey processing time:', time.time() - self.pubkeyProcessingStartTime, 'seconds.'
shared.printLock.release()
with shared.printLock:
print 'Total pubkey processing time:', time.time() - self.pubkeyProcessingStartTime, 'seconds.'
def processpubkey(self, data):
readPosition = 8 # for the nonce
@ -1223,9 +1224,9 @@ class receiveDataThread(threading.Thread):
print '(Within processpubkey) addressVersion of 0 doesn\'t make sense.'
return
if addressVersion >= 4 or addressVersion == 1:
shared.printLock.acquire()
print 'This version of Bitmessage cannot handle version', addressVersion, 'addresses.'
shared.printLock.release()
with shared.printLock:
print 'This version of Bitmessage cannot handle version', addressVersion, 'addresses.'
return
if addressVersion == 2:
if len(data) < 146: # sanity check. This is the minimum possible length.
@ -1249,12 +1250,12 @@ class receiveDataThread(threading.Thread):
ripeHasher.update(sha.digest())
ripe = ripeHasher.digest()
shared.printLock.acquire()
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
print 'ripe', ripe.encode('hex')
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
shared.printLock.release()
with shared.printLock:
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
print 'ripe', ripe.encode('hex')
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
t = (ripe,)
shared.sqlLock.acquire()
@ -1318,12 +1319,12 @@ class receiveDataThread(threading.Thread):
ripeHasher.update(sha.digest())
ripe = ripeHasher.digest()
shared.printLock.acquire()
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
print 'ripe', ripe.encode('hex')
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
shared.printLock.release()
with shared.printLock:
print 'within recpubkey, addressVersion:', addressVersion, ', streamNumber:', streamNumber
print 'ripe', ripe.encode('hex')
print 'publicSigningKey in hex:', publicSigningKey.encode('hex')
print 'publicEncryptionKey in hex:', publicEncryptionKey.encode('hex')
t = (ripe,)
shared.sqlLock.acquire()
@ -1420,10 +1421,10 @@ class receiveDataThread(threading.Thread):
if requestedHash in shared.myAddressesByHash: # if this address hash is one of mine
if decodeAddress(shared.myAddressesByHash[requestedHash])[1] != requestedAddressVersionNumber:
shared.printLock.acquire()
sys.stderr.write(
'(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. That shouldn\'t have happened. Ignoring.\n')
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'(Within the recgetpubkey function) Someone requested one of my pubkeys but the requestedAddressVersionNumber doesn\'t match my actual address version number. That shouldn\'t have happened. Ignoring.\n')
return
try:
lastPubkeySendTime = int(shared.config.get(
@ -1431,9 +1432,9 @@ class receiveDataThread(threading.Thread):
except:
lastPubkeySendTime = 0
if lastPubkeySendTime < time.time() - shared.lengthOfTimeToHoldOnToAllPubkeys: # If the last time we sent our pubkey was at least 28 days ago...
shared.printLock.acquire()
print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.'
shared.printLock.release()
with shared.printLock:
print 'Found getpubkey-requested-hash in my list of EC hashes. Telling Worker thread to do the POW for a pubkey message and send it out.'
if requestedAddressVersionNumber == 2:
shared.workerQueue.put((
'doPOWForMyV2Pubkey', requestedHash))
@ -1441,13 +1442,13 @@ class receiveDataThread(threading.Thread):
shared.workerQueue.put((
'doPOWForMyV3Pubkey', requestedHash))
else:
shared.printLock.acquire()
print 'Found getpubkey-requested-hash in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is:', lastPubkeySendTime
shared.printLock.release()
with shared.printLock:
print 'Found getpubkey-requested-hash in my list of EC hashes BUT we already sent it recently. Ignoring request. The lastPubkeySendTime is:', lastPubkeySendTime
else:
shared.printLock.acquire()
print 'This getpubkey request is not for any of my keys.'
shared.printLock.release()
with shared.printLock:
print 'This getpubkey request is not for any of my keys.'
# We have received an inv message
def recinv(self, data):
@ -1455,10 +1456,10 @@ class receiveDataThread(threading.Thread):
if len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer) > 0:
for key, value in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer.items():
totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave += value
shared.printLock.acquire()
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave
shared.printLock.release()
with shared.printLock:
print 'number of keys(hosts) in shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer:', len(shared.numberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHavePerPeer)
print 'totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave = ', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave
numberOfItemsInInv, lengthOfVarint = decodeVarint(data[:10])
if numberOfItemsInInv > 50000:
sys.stderr.write('Too many items in inv message!')
@ -1468,16 +1469,16 @@ class receiveDataThread(threading.Thread):
return
if numberOfItemsInInv == 1: # we'll just request this data from the person who advertised the object.
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
shared.printLock.acquire()
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
shared.printLock.release()
with shared.printLock:
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over 1000 from this node in particular. Ignoring this inv message.'
return
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[
data[lengthOfVarint:32 + lengthOfVarint]] = 0
if data[lengthOfVarint:32 + lengthOfVarint] in shared.inventory:
shared.printLock.acquire()
print 'Inventory (in memory) has inventory item already.'
shared.printLock.release()
with shared.printLock:
print 'Inventory (in memory) has inventory item already.'
elif shared.isInSqlInventory(data[lengthOfVarint:32 + lengthOfVarint]):
print 'Inventory (SQL on disk) has inventory item already.'
else:
@ -1487,9 +1488,9 @@ class receiveDataThread(threading.Thread):
for i in range(numberOfItemsInInv): # upon finishing dealing with an incoming message, the receiveDataThread will request a random object from the peer. This way if we get multiple inv messages from multiple peers which list mostly the same objects, we will make getdata requests for different random objects from the various peers.
if len(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]) == 32: # The length of an inventory hash should be 32. If it isn't 32 then the remote node is either badly programmed or behaving nefariously.
if totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave > 200000 and len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave) > 1000: # inv flooding attack mitigation
shared.printLock.acquire()
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.'
shared.printLock.release()
with shared.printLock:
print 'We already have', totalNumberOfObjectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave, 'items yet to retrieve from peers and over', len(self.objectsThatWeHaveYetToCheckAndSeeWhetherWeAlreadyHave), 'from this node in particular. Ignoring the rest of this inv message.'
break
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware[data[
lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)]] = 0
@ -1501,9 +1502,9 @@ class receiveDataThread(threading.Thread):
# Send a getdata message to our peer to request the object with the given
# hash
def sendgetdata(self, hash):
shared.printLock.acquire()
print 'sending getdata to retrieve object with hash:', hash.encode('hex')
shared.printLock.release()
with shared.printLock:
print 'sending getdata to retrieve object with hash:', hash.encode('hex')
payload = '\x01' + hash
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
headerData += 'getdata\x00\x00\x00\x00\x00'
@ -1514,9 +1515,9 @@ class receiveDataThread(threading.Thread):
self.sock.sendall(headerData + payload)
except Exception as err:
# if not 'Bad file descriptor' in err:
shared.printLock.acquire()
print 'sock.sendall error:', err
shared.printLock.release()
with shared.printLock:
print 'sock.sendall error:', err
# We have received a getdata request from our peer
def recgetdata(self, data):
@ -1528,9 +1529,9 @@ class receiveDataThread(threading.Thread):
for i in xrange(numberOfRequestedInventoryItems):
hash = data[lengthOfVarint + (
i * 32):32 + lengthOfVarint + (i * 32)]
shared.printLock.acquire()
print 'received getdata request for item:', hash.encode('hex')
shared.printLock.release()
with shared.printLock:
print 'received getdata request for item:', hash.encode('hex')
# print 'inventory is', shared.inventory
if hash in shared.inventory:
objectType, streamNumber, payload, receivedTime = shared.inventory[
@ -1555,24 +1556,24 @@ class receiveDataThread(threading.Thread):
def sendData(self, objectType, payload):
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
if objectType == 'pubkey':
shared.printLock.acquire()
print 'sending pubkey'
shared.printLock.release()
with shared.printLock:
print 'sending pubkey'
headerData += 'pubkey\x00\x00\x00\x00\x00\x00'
elif objectType == 'getpubkey' or objectType == 'pubkeyrequest':
shared.printLock.acquire()
print 'sending getpubkey'
shared.printLock.release()
with shared.printLock:
print 'sending getpubkey'
headerData += 'getpubkey\x00\x00\x00'
elif objectType == 'msg':
shared.printLock.acquire()
print 'sending msg'
shared.printLock.release()
with shared.printLock:
print 'sending msg'
headerData += 'msg\x00\x00\x00\x00\x00\x00\x00\x00\x00'
elif objectType == 'broadcast':
shared.printLock.acquire()
print 'sending broadcast'
shared.printLock.release()
with shared.printLock:
print 'sending broadcast'
headerData += 'broadcast\x00\x00\x00'
else:
sys.stderr.write(
@ -1584,15 +1585,15 @@ class receiveDataThread(threading.Thread):
self.sock.sendall(headerData + payload)
except Exception as err:
# if not 'Bad file descriptor' in err:
shared.printLock.acquire()
print 'sock.sendall error:', err
shared.printLock.release()
with shared.printLock:
print 'sock.sendall error:', err
# Send an inv message with just one hash to all of our peers
def broadcastinv(self, hash):
shared.printLock.acquire()
print 'broadcasting inv with hash:', hash.encode('hex')
shared.printLock.release()
with shared.printLock:
print 'broadcasting inv with hash:', hash.encode('hex')
shared.broadcastToSendDataQueues((self.streamNumber, 'sendinv', hash))
# We have received an addr message.
@ -1603,9 +1604,9 @@ class receiveDataThread(threading.Thread):
data[:10])
if shared.verbose >= 1:
shared.printLock.acquire()
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
shared.printLock.release()
with shared.printLock:
print 'addr message contains', numberOfAddressesIncluded, 'IP addresses.'
if self.remoteProtocolVersion == 1:
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
@ -1618,25 +1619,25 @@ class receiveDataThread(threading.Thread):
for i in range(0, numberOfAddressesIncluded):
try:
if data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
shared.printLock.acquire()
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
shared.printLock.release()
with shared.printLock:
print 'Skipping IPv6 address.', repr(data[16 + lengthOfNumberOfAddresses + (34 * i):28 + lengthOfNumberOfAddresses + (34 * i)])
continue
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
try:
recaddrStream, = unpack('>I', data[4 + lengthOfNumberOfAddresses + (
34 * i):8 + lengthOfNumberOfAddresses + (34 * i)])
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
if recaddrStream == 0:
continue
@ -1646,20 +1647,20 @@ class receiveDataThread(threading.Thread):
recaddrServices, = unpack('>Q', data[8 + lengthOfNumberOfAddresses + (
34 * i):16 + lengthOfNumberOfAddresses + (34 * i)])
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
try:
recaddrPort, = unpack('>H', data[32 + lengthOfNumberOfAddresses + (
34 * i):34 + lengthOfNumberOfAddresses + (34 * i)])
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
# print 'Within recaddr(): IP', recaddrIP, ', Port',
# recaddrPort, ', i', i
@ -1708,9 +1709,9 @@ class receiveDataThread(threading.Thread):
shared.knownNodesLock.release()
self.broadcastaddr(
listOfAddressDetailsToBroadcastToPeers) # no longer broadcast
shared.printLock.acquire()
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
shared.printLock.release()
with shared.printLock:
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
elif self.remoteProtocolVersion >= 2: # The difference is that in protocol version 2, network addresses use 64 bit times rather than 32 bit times.
if numberOfAddressesIncluded > 1000 or numberOfAddressesIncluded == 0:
return
@ -1722,25 +1723,25 @@ class receiveDataThread(threading.Thread):
for i in range(0, numberOfAddressesIncluded):
try:
if data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)] != '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF':
shared.printLock.acquire()
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
shared.printLock.release()
with shared.printLock:
print 'Skipping IPv6 address.', repr(data[20 + lengthOfNumberOfAddresses + (38 * i):32 + lengthOfNumberOfAddresses + (38 * i)])
continue
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (to test for an IPv6 address). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
try:
recaddrStream, = unpack('>I', data[8 + lengthOfNumberOfAddresses + (
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrStream). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
if recaddrStream == 0:
continue
@ -1750,20 +1751,20 @@ class receiveDataThread(threading.Thread):
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrServices). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
try:
recaddrPort, = unpack('>H', data[36 + lengthOfNumberOfAddresses + (
38 * i):38 + lengthOfNumberOfAddresses + (38 * i)])
except Exception as err:
shared.printLock.acquire()
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
shared.printLock.release()
with shared.printLock:
sys.stderr.write(
'ERROR TRYING TO UNPACK recaddr (recaddrPort). Message: %s\n' % str(err))
break # giving up on unpacking any more. We should still be connected however.
# print 'Within recaddr(): IP', recaddrIP, ', Port',
# recaddrPort, ', i', i
@ -1791,9 +1792,9 @@ class receiveDataThread(threading.Thread):
shared.knownNodes[recaddrStream][hostFromAddrMessage] = (