fix #590
This commit is contained in:
parent
9fdff73ee1
commit
61389b64aa
|
@ -2617,7 +2617,6 @@ class MyForm(QtGui.QMainWindow):
|
||||||
currentInboxRow = self.ui.tableWidgetInbox.currentRow()
|
currentInboxRow = self.ui.tableWidgetInbox.currentRow()
|
||||||
toAddressAtCurrentInboxRow = str(self.ui.tableWidgetInbox.item(
|
toAddressAtCurrentInboxRow = str(self.ui.tableWidgetInbox.item(
|
||||||
currentInboxRow, 0).data(Qt.UserRole).toPyObject())
|
currentInboxRow, 0).data(Qt.UserRole).toPyObject())
|
||||||
|
|
||||||
fromAddressAtCurrentInboxRow = str(self.ui.tableWidgetInbox.item(
|
fromAddressAtCurrentInboxRow = str(self.ui.tableWidgetInbox.item(
|
||||||
currentInboxRow, 1).data(Qt.UserRole).toPyObject())
|
currentInboxRow, 1).data(Qt.UserRole).toPyObject())
|
||||||
msgid = str(self.ui.tableWidgetInbox.item(
|
msgid = str(self.ui.tableWidgetInbox.item(
|
||||||
|
|
|
@ -113,14 +113,20 @@ class outgoingSynSender(threading.Thread):
|
||||||
rd = receiveDataThread()
|
rd = receiveDataThread()
|
||||||
rd.daemon = True # close the main program even if there are threads left
|
rd.daemon = True # close the main program even if there are threads left
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
||||||
rd.setup(sock, peer.host, peer.port, self.streamNumber,
|
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
rd.setup(sock,
|
||||||
|
peer.host,
|
||||||
|
peer.port,
|
||||||
|
self.streamNumber,
|
||||||
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
|
||||||
|
self.selfInitiatedConnections,
|
||||||
|
sendDataThreadQueue)
|
||||||
rd.start()
|
rd.start()
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print self, 'connected to', peer, 'during an outgoing attempt.'
|
print self, 'connected to', peer, 'during an outgoing attempt.'
|
||||||
|
|
||||||
|
|
||||||
sd = sendDataThread()
|
sd = sendDataThread(sendDataThreadQueue)
|
||||||
sd.setup(sock, peer.host, peer.port, self.streamNumber,
|
sd.setup(sock, peer.host, peer.port, self.streamNumber,
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
sd.start()
|
sd.start()
|
||||||
|
|
|
@ -40,14 +40,17 @@ class receiveDataThread(threading.Thread):
|
||||||
HOST,
|
HOST,
|
||||||
port,
|
port,
|
||||||
streamNumber,
|
streamNumber,
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware,
|
||||||
selfInitiatedConnections):
|
selfInitiatedConnections,
|
||||||
|
sendDataThreadQueue):
|
||||||
|
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.peer = shared.Peer(HOST, port)
|
self.peer = shared.Peer(HOST, port)
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
|
self.payloadLength = 0 # This is the protocol payload length thus it doesn't include the 24 byte message header
|
||||||
self.objectsThatWeHaveYetToGetFromThisPeer = {}
|
self.objectsThatWeHaveYetToGetFromThisPeer = {}
|
||||||
self.selfInitiatedConnections = selfInitiatedConnections
|
self.selfInitiatedConnections = selfInitiatedConnections
|
||||||
|
self.sendDataThreadQueue = sendDataThreadQueue # used to send commands and data to the sendDataThread
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.peer.host] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
self.peer.host] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
||||||
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
||||||
|
@ -218,13 +221,7 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
def sendpong(self):
|
def sendpong(self):
|
||||||
print 'Sending pong'
|
print 'Sending pong'
|
||||||
try:
|
self.sendDataThreadQueue.put((0, 'sendRawData', '\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35'))
|
||||||
self.sock.sendall(
|
|
||||||
'\xE9\xBE\xB4\xD9\x70\x6F\x6E\x67\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
|
||||||
except Exception as err:
|
|
||||||
# if not 'Bad file descriptor' in err:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'sock.sendall error:', err
|
|
||||||
|
|
||||||
|
|
||||||
def recverack(self):
|
def recverack(self):
|
||||||
|
@ -309,13 +306,7 @@ class receiveDataThread(threading.Thread):
|
||||||
headerData += hashlib.sha512(payload).digest()[:4]
|
headerData += hashlib.sha512(payload).digest()[:4]
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
|
print 'Sending huge inv message with', numberOfObjects, 'objects to just this one peer'
|
||||||
|
self.sendDataThreadQueue.put((0, 'sendRawData', headerData + payload))
|
||||||
try:
|
|
||||||
self.sock.sendall(headerData + payload)
|
|
||||||
except Exception as err:
|
|
||||||
# if not 'Bad file descriptor' in err:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'sock.sendall error:', err
|
|
||||||
|
|
||||||
|
|
||||||
# We have received a broadcast message
|
# We have received a broadcast message
|
||||||
|
@ -462,12 +453,7 @@ class receiveDataThread(threading.Thread):
|
||||||
headerData += pack('>L', len(
|
headerData += pack('>L', len(
|
||||||
payload)) # payload length. Note that we add an extra 8 for the nonce.
|
payload)) # payload length. Note that we add an extra 8 for the nonce.
|
||||||
headerData += hashlib.sha512(payload).digest()[:4]
|
headerData += hashlib.sha512(payload).digest()[:4]
|
||||||
try:
|
self.sendDataThreadQueue.put((0, 'sendRawData', headerData + payload))
|
||||||
self.sock.sendall(headerData + payload)
|
|
||||||
except Exception as err:
|
|
||||||
# if not 'Bad file descriptor' in err:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'sock.sendall error:', err
|
|
||||||
|
|
||||||
|
|
||||||
# We have received a getdata request from our peer
|
# We have received a getdata request from our peer
|
||||||
|
@ -531,12 +517,7 @@ class receiveDataThread(threading.Thread):
|
||||||
return
|
return
|
||||||
headerData += pack('>L', len(payload)) # payload length.
|
headerData += pack('>L', len(payload)) # payload length.
|
||||||
headerData += hashlib.sha512(payload).digest()[:4]
|
headerData += hashlib.sha512(payload).digest()[:4]
|
||||||
try:
|
self.sendDataThreadQueue.put((0, 'sendRawData', headerData + payload))
|
||||||
self.sock.sendall(headerData + payload)
|
|
||||||
except Exception as err:
|
|
||||||
# if not 'Bad file descriptor' in err:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'sock.sendall error:', err
|
|
||||||
|
|
||||||
|
|
||||||
# Advertise this object to all of our peers
|
# Advertise this object to all of our peers
|
||||||
|
@ -735,16 +716,7 @@ class receiveDataThread(threading.Thread):
|
||||||
datatosend = datatosend + pack('>L', len(payload)) # payload length
|
datatosend = datatosend + pack('>L', len(payload)) # payload length
|
||||||
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
datatosend = datatosend + hashlib.sha512(payload).digest()[0:4]
|
||||||
datatosend = datatosend + payload
|
datatosend = datatosend + payload
|
||||||
try:
|
self.sendDataThreadQueue.put((0, 'sendRawData', datatosend))
|
||||||
self.sock.sendall(datatosend)
|
|
||||||
if shared.verbose >= 1:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'Sending addr with', numberOfAddressesInAddrMessage, 'entries.'
|
|
||||||
|
|
||||||
except Exception as err:
|
|
||||||
# if not 'Bad file descriptor' in err:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'sock.sendall error:', err
|
|
||||||
|
|
||||||
|
|
||||||
# We have received a version message
|
# We have received a version message
|
||||||
|
@ -794,8 +766,7 @@ class receiveDataThread(threading.Thread):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closing connection to myself: ', self.peer
|
print 'Closing connection to myself: ', self.peer
|
||||||
return
|
return
|
||||||
shared.broadcastToSendDataQueues((0, 'setRemoteProtocolVersion', (
|
self.sendDataThreadQueue.put((0, 'setRemoteProtocolVersion', self.remoteProtocolVersion))
|
||||||
self.peer, self.remoteProtocolVersion)))
|
|
||||||
|
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||||
|
@ -810,27 +781,15 @@ class receiveDataThread(threading.Thread):
|
||||||
def sendversion(self):
|
def sendversion(self):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Sending version message'
|
print 'Sending version message'
|
||||||
|
self.sendDataThreadQueue.put((0, 'sendRawData', shared.assembleVersionMessage(
|
||||||
try:
|
self.peer.host, self.peer.port, self.streamNumber)))
|
||||||
self.sock.sendall(shared.assembleVersionMessage(
|
|
||||||
self.peer.host, self.peer.port, self.streamNumber))
|
|
||||||
except Exception as err:
|
|
||||||
# if not 'Bad file descriptor' in err:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'sock.sendall error:', err
|
|
||||||
|
|
||||||
|
|
||||||
# Sends a verack message
|
# Sends a verack message
|
||||||
def sendverack(self):
|
def sendverack(self):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Sending verack'
|
print 'Sending verack'
|
||||||
try:
|
self.sendDataThreadQueue.put((0, 'sendRawData', '\xE9\xBE\xB4\xD9\x76\x65\x72\x61\x63\x6B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35'))
|
||||||
self.sock.sendall(
|
|
||||||
'\xE9\xBE\xB4\xD9\x76\x65\x72\x61\x63\x6B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xcf\x83\xe1\x35')
|
|
||||||
except Exception as err:
|
|
||||||
# if not 'Bad file descriptor' in err:
|
|
||||||
with shared.printLock:
|
|
||||||
print 'sock.sendall error:', err
|
|
||||||
self.verackSent = True
|
self.verackSent = True
|
||||||
if self.verackReceived:
|
if self.verackReceived:
|
||||||
self.connectionFullyEstablished()
|
self.connectionFullyEstablished()
|
||||||
|
|
|
@ -15,15 +15,15 @@ from addresses import *
|
||||||
# receiveDataThread).
|
# receiveDataThread).
|
||||||
class sendDataThread(threading.Thread):
|
class sendDataThread(threading.Thread):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, sendDataThreadQueue):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.mailbox = Queue.Queue()
|
self.sendDataThreadQueue = sendDataThreadQueue
|
||||||
shared.sendDataQueues.append(self.mailbox)
|
shared.sendDataQueues.append(self.sendDataThreadQueue)
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
print 'The length of sendDataQueues at sendDataThread init is:', len(shared.sendDataQueues)
|
||||||
|
|
||||||
self.data = ''
|
self.data = ''
|
||||||
self.objectHashHolderInstance = objectHashHolder(self.mailbox)
|
self.objectHashHolderInstance = objectHashHolder(self.sendDataThreadQueue)
|
||||||
self.objectHashHolderInstance.start()
|
self.objectHashHolderInstance.start()
|
||||||
|
|
||||||
|
|
||||||
|
@ -38,7 +38,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.peer = shared.Peer(HOST, PORT)
|
self.peer = shared.Peer(HOST, PORT)
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = streamNumber
|
||||||
self.remoteProtocolVersion = - \
|
self.remoteProtocolVersion = - \
|
||||||
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.mailbox queue.
|
1 # This must be set using setRemoteProtocolVersion command which is sent through the self.sendDataThreadQueue queue.
|
||||||
self.lastTimeISentData = int(
|
self.lastTimeISentData = int(
|
||||||
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||||
|
@ -64,7 +64,7 @@ class sendDataThread(threading.Thread):
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
deststream, command, data = self.mailbox.get()
|
deststream, command, data = self.sendDataThreadQueue.get()
|
||||||
|
|
||||||
if deststream == self.streamNumber or deststream == 0:
|
if deststream == self.streamNumber or deststream == 0:
|
||||||
if command == 'shutdown':
|
if command == 'shutdown':
|
||||||
|
@ -77,7 +77,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'len of sendDataQueues', len(shared.sendDataQueues)
|
print 'len of sendDataQueues', len(shared.sendDataQueues)
|
||||||
|
|
||||||
|
@ -96,12 +96,10 @@ class sendDataThread(threading.Thread):
|
||||||
|
|
||||||
self.streamNumber = specifiedStreamNumber
|
self.streamNumber = specifiedStreamNumber
|
||||||
elif command == 'setRemoteProtocolVersion':
|
elif command == 'setRemoteProtocolVersion':
|
||||||
peerInMessage, specifiedRemoteProtocolVersion = data
|
specifiedRemoteProtocolVersion = data
|
||||||
if peerInMessage == self.peer:
|
with shared.printLock:
|
||||||
with shared.printLock:
|
print 'setting the remote node\'s protocol version in the sendDataThread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
||||||
print 'setting the remote node\'s protocol version in the sendData thread (ID:', id(self), ') to', specifiedRemoteProtocolVersion
|
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
||||||
|
|
||||||
self.remoteProtocolVersion = specifiedRemoteProtocolVersion
|
|
||||||
elif command == 'advertisepeer':
|
elif command == 'advertisepeer':
|
||||||
self.objectHashHolderInstance.holdPeer(data)
|
self.objectHashHolderInstance.holdPeer(data)
|
||||||
elif command == 'sendaddr':
|
elif command == 'sendaddr':
|
||||||
|
@ -135,7 +133,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
||||||
break
|
break
|
||||||
elif command == 'advertiseobject':
|
elif command == 'advertiseobject':
|
||||||
|
@ -161,7 +159,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
||||||
break
|
break
|
||||||
elif command == 'pong':
|
elif command == 'pong':
|
||||||
|
@ -182,9 +180,22 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock.close()
|
self.sock.close()
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
shared.sendDataQueues.remove(self.mailbox)
|
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||||
print 'sendDataThread thread', self, 'ending now. Was connected to', self.peer
|
print 'sendDataThread thread', self, 'ending now. Was connected to', self.peer
|
||||||
break
|
break
|
||||||
|
elif command == 'sendRawData':
|
||||||
|
try:
|
||||||
|
self.sock.sendall(data)
|
||||||
|
self.lastTimeISentData = int(time.time())
|
||||||
|
except:
|
||||||
|
try:
|
||||||
|
self.sock.shutdown(socket.SHUT_RDWR)
|
||||||
|
self.sock.close()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||||
|
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
||||||
|
break
|
||||||
else:
|
else:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
||||||
|
|
|
@ -69,9 +69,10 @@ class singleListener(threading.Thread):
|
||||||
a.close()
|
a.close()
|
||||||
a, (HOST, PORT) = sock.accept()
|
a, (HOST, PORT) = sock.accept()
|
||||||
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
someObjectsOfWhichThisRemoteNodeIsAlreadyAware = {} # This is not necessairly a complete list; we clear it from time to time to save memory.
|
||||||
|
sendDataThreadQueue = Queue.Queue() # Used to submit information to the send data thread for this connection.
|
||||||
a.settimeout(20)
|
a.settimeout(20)
|
||||||
|
|
||||||
sd = sendDataThread()
|
sd = sendDataThread(sendDataThreadQueue)
|
||||||
sd.setup(
|
sd.setup(
|
||||||
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware)
|
||||||
sd.start()
|
sd.start()
|
||||||
|
@ -79,7 +80,7 @@ class singleListener(threading.Thread):
|
||||||
rd = receiveDataThread()
|
rd = receiveDataThread()
|
||||||
rd.daemon = True # close the main program even if there are threads left
|
rd.daemon = True # close the main program even if there are threads left
|
||||||
rd.setup(
|
rd.setup(
|
||||||
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections)
|
a, HOST, PORT, -1, someObjectsOfWhichThisRemoteNodeIsAlreadyAware, self.selfInitiatedConnections, sendDataThreadQueue)
|
||||||
rd.start()
|
rd.start()
|
||||||
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
|
|
Reference in New Issue
Block a user