diff --git a/src/bitmessagemain.py b/src/bitmessagemain.py index 83e729c5..491b82f0 100755 --- a/src/bitmessagemain.py +++ b/src/bitmessagemain.py @@ -19,12 +19,12 @@ import sys #potentially render this version check useless. So logging won't be used here until #there is a more efficient way to configure logging if sys.hexversion >= 0x3000000: - msg = "PyBitmessage does not support Python 3. Python 2.7.5 or later is required. Your version: %s" % sys.version + msg = "PyBitmessage does not support Python 3. Python 2.7.3 or later is required. Your version: %s" % sys.version #logger.critical(msg) sys.stdout.write(msg) sys.exit(0) -if sys.hexversion < 0x20705F0: - msg = "You should use Python 2.7.5 or greater (but not Python 3). Your version: %s" % sys.version +if sys.hexversion < 0x20703F0: + msg = "You should use Python 2.7.3 or greater (but not Python 3). Your version: %s" % sys.version #logger.critical(msg) sys.stdout.write(msg) sys.exit(0) diff --git a/src/class_receiveDataThread.py b/src/class_receiveDataThread.py index a97da9df..879faff6 100644 --- a/src/class_receiveDataThread.py +++ b/src/class_receiveDataThread.py @@ -62,7 +62,7 @@ class receiveDataThread(threading.Thread): def run(self): with shared.printLock: - print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList) + print 'receiveDataThread starting. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList) while True: dataLen = len(self.data) @@ -76,7 +76,7 @@ class receiveDataThread(threading.Thread): break except Exception as err: with shared.printLock: - print 'sock.recv error. Closing receiveData thread (HOST:', self.peer, 'ID:', str(id(self)) + ').', err + print 'sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID:', str(id(self)) + ').', err break # print 'Received', repr(self.data) if len(self.data) == dataLen: # If self.sock.recv returned no data: @@ -92,7 +92,7 @@ class receiveDataThread(threading.Thread): print 'removed self (a receiveDataThread) from selfInitiatedConnections' except: pass - shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) # commands the corresponding sendDataThread to shut itself down. + self.sendDataThreadQueue.put((0, 'shutdown','no data')) # commands the corresponding sendDataThread to shut itself down. try: del shared.connectedHostsList[self.peer.host] except Exception as err: @@ -106,47 +106,33 @@ class receiveDataThread(threading.Thread): pass shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data')) with shared.printLock: - print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList) + print 'receiveDataThread ending. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList) def processData(self): - # if shared.verbose >= 3: - # with shared.printLock: - # print 'self.data is currently ', repr(self.data) - # if len(self.data) < shared.Header.size: # if so little of the data has arrived that we can't even read the checksum then wait for more data. return - #Use a memoryview so we don't copy data unnecessarily - view = memoryview(self.data) - magic,command,payloadLength,checksum = shared.Header.unpack(view[:shared.Header.size]) - view = view[shared.Header.size:] + + magic,command,payloadLength,checksum = shared.Header.unpack(self.data[:shared.Header.size]) if magic != 0xE9BEB4D9: - #if shared.verbose >= 1: - # with shared.printLock: - # print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40]) - self.data = "" return if payloadLength > 20000000: logger.info('The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s' % payloadLength) - self.data = view[payloadLength:].tobytes() - del view,magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after + self.data = self.data[payloadLength + shared.Header.size:] + del magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after self.processData() return - if len(view) < payloadLength: # check if the whole message has arrived yet. + if len(self.data) < payloadLength + shared.Header.size: # check if the whole message has arrived yet. return - payload = view[:payloadLength] + payload = self.data[shared.Header.size:payloadLength + shared.Header.size] if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message. print 'Checksum incorrect. Clearing this message.' - self.data = view[payloadLength:].tobytes() - del view,magic,command,payloadLength,checksum,payload #again better to clean up before the recursive call + self.data = self.data[payloadLength + shared.Header.size:] + del magic,command,payloadLength,checksum,payload # better to clean up before the recursive call self.processData() return - - #We can now revert back to bytestrings and take this message out - payload = payload.tobytes() - self.data = view[payloadLength:].tobytes() - del view,magic,payloadLength,checksum + # The time we've last seen this node is obviously right now since we # just received valid data from it. So update the knownNodes list so # that other peers can be made aware of its existance. @@ -187,8 +173,11 @@ class receiveDataThread(threading.Thread): # pass #elif command == 'alert': # pass + + del payload + self.data = self.data[payloadLength + shared.Header.size:] # take this message out and then process the next message - if self.data == '': + if self.data == '': # if there are no more messages while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0: shared.numberOfInventoryLookupsPerformed += 1 objectHash, = random.sample( @@ -196,24 +185,22 @@ class receiveDataThread(threading.Thread): if objectHash in shared.inventory: with shared.printLock: print 'Inventory (in memory) already has object listed in inv message.' - del self.objectsThatWeHaveYetToGetFromThisPeer[ objectHash] elif shared.isInSqlInventory(objectHash): if shared.verbose >= 3: with shared.printLock: print 'Inventory (SQL on disk) already has object listed in inv message.' - del self.objectsThatWeHaveYetToGetFromThisPeer[ objectHash] else: + # We don't have the object in our inventory. Let's request it. self.sendgetdata(objectHash) del self.objectsThatWeHaveYetToGetFromThisPeer[ objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway. if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0: with shared.printLock: - print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer) - + print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0' try: del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. @@ -221,9 +208,9 @@ class receiveDataThread(threading.Thread): pass break if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0: + # We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore. with shared.printLock: - print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer) - + print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0' try: del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[ self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together. @@ -239,12 +226,14 @@ class receiveDataThread(threading.Thread): def sendpong(self): - print 'Sending pong' + with shared.printLock: + print 'Sending pong' self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('pong'))) def recverack(self): - print 'verack received' + with shared.printLock: + print 'verack received' self.verackReceived = True if self.verackSent: # We have thus both sent and received a verack. @@ -279,7 +268,7 @@ class receiveDataThread(threading.Thread): with shared.printLock: print 'We are connected to too many people. Closing connection.' - shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) + self.sendDataThreadQueue.put((0, 'shutdown','no data')) return self.sendBigInv() @@ -742,7 +731,7 @@ class receiveDataThread(threading.Thread): return self.remoteProtocolVersion, = unpack('>L', data[:4]) if self.remoteProtocolVersion <= 1: - shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) + self.sendDataThreadQueue.put((0, 'shutdown','no data')) with shared.printLock: print 'Closing connection to old protocol version 1 node: ', self.peer return @@ -765,7 +754,7 @@ class receiveDataThread(threading.Thread): print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber if self.streamNumber != 1: - shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) + self.sendDataThreadQueue.put((0, 'shutdown','no data')) with shared.printLock: print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.' return @@ -776,7 +765,7 @@ class receiveDataThread(threading.Thread): if not self.initiatedConnection: self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber)) if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf: - shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) + self.sendDataThreadQueue.put((0, 'shutdown','no data')) with shared.printLock: print 'Closing connection to myself: ', self.peer return diff --git a/src/class_sendDataThread.py b/src/class_sendDataThread.py index eec23e7b..1d8c87b8 100644 --- a/src/class_sendDataThread.py +++ b/src/class_sendDataThread.py @@ -20,9 +20,6 @@ class sendDataThread(threading.Thread): threading.Thread.__init__(self) self.sendDataThreadQueue = sendDataThreadQueue shared.sendDataQueues.append(self.sendDataThreadQueue) - with shared.printLock: - print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues) - self.data = '' self.objectHashHolderInstance = objectHashHolder(self.sendDataThreadQueue) self.objectHashHolderInstance.start() @@ -67,17 +64,20 @@ class sendDataThread(threading.Thread): def sendBytes(self, data): self.sock.sendall(data) shared.numberOfBytesSent += len(data) + self.lastTimeISentData = int(time.time()) + def run(self): + with shared.printLock: + print 'sendDataThread starting. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues) while True: deststream, command, data = self.sendDataThreadQueue.get() if deststream == self.streamNumber or deststream == 0: if command == 'shutdown': - if data == self.peer or data == 'all': - with shared.printLock: - print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.' - break + with shared.printLock: + print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.' + break # When you receive an incoming connection, a sendDataThread is # created even though you don't yet know what stream number the # remote peer is interested in. They will tell you in a version @@ -96,49 +96,44 @@ class sendDataThread(threading.Thread): elif command == 'advertisepeer': self.objectHashHolderInstance.holdPeer(data) elif command == 'sendaddr': - if not self.connectionIsOrWasFullyEstablished: - # not sending addr because we haven't sent and heard a verack from the remote node yet - return - numberOfAddressesInAddrMessage = len( - data) - payload = '' - for hostDetails in data: - timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails - payload += pack( - '>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time - payload += pack('>I', streamNumber) - payload += pack( - '>q', services) # service bit flags offered by this node - payload += shared.encodeHost(host) - payload += pack('>H', port) - - payload = encodeVarint(numberOfAddressesInAddrMessage) + payload - packet = shared.CreatePacket('addr', payload) - try: - self.sendBytes(packet) - self.lastTimeISentData = int(time.time()) - except: - print 'sendaddr: self.sock.sendall failed' - break + if self.connectionIsOrWasFullyEstablished: # only send addr messages if we have send and heard a verack from the remote node + numberOfAddressesInAddrMessage = len(data) + payload = '' + for hostDetails in data: + timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails + payload += pack( + '>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time + payload += pack('>I', streamNumber) + payload += pack( + '>q', services) # service bit flags offered by this node + payload += shared.encodeHost(host) + payload += pack('>H', port) + + payload = encodeVarint(numberOfAddressesInAddrMessage) + payload + packet = shared.CreatePacket('addr', payload) + try: + self.sendBytes(packet) + except: + with shared.printLock: + print 'sendaddr: self.sock.sendall failed' + break elif command == 'advertiseobject': self.objectHashHolderInstance.holdHash(data) elif command == 'sendinv': - if not self.connectionIsOrWasFullyEstablished: - # not sending inv because we haven't sent and heard a verack from the remote node yet - return - payload = '' - for hash in data: - if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: - payload += hash - if payload != '': - payload = encodeVarint(len(payload)/32) + payload - packet = shared.CreatePacket('inv', payload) - try: - self.sendBytes(packet) - self.lastTimeISentData = int(time.time()) - except: - print 'sendinv: self.sock.sendall failed' - break + if self.connectionIsOrWasFullyEstablished: # only send inv messages if we have send and heard a verack from the remote node + payload = '' + for hash in data: + if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware: + payload += hash + if payload != '': + payload = encodeVarint(len(payload)/32) + payload + packet = shared.CreatePacket('inv', payload) + try: + self.sendBytes(packet) + except: + with shared.printLock: + print 'sendinv: self.sock.sendall failed' + break elif command == 'pong': self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time. if self.lastTimeISentData < (int(time.time()) - 298): @@ -148,16 +143,16 @@ class sendDataThread(threading.Thread): packet = shared.CreatePacket('pong') try: self.sendBytes(packet) - self.lastTimeISentData = int(time.time()) except: - print 'send pong failed' + with shared.printLock: + print 'send pong failed' break elif command == 'sendRawData': try: self.sendBytes(data) - self.lastTimeISentData = int(time.time()) except: - print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.' + with shared.printLock: + print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.' break elif command == 'connectionIsOrWasFullyEstablished': self.connectionIsOrWasFullyEstablished = True @@ -172,5 +167,5 @@ class sendDataThread(threading.Thread): pass shared.sendDataQueues.remove(self.sendDataThreadQueue) with shared.printLock: - print 'Number of queues remaining in sendDataQueues:', len(shared.sendDataQueues) + print 'sendDataThread ending. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues) self.objectHashHolderInstance.close() diff --git a/src/shared.py b/src/shared.py index c82bf73a..cb3aa1c6 100644 --- a/src/shared.py +++ b/src/shared.py @@ -329,7 +329,7 @@ def isProofOfWorkSufficient( def doCleanShutdown(): global shutdown shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit. - broadcastToSendDataQueues((0, 'shutdown', 'all')) + broadcastToSendDataQueues((0, 'shutdown', 'no data')) with shared.objectProcessorQueueSizeLock: data = 'no data' shared.objectProcessorQueueSize += len(data)