Fix #662 - 'PyBitmessage does not wait for verack'
This commit is contained in:
parent
12d131f7f2
commit
22934441dc
|
@ -147,9 +147,9 @@ class receiveDataThread(threading.Thread):
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.peer
|
print 'remoteCommand', repr(remoteCommand.replace('\x00', '')), ' from', self.peer
|
||||||
|
|
||||||
if remoteCommand == 'version\x00\x00\x00\x00\x00':
|
if remoteCommand == 'version\x00\x00\x00\x00\x00' and not self.connectionIsOrWasFullyEstablished:
|
||||||
self.recversion(self.data[24:self.payloadLength + 24])
|
self.recversion(self.data[24:self.payloadLength + 24])
|
||||||
elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00':
|
elif remoteCommand == 'verack\x00\x00\x00\x00\x00\x00' and not self.connectionIsOrWasFullyEstablished:
|
||||||
self.recverack()
|
self.recverack()
|
||||||
elif remoteCommand == 'addr\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
elif remoteCommand == 'addr\x00\x00\x00\x00\x00\x00\x00\x00' and self.connectionIsOrWasFullyEstablished:
|
||||||
self.recaddr(self.data[24:self.payloadLength + 24])
|
self.recaddr(self.data[24:self.payloadLength + 24])
|
||||||
|
@ -237,7 +237,12 @@ class receiveDataThread(threading.Thread):
|
||||||
self.connectionFullyEstablished()
|
self.connectionFullyEstablished()
|
||||||
|
|
||||||
def connectionFullyEstablished(self):
|
def connectionFullyEstablished(self):
|
||||||
|
if self.connectionIsOrWasFullyEstablished:
|
||||||
|
# there is no reason to run this function a second time
|
||||||
|
return
|
||||||
self.connectionIsOrWasFullyEstablished = True
|
self.connectionIsOrWasFullyEstablished = True
|
||||||
|
# Command the corresponding sendDataThread to set its own connectionIsOrWasFullyEstablished variable to True also
|
||||||
|
self.sendDataThreadQueue.put((0, 'connectionIsOrWasFullyEstablished', 'no data'))
|
||||||
if not self.initiatedConnection:
|
if not self.initiatedConnection:
|
||||||
shared.clientHasReceivedIncomingConnections = True
|
shared.clientHasReceivedIncomingConnections = True
|
||||||
shared.UISignalQueue.put(('setStatusIcon', 'green'))
|
shared.UISignalQueue.put(('setStatusIcon', 'green'))
|
||||||
|
@ -301,8 +306,9 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sendinvMessageToJustThisOnePeer(
|
self.sendinvMessageToJustThisOnePeer(
|
||||||
numberOfObjectsInInvMessage, payload)
|
numberOfObjectsInInvMessage, payload)
|
||||||
|
|
||||||
# Self explanatory. Notice that there is also a broadcastinv function for
|
# Used to send a big inv message when the connection with a node is
|
||||||
# broadcasting invs to everyone in our stream.
|
# first fully established. Notice that there is also a broadcastinv
|
||||||
|
# function for broadcasting invs to everyone in our stream.
|
||||||
def sendinvMessageToJustThisOnePeer(self, numberOfObjects, payload):
|
def sendinvMessageToJustThisOnePeer(self, numberOfObjects, payload):
|
||||||
payload = encodeVarint(numberOfObjects) + payload
|
payload = encodeVarint(numberOfObjects) + payload
|
||||||
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
headerData = '\xe9\xbe\xb4\xd9' # magic bits, slighly different from Bitcoin's magic bits.
|
||||||
|
@ -665,7 +671,10 @@ class receiveDataThread(threading.Thread):
|
||||||
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
print 'knownNodes currently has', len(shared.knownNodes[self.streamNumber]), 'nodes for this stream.'
|
||||||
|
|
||||||
|
|
||||||
# Send a big addr message to our peer
|
# Send a huge addr message to our peer. This is only used
|
||||||
|
# when we fully establish a connection with a
|
||||||
|
# peer (with the full exchange of version and verack
|
||||||
|
# messages).
|
||||||
def sendaddr(self):
|
def sendaddr(self):
|
||||||
addrsInMyStream = {}
|
addrsInMyStream = {}
|
||||||
addrsInChildStreamLeft = {}
|
addrsInChildStreamLeft = {}
|
||||||
|
@ -750,58 +759,68 @@ class receiveDataThread(threading.Thread):
|
||||||
if len(data) < 83:
|
if len(data) < 83:
|
||||||
# This version message is unreasonably short. Forget it.
|
# This version message is unreasonably short. Forget it.
|
||||||
return
|
return
|
||||||
elif not self.verackSent:
|
if self.verackSent:
|
||||||
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
"""
|
||||||
if self.remoteProtocolVersion <= 1:
|
We must have already processed the remote node's version message.
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
There might be a time in the future when we Do want to process
|
||||||
with shared.printLock:
|
a new version message, like if the remote node wants to update
|
||||||
print 'Closing connection to old protocol version 1 node: ', self.peer
|
the streams in which they are interested. But for now we'll
|
||||||
return
|
ignore this version message
|
||||||
# print 'remoteProtocolVersion', self.remoteProtocolVersion
|
"""
|
||||||
self.myExternalIP = socket.inet_ntoa(data[40:44])
|
return
|
||||||
# print 'myExternalIP', self.myExternalIP
|
self.remoteProtocolVersion, = unpack('>L', data[:4])
|
||||||
self.remoteNodeIncomingPort, = unpack('>H', data[70:72])
|
if self.remoteProtocolVersion <= 1:
|
||||||
# print 'remoteNodeIncomingPort', self.remoteNodeIncomingPort
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
useragentLength, lengthOfUseragentVarint = decodeVarint(
|
|
||||||
data[80:84])
|
|
||||||
readPosition = 80 + lengthOfUseragentVarint
|
|
||||||
useragent = data[readPosition:readPosition + useragentLength]
|
|
||||||
readPosition += useragentLength
|
|
||||||
numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(
|
|
||||||
data[readPosition:])
|
|
||||||
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
|
||||||
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
|
|
||||||
data[readPosition:])
|
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
print 'Closing connection to old protocol version 1 node: ', self.peer
|
||||||
|
return
|
||||||
|
# print 'remoteProtocolVersion', self.remoteProtocolVersion
|
||||||
|
self.myExternalIP = socket.inet_ntoa(data[40:44])
|
||||||
|
# print 'myExternalIP', self.myExternalIP
|
||||||
|
self.remoteNodeIncomingPort, = unpack('>H', data[70:72])
|
||||||
|
# print 'remoteNodeIncomingPort', self.remoteNodeIncomingPort
|
||||||
|
useragentLength, lengthOfUseragentVarint = decodeVarint(
|
||||||
|
data[80:84])
|
||||||
|
readPosition = 80 + lengthOfUseragentVarint
|
||||||
|
useragent = data[readPosition:readPosition + useragentLength]
|
||||||
|
readPosition += useragentLength
|
||||||
|
numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(
|
||||||
|
data[readPosition:])
|
||||||
|
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
||||||
|
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
|
||||||
|
data[readPosition:])
|
||||||
|
with shared.printLock:
|
||||||
|
print 'Remote node useragent:', useragent, ' stream number:', self.streamNumber
|
||||||
|
|
||||||
if self.streamNumber != 1:
|
if self.streamNumber != 1:
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
|
print 'Closed connection to', self.peer, 'because they are interested in stream', self.streamNumber, '.'
|
||||||
return
|
return
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
self.peer.host] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
||||||
# If this was an incoming connection, then the sendData thread
|
# If this was an incoming connection, then the sendData thread
|
||||||
# doesn't know the stream. We have to set it.
|
# doesn't know the stream. We have to set it.
|
||||||
if not self.initiatedConnection:
|
if not self.initiatedConnection:
|
||||||
shared.broadcastToSendDataQueues((
|
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber))
|
||||||
0, 'setStreamNumber', (self.peer, self.streamNumber)))
|
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
||||||
if data[72:80] == shared.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
||||||
shared.broadcastToSendDataQueues((0, 'shutdown', self.peer))
|
with shared.printLock:
|
||||||
with shared.printLock:
|
print 'Closing connection to myself: ', self.peer
|
||||||
print 'Closing connection to myself: ', self.peer
|
return
|
||||||
return
|
|
||||||
self.sendDataThreadQueue.put((0, 'setRemoteProtocolVersion', self.remoteProtocolVersion))
|
# The other peer's protocol version is of interest to the sendDataThread but we learn of it
|
||||||
|
# in this version message. Let us inform the sendDataThread.
|
||||||
|
self.sendDataThreadQueue.put((0, 'setRemoteProtocolVersion', self.remoteProtocolVersion))
|
||||||
|
|
||||||
shared.knownNodesLock.acquire()
|
shared.knownNodesLock.acquire()
|
||||||
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
shared.knownNodes[self.streamNumber][shared.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||||
shared.needToWriteKnownNodesToDisk = True
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
shared.knownNodesLock.release()
|
shared.knownNodesLock.release()
|
||||||
|
|
||||||
self.sendverack()
|
self.sendverack()
|
||||||
if self.initiatedConnection == False:
|
if self.initiatedConnection == False:
|
||||||
self.sendversion()
|
self.sendversion()
|
||||||
|
|
||||||
# Sends a version message
|
# Sends a version message
|
||||||
def sendversion(self):
|
def sendversion(self):
|
||||||
|
|
|
@ -25,6 +25,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.data = ''
|
self.data = ''
|
||||||
self.objectHashHolderInstance = objectHashHolder(self.sendDataThreadQueue)
|
self.objectHashHolderInstance = objectHashHolder(self.sendDataThreadQueue)
|
||||||
self.objectHashHolderInstance.start()
|
self.objectHashHolderInstance.start()
|
||||||
|
self.connectionIsOrWasFullyEstablished = False
|
||||||
|
|
||||||
|
|
||||||
def setup(
|
def setup(
|
||||||
|
@ -71,16 +72,6 @@ class sendDataThread(threading.Thread):
|
||||||
if data == self.peer or data == 'all':
|
if data == self.peer or data == 'all':
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'shutting down now.'
|
||||||
|
|
||||||
try:
|
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
|
||||||
self.sock.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
|
||||||
with shared.printLock:
|
|
||||||
print 'len of sendDataQueues', len(shared.sendDataQueues)
|
|
||||||
|
|
||||||
break
|
break
|
||||||
# When you receive an incoming connection, a sendDataThread is
|
# When you receive an incoming connection, a sendDataThread is
|
||||||
# created even though you don't yet know what stream number the
|
# created even though you don't yet know what stream number the
|
||||||
|
@ -89,12 +80,9 @@ class sendDataThread(threading.Thread):
|
||||||
# will continue on with the connection and will set the
|
# will continue on with the connection and will set the
|
||||||
# streamNumber of this send data thread here:
|
# streamNumber of this send data thread here:
|
||||||
elif command == 'setStreamNumber':
|
elif command == 'setStreamNumber':
|
||||||
peerInMessage, specifiedStreamNumber = data
|
self.streamNumber = data
|
||||||
if peerInMessage == self.peer:
|
with shared.printLock:
|
||||||
with shared.printLock:
|
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', self.streamNumber
|
||||||
print 'setting the stream number in the sendData thread (ID:', id(self), ') to', specifiedStreamNumber
|
|
||||||
|
|
||||||
self.streamNumber = specifiedStreamNumber
|
|
||||||
elif command == 'setRemoteProtocolVersion':
|
elif command == 'setRemoteProtocolVersion':
|
||||||
specifiedRemoteProtocolVersion = data
|
specifiedRemoteProtocolVersion = data
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
|
@ -103,6 +91,9 @@ class sendDataThread(threading.Thread):
|
||||||
elif command == 'advertisepeer':
|
elif command == 'advertisepeer':
|
||||||
self.objectHashHolderInstance.holdPeer(data)
|
self.objectHashHolderInstance.holdPeer(data)
|
||||||
elif command == 'sendaddr':
|
elif command == 'sendaddr':
|
||||||
|
if not self.connectionIsOrWasFullyEstablished:
|
||||||
|
# not sending addr because we haven't sent and heard a verack from the remote node yet
|
||||||
|
return
|
||||||
numberOfAddressesInAddrMessage = len(
|
numberOfAddressesInAddrMessage = len(
|
||||||
data)
|
data)
|
||||||
payload = ''
|
payload = ''
|
||||||
|
@ -127,17 +118,13 @@ class sendDataThread(threading.Thread):
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'sendaddr: self.sock.sendall failed'
|
print 'sendaddr: self.sock.sendall failed'
|
||||||
try:
|
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
|
||||||
self.sock.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
|
||||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
|
||||||
break
|
break
|
||||||
elif command == 'advertiseobject':
|
elif command == 'advertiseobject':
|
||||||
self.objectHashHolderInstance.holdHash(data)
|
self.objectHashHolderInstance.holdHash(data)
|
||||||
elif command == 'sendinv':
|
elif command == 'sendinv':
|
||||||
|
if not self.connectionIsOrWasFullyEstablished:
|
||||||
|
# not sending inv because we haven't sent and heard a verack from the remote node yet
|
||||||
|
return
|
||||||
payload = ''
|
payload = ''
|
||||||
for hash in data:
|
for hash in data:
|
||||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware:
|
||||||
|
@ -153,13 +140,6 @@ class sendDataThread(threading.Thread):
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'sendinv: self.sock.sendall failed'
|
print 'sendinv: self.sock.sendall failed'
|
||||||
try:
|
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
|
||||||
self.sock.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
|
||||||
print 'sendDataThread thread (ID:', str(id(self)) + ') ending now. Was connected to', self.peer
|
|
||||||
break
|
break
|
||||||
elif command == 'pong':
|
elif command == 'pong':
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware.clear() # To save memory, let us clear this data structure from time to time. As its function is to help us keep from sending inv messages to peers which sent us the same inv message mere seconds earlier, it will be fine to clear this data structure from time to time.
|
||||||
|
@ -174,29 +154,28 @@ class sendDataThread(threading.Thread):
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
print 'send pong failed'
|
print 'send pong failed'
|
||||||
try:
|
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
|
||||||
self.sock.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
|
||||||
print 'sendDataThread thread', self, 'ending now. Was connected to', self.peer
|
|
||||||
break
|
break
|
||||||
elif command == 'sendRawData':
|
elif command == 'sendRawData':
|
||||||
try:
|
try:
|
||||||
self.sock.sendall(data)
|
self.sock.sendall(data)
|
||||||
self.lastTimeISentData = int(time.time())
|
self.lastTimeISentData = int(time.time())
|
||||||
except:
|
except:
|
||||||
try:
|
|
||||||
self.sock.shutdown(socket.SHUT_RDWR)
|
|
||||||
self.sock.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
|
||||||
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
print 'Sending of data to', self.peer, 'failed. sendDataThread thread', self, 'ending now.'
|
||||||
break
|
break
|
||||||
|
elif command == 'connectionIsOrWasFullyEstablished':
|
||||||
|
with shared.printLock:
|
||||||
|
print 'sendDataThread (associated with', self.peer, ') ID:', id(self), 'setting connectionIsOrWasFullyEstablished to True.'
|
||||||
|
self.connectionIsOrWasFullyEstablished = True
|
||||||
else:
|
else:
|
||||||
with shared.printLock:
|
with shared.printLock:
|
||||||
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
print 'sendDataThread ID:', id(self), 'ignoring command', command, 'because the thread is not in stream', deststream
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.sock.shutdown(socket.SHUT_RDWR)
|
||||||
|
self.sock.close()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
shared.sendDataQueues.remove(self.sendDataThreadQueue)
|
||||||
|
with shared.printLock:
|
||||||
|
print 'Number of queues remaining in sendDataQueues:', len(shared.sendDataQueues)
|
||||||
self.objectHashHolderInstance.close()
|
self.objectHashHolderInstance.close()
|
||||||
|
|
|
@ -11,17 +11,17 @@ def createDefaultKnownNodes(appdata):
|
||||||
############## Stream 1 ################
|
############## Stream 1 ################
|
||||||
stream1 = {}
|
stream1 = {}
|
||||||
|
|
||||||
stream1[shared.Peer('176.31.246.114', 8444)] = int(time.time())
|
#stream1[shared.Peer('2604:2000:1380:9f:82e:148b:2746:d0c7', 8080)] = int(time.time())
|
||||||
stream1[shared.Peer('109.229.197.133', 8444)] = int(time.time())
|
stream1[shared.Peer('68.33.0.104', 8444)] = int(time.time())
|
||||||
stream1[shared.Peer('174.3.101.111', 8444)] = int(time.time())
|
stream1[shared.Peer('97.77.34.35', 8444)] = int(time.time())
|
||||||
stream1[shared.Peer('90.188.238.79', 7829)] = int(time.time())
|
stream1[shared.Peer('71.232.195.131', 8444)] = int(time.time())
|
||||||
stream1[shared.Peer('184.75.69.2', 8444)] = int(time.time())
|
stream1[shared.Peer('192.241.231.39', 8444)] = int(time.time())
|
||||||
stream1[shared.Peer('60.225.209.243', 8444)] = int(time.time())
|
stream1[shared.Peer('75.66.0.116', 8444)] = int(time.time())
|
||||||
stream1[shared.Peer('5.145.140.218', 8444)] = int(time.time())
|
stream1[shared.Peer('182.169.23.102', 8444)] = int(time.time())
|
||||||
stream1[shared.Peer('5.19.255.216', 8444)] = int(time.time())
|
stream1[shared.Peer('75.95.134.9', 8444)] = int(time.time())
|
||||||
stream1[shared.Peer('193.159.162.189', 8444)] = int(time.time())
|
stream1[shared.Peer('46.236.100.108', 48444)] = int(time.time())
|
||||||
stream1[shared.Peer('86.26.15.171', 8444)] = int(time.time())
|
stream1[shared.Peer('66.108.53.42', 8080)] = int(time.time())
|
||||||
|
|
||||||
############# Stream 2 #################
|
############# Stream 2 #################
|
||||||
stream2 = {}
|
stream2 = {}
|
||||||
# None yet
|
# None yet
|
||||||
|
|
Loading…
Reference in New Issue
Block a user