PyBitmessage-2021-04-27/src/network/uploadthread.py

70 lines
2.5 KiB
Python
Raw Normal View History

"""
`UploadThread` class definition
"""
import time
import helper_random
import protocol
from inventory import Inventory
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from randomtrackingdict import RandomTrackingDict
from threads import StoppableThread
class UploadThread(StoppableThread):
"""
This is a thread that uploads the objects that the peers requested from me
"""
maxBufSize = 2097152 # 2MB
name = "Uploader"
def run(self):
while not self._stopped:
uploaded = 0
# Choose uploading peers randomly
connections = BMConnectionPool().establishedConnections()
helper_random.randomshuffle(connections)
for i in connections:
now = time.time()
# avoid unnecessary delay
if i.skipUntil >= now:
continue
if len(i.write_buf) > self.maxBufSize:
continue
try:
request = i.pendingUpload.randomKeys(
RandomTrackingDict.maxPending)
except KeyError:
continue
payload = bytearray()
chunk_count = 0
for chunk in request:
del i.pendingUpload[chunk]
if Dandelion().hasHash(chunk) and \
i != Dandelion().objectChildStem(chunk):
i.antiIntersectionDelay()
self.logger.info(
'%s asked for a stem object we didn\'t offer to it.',
i.destination)
break
try:
payload.extend(protocol.CreatePacket(
'object', Inventory()[chunk].payload))
chunk_count += 1
except KeyError:
i.antiIntersectionDelay()
self.logger.info(
'%s asked for an object we don\'t have.',
i.destination)
break
if not chunk_count:
continue
i.append_write_buf(payload)
self.logger.debug(
'%s:%i Uploading %i objects',
i.destination.host, i.destination.port, chunk_count)
uploaded += chunk_count
if not uploaded:
self.stop.wait(1)