Merge pull request #704 from Atheros1/master
removed use of memoryview so that we can support python 2.7.3
This commit is contained in:
commit
c78b3a1331
|
@ -19,12 +19,12 @@ import sys
|
||||||
#potentially render this version check useless. So logging won't be used here until
|
#potentially render this version check useless. So logging won't be used here until
|
||||||
#there is a more efficient way to configure logging
|
#there is a more efficient way to configure logging
|
||||||
if sys.hexversion >= 0x3000000:
|
if sys.hexversion >= 0x3000000:
|
||||||
msg = "PyBitmessage does not support Python 3. Python 2.7.5 or later is required. Your version: %s" % sys.version
|
msg = "PyBitmessage does not support Python 3. Python 2.7.3 or later is required. Your version: %s" % sys.version
|
||||||
#logger.critical(msg)
|
#logger.critical(msg)
|
||||||
sys.stdout.write(msg)
|
sys.stdout.write(msg)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
if sys.hexversion < 0x20705F0:
|
if sys.hexversion < 0x20703F0:
|
||||||
msg = "You should use Python 2.7.5 or greater (but not Python 3). Your version: %s" % sys.version
|
msg = "You should use Python 2.7.3 or greater (but not Python 3). Your version: %s" % sys.version
|
||||||
#logger.critical(msg)
|
#logger.critical(msg)
|
||||||
sys.stdout.write(msg)
|
sys.stdout.write(msg)
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
|
@ -62,7 +62,7 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'ID of the receiveDataThread is', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
print 'receiveDataThread starting. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
dataLen = len(self.data)
|
dataLen = len(self.data)
|
||||||
|
@ -76,7 +76,7 @@ class receiveDataThread(threading.Thread):
|
||||||
break
|
break
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sock.recv error. Closing receiveData thread (HOST:', self.peer, 'ID:', str(id(self)) + ').', err
|
print 'sock.recv error. Closing receiveData thread (' + str(self.peer) + ', Thread ID:', str(id(self)) + ').', err
|
||||||
break
|
break
|
||||||
# print 'Received', repr(self.data)
|
# print 'Received', repr(self.data)
|
||||||
if len(self.data) == dataLen: # If self.sock.recv returned no data:
|
if len(self.data) == dataLen: # If self.sock.recv returned no data:
|
||||||
|
@ -92,7 +92,7 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
print 'removed self (a receiveDataThread) from selfInitiatedConnections'
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer)) # commands the corresponding sendDataThread to shut itself down.
|
self.sendDataThreadQueue.put((0, 'shutdown','no data')) # commands the corresponding sendDataThread to shut itself down.
|
||||||
try:
|
try:
|
||||||
del shared.connectedHostsList[self.peer.host]
|
del shared.connectedHostsList[self.peer.host]
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
|
@ -106,47 +106,33 @@ class receiveDataThread(threading.Thread):
|
||||||
pass
|
pass
|
||||||
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
shared.UISignalQueue.put(('updateNetworkStatusTab', 'no data'))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'The size of the connectedHostsList is now:', len(shared.connectedHostsList)
|
print 'receiveDataThread ending. ID', str(id(self)) + '. The size of the shared.connectedHostsList is now', len(shared.connectedHostsList)
|
||||||
|
|
||||||
|
|
||||||
def processData(self):
|
def processData(self):
|
||||||
# if shared.verbose >= 3:
|
|
||||||
# with shared.printLock:
|
|
||||||
# print 'self.data is currently ', repr(self.data)
|
|
||||||
#
|
|
||||||
if len(self.data) < shared.Header.size: # if so little of the data has arrived that we can't even read the checksum then wait for more data.
|
if len(self.data) < shared.Header.size: # if so little of the data has arrived that we can't even read the checksum then wait for more data.
|
||||||
return
|
return
|
||||||
#Use a memoryview so we don't copy data unnecessarily
|
|
||||||
view = memoryview(self.data)
|
|
||||||
magic,command,payloadLength,checksum = shared.Header.unpack(view[:shared.Header.size])
|
|
||||||
view = view[shared.Header.size:]
|
|
||||||
if magic != 0xE9BEB4D9:
|
|
||||||
#if shared.verbose >= 1:
|
|
||||||
# with shared.printLock:
|
|
||||||
# print 'The magic bytes were not correct. First 40 bytes of data: ' + repr(self.data[0:40])
|
|
||||||
|
|
||||||
|
magic,command,payloadLength,checksum = shared.Header.unpack(self.data[:shared.Header.size])
|
||||||
|
if magic != 0xE9BEB4D9:
|
||||||
self.data = ""
|
self.data = ""
|
||||||
return
|
return
|
||||||
if payloadLength > 20000000:
|
if payloadLength > 20000000:
|
||||||
logger.info('The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s' % payloadLength)
|
logger.info('The incoming message, which we have not yet download, is too large. Ignoring it. (unfortunately there is no way to tell the other node to stop sending it except to disconnect.) Message size: %s' % payloadLength)
|
||||||
self.data = view[payloadLength:].tobytes()
|
self.data = self.data[payloadLength + shared.Header.size:]
|
||||||
del view,magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after
|
del magic,command,payloadLength,checksum # we don't need these anymore and better to clean them now before the recursive call rather than after
|
||||||
self.processData()
|
self.processData()
|
||||||
return
|
return
|
||||||
if len(view) < payloadLength: # check if the whole message has arrived yet.
|
if len(self.data) < payloadLength + shared.Header.size: # check if the whole message has arrived yet.
|
||||||
return
|
return
|
||||||
payload = view[:payloadLength]
|
payload = self.data[shared.Header.size:payloadLength + shared.Header.size]
|
||||||
if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message.
|
if checksum != hashlib.sha512(payload).digest()[0:4]: # test the checksum in the message.
|
||||||
print 'Checksum incorrect. Clearing this message.'
|
print 'Checksum incorrect. Clearing this message.'
|
||||||
self.data = view[payloadLength:].tobytes()
|
self.data = self.data[payloadLength + shared.Header.size:]
|
||||||
del view,magic,command,payloadLength,checksum,payload #again better to clean up before the recursive call
|
del magic,command,payloadLength,checksum,payload # better to clean up before the recursive call
|
||||||
self.processData()
|
self.processData()
|
||||||
return
|
return
|
||||||
|
|
||||||
#We can now revert back to bytestrings and take this message out
|
|
||||||
payload = payload.tobytes()
|
|
||||||
self.data = view[payloadLength:].tobytes()
|
|
||||||
del view,magic,payloadLength,checksum
|
|
||||||
# The time we've last seen this node is obviously right now since we
|
# The time we've last seen this node is obviously right now since we
|
||||||
# just received valid data from it. So update the knownNodes list so
|
# just received valid data from it. So update the knownNodes list so
|
||||||
# that other peers can be made aware of its existance.
|
# that other peers can be made aware of its existance.
|
||||||
|
@ -188,7 +174,10 @@ class receiveDataThread(threading.Thread):
|
||||||
#elif command == 'alert':
|
#elif command == 'alert':
|
||||||
# pass
|
# pass
|
||||||
|
|
||||||
if self.data == '':
|
del payload
|
||||||
|
self.data = self.data[payloadLength + shared.Header.size:] # take this message out and then process the next message
|
||||||
|
|
||||||
|
if self.data == '': # if there are no more messages
|
||||||
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
while len(self.objectsThatWeHaveYetToGetFromThisPeer) > 0:
|
||||||
shared.numberOfInventoryLookupsPerformed += 1
|
shared.numberOfInventoryLookupsPerformed += 1
|
||||||
objectHash, = random.sample(
|
objectHash, = random.sample(
|
||||||
|
@ -196,24 +185,22 @@ class receiveDataThread(threading.Thread):
|
||||||
if objectHash in shared.inventory:
|
if objectHash in shared.inventory:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Inventory (in memory) already has object listed in inv message.'
|
print 'Inventory (in memory) already has object listed in inv message.'
|
||||||
|
|
||||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||||
objectHash]
|
objectHash]
|
||||||
elif shared.isInSqlInventory(objectHash):
|
elif shared.isInSqlInventory(objectHash):
|
||||||
if shared.verbose >= 3:
|
if shared.verbose >= 3:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
print 'Inventory (SQL on disk) already has object listed in inv message.'
|
||||||
|
|
||||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||||
objectHash]
|
objectHash]
|
||||||
else:
|
else:
|
||||||
|
# We don't have the object in our inventory. Let's request it.
|
||||||
self.sendgetdata(objectHash)
|
self.sendgetdata(objectHash)
|
||||||
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
del self.objectsThatWeHaveYetToGetFromThisPeer[
|
||||||
objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway.
|
objectHash] # It is possible that the remote node might not respond with the object. In that case, we'll very likely get it from someone else anyway.
|
||||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
|
@ -221,9 +208,9 @@ class receiveDataThread(threading.Thread):
|
||||||
pass
|
pass
|
||||||
break
|
break
|
||||||
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
if len(self.objectsThatWeHaveYetToGetFromThisPeer) == 0:
|
||||||
|
# We had objectsThatWeHaveYetToGetFromThisPeer but the loop ran, they were all in our inventory, and now we don't have any to get anymore.
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now', len(self.objectsThatWeHaveYetToGetFromThisPeer)
|
print '(concerning', str(self.peer) + ')', 'number of objectsThatWeHaveYetToGetFromThisPeer is now 0'
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
del shared.numberOfObjectsThatWeHaveYetToGetPerPeer[
|
||||||
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
self.peer] # this data structure is maintained so that we can keep track of how many total objects, across all connections, are currently outstanding. If it goes too high it can indicate that we are under attack by multiple nodes working together.
|
||||||
|
@ -239,12 +226,14 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
|
|
||||||
def sendpong(self):
|
def sendpong(self):
|
||||||
print 'Sending pong'
|
with shared.printLock:
|
||||||
|
print 'Sending pong'
|
||||||
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('pong')))
|
self.sendDataThreadQueue.put((0, 'sendRawData', shared.CreatePacket('pong')))
|
||||||
|
|
||||||
|
|
||||||
def recverack(self):
|
def recverack(self):
|
||||||
print 'verack received'
|
with shared.printLock:
|
||||||
|
print 'verack received'
|
||||||
self.verackReceived = True
|
self.verackReceived = True
|
||||||
if self.verackSent:
|
if self.verackSent:
|
||||||
# We have thus both sent and received a verack.
|
# We have thus both sent and received a verack.
|
||||||
|
@ -279,7 +268,7 @@ class receiveDataThread(threading.Thread):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'We are connected to too many people. Closing connection.'
|
print 'We are connected to too many people. Closing connection.'
|
||||||
|
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||||
return
|
return
|
||||||
self.sendBigInv()
|
self.sendBigInv()
|
||||||
|
|
||||||
|
@ -742,7 +731,7 @@ class receiveDataThread(threading.Thread):
|
||||||
return
|
return
|
||||||
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
||||||
if self.remoteProtocolVersion <= 1:
|
if self.remoteProtocolVersion <= 1:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closing connection to old protocol version 1 node: ', self.peer
|
print 'Closing connection to old protocol version 1 node: ', self.peer
|
||||||
return
|
return
|
||||||
|
@ -765,7 +754,7 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
||||||
|
|
||||||
if self.streamNumber != 1:
|
if self.streamNumber != 1:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
|
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
|
||||||
return
|
return
|
||||||
|
@ -776,7 +765,7 @@ class receiveDataThread(threading.Thread):
|
||||||
if not self.initiatedConnection:
|
if not self.initiatedConnection:
|
||||||
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber))
|
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber))
|
||||||
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closing connection to myself: ', self.peer
|
print 'Closing connection to myself: ', self.peer
|
||||||
return
|
return
|
||||||
|
|
|
@ -20,9 +20,6 @@ class sendDataThread(threading.Thread):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.sendDataThreadQueue = sendDataThreadQueue
|
self.sendDataThreadQueue = sendDataThreadQueue
|
||||||
shared.sendDataQueues.append(self.sendDataThreadQueue)
|
shared.sendDataQueues.append(self.sendDataThreadQueue)
|
||||||
with shared.printLock:
|
|
||||||
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
|
||||||
|
|
||||||
self.data = ''
|
self.data = ''
|
||||||
self.objectHashHolderInstance = objectHashHolder(self.sendDataThreadQueue)
|
self.objectHashHolderInstance = objectHashHolder(self.sendDataThreadQueue)
|
||||||
self.objectHashHolderInstance.start()
|
self.objectHashHolderInstance.start()
|
||||||
|
@ -67,17 +64,20 @@ class sendDataThread(threading.Thread):
|
||||||
def sendBytes(self, data):
|
def sendBytes(self, data):
|
||||||
self.sock.sendall(data)
|
self.sock.sendall(data)
|
||||||
shared.numberOfBytesSent += len(data)
|
shared.numberOfBytesSent += len(data)
|
||||||
|
self.lastTimeISentData = int(time.time())
|
||||||
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
with shared.printLock:
|
||||||
|
print 'sendDataThread starting. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
|
||||||
while True:
|
while True:
|
||||||
deststream, command, data = self.sendDataThreadQueue.get()
|
deststream, command, data = self.sendDataThreadQueue.get()
|
||||||
|
|
||||||
if deststream == self.streamNumber or deststream == 0:
|
if deststream == self.streamNumber or deststream == 0:
|
||||||
if command == 'shutdown':
|
if command == 'shutdown':
|
||||||
if data == self.peer or data == 'all':
|
with shared.printLock:
|
||||||
with shared.printLock:
|
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
||||||
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
break
|
||||||
break
|
|
||||||
# When you receive an incoming connection, a sendDataThread is
|
# When you receive an incoming connection, a sendDataThread is
|
||||||
# created even though you don't yet know what stream number the
|
# created even though you don't yet know what stream number the
|
||||||
# remote peer is interested in. They will tell you in a version
|
# remote peer is interested in. They will tell you in a version
|
||||||
|
@ -96,49 +96,44 @@ class sendDataThread(threading.Thread):
|
||||||
elif command == 'advertisepeer':
|
elif command == 'advertisepeer':
|
||||||
self.objectHashHolderInstance.holdPeer(data)
|
self.objectHashHolderInstance.holdPeer(data)
|
||||||
elif command == 'sendaddr':
|
elif command == 'sendaddr':
|
||||||
if not self.connectionIsOrWasFullyEstablished:
|
if self.connectionIsOrWasFullyEstablished: # only send addr messages if we have send and heard a verack from the remote node
|
||||||
# not sending addr because we haven't sent and heard a verack from the remote node yet
|
numberOfAddressesInAddrMessage = len(data)
|
||||||
return
|
payload = ''
|
||||||
numberOfAddressesInAddrMessage = len(
|
for hostDetails in data:
|
||||||
data)
|
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
||||||
payload = ''
|
payload += pack(
|
||||||
for hostDetails in data:
|
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
|
||||||
timeLastReceivedMessageFromThisNode, streamNumber, services, host, port = hostDetails
|
payload += pack('>I', streamNumber)
|
||||||
payload += pack(
|
payload += pack(
|
||||||
'>Q', timeLastReceivedMessageFromThisNode) # now uses 64-bit time
|
'>q', services) # service bit flags offered by this node
|
||||||
payload += pack('>I', streamNumber)
|
payload += shared.encodeHost(host)
|
||||||
payload += pack(
|
payload += pack('>H', port)
|
||||||
'>q', services) # service bit flags offered by this node
|
|
||||||
payload += shared.encodeHost(host)
|
|
||||||
payload += pack('>H', port)
|
|
||||||
|
|
||||||
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
||||||
packet = shared.CreatePacket('addr', payload)
|
packet = shared.CreatePacket('addr', payload)
|
||||||
try:
|
try:
|
||||||
self.sendBytes(packet)
|
self.sendBytes(packet)
|
||||||
self.lastTimeISentData = int(time.time())
|
except:
|
||||||
except:
|
with shared.printLock:
|
||||||
print 'sendaddr: self.sock.sendall failed'
|
print 'sendaddr: self.sock.sendall failed'
|
||||||
break
|
break
|
||||||
elif command == 'advertiseobject':
|
elif command == 'advertiseobject':
|
||||||
self.objectHashHolderInstance.holdHash(data)
|
self.objectHashHolderInstance.holdHash(data)
|
||||||
elif command == 'sendinv':
|
elif command == 'sendinv':
|
||||||
if not self.connectionIsOrWasFullyEstablished:
|
if self.connectionIsOrWasFullyEstablished: # only send inv messages if we have send and heard a verack from the remote node
|
||||||
# not sending inv because we haven't sent and heard a verack from the remote node yet
|
payload = ''
|
||||||
return
|
for hash in data:
|
||||||
payload = ''
|
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
for hash in data:
|
payload += hash
|
||||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
if payload != '':
|
||||||
payload += hash
|
payload = encodeVarint(len(payload)/32) + payload
|
||||||
if payload != '':
|
packet = shared.CreatePacket('inv', payload)
|
||||||
payload = encodeVarint(len(payload)/32) + payload
|
try:
|
||||||
packet = shared.CreatePacket('inv', payload)
|
self.sendBytes(packet)
|
||||||
try:
|
except:
|
||||||
self.sendBytes(packet)
|
with shared.printLock:
|
||||||
self.lastTimeISentData = int(time.time())
|
print 'sendinv: self.sock.sendall failed'
|
||||||
except:
|
break
|
||||||
print 'sendinv: self.sock.sendall failed'
|
|
||||||
break
|
|
||||||
elif command == 'pong':
|
elif command == 'pong':
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
||||||
if self.lastTimeISentData < (int(time.time()) - 298):
|
if self.lastTimeISentData < (int(time.time()) - 298):
|
||||||
|
@ -148,16 +143,16 @@ class sendDataThread(threading.Thread):
|
||||||
packet = shared.CreatePacket('pong')
|
packet = shared.CreatePacket('pong')
|
||||||
try:
|
try:
|
||||||
self.sendBytes(packet)
|
self.sendBytes(packet)
|
||||||
self.lastTimeISentData = int(time.time())
|
|
||||||
except:
|
except:
|
||||||
print 'send pong failed'
|
with shared.printLock:
|
||||||
|
print 'send pong failed'
|
||||||
break
|
break
|
||||||
elif command == 'sendRawData':
|
elif command == 'sendRawData':
|
||||||
try:
|
try:
|
||||||
self.sendBytes(data)
|
self.sendBytes(data)
|
||||||
self.lastTimeISentData = int(time.time())
|
|
||||||
except:
|
except:
|
||||||
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
with shared.printLock:
|
||||||
|
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
||||||
break
|
break
|
||||||
elif command == 'connectionIsOrWasFullyEstablished':
|
elif command == 'connectionIsOrWasFullyEstablished':
|
||||||
self.connectionIsOrWasFullyEstablished = True
|
self.connectionIsOrWasFullyEstablished = True
|
||||||
|
@ -172,5 +167,5 @@ class sendDataThread(threading.Thread):
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Number of queues remaining in sendDataQueues:', len(shared.sendDataQueues)
|
print 'sendDataThread ending. ID:', str(id(self))+'. Number of queues in sendDataQueues:', len(shared.sendDataQueues)
|
||||||
self.objectHashHolderInstance.close()
|
self.objectHashHolderInstance.close()
|
||||||
|
|
|
@ -329,7 +329,7 @@ def isProofOfWorkSufficient(
|
||||||
def doCleanShutdown():
|
def doCleanShutdown():
|
||||||
global shutdown
|
global shutdown
|
||||||
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
|
shutdown = 1 #Used to tell proof of work worker threads and the objectProcessorThread to exit.
|
||||||
broadcastToSendDataQueues((0, 'shutdown', 'all'))
|
broadcastToSendDataQueues((0, 'shutdown', 'no data'))
|
||||||
with shared.objectProcessorQueueSizeLock:
|
with shared.objectProcessorQueueSizeLock:
|
||||||
data = 'no data'
|
data = 'no data'
|
||||||
shared.objectProcessorQueueSize += len(data)
|
shared.objectProcessorQueueSize += len(data)
|
||||||
|
|
Reference in New Issue
Block a user