Minimal implementation of onionpeer object

This commit is contained in:
Dmitri Bogomolov 2019-04-26 16:59:56 +03:00
parent e3344ade59
commit 453e045ae5
Signed by untrusted user: g1itch
GPG Key ID: 720A756F18DEED13
4 changed files with 73 additions and 8 deletions

View File

@ -6,6 +6,7 @@ import time
from binascii import hexlify from binascii import hexlify
from subprocess import call # nosec from subprocess import call # nosec
import knownnodes
import highlevelcrypto import highlevelcrypto
from addresses import ( from addresses import (
calculateInventoryHash, decodeAddress, decodeVarint, encodeAddress, calculateInventoryHash, decodeAddress, decodeVarint, encodeAddress,
@ -67,6 +68,8 @@ class objectProcessor(threading.Thread):
self.processmsg(data) self.processmsg(data)
elif objectType == protocol.OBJECT_BROADCAST: elif objectType == protocol.OBJECT_BROADCAST:
self.processbroadcast(data) self.processbroadcast(data)
elif objectType == protocol.OBJECT_ONIONPEER:
self.processonion(data)
# is more of a command, not an object type. Is used to get # is more of a command, not an object type. Is used to get
# this thread past the queue.get() so that it will check # this thread past the queue.get() so that it will check
# the shutdown variable. # the shutdown variable.
@ -140,6 +143,23 @@ class objectProcessor(threading.Thread):
else: else:
logger.debug('This object is not an acknowledgement bound for me.') logger.debug('This object is not an acknowledgement bound for me.')
def processonion(self, data):
readPosition = 20 # bypass the nonce, time, and object type
length = decodeVarint(data[readPosition:readPosition + 10])[1]
readPosition += length
stream, length = decodeVarint(data[readPosition:readPosition + 10])
readPosition += length
# it seems that stream is checked in network.bmproto
port, length = decodeVarint(data[readPosition:readPosition + 10])
host = protocol.checkIPAddress(data[readPosition + length:])
if not host:
return
peer = state.Peer(host, port)
with knownnodes.knownNodesLock:
knownnodes.addKnownNode(
stream, peer, is_self=state.ownAddresses.get(peer))
def processgetpubkey(self, data): def processgetpubkey(self, data):
if len(data) > 200: if len(data) > 200:
logger.info( logger.info(

View File

@ -119,13 +119,21 @@ class singleWorker(threading.Thread, StoppableThread):
# before we start on existing POW tasks. # before we start on existing POW tasks.
self.stop.wait(10) self.stop.wait(10)
if state.shutdown == 0: if state.shutdown > 0:
# just in case there are any pending tasks for msg return
# messages that have yet to be sent.
queues.workerQueue.put(('sendmessage', '')) # just in case there are any pending tasks for msg
# just in case there are any tasks for Broadcasts # messages that have yet to be sent.
# that have yet to be sent. queues.workerQueue.put(('sendmessage', ''))
queues.workerQueue.put(('sendbroadcast', '')) # just in case there are any tasks for Broadcasts
# that have yet to be sent.
queues.workerQueue.put(('sendbroadcast', ''))
# send onionpeer object
for peer in state.ownAddresses.keys():
if peer.host.endswith('.onion'):
queues.workerQueue.put(('sendOnionPeerObj', peer))
break
while state.shutdown == 0: while state.shutdown == 0:
self.busy = 0 self.busy = 0
@ -156,6 +164,11 @@ class singleWorker(threading.Thread, StoppableThread):
self.sendOutOrStoreMyV4Pubkey(data) self.sendOutOrStoreMyV4Pubkey(data)
except: except:
pass pass
elif command == 'sendOnionPeerObj':
try:
self.sendOnionPeerObj(data)
except:
pass
elif command == 'resetPoW': elif command == 'resetPoW':
try: try:
proofofwork.resetPoW() proofofwork.resetPoW()
@ -456,6 +469,37 @@ class singleWorker(threading.Thread, StoppableThread):
' to the keys.dat file. Error message: %s', err ' to the keys.dat file. Error message: %s', err
) )
def sendOnionPeerObj(self, peer):
TTL = int(7 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300))
embeddedTime = int(time.time() + TTL)
streamNumber = 1 # Don't know yet what should be here
objectType = protocol.OBJECT_ONIONPEER
# FIXME: ideally the objectPayload should be signed
objectPayload = encodeVarint(peer.port) + protocol.encodeHost(peer.host)
tag = calculateInventoryHash(objectPayload)
if Inventory().by_type_and_tag(objectType, tag):
return # not expired
payload = pack('>Q', (embeddedTime))
payload += str(bytearray([0, 0, 0, objectType]))
payload += encodeVarint(2 if len(peer.host) == 22 else 3)
payload += encodeVarint(streamNumber)
payload += objectPayload
payload = self._doPOWDefaults(
payload, TTL, log_prefix='(For onionpeer object)')
inventoryHash = calculateInventoryHash(payload)
Inventory()[inventoryHash] = (
objectType, streamNumber, buffer(payload),
embeddedTime, buffer(tag)
)
logger.info(
'sending inv (within sendOnionPeerObj function) for object: %s',
hexlify(inventoryHash))
queues.invQueue.put((streamNumber, inventoryHash))
def sendBroadcast(self): def sendBroadcast(self):
"""Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)"""
# Reset just in case # Reset just in case

View File

@ -147,7 +147,7 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc
(k, v) for k, v in nodes.iteritems() (k, v) for k, v in nodes.iteritems()
if v["lastseen"] > int(time.time()) - if v["lastseen"] > int(time.time()) -
shared.maximumAgeOfNodesThatIAdvertiseToOthers and shared.maximumAgeOfNodesThatIAdvertiseToOthers and
v["rating"] >= 0 v["rating"] >= 0 and len(k.host) <= 22
] ]
# sent 250 only if the remote isn't interested in it # sent 250 only if the remote isn't interested in it
elemCount = min( elemCount = min(

View File

@ -48,6 +48,7 @@ OBJECT_GETPUBKEY = 0
OBJECT_PUBKEY = 1 OBJECT_PUBKEY = 1
OBJECT_MSG = 2 OBJECT_MSG = 2
OBJECT_BROADCAST = 3 OBJECT_BROADCAST = 3
OBJECT_ONIONPEER = 0x746f72
OBJECT_I2P = 0x493250 OBJECT_I2P = 0x493250
OBJECT_ADDR = 0x61646472 OBJECT_ADDR = 0x61646472