Improved stream handling
- version command sends list of all participating streams - biginv sends lists of hosts for all streams the peer wants (plus immediate children) - objects will spread to all peers that advertise the associated stream - please note these are just network subsystem adjustments, streams aren't actually usable yet
This commit is contained in:
parent
79b566a907
commit
f6bdad18a3
|
@ -51,7 +51,7 @@ import helper_generic
|
||||||
from helper_threading import *
|
from helper_threading import *
|
||||||
|
|
||||||
def connectToStream(streamNumber):
|
def connectToStream(streamNumber):
|
||||||
state.streamsInWhichIAmParticipating[streamNumber] = 'no data'
|
state.streamsInWhichIAmParticipating.append(streamNumber)
|
||||||
selfInitiatedConnections[streamNumber] = {}
|
selfInitiatedConnections[streamNumber] = {}
|
||||||
|
|
||||||
if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections():
|
if isOurOperatingSystemLimitedToHavingVeryFewHalfOpenConnections():
|
||||||
|
|
|
@ -62,7 +62,8 @@ class receiveDataThread(threading.Thread):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.peer = state.Peer(HOST, port)
|
self.peer = state.Peer(HOST, port)
|
||||||
self.name = "receiveData-" + self.peer.host.replace(":", ".") # ":" log parser field separator
|
self.name = "receiveData-" + self.peer.host.replace(":", ".") # ":" log parser field separator
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = state.streamsInWhichIAmParticipating
|
||||||
|
self.remoteStreams = []
|
||||||
self.selfInitiatedConnections = selfInitiatedConnections
|
self.selfInitiatedConnections = selfInitiatedConnections
|
||||||
self.sendDataThreadQueue = sendDataThreadQueue # used to send commands and data to the sendDataThread
|
self.sendDataThreadQueue = sendDataThreadQueue # used to send commands and data to the sendDataThread
|
||||||
self.hostIdent = self.peer.port if ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname') and protocol.checkSocksIP(self.peer.host) else self.peer.host
|
self.hostIdent = self.peer.port if ".onion" in BMConfigParser().get('bitmessagesettings', 'onionhostname') and protocol.checkSocksIP(self.peer.host) else self.peer.host
|
||||||
|
@ -70,7 +71,7 @@ class receiveDataThread(threading.Thread):
|
||||||
self.hostIdent] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
self.hostIdent] = 0 # The very fact that this receiveData thread exists shows that we are connected to the remote host. Let's add it to this list so that an outgoingSynSender thread doesn't try to connect to it.
|
||||||
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
self.connectionIsOrWasFullyEstablished = False # set to true after the remote node and I accept each other's version messages. This is needed to allow the user interface to accurately reflect the current number of connections.
|
||||||
self.services = 0
|
self.services = 0
|
||||||
if self.streamNumber == -1: # This was an incoming connection. Send out a version message if we accept the other node's version message.
|
if streamNumber == -1: # This was an incoming connection. Send out a version message if we accept the other node's version message.
|
||||||
self.initiatedConnection = False
|
self.initiatedConnection = False
|
||||||
else:
|
else:
|
||||||
self.initiatedConnection = True
|
self.initiatedConnection = True
|
||||||
|
@ -120,7 +121,11 @@ class receiveDataThread(threading.Thread):
|
||||||
self.processData()
|
self.processData()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
del self.selfInitiatedConnections[self.streamNumber][self]
|
for stream in self.remoteStreams:
|
||||||
|
try:
|
||||||
|
del self.selfInitiatedConnections[stream][self]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
logger.debug('removed self (a receiveDataThread) from selfInitiatedConnections')
|
logger.debug('removed self (a receiveDataThread) from selfInitiatedConnections')
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
@ -137,7 +142,8 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
def antiIntersectionDelay(self, initial = False):
|
def antiIntersectionDelay(self, initial = False):
|
||||||
# estimated time for a small object to propagate across the whole network
|
# estimated time for a small object to propagate across the whole network
|
||||||
delay = math.ceil(math.log(len(shared.knownNodes[self.streamNumber]) + 2, 20)) * (0.2 + objectHashHolder.size/2)
|
delay = math.ceil(math.log(max(len(shared.knownNodes[x]) for x in shared.knownNodes) + 2, 20)) * (0.2 + objectHashHolder.size/2)
|
||||||
|
# take the stream with maximum amount of nodes
|
||||||
# +2 is to avoid problems with log(0) and log(1)
|
# +2 is to avoid problems with log(0) and log(1)
|
||||||
# 20 is avg connected nodes count
|
# 20 is avg connected nodes count
|
||||||
# 0.2 is avg message transmission time
|
# 0.2 is avg message transmission time
|
||||||
|
@ -182,7 +188,8 @@ class receiveDataThread(threading.Thread):
|
||||||
# that other peers can be made aware of its existance.
|
# that other peers can be made aware of its existance.
|
||||||
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
if self.initiatedConnection and self.connectionIsOrWasFullyEstablished: # The remote port is only something we should share with others if it is the remote node's incoming port (rather than some random operating-system-assigned outgoing port).
|
||||||
with shared.knownNodesLock:
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[self.streamNumber][self.peer] = int(time.time())
|
for stream in self.streamNumber:
|
||||||
|
shared.knownNodes[stream][self.peer] = int(time.time())
|
||||||
|
|
||||||
#Strip the nulls
|
#Strip the nulls
|
||||||
command = command.rstrip('\x00')
|
command = command.rstrip('\x00')
|
||||||
|
@ -313,9 +320,10 @@ class receiveDataThread(threading.Thread):
|
||||||
PendingUpload().add()
|
PendingUpload().add()
|
||||||
|
|
||||||
# Let all of our peers know about this new node.
|
# Let all of our peers know about this new node.
|
||||||
dataToSend = (int(time.time()), self.streamNumber, 1, self.peer.host, self.remoteNodeIncomingPort)
|
for stream in self.remoteStreams:
|
||||||
protocol.broadcastToSendDataQueues((
|
dataToSend = (int(time.time()), stream, self.services, self.peer.host, self.remoteNodeIncomingPort)
|
||||||
self.streamNumber, 'advertisepeer', dataToSend))
|
protocol.broadcastToSendDataQueues((
|
||||||
|
stream, 'advertisepeer', dataToSend))
|
||||||
|
|
||||||
self.sendaddr() # This is one large addr message to this one peer.
|
self.sendaddr() # This is one large addr message to this one peer.
|
||||||
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
if not self.initiatedConnection and len(shared.connectedHostsList) > 200:
|
||||||
|
@ -328,9 +336,10 @@ class receiveDataThread(threading.Thread):
|
||||||
def sendBigInv(self):
|
def sendBigInv(self):
|
||||||
# Select all hashes for objects in this stream.
|
# Select all hashes for objects in this stream.
|
||||||
bigInvList = {}
|
bigInvList = {}
|
||||||
for hash in Inventory().unexpired_hashes_by_stream(self.streamNumber):
|
for stream in self.streamNumber:
|
||||||
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
|
for hash in Inventory().unexpired_hashes_by_stream(stream):
|
||||||
bigInvList[hash] = 0
|
if hash not in self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware and not self.objectHashHolderInstance.hasHash(hash):
|
||||||
|
bigInvList[hash] = 0
|
||||||
numberOfObjectsInInvMessage = 0
|
numberOfObjectsInInvMessage = 0
|
||||||
payload = ''
|
payload = ''
|
||||||
# Now let us start appending all of these hashes together. They will be
|
# Now let us start appending all of these hashes together. They will be
|
||||||
|
@ -426,7 +435,9 @@ class receiveDataThread(threading.Thread):
|
||||||
advertisedSet = set()
|
advertisedSet = set()
|
||||||
for i in range(numberOfItemsInInv):
|
for i in range(numberOfItemsInInv):
|
||||||
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
advertisedSet.add(data[lengthOfVarint + (32 * i):32 + lengthOfVarint + (32 * i)])
|
||||||
objectsNewToMe = advertisedSet - Inventory().hashes_by_stream(self.streamNumber)
|
objectsNewToMe = advertisedSet
|
||||||
|
for stream in self.streamNumber:
|
||||||
|
objectsNewToMe -= Inventory().hashes_by_stream(stream)
|
||||||
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
logger.info('inv message lists %s objects. Of those %s are new to me. It took %s seconds to figure that out.', numberOfItemsInInv, len(objectsNewToMe), time.time()-startTime)
|
||||||
for item in objectsNewToMe:
|
for item in objectsNewToMe:
|
||||||
PendingDownload().add(item)
|
PendingDownload().add(item)
|
||||||
|
@ -532,7 +543,7 @@ class receiveDataThread(threading.Thread):
|
||||||
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
38 * i):12 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
if recaddrStream == 0:
|
if recaddrStream == 0:
|
||||||
continue
|
continue
|
||||||
if recaddrStream != self.streamNumber and recaddrStream != (self.streamNumber * 2) and recaddrStream != ((self.streamNumber * 2) + 1): # if the embedded stream number is not in my stream or either of my child streams then ignore it. Someone might be trying funny business.
|
if recaddrStream not in self.streamNumber and (recaddrStream / 2) not in self.streamNumber: # if the embedded stream number and its parent are not in my streams then ignore it. Someone might be trying funny business.
|
||||||
continue
|
continue
|
||||||
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
recaddrServices, = unpack('>Q', data[12 + lengthOfNumberOfAddresses + (
|
||||||
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
38 * i):20 + lengthOfNumberOfAddresses + (38 * i)])
|
||||||
|
@ -560,7 +571,7 @@ class receiveDataThread(threading.Thread):
|
||||||
timeSomeoneElseReceivedMessageFromThisNode,
|
timeSomeoneElseReceivedMessageFromThisNode,
|
||||||
recaddrStream, recaddrServices, hostStandardFormat, recaddrPort)
|
recaddrStream, recaddrServices, hostStandardFormat, recaddrPort)
|
||||||
protocol.broadcastToSendDataQueues((
|
protocol.broadcastToSendDataQueues((
|
||||||
self.streamNumber, 'advertisepeer', hostDetails))
|
self.recaddrStream, 'advertisepeer', hostDetails))
|
||||||
else:
|
else:
|
||||||
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
timeLastReceivedMessageFromThisNode = shared.knownNodes[recaddrStream][
|
||||||
peerFromAddrMessage]
|
peerFromAddrMessage]
|
||||||
|
@ -568,7 +579,8 @@ class receiveDataThread(threading.Thread):
|
||||||
with shared.knownNodesLock:
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
shared.knownNodes[recaddrStream][peerFromAddrMessage] = timeSomeoneElseReceivedMessageFromThisNode
|
||||||
|
|
||||||
logger.debug('knownNodes currently has ' + str(len(shared.knownNodes[self.streamNumber])) + ' nodes for this stream.')
|
for stream in self.streamNumber:
|
||||||
|
logger.debug('knownNodes currently has %i nodes for stream %i', len(shared.knownNodes[stream]), stream)
|
||||||
|
|
||||||
|
|
||||||
# Send a huge addr message to our peer. This is only used
|
# Send a huge addr message to our peer. This is only used
|
||||||
|
@ -576,87 +588,87 @@ class receiveDataThread(threading.Thread):
|
||||||
# peer (with the full exchange of version and verack
|
# peer (with the full exchange of version and verack
|
||||||
# messages).
|
# messages).
|
||||||
def sendaddr(self):
|
def sendaddr(self):
|
||||||
addrsInMyStream = {}
|
# We are going to share a maximum number of 1000 addrs (per overlapping
|
||||||
addrsInChildStreamLeft = {}
|
# stream) with our peer. 500 from overlapping streams, 250 from the
|
||||||
addrsInChildStreamRight = {}
|
# left child stream, and 250 from the right child stream.
|
||||||
# print 'knownNodes', shared.knownNodes
|
for stream in self.streamNumber:
|
||||||
|
addrsInMyStream = {}
|
||||||
|
addrsInChildStreamLeft = {}
|
||||||
|
addrsInChildStreamRight = {}
|
||||||
|
|
||||||
# We are going to share a maximum number of 1000 addrs with our peer.
|
with shared.knownNodesLock:
|
||||||
# 500 from this stream, 250 from the left child stream, and 250 from
|
if len(shared.knownNodes[stream]) > 0:
|
||||||
# the right child stream.
|
ownPosition = random.randint(0, 499)
|
||||||
with shared.knownNodesLock:
|
sentOwn = False
|
||||||
if len(shared.knownNodes[self.streamNumber]) > 0:
|
for i in range(500):
|
||||||
ownPosition = random.randint(0, 499)
|
# if current connection is over a proxy, sent our own onion address at a random position
|
||||||
sentOwn = False
|
if ownPosition == i and ".onion" in BMConfigParser().get("bitmessagesettings", "onionhostname") and \
|
||||||
for i in range(500):
|
hasattr(self.sock, "getproxytype") and self.sock.getproxytype() != "none" and not sentOwn:
|
||||||
# if current connection is over a proxy, sent our own onion address at a random position
|
peer = state.Peer(BMConfigParser().get("bitmessagesettings", "onionhostname"), BMConfigParser().getint("bitmessagesettings", "onionport"))
|
||||||
if ownPosition == i and ".onion" in BMConfigParser().get("bitmessagesettings", "onionhostname") and \
|
else:
|
||||||
hasattr(self.sock, "getproxytype") and self.sock.getproxytype() != "none" and not sentOwn:
|
# still may contain own onion address, but we don't change it
|
||||||
peer = state.Peer(BMConfigParser().get("bitmessagesettings", "onionhostname"), BMConfigParser().getint("bitmessagesettings", "onionport"))
|
peer, = random.sample(shared.knownNodes[stream], 1)
|
||||||
else:
|
if isHostInPrivateIPRange(peer.host):
|
||||||
# still may contain own onion address, but we don't change it
|
continue
|
||||||
peer, = random.sample(shared.knownNodes[self.streamNumber], 1)
|
if peer.host == BMConfigParser().get("bitmessagesettings", "onionhostname") and peer.port == BMConfigParser().getint("bitmessagesettings", "onionport") :
|
||||||
if isHostInPrivateIPRange(peer.host):
|
sentOwn = True
|
||||||
continue
|
addrsInMyStream[peer] = shared.knownNodes[
|
||||||
if peer.host == BMConfigParser().get("bitmessagesettings", "onionhostname") and peer.port == BMConfigParser().getint("bitmessagesettings", "onionport") :
|
stream][peer]
|
||||||
sentOwn = True
|
# sent 250 only if the remote isn't interested in it
|
||||||
addrsInMyStream[peer] = shared.knownNodes[
|
if len(shared.knownNodes[stream * 2]) > 0 and stream not in self.streamNumber:
|
||||||
self.streamNumber][peer]
|
for i in range(250):
|
||||||
if len(shared.knownNodes[self.streamNumber * 2]) > 0:
|
peer, = random.sample(shared.knownNodes[
|
||||||
for i in range(250):
|
stream * 2], 1)
|
||||||
peer, = random.sample(shared.knownNodes[
|
if isHostInPrivateIPRange(peer.host):
|
||||||
self.streamNumber * 2], 1)
|
continue
|
||||||
if isHostInPrivateIPRange(peer.host):
|
addrsInChildStreamLeft[peer] = shared.knownNodes[
|
||||||
continue
|
stream * 2][peer]
|
||||||
addrsInChildStreamLeft[peer] = shared.knownNodes[
|
if len(shared.knownNodes[(stream * 2) + 1]) > 0 and stream not in self.streamNumber:
|
||||||
self.streamNumber * 2][peer]
|
for i in range(250):
|
||||||
if len(shared.knownNodes[(self.streamNumber * 2) + 1]) > 0:
|
peer, = random.sample(shared.knownNodes[
|
||||||
for i in range(250):
|
(stream * 2) + 1], 1)
|
||||||
peer, = random.sample(shared.knownNodes[
|
if isHostInPrivateIPRange(peer.host):
|
||||||
(self.streamNumber * 2) + 1], 1)
|
continue
|
||||||
if isHostInPrivateIPRange(peer.host):
|
addrsInChildStreamRight[peer] = shared.knownNodes[
|
||||||
continue
|
(stream * 2) + 1][peer]
|
||||||
addrsInChildStreamRight[peer] = shared.knownNodes[
|
numberOfAddressesInAddrMessage = 0
|
||||||
(self.streamNumber * 2) + 1][peer]
|
payload = ''
|
||||||
numberOfAddressesInAddrMessage = 0
|
for (HOST, PORT), value in addrsInMyStream.items():
|
||||||
payload = ''
|
timeLastReceivedMessageFromThisNode = value
|
||||||
# print 'addrsInMyStream.items()', addrsInMyStream.items()
|
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
||||||
for (HOST, PORT), value in addrsInMyStream.items():
|
numberOfAddressesInAddrMessage += 1
|
||||||
timeLastReceivedMessageFromThisNode = value
|
payload += pack(
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
|
||||||
numberOfAddressesInAddrMessage += 1
|
payload += pack('>I', stream)
|
||||||
payload += pack(
|
payload += pack(
|
||||||
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
|
'>q', 1) # service bit flags offered by this node
|
||||||
payload += pack('>I', self.streamNumber)
|
payload += protocol.encodeHost(HOST)
|
||||||
payload += pack(
|
payload += pack('>H', PORT) # remote port
|
||||||
'>q', 1) # service bit flags offered by this node
|
for (HOST, PORT), value in addrsInChildStreamLeft.items():
|
||||||
payload += protocol.encodeHost(HOST)
|
timeLastReceivedMessageFromThisNode = value
|
||||||
payload += pack('>H', PORT) # remote port
|
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
||||||
for (HOST, PORT), value in addrsInChildStreamLeft.items():
|
numberOfAddressesInAddrMessage += 1
|
||||||
timeLastReceivedMessageFromThisNode = value
|
payload += pack(
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
|
||||||
numberOfAddressesInAddrMessage += 1
|
payload += pack('>I', stream * 2)
|
||||||
payload += pack(
|
payload += pack(
|
||||||
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
|
'>q', 1) # service bit flags offered by this node
|
||||||
payload += pack('>I', self.streamNumber * 2)
|
payload += protocol.encodeHost(HOST)
|
||||||
payload += pack(
|
payload += pack('>H', PORT) # remote port
|
||||||
'>q', 1) # service bit flags offered by this node
|
for (HOST, PORT), value in addrsInChildStreamRight.items():
|
||||||
payload += protocol.encodeHost(HOST)
|
timeLastReceivedMessageFromThisNode = value
|
||||||
payload += pack('>H', PORT) # remote port
|
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
||||||
for (HOST, PORT), value in addrsInChildStreamRight.items():
|
numberOfAddressesInAddrMessage += 1
|
||||||
timeLastReceivedMessageFromThisNode = value
|
payload += pack(
|
||||||
if timeLastReceivedMessageFromThisNode > (int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers): # If it is younger than 3 hours old..
|
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
|
||||||
numberOfAddressesInAddrMessage += 1
|
payload += pack('>I', (stream * 2) + 1)
|
||||||
payload += pack(
|
payload += pack(
|
||||||
'>Q', timeLastReceivedMessageFromThisNode) # 64-bit time
|
'>q', 1) # service bit flags offered by this node
|
||||||
payload += pack('>I', (self.streamNumber * 2) + 1)
|
payload += protocol.encodeHost(HOST)
|
||||||
payload += pack(
|
payload += pack('>H', PORT) # remote port
|
||||||
'>q', 1) # service bit flags offered by this node
|
|
||||||
payload += protocol.encodeHost(HOST)
|
|
||||||
payload += pack('>H', PORT) # remote port
|
|
||||||
|
|
||||||
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
payload = encodeVarint(numberOfAddressesInAddrMessage) + payload
|
||||||
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('addr', payload)))
|
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.CreatePacket('addr', payload)))
|
||||||
|
|
||||||
|
|
||||||
# We have received a version message
|
# We have received a version message
|
||||||
|
@ -732,20 +744,24 @@ class receiveDataThread(threading.Thread):
|
||||||
numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(
|
numberOfStreamsInVersionMessage, lengthOfNumberOfStreamsInVersionMessage = decodeVarint(
|
||||||
data[readPosition:])
|
data[readPosition:])
|
||||||
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
readPosition += lengthOfNumberOfStreamsInVersionMessage
|
||||||
self.streamNumber, lengthOfRemoteStreamNumber = decodeVarint(
|
self.remoteStreams = []
|
||||||
data[readPosition:])
|
for i in range(numberOfStreamsInVersionMessage):
|
||||||
logger.debug('Remote node useragent: ' + useragent + ' stream number:' + str(self.streamNumber) + ' time offset: ' + str(timeOffset) + ' seconds.')
|
newStreamNumber, lengthOfRemoteStreamNumber = decodeVarint(data[readPosition:])
|
||||||
|
readPosition += lengthOfRemoteStreamNumber
|
||||||
|
self.remoteStreams.append(newStreamNumber)
|
||||||
|
logger.debug('Remote node useragent: %s, streams: (%s), time offset: %is.', useragent, ', '.join(str(x) for x in self.remoteStreams), timeOffset)
|
||||||
|
|
||||||
if self.streamNumber != 1:
|
# find shared streams
|
||||||
|
self.streamNumber = sorted(set(state.streamsInWhichIAmParticipating).intersection(self.remoteStreams))
|
||||||
|
|
||||||
|
if len(self.streamNumber) == 0:
|
||||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||||
logger.debug ('Closed connection to ' + str(self.peer) + ' because they are interested in stream ' + str(self.streamNumber) + '.')
|
logger.debug ('Closed connection to ' + str(self.peer) + ' because there is no overlapping interest in streams.')
|
||||||
return
|
return
|
||||||
|
|
||||||
shared.connectedHostsList[
|
shared.connectedHostsList[
|
||||||
self.hostIdent] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
self.hostIdent] = 1 # We use this data structure to not only keep track of what hosts we are connected to so that we don't try to connect to them again, but also to list the connections count on the Network Status tab.
|
||||||
# If this was an incoming connection, then the sendDataThread
|
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.remoteStreams))
|
||||||
# doesn't know the stream. We have to set it.
|
|
||||||
if not self.initiatedConnection:
|
|
||||||
self.sendDataThreadQueue.put((0, 'setStreamNumber', self.streamNumber))
|
|
||||||
if data[72:80] == protocol.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
if data[72:80] == protocol.eightBytesOfRandomDataUsedToDetectConnectionsToSelf:
|
||||||
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
self.sendDataThreadQueue.put((0, 'shutdown','no data'))
|
||||||
logger.debug('Closing connection to myself: ' + str(self.peer))
|
logger.debug('Closing connection to myself: ' + str(self.peer))
|
||||||
|
@ -757,10 +773,11 @@ class receiveDataThread(threading.Thread):
|
||||||
|
|
||||||
if not isHostInPrivateIPRange(self.peer.host):
|
if not isHostInPrivateIPRange(self.peer.host):
|
||||||
with shared.knownNodesLock:
|
with shared.knownNodesLock:
|
||||||
shared.knownNodes[self.streamNumber][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
for stream in self.remoteStreams:
|
||||||
if not self.initiatedConnection:
|
shared.knownNodes[stream][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] = int(time.time())
|
||||||
shared.knownNodes[self.streamNumber][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] -= 162000 # penalise inbound, 2 days minus 3 hours
|
if not self.initiatedConnection:
|
||||||
shared.needToWriteKnownNodesToDisk = True
|
shared.knownNodes[stream][state.Peer(self.peer.host, self.remoteNodeIncomingPort)] -= 162000 # penalise inbound, 2 days minus 3 hours
|
||||||
|
shared.needToWriteKnownNodesToDisk = True
|
||||||
|
|
||||||
self.sendverack()
|
self.sendverack()
|
||||||
if self.initiatedConnection == False:
|
if self.initiatedConnection == False:
|
||||||
|
@ -770,7 +787,7 @@ class receiveDataThread(threading.Thread):
|
||||||
def sendversion(self):
|
def sendversion(self):
|
||||||
logger.debug('Sending version message')
|
logger.debug('Sending version message')
|
||||||
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleVersionMessage(
|
self.sendDataThreadQueue.put((0, 'sendRawData', protocol.assembleVersionMessage(
|
||||||
self.peer.host, self.peer.port, self.streamNumber, not self.initiatedConnection)))
|
self.peer.host, self.peer.port, state.streamsInWhichIAmParticipating, not self.initiatedConnection)))
|
||||||
|
|
||||||
|
|
||||||
# Sends a verack message
|
# Sends a verack message
|
||||||
|
|
|
@ -42,7 +42,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
self.peer = state.Peer(HOST, PORT)
|
self.peer = state.Peer(HOST, PORT)
|
||||||
self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator
|
self.name = "sendData-" + self.peer.host.replace(":", ".") # log parser field separator
|
||||||
self.streamNumber = streamNumber
|
self.streamNumber = []
|
||||||
self.services = 0
|
self.services = 0
|
||||||
self.buffer = ""
|
self.buffer = ""
|
||||||
self.initiatedConnection = False
|
self.initiatedConnection = False
|
||||||
|
@ -51,16 +51,16 @@ class sendDataThread(threading.Thread):
|
||||||
self.lastTimeISentData = int(
|
self.lastTimeISentData = int(
|
||||||
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
time.time()) # If this value increases beyond five minutes ago, we'll send a pong message to keep the connection alive.
|
||||||
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
self.someObjectsOfWhichThisRemoteNodeIsAlreadyAware = someObjectsOfWhichThisRemoteNodeIsAlreadyAware
|
||||||
if self.streamNumber == -1: # This was an incoming connection.
|
if streamNumber == -1: # This was an incoming connection.
|
||||||
self.initiatedConnection = False
|
self.initiatedConnection = False
|
||||||
else:
|
else:
|
||||||
self.initiatedConnection = True
|
self.initiatedConnection = True
|
||||||
logger.debug('The streamNumber of this sendDataThread (ID: ' + str(id(self)) + ') at setup() is' + str(self.streamNumber))
|
#logger.debug('The streamNumber of this sendDataThread (ID: ' + str(id(self)) + ') at setup() is' + str(self.streamNumber))
|
||||||
|
|
||||||
|
|
||||||
def sendVersionMessage(self):
|
def sendVersionMessage(self):
|
||||||
datatosend = protocol.assembleVersionMessage(
|
datatosend = protocol.assembleVersionMessage(
|
||||||
self.peer.host, self.peer.port, self.streamNumber, not self.initiatedConnection) # the IP and port of the remote host, and my streamNumber.
|
self.peer.host, self.peer.port, state.streamsInWhichIAmParticipating, not self.initiatedConnection) # the IP and port of the remote host, and my streamNumber.
|
||||||
|
|
||||||
logger.debug('Sending version packet: ' + repr(datatosend))
|
logger.debug('Sending version packet: ' + repr(datatosend))
|
||||||
|
|
||||||
|
@ -102,7 +102,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.sendBytes()
|
self.sendBytes()
|
||||||
deststream, command, data = self.sendDataThreadQueue.get()
|
deststream, command, data = self.sendDataThreadQueue.get()
|
||||||
|
|
||||||
if deststream == self.streamNumber or deststream == 0:
|
if deststream == 0 or deststream in self.streamNumber:
|
||||||
if command == 'shutdown':
|
if command == 'shutdown':
|
||||||
logger.debug('sendDataThread (associated with ' + str(self.peer) + ') ID: ' + str(id(self)) + ' shutting down now.')
|
logger.debug('sendDataThread (associated with ' + str(self.peer) + ') ID: ' + str(id(self)) + ' shutting down now.')
|
||||||
break
|
break
|
||||||
|
@ -114,7 +114,7 @@ class sendDataThread(threading.Thread):
|
||||||
# streamNumber of this send data thread here:
|
# streamNumber of this send data thread here:
|
||||||
elif command == 'setStreamNumber':
|
elif command == 'setStreamNumber':
|
||||||
self.streamNumber = data
|
self.streamNumber = data
|
||||||
logger.debug('setting the stream number in the sendData thread (ID: ' + str(id(self)) + ') to ' + str(self.streamNumber))
|
logger.debug('setting the stream number to %s', ', '.join(str(x) for x in self.streamNumber))
|
||||||
elif command == 'setRemoteProtocolVersion':
|
elif command == 'setRemoteProtocolVersion':
|
||||||
specifiedRemoteProtocolVersion = data
|
specifiedRemoteProtocolVersion = data
|
||||||
logger.debug('setting the remote node\'s protocol version in the sendDataThread (ID: ' + str(id(self)) + ') to ' + str(specifiedRemoteProtocolVersion))
|
logger.debug('setting the remote node\'s protocol version in the sendDataThread (ID: ' + str(id(self)) + ') to ' + str(specifiedRemoteProtocolVersion))
|
||||||
|
@ -183,7 +183,7 @@ class sendDataThread(threading.Thread):
|
||||||
self.connectionIsOrWasFullyEstablished = True
|
self.connectionIsOrWasFullyEstablished = True
|
||||||
self.services, self.sslSock = data
|
self.services, self.sslSock = data
|
||||||
elif self.connectionIsOrWasFullyEstablished:
|
elif self.connectionIsOrWasFullyEstablished:
|
||||||
logger.error('sendDataThread ID: ' + str(id(self)) + ' ignoring command ' + command + ' because the thread is not in stream ' + str(deststream))
|
logger.error('sendDataThread ID: ' + str(id(self)) + ' ignoring command ' + command + ' because the thread is not in stream ' + str(deststream) + ' but in streams ' + ', '.join(str(x) for x in self.streamNumber))
|
||||||
self.sendDataThreadQueue.task_done()
|
self.sendDataThreadQueue.task_done()
|
||||||
self.sendDataThreadQueue.task_done()
|
self.sendDataThreadQueue.task_done()
|
||||||
|
|
||||||
|
|
|
@ -119,7 +119,7 @@ def CreatePacket(command, payload=''):
|
||||||
b[Header.size:] = payload
|
b[Header.size:] = payload
|
||||||
return bytes(b)
|
return bytes(b)
|
||||||
|
|
||||||
def assembleVersionMessage(remoteHost, remotePort, myStreamNumber, server = False):
|
def assembleVersionMessage(remoteHost, remotePort, participatingStreams, server = False):
|
||||||
payload = ''
|
payload = ''
|
||||||
payload += pack('>L', 3) # protocol version.
|
payload += pack('>L', 3) # protocol version.
|
||||||
payload += pack('>q', NODE_NETWORK|(NODE_SSL if haveSSL(server) else 0)) # bitflags of the services I offer.
|
payload += pack('>q', NODE_NETWORK|(NODE_SSL if haveSSL(server) else 0)) # bitflags of the services I offer.
|
||||||
|
@ -154,9 +154,16 @@ def assembleVersionMessage(remoteHost, remotePort, myStreamNumber, server = Fals
|
||||||
userAgent = '/PyBitmessage:' + softwareVersion + '/'
|
userAgent = '/PyBitmessage:' + softwareVersion + '/'
|
||||||
payload += encodeVarint(len(userAgent))
|
payload += encodeVarint(len(userAgent))
|
||||||
payload += userAgent
|
payload += userAgent
|
||||||
payload += encodeVarint(
|
|
||||||
1) # The number of streams about which I care. PyBitmessage currently only supports 1 per connection.
|
# Streams
|
||||||
payload += encodeVarint(myStreamNumber)
|
payload += encodeVarint(len(participatingStreams))
|
||||||
|
count = 0
|
||||||
|
for stream in sorted(participatingStreams):
|
||||||
|
payload += encodeVarint(stream)
|
||||||
|
count += 1
|
||||||
|
# protocol limit, see specification
|
||||||
|
if count >= 160000:
|
||||||
|
break
|
||||||
|
|
||||||
return CreatePacket('version', payload)
|
return CreatePacket('version', payload)
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import collections
|
import collections
|
||||||
|
|
||||||
neededPubkeys = {}
|
neededPubkeys = {}
|
||||||
streamsInWhichIAmParticipating = {}
|
streamsInWhichIAmParticipating = []
|
||||||
sendDataQueues = [] #each sendData thread puts its queue in this list.
|
sendDataQueues = [] #each sendData thread puts its queue in this list.
|
||||||
|
|
||||||
# For UPnP
|
# For UPnP
|
||||||
|
|
Reference in New Issue
Block a user