PyBitmessage-2021-04-27/src/network/invthread.py
Dmitri Bogomolov 7a89109fc9
New logging approach in order to reduce imports from submodules
and use logging without risk of circular import. Only subpackage
that imports from debug is bitmessageqt - because it also uses
debug.resetLogging().
Instead of from debug import logger is now recommended to use:

import logging

logger = logging.getLogger('default')

All subclasses of StoppableThread now have a logger attribute.
All threading related stuff except for set_thread_name()
was moved from helper_threading to network.threads.

Fixed two my mistakes from previous edit of debug in a1a8d3a:

 - logger.handlers is not dict but iterable
 - sys.excepthook should be set unconditionally
2019-10-18 09:35:24 +03:00

115 lines
4.4 KiB
Python

"""
src/network/invthread.py
========================
"""
import Queue
import random
from time import time
import addresses
import protocol
import state
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from queues import invQueue
from threads import StoppableThread
def handleExpiredDandelion(expired):
"""For expired dandelion objects, mark all remotes as not having
the object"""
if not expired:
return
for i in \
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if not i.fullyEstablished:
continue
for x in expired:
streamNumber, hashid, _ = x
try:
del i.objectsNewToMe[hashid]
except KeyError:
if streamNumber in i.streams:
with i.objectsNewToThemLock:
i.objectsNewToThem[hashid] = time()
class InvThread(StoppableThread):
"""A thread to send inv annoucements."""
name = "InvBroadcaster"
@staticmethod
def handleLocallyGenerated(stream, hashId):
"""Locally generated inventory items require special handling"""
Dandelion().addHash(hashId, stream=stream)
for connection in \
BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
if state.dandelion and connection != Dandelion().objectChildStem(hashId):
continue
connection.objectsNewToThem[hashId] = time()
def run(self): # pylint: disable=too-many-branches
while not state.shutdown: # pylint: disable=too-many-nested-blocks
chunk = []
while True:
# Dandelion fluff trigger by expiration
handleExpiredDandelion(Dandelion().expire())
try:
data = invQueue.get(False)
chunk.append((data[0], data[1]))
# locally generated
if len(data) == 2 or data[2] is None:
self.handleLocallyGenerated(data[0], data[1])
except Queue.Empty:
break
if chunk:
for connection in BMConnectionPool().inboundConnections.values() + \
BMConnectionPool().outboundConnections.values():
fluffs = []
stems = []
for inv in chunk:
if inv[0] not in connection.streams:
continue
try:
with connection.objectsNewToThemLock:
del connection.objectsNewToThem[inv[1]]
except KeyError:
continue
try:
if connection == Dandelion().objectChildStem(inv[1]):
# Fluff trigger by RNG
# auto-ignore if config set to 0, i.e. dandelion is off
if random.randint(1, 100) >= state.dandelion:
fluffs.append(inv[1])
# send a dinv only if the stem node supports dandelion
elif connection.services & protocol.NODE_DANDELION > 0:
stems.append(inv[1])
else:
fluffs.append(inv[1])
except KeyError:
fluffs.append(inv[1])
if fluffs:
random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket(
'inv', addresses.encodeVarint(len(fluffs)) +
"".join(fluffs)))
if stems:
random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket(
'dinv', addresses.encodeVarint(len(stems)) +
"".join(stems)))
invQueue.iterate()
for i in range(len(chunk)):
invQueue.task_done()
if Dandelion().refresh < time():
Dandelion().reRandomiseStems()
self.stop.wait(1)