removed use of memoryview so that we can support python 2.7.3 #704
|
@ -19,12 +19,12 @@ import sys
|
|||
#potentially render this version check useless. So logging won't be used here until
|
||||
#there is a more efficient way to configure logging
|
||||
if sys.hexversion >= 0x3000000:
|
||||
msg = "PyBitmessage does not support Python 3. Python 2.7.5 or later is required. Your version: %s" % sys.version
|
||||
msg = "PyBitmessage does not support Python 3. Python 2.7.3 or later is required. Your version: %s" % sys.version
|
||||
#logger.critical(msg)
|
||||
sys.stdout.write(msg)
|
||||
sys.exit(0)
|
||||
if sys.hexversion < 0x20705F0:
|
||||
msg = "You should use Python 2.7.5 or greater (but not Python 3). Your version: %s" % sys.version
|
||||
if sys.hexversion < 0x20703F0:
|
||||
msg = "You should use Python 2.7.3 or greater (but not Python 3). Your version: %s" % sys.version
|
||||
#logger.critical(msg)
|
||||
sys.stdout.write(msg)
|
||||
sys.exit(0)
|
||||
|
|
|
@ -62,7 +62,7 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
def run(self):
|
||||
with shared.printLock:
|
||||
print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||
print 'receiveDataThread starting. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||
|
||||
while True:
|
||||
dataLen = len(self.data)
|
||||
|
@ -76,7 +76,7 @@ class receiveDataThread(threading.Thread):
|
|||
break
|
||||
except Exception as err:
|
||||
with shared.printLock:
|
||||
print 'sock.recv error. Closing receiveData thread (HOST:', self.peer, 'ID:', str(id(self)) + ').', err
|
||||
print 'sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID:', str(id(self)) + ').', err
|
||||
break
|
||||
# print 'Received', repr(self.data)
|
||||
if len(self.data) == dataLen: # If self.sock.recv returned no data:
|
||||
|
@ -92,7 +92,7 @@ class receiveDataThread(threading.Thread):
|
|||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||
except:
|
||||
pass
|
||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) # commands the corresponding sendDataThread to shut itself down.
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data')) # commands the corresponding sendDataThread to shut itself down.
|
||||
try:
|
||||
del shared.connectedHostsList[self.peer.host]
|
||||
except Exception as err:
|
||||
|
@ -106,47 +106,33 @@ class receiveDataThread(threading.Thread):
|
|||
pass
|
||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||
with shared.printLock:
|
||||
print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList)
|
||||
print 'receiveDataThread ending. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||
|
||||
|
||||
def processData(self):
|
||||
# if shared.verbose >= 3:
|
||||
# with shared.printLock:
|
||||
# print 'self.data is currently ', repr(self.data)
|
||||
#
|
||||
if len(self.data) < shared.Header.size: # if so little of the data has arrived that we can't even read the checksum then wait for more data.
|
||||
return
|
||||
#Use a memoryview so we don't copy data unnecessarily
|
||||
view = memoryview(self.data)
|
||||
magic,command,payloadLength,checksum = shared.Header.unpack(view[:shared.Header.size])
|
||||
view = view[shared.Header.size:]
|
||||
|
||||
magic,command,payloadLength,checksum = shared.Header.unpack(self.data[:shared.Header.size])
|
||||
if magic != 0xE9BEB4D9:
|
||||
#if shared.verbose >= 1:
|
||||
# with shared.printLock:
|
||||
# print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
|
||||
|
||||
self.data = ""
|
||||
return
|
||||
if payloadLength > 20000000:
|
||||
logger.info('The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s' % payloadLength)
|
||||
self.data = view[payloadLength:].tobytes()
|
||||
del view,magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after
|
||||
self.data = self.data[payloadLength + shared.Header.size:]
|
||||
del magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after
|
||||
self.processData()
|
||||
return
|
||||
if len(view) < payloadLength: # check if the whole message has arrived yet.
|
||||
if len(self.data) < payloadLength + shared.Header.size: # check if the whole message has arrived yet.
|
||||
return
|
||||
payload = view[:payloadLength]
|
||||
payload = self.data[shared.Header.size:payloadLength + shared.Header.size]
|
||||
if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message.
|
||||
print 'Checksum incorrect. Clearing this message.'
|
||||
self.data = view[payloadLength:].tobytes()
|
||||
del view,magic,command,payloadLength,checksum,payload #again better to clean up before the recursive call
|
||||
self.data = self.data[payloadLength + shared.Header.size:]
|
||||
del magic,command,payloadLength,checksum,payload # better to clean up before the recursive call
|
||||
self.processData()
|
||||
return
|
||||
|
||||
#We can now revert back to bytestrings and take this message out
|
||||
payload = payload.tobytes()
|
||||
self.data = view[payloadLength:].tobytes()
|
||||
del view,magic,payloadLength,checksum
|
||||
|
||||
# The time we've last seen this node is obviously right now since we
|
||||
# just received valid data from it. So update the knownNodes list so
|
||||
# that other peers can be made aware of its existance.
|
||||
|
@ -187,8 +173,11 @@ class receiveDataThread(threading.Thread):
|
|||
# pass
|
||||
#elif command == 'alert':
|
||||
# pass
|
||||
|
||||
del payload
|
||||
self.data = self.data[payloadLength + shared.Header.size:] # take this message out and then process the next message
|
||||
|
||||
if self.data == '':
|
||||
if self.data == '': # if there are no more messages
|
||||
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||
shared.numberOfInventoryLookupsPerformed += 1
|
||||
objectHash, = random.sample(
|
||||
|
@ -196,24 +185,22 @@ class receiveDataThread(threading.Thread):
|
|||
if objectHash in shared.inventory:
|
||||
with shared.printLock:
|
||||
print 'Inventory (in memory) already has object listed in inv message.'
|
||||
|
||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||
objectHash]
|
||||
elif shared.isInSqlInventory(objectHash):
|
||||
if shared.verbose >= 3:
|
||||
with shared.printLock:
|
||||
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
||||
|
||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||
objectHash]
|
||||
else:
|
||||
# We don't have the object in our inventory. Let's request it.
|
||||
self.sendgetdata(objectHash)
|
||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||
objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway.
|
||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||
with shared.printLock:
|
||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||
|
||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
|
||||
try:
|
||||
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||
|
@ -221,9 +208,9 @@ class receiveDataThread(threading.Thread):
|
|||
pass
|
||||
break
|
||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
|
||||
with shared.printLock:
|
||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
||||
|
||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
|
||||
try:
|
||||
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||
|
@ -239,12 +226,14 @@ class receiveDataThread(threading.Thread):
|
|||
|
||||
|
||||
def sendpong(self):
|
||||
print 'Sending pong'
|
||||
with shared.printLock:
|
||||
print 'Sending pong'
|
||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('pong')))
|
||||
|
||||
|
||||
def recverack(self):
|
||||
print 'verack received'
|
||||
with shared.printLock:
|
||||
print 'verack received'
|
||||
self.verackReceived = True
|
||||
if self.verackSent:
|
||||
# We have thus both sent and received a verack.
|
||||
|
@ -279,7 +268,7 @@ class receiveDataThread(threading.Thread):
|
|||
with shared.printLock:
|
||||
print 'We are connected to too many people. Closing connection.'
|
||||
|
||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
return
|
||||
self.sendBigInv()
|
||||
|
||||
|
@ -742,7 +731,7 @@ class receiveDataThread(threading.Thread):
|
|||
return
|
||||
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
||||
if self.remoteProtocolVersion <= 1:
|
||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
with shared.printLock:
|
||||
print 'Closing connection to old protocol version 1 node: ', self.peer
|
||||
return
|
||||
|
@ -765,7 +754,7 @@ class receiveDataThread(threading.Thread):
|
|||
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
||||
|
||||
if self.streamNumber != 1:
|
||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
with shared.printLock:
|
||||
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
|
||||
return
|
||||
|
@ -776,7 +765,7 @@ class receiveDataThread(threading.Thread):
|
|||
if not self.initiatedConnection:
|
||||
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber))
|
||||
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||
with shared.printLock:
|
||||
print 'Closing connection to myself: ', self.peer
|
||||
return
|
||||
|
|
|
@ -20,9 +20,6 @@ class sendDataThread(threading.Thread):
|
|||
threading.Thread.__init__(self)
|
||||
self.sendDataThreadQueue = sendDataThreadQueue
|
||||
shared.sendDataQueues.append(self.sendDataThreadQueue)
|
||||
with shared.printLock:
|
||||
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
||||
|
||||
self.data = ''
|
||||
self.objectHashHolderInstance = objectHashHolder(self.sendDataThreadQueue)
|
||||
self.objectHashHolderInstance.start()
|
||||
|
@ -67,17 +64,20 @@ class sendDataThread(threading.Thread):
|
|||
def sendBytes(self, data):
|
||||
self.sock.sendall(data)
|
||||
shared.numberOfBytesSent += len(data)
|
||||
self.lastTimeISentData = int(time.time())
|
||||
|
||||
|
||||
def run(self):
|
||||
with shared.printLock:
|
||||
print 'sendDataThread starting. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
|
||||
while True:
|
||||
deststream, command, data = self.sendDataThreadQueue.get()
|
||||
|
||||
if deststream == self.streamNumber or deststream == 0:
|
||||
if command == 'shutdown':
|
||||
if data == self.peer or data == 'all':
|
||||
with shared.printLock:
|
||||
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
||||
break
|
||||
with shared.printLock:
|
||||
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
||||
break
|
||||
# When you receive an incoming connection, a sendDataThread is
|
||||
# created even though you don't yet know what stream number the
|
||||
# remote peer is interested in. They will tell you in a version
|
||||
|
@ -96,49 +96,44 @@ class sendDataThread(threading.Thread):
|
|||
elif command == 'advertisepeer':
|
||||
self.objectHashHolderInstance.holdPeer(data)
|
||||
elif command == 'sendaddr':
|
||||
if not self.connectionIsOrWasFullyEstablished:
|
||||
# not sending addr because we haven't sent and heard a verack from the remote node yet
|
||||
return
|
||||
numberOfAddressesInAddrMessage = len(
|
||||
data)
|
||||
payload = ''
|
||||
for hostDetails in data:
|
||||
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
||||
payload += pack(
|
||||
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
|
||||
payload += pack('>I', streamNumber)
|
||||
payload += pack(
|
||||
'>q', services) # service bit flags offered by this node
|
||||
payload += shared.encodeHost(host)
|
||||
payload += pack('>H', port)
|
||||
|
||||
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
||||
packet = shared.CreatePacket('addr', payload)
|
||||
try:
|
||||
self.sendBytes(packet)
|
||||
self.lastTimeISentData = int(time.time())
|
||||
except:
|
||||
print 'sendaddr: self.sock.sendall failed'
|
||||
break
|
||||
if self.connectionIsOrWasFullyEstablished: # only send addr messages if we have send and heard a verack from the remote node
|
||||
numberOfAddressesInAddrMessage = len(data)
|
||||
payload = ''
|
||||
for hostDetails in data:
|
||||
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
||||
payload += pack(
|
||||
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
|
||||
payload += pack('>I', streamNumber)
|
||||
payload += pack(
|
||||
'>q', services) # service bit flags offered by this node
|
||||
payload += shared.encodeHost(host)
|
||||
payload += pack('>H', port)
|
||||
|
||||
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
||||
packet = shared.CreatePacket('addr', payload)
|
||||
try:
|
||||
self.sendBytes(packet)
|
||||
except:
|
||||
with shared.printLock:
|
||||
print 'sendaddr: self.sock.sendall failed'
|
||||
break
|
||||
elif command == 'advertiseobject':
|
||||
self.objectHashHolderInstance.holdHash(data)
|
||||
elif command == 'sendinv':
|
||||
if not self.connectionIsOrWasFullyEstablished:
|
||||
# not sending inv because we haven't sent and heard a verack from the remote node yet
|
||||
return
|
||||
payload = ''
|
||||
for hash in data:
|
||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||
payload += hash
|
||||
if payload != '':
|
||||
payload = encodeVarint(len(payload)/32) + payload
|
||||
packet = shared.CreatePacket('inv', payload)
|
||||
try:
|
||||
self.sendBytes(packet)
|
||||
self.lastTimeISentData = int(time.time())
|
||||
except:
|
||||
print 'sendinv: self.sock.sendall failed'
|
||||
break
|
||||
if self.connectionIsOrWasFullyEstablished: # only send inv messages if we have send and heard a verack from the remote node
|
||||
payload = ''
|
||||
for hash in data:
|
||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||
payload += hash
|
||||
if payload != '':
|
||||
payload = encodeVarint(len(payload)/32) + payload
|
||||
packet = shared.CreatePacket('inv', payload)
|
||||
try:
|
||||
self.sendBytes(packet)
|
||||
except:
|
||||
with shared.printLock:
|
||||
print 'sendinv: self.sock.sendall failed'
|
||||
break
|
||||
elif command == 'pong':
|
||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
||||
if self.lastTimeISentData < (int(time.time()) - 298):
|
||||
|
@ -148,16 +143,16 @@ class sendDataThread(threading.Thread):
|
|||
packet = shared.CreatePacket('pong')
|
||||
try:
|
||||
self.sendBytes(packet)
|
||||
self.lastTimeISentData = int(time.time())
|
||||
except:
|
||||
print 'send pong failed'
|
||||
with shared.printLock:
|
||||
print 'send pong failed'
|
||||
break
|
||||
elif command == 'sendRawData':
|
||||
try:
|
||||
self.sendBytes(data)
|
||||
self.lastTimeISentData = int(time.time())
|
||||
except:
|
||||
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
||||
with shared.printLock:
|
||||
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
||||
break
|
||||
elif command == 'connectionIsOrWasFullyEstablished':
|
||||
self.connectionIsOrWasFullyEstablished = True
|
||||
|
@ -172,5 +167,5 @@ class sendDataThread(threading.Thread):
|
|||
pass
|
||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||
with shared.printLock:
|
||||
print 'Number of queues remaining in sendDataQueues:', len(shared.sendDataQueues)
|
||||
print 'sendDataThread ending. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
|
||||
self.objectHashHolderInstance.close()
|
||||
|
|
|
@ -329,7 +329,7 @@ def isProofOfWorkSufficient(
|
|||
def doCleanShutdown():
|
||||
global shutdown
|
||||
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
|
||||
broadcastToSendDataQueues((0, 'shutdown', 'all'))
|
||||
broadcastToSendDataQueues((0, 'shutdown', 'no data'))
|
||||
with shared.objectProcessorQueueSizeLock:
|
||||
data = 'no data'
|
||||
shared.objectProcessorQueueSize += len(data)
|
||||
|
|
Reference in New Issue
Block a user