This repository has been archived on 2024-12-25. You can view files and clone it, but cannot push or open issues or pull requests.
PyBitmessage-2024-12-25/src/network/uploadthread.py
Dmitri Bogomolov 7a89109fc9
New logging approach in order to reduce imports from submodules
and use logging without risk of circular import. Only subpackage
that imports from debug is bitmessageqt - because it also uses
debug.resetLogging().
Instead of from debug import logger is now recommended to use:

import logging

logger = logging.getLogger('default')

All subclasses of StoppableThread now have a logger attribute.
All threading related stuff except for set_thread_name()
was moved from helper_threading to network.threads.

Fixed two my mistakes from previous edit of debug in a1a8d3a:

 - logger.handlers is not dict but iterable
 - sys.excepthook should be set unconditionally
2019-10-18 09:35:24 +03:00

70 lines
2.6 KiB
Python

"""
src/network/uploadthread.py
"""
import time
import helper_random
import protocol
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
from threads import StoppableThread
class UploadThread(StoppableThread):
"""
This is a thread that uploads the objects that the peers requested from me
"""
maxBufSize = 2097152 # 2MB
name = "Uploader"
def run(self):
while not self._stopped:
uploaded = 0
# Choose downloading peers randomly
connections = [x for x in BMConnectionPool().inboundConnections.values() +
BMConnectionPool().outboundConnections.values() if x.fullyEstablished]
helper_random.randomshuffle(connections)
for i in connections:
now = time.time()
# avoid unnecessary delay
if i.skipUntil >= now:
continue
if len(i.write_buf) > UploadThread.maxBufSize:
continue
try:
request = i.pendingUpload.randomKeys(RandomTrackingDict.maxPending)
except KeyError:
continue
payload = bytearray()
chunk_count = 0
for chunk in request:
del i.pendingUpload[chunk]
if Dandelion().hasHash(chunk) and \
i != Dandelion().objectChildStem(chunk):
i.antiIntersectionDelay()
self.logger.info(
'%s asked for a stem object we didn\'t offer to it.',
i.destination)
break
try:
payload.extend(protocol.CreatePacket(
'object', Inventory()[chunk].payload))
chunk_count += 1
except KeyError:
i.antiIntersectionDelay()
self.logger.info(
'%s asked for an object we don\'t have.',
i.destination)
break
if not chunk_count:
continue
i.append_write_buf(payload)
self.logger.debug(
'%s:%i Uploading %i objects',
i.destination.host, i.destination.port, chunk_count)
uploaded += chunk_count
if not uploaded:
self.stop.wait(1)