From 453e045ae5c846b00be10fb397ec33907782cb0d Mon Sep 17 00:00:00 2001 From: Dmitri Bogomolov <4glitch@gmail.com> Date: Fri, 26 Apr 2019 16:59:56 +0300 Subject: [PATCH] Minimal implementation of onionpeer object --- src/class_objectProcessor.py | 20 +++++++++++++ src/class_singleWorker.py | 58 +++++++++++++++++++++++++++++++----- src/network/tcp.py | 2 +- src/protocol.py | 1 + 4 files changed, 73 insertions(+), 8 deletions(-) diff --git a/src/class_objectProcessor.py b/src/class_objectProcessor.py index 62713b7b..9a7188b7 100644 --- a/src/class_objectProcessor.py +++ b/src/class_objectProcessor.py @@ -6,6 +6,7 @@ import time from binascii import hexlify from subprocess import call # nosec +import knownnodes import highlevelcrypto from addresses import ( calculateInventoryHash, decodeAddress, decodeVarint, encodeAddress, @@ -67,6 +68,8 @@ class objectProcessor(threading.Thread): self.processmsg(data) elif objectType == protocol.OBJECT_BROADCAST: self.processbroadcast(data) + elif objectType == protocol.OBJECT_ONIONPEER: + self.processonion(data) # is more of a command, not an object type. Is used to get # this thread past the queue.get() so that it will check # the shutdown variable. @@ -140,6 +143,23 @@ class objectProcessor(threading.Thread): else: logger.debug('This object is not an acknowledgement bound for me.') + def processonion(self, data): + readPosition = 20 # bypass the nonce, time, and object type + length = decodeVarint(data[readPosition:readPosition + 10])[1] + readPosition += length + stream, length = decodeVarint(data[readPosition:readPosition + 10]) + readPosition += length + # it seems that stream is checked in network.bmproto + port, length = decodeVarint(data[readPosition:readPosition + 10]) + host = protocol.checkIPAddress(data[readPosition + length:]) + + if not host: + return + peer = state.Peer(host, port) + with knownnodes.knownNodesLock: + knownnodes.addKnownNode( + stream, peer, is_self=state.ownAddresses.get(peer)) + def processgetpubkey(self, data): if len(data) > 200: logger.info( diff --git a/src/class_singleWorker.py b/src/class_singleWorker.py index 4e3be929..d8f70ada 100644 --- a/src/class_singleWorker.py +++ b/src/class_singleWorker.py @@ -119,13 +119,21 @@ class singleWorker(threading.Thread, StoppableThread): # before we start on existing POW tasks. self.stop.wait(10) - if state.shutdown == 0: - # just in case there are any pending tasks for msg - # messages that have yet to be sent. - queues.workerQueue.put(('sendmessage', '')) - # just in case there are any tasks for Broadcasts - # that have yet to be sent. - queues.workerQueue.put(('sendbroadcast', '')) + if state.shutdown > 0: + return + + # just in case there are any pending tasks for msg + # messages that have yet to be sent. + queues.workerQueue.put(('sendmessage', '')) + # just in case there are any tasks for Broadcasts + # that have yet to be sent. + queues.workerQueue.put(('sendbroadcast', '')) + + # send onionpeer object + for peer in state.ownAddresses.keys(): + if peer.host.endswith('.onion'): + queues.workerQueue.put(('sendOnionPeerObj', peer)) + break while state.shutdown == 0: self.busy = 0 @@ -156,6 +164,11 @@ class singleWorker(threading.Thread, StoppableThread): self.sendOutOrStoreMyV4Pubkey(data) except: pass + elif command == 'sendOnionPeerObj': + try: + self.sendOnionPeerObj(data) + except: + pass elif command == 'resetPoW': try: proofofwork.resetPoW() @@ -456,6 +469,37 @@ class singleWorker(threading.Thread, StoppableThread): ' to the keys.dat file. Error message: %s', err ) + def sendOnionPeerObj(self, peer): + TTL = int(7 * 24 * 60 * 60 + helper_random.randomrandrange(-300, 300)) + embeddedTime = int(time.time() + TTL) + streamNumber = 1 # Don't know yet what should be here + objectType = protocol.OBJECT_ONIONPEER + # FIXME: ideally the objectPayload should be signed + objectPayload = encodeVarint(peer.port) + protocol.encodeHost(peer.host) + tag = calculateInventoryHash(objectPayload) + + if Inventory().by_type_and_tag(objectType, tag): + return # not expired + + payload = pack('>Q', (embeddedTime)) + payload += str(bytearray([0, 0, 0, objectType])) + payload += encodeVarint(2 if len(peer.host) == 22 else 3) + payload += encodeVarint(streamNumber) + payload += objectPayload + + payload = self._doPOWDefaults( + payload, TTL, log_prefix='(For onionpeer object)') + + inventoryHash = calculateInventoryHash(payload) + Inventory()[inventoryHash] = ( + objectType, streamNumber, buffer(payload), + embeddedTime, buffer(tag) + ) + logger.info( + 'sending inv (within sendOnionPeerObj function) for object: %s', + hexlify(inventoryHash)) + queues.invQueue.put((streamNumber, inventoryHash)) + def sendBroadcast(self): """Send a broadcast-type object (assemble the object, perform PoW and put it to the inv announcement queue)""" # Reset just in case diff --git a/src/network/tcp.py b/src/network/tcp.py index 0463d322..1790d59b 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -147,7 +147,7 @@ class TCPConnection(BMProto, TLSDispatcher): # pylint: disable=too-many-instanc (k, v) for k, v in nodes.iteritems() if v["lastseen"] > int(time.time()) - shared.maximumAgeOfNodesThatIAdvertiseToOthers and - v["rating"] >= 0 + v["rating"] >= 0 and len(k.host) <= 22 ] # sent 250 only if the remote isn't interested in it elemCount = min( diff --git a/src/protocol.py b/src/protocol.py index 6f89c186..9a0c6a11 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -48,6 +48,7 @@ OBJECT_GETPUBKEY = 0 OBJECT_PUBKEY = 1 OBJECT_MSG = 2 OBJECT_BROADCAST = 3 +OBJECT_ONIONPEER = 0x746f72 OBJECT_I2P = 0x493250 OBJECT_ADDR = 0x61646472