Network fixes

This commit is contained in:
lakshyacis 2020-01-06 17:14:47 +05:30
parent e37d52d950
commit f0bc74e658
No known key found for this signature in database
GPG Key ID: D2C539C8EC63E9EB
13 changed files with 205 additions and 214 deletions

View File

@ -1,3 +1,6 @@
"""
Network subsystem packages
"""
from addrthread import AddrThread
from announcethread import AnnounceThread
from connectionpool import BMConnectionPool

View File

@ -12,6 +12,7 @@ from threads import StoppableThread
class AddrThread(StoppableThread):
"""(Node) address broadcasting thread"""
name = "AddrBroadcaster"
def run(self):

View File

@ -1,9 +1,7 @@
"""
src/network/advanceddispatcher.py
=================================
Improved version of asyncore dispatcher
"""
# pylint: disable=attribute-defined-outside-init
import socket
import threading
import time
@ -14,7 +12,8 @@ from threads import BusyError, nonBlocking
class ProcessingError(Exception):
"""General class for protocol parser exception, use as a base for others."""
"""General class for protocol parser exception,
use as a base for others."""
pass
@ -24,7 +23,8 @@ class UnknownStateError(ProcessingError):
class AdvancedDispatcher(asyncore.dispatcher):
"""Improved version of asyncore dispatcher, with buffers and protocol state."""
"""Improved version of asyncore dispatcher,
with buffers and protocol state."""
# pylint: disable=too-many-instance-attributes
_buf_len = 131072 # 128kB
@ -72,7 +72,8 @@ class AdvancedDispatcher(asyncore.dispatcher):
del self.read_buf[0:length]
def process(self):
"""Process (parse) data that's in the buffer, as long as there is enough data and the connection is open."""
"""Process (parse) data that's in the buffer,
as long as there is enough data and the connection is open."""
while self.connected and not state.shutdown:
try:
with nonBlocking(self.processingLock):
@ -104,8 +105,9 @@ class AdvancedDispatcher(asyncore.dispatcher):
if asyncore.maxUploadRate > 0:
self.uploadChunk = int(asyncore.uploadBucket)
self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
return asyncore.dispatcher.writable(self) and \
(self.connecting or (self.connected and self.uploadChunk > 0))
return asyncore.dispatcher.writable(self) and (
self.connecting or (
self.connected and self.uploadChunk > 0))
def readable(self):
"""Is the read buffer ready to accept data from the network?"""
@ -114,13 +116,15 @@ class AdvancedDispatcher(asyncore.dispatcher):
self.downloadChunk = int(asyncore.downloadBucket)
try:
if self.expectBytes > 0 and not self.fullyEstablished:
self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf))
self.downloadChunk = min(
self.downloadChunk, self.expectBytes - len(self.read_buf))
if self.downloadChunk < 0:
self.downloadChunk = 0
except AttributeError:
pass
return asyncore.dispatcher.readable(self) and \
(self.connecting or self.accepting or (self.connected and self.downloadChunk > 0))
return asyncore.dispatcher.readable(self) and (
self.connecting or self.accepting or (
self.connected and self.downloadChunk > 0))
def handle_read(self):
"""Append incoming data to the read buffer."""
@ -144,20 +148,21 @@ class AdvancedDispatcher(asyncore.dispatcher):
try:
asyncore.dispatcher.handle_connect_event(self)
except socket.error as e:
if e.args[0] not in asyncore._DISCONNECTED: # pylint: disable=protected-access
# pylint: disable=protected-access
if e.args[0] not in asyncore._DISCONNECTED:
raise
def handle_connect(self):
"""Method for handling connection established implementations."""
self.lastTx = time.time()
def state_close(self):
def state_close(self): # pylint: disable=no-self-use
"""Signal to the processing loop to end."""
# pylint: disable=no-self-use
return False
def handle_close(self):
"""Callback for connection being closed, but can also be called directly when you want connection to close."""
"""Callback for connection being closed,
but can also be called directly when you want connection to close."""
with self.readLock:
self.read_buf = bytearray()
with self.writeLock:

View File

@ -1,8 +1,6 @@
"""
src/network/announcethread.py
=================================
Announce myself (node address)
"""
import time
import state
@ -40,6 +38,7 @@ class AnnounceThread(StoppableThread):
stream,
Peer(
'127.0.0.1',
BMConfigParser().safeGetInt('bitmessagesettings', 'port')),
BMConfigParser().safeGetInt(
'bitmessagesettings', 'port')),
time.time())
connection.append_write_buf(assemble_addr([addr]))

View File

@ -1,7 +1,6 @@
"""
Create bitmessage protocol command packets
"""
import struct
import addresses
@ -13,20 +12,20 @@ from protocol import CreatePacket, encodeHost
def assemble_addr(peerList):
"""Create address command"""
if isinstance(peerList, Peer):
peerList = (peerList)
peerList = [peerList]
if not peerList:
return b''
retval = b''
for i in range(0, len(peerList), MAX_ADDR_COUNT):
payload = addresses.encodeVarint(
len(peerList[i:i + MAX_ADDR_COUNT]))
payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT]))
for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]:
payload += struct.pack(
'>Q', timestamp) # 64-bit time
# 64-bit time
payload += struct.pack('>Q', timestamp)
payload += struct.pack('>I', stream)
payload += struct.pack(
'>q', 1) # service bit flags offered by this node
# service bit flags offered by this node
payload += struct.pack('>q', 1)
payload += encodeHost(peer.host)
payload += struct.pack('>H', peer.port) # remote port
# remote port
payload += struct.pack('>H', peer.port)
retval += CreatePacket('addr', payload)
return retval

View File

@ -1,56 +1,11 @@
"""
Basic infrastructure for asynchronous socket service clients and servers.
"""
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# pylint: disable=too-many-statements,too-many-branches,no-self-use,too-many-lines,attribute-defined-outside-init
# pylint: disable=global-statement
"""
src/network/asyncore_pollchoose.py
==================================
# ======================================================================
# Copyright 1996 by Sam Rushing
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
rarely CPU-bound, however.
If your operating system supports the select() system call in its I/O
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background." Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
# pylint: disable=too-many-branches,too-many-lines,global-statement
# pylint: disable=redefined-builtin,no-self-use
import os
import select
import socket
@ -58,8 +13,9 @@ import sys
import time
import warnings
from errno import (
EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR,
EINVAL, EISCONN, ENETUNREACH, ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode
EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED,
ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, EINVAL, EISCONN, ENETUNREACH,
ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode
)
from threading import current_thread
@ -107,7 +63,8 @@ def _strerror(err):
class ExitNow(Exception):
"""We don't use directly but may be necessary as we replace asyncore due to some library raising or expecting it"""
"""We don't use directly but may be necessary as we replace
asyncore due to some library raising or expecting it"""
pass
@ -152,7 +109,8 @@ def write(obj):
def set_rates(download, upload):
"""Set throttling rates"""
global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
global maxDownloadRate, maxUploadRate, downloadBucket
global uploadBucket, downloadTimestamp, uploadTimestamp
maxDownloadRate = float(download) * 1024
maxUploadRate = float(upload) * 1024
@ -182,7 +140,8 @@ def update_received(download=0):
currentTimestamp = time.time()
receivedBytes += download
if maxDownloadRate > 0:
bucketIncrease = maxDownloadRate * (currentTimestamp - downloadTimestamp)
bucketIncrease = \
maxDownloadRate * (currentTimestamp - downloadTimestamp)
downloadBucket += bucketIncrease
if downloadBucket > maxDownloadRate:
downloadBucket = int(maxDownloadRate)
@ -242,7 +201,6 @@ def readwrite(obj, flags):
def select_poller(timeout=0.0, map=None):
"""A poller which uses select(), available on most platforms."""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
@ -298,7 +256,6 @@ def select_poller(timeout=0.0, map=None):
def poll_poller(timeout=0.0, map=None):
"""A poller which uses poll(), available on most UNIXen."""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
@ -356,7 +313,6 @@ poll2 = poll3 = poll_poller
def epoll_poller(timeout=0.0, map=None):
"""A poller which uses epoll(), supported on Linux 2.5.44 and newer."""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
@ -412,7 +368,7 @@ def epoll_poller(timeout=0.0, map=None):
def kqueue_poller(timeout=0.0, map=None):
"""A poller which uses kqueue(), BSD specific."""
# pylint: disable=redefined-builtin,no-member
# pylint: disable=no-member,too-many-statements
if map is None:
map = socket_map
@ -440,14 +396,20 @@ def kqueue_poller(timeout=0.0, map=None):
poller_flags |= select.KQ_EV_ENABLE
else:
poller_flags |= select.KQ_EV_DISABLE
updates.append(select.kevent(fd, filter=select.KQ_FILTER_READ, flags=poller_flags))
updates.append(
select.kevent(
fd, filter=select.KQ_FILTER_READ,
flags=poller_flags))
if kq_filter & 2 != obj.poller_filter & 2:
poller_flags = select.KQ_EV_ADD
if kq_filter & 2:
poller_flags |= select.KQ_EV_ENABLE
else:
poller_flags |= select.KQ_EV_DISABLE
updates.append(select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=poller_flags))
updates.append(
select.kevent(
fd, filter=select.KQ_FILTER_WRITE,
flags=poller_flags))
obj.poller_filter = kq_filter
if not selectables:
@ -481,7 +443,6 @@ def kqueue_poller(timeout=0.0, map=None):
def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
"""Poll in a loop, until count or timeout is reached"""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
@ -520,9 +481,9 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
count = count - 1
class dispatcher:
class dispatcher(object):
"""Dispatcher for socket objects"""
# pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class
# pylint: disable=too-many-public-methods,too-many-instance-attributes
debug = False
connected = False
@ -537,7 +498,6 @@ class dispatcher:
minTx = 1500
def __init__(self, sock=None, map=None):
# pylint: disable=redefined-builtin
if map is None:
self._map = socket_map
else:
@ -586,8 +546,7 @@ class dispatcher:
def add_channel(self, map=None):
"""Add a channel"""
# pylint: disable=redefined-builtin
# pylint: disable=attribute-defined-outside-init
if map is None:
map = self._map
map[self._fileno] = self
@ -596,8 +555,6 @@ class dispatcher:
def del_channel(self, map=None):
"""Delete a channel"""
# pylint: disable=redefined-builtin
fd = self._fileno
if map is None:
map = self._map
@ -605,11 +562,13 @@ class dispatcher:
del map[fd]
if self._fileno:
try:
kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0)
kqueue_poller.pollster.control([select.kevent(
fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0)
except(AttributeError, KeyError, TypeError, IOError, OSError):
pass
try:
kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0)
kqueue_poller.pollster.control([select.kevent(
fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0)
except(AttributeError, KeyError, TypeError, IOError, OSError):
pass
try:
@ -627,8 +586,10 @@ class dispatcher:
self.poller_filter = 0
self.poller_registered = False
def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM):
def create_socket(
self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM):
"""Create a socket"""
# pylint: disable=attribute-defined-outside-init
self.family_and_type = family, socket_type
sock = socket.socket(family, socket_type)
sock.setblocking(0)
@ -636,20 +597,16 @@ class dispatcher:
def set_socket(self, sock, map=None):
"""Set socket"""
# pylint: disable=redefined-builtin
self.socket = sock
self._fileno = sock.fileno()
self.add_channel(map)
def set_reuse_addr(self):
"""try to re-use a server port if possible"""
try:
self.socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR,
self.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR) | 1
socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1
)
except socket.error:
pass
@ -704,13 +661,16 @@ class dispatcher:
raise socket.error(err, errorcode[err])
def accept(self):
"""Accept incoming connections. Returns either an address pair or None."""
"""Accept incoming connections.
Returns either an address pair or None."""
try:
conn, addr = self.socket.accept()
except TypeError:
return None
except socket.error as why:
if why.args[0] in (EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED, EAGAIN, ENOTCONN):
if why.args[0] in (
EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED,
EAGAIN, ENOTCONN):
return None
else:
raise
@ -769,7 +729,8 @@ class dispatcher:
try:
retattr = getattr(self.socket, attr)
except AttributeError:
raise AttributeError("%s instance has no attribute '%s'"
raise AttributeError(
"%s instance has no attribute '%s'"
% (self.__class__.__name__, attr))
else:
msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s"\
@ -855,13 +816,8 @@ class dispatcher:
self.log_info(
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr,
t,
v,
tbinfo
),
'error'
)
self_repr, t, v, tbinfo),
'error')
self.handle_close()
def handle_accept(self):
@ -902,11 +858,8 @@ class dispatcher_with_send(dispatcher):
adds simple buffered output capability, useful for simple clients.
[for more sophisticated usage use asynchat.async_chat]
"""
# pylint: disable=redefined-builtin
def __init__(self, sock=None, map=None):
# pylint: disable=redefined-builtin
dispatcher.__init__(self, sock, map)
self.out_buffer = b''
@ -941,7 +894,8 @@ def compact_traceback():
"""Return a compact traceback"""
t, v, tb = sys.exc_info()
tbinfo = []
if not tb: # Must have a traceback
# Must have a traceback
if not tb:
raise AssertionError("traceback does not exist")
while tb:
tbinfo.append((
@ -961,7 +915,6 @@ def compact_traceback():
def close_all(map=None, ignore_all=False):
"""Close all connections"""
# pylint: disable=redefined-builtin
if map is None:
map = socket_map
@ -998,13 +951,13 @@ def close_all(map=None, ignore_all=False):
if os.name == 'posix':
import fcntl
class file_wrapper:
class file_wrapper: # pylint: disable=old-style-class
"""
Here we override just enough to make a file look like a socket for the purposes of asyncore.
Here we override just enough to make a file look
like a socket for the purposes of asyncore.
The passed fd is automatically os.dup()'d
"""
# pylint: disable=old-style-class
def __init__(self, fd):
self.fd = os.dup(fd)
@ -1019,12 +972,11 @@ if os.name == 'posix':
def getsockopt(self, level, optname, buflen=None):
"""Fake getsockopt()"""
if (level == socket.SOL_SOCKET and
optname == socket.SO_ERROR and
if (level == socket.SOL_SOCKET and optname == socket.SO_ERROR and
not buflen):
return 0
raise NotImplementedError("Only asyncore specific behaviour "
"implemented.")
raise NotImplementedError(
"Only asyncore specific behaviour implemented.")
read = recv
write = send
@ -1041,8 +993,6 @@ if os.name == 'posix':
"""A dispatcher for file_wrapper objects"""
def __init__(self, fd, map=None):
# pylint: disable=redefined-builtin
dispatcher.__init__(self, None, map)
self.connected = True
try:

View File

@ -1,7 +1,6 @@
"""
BMObject and it's exceptions.
"""
import logging
import time
@ -15,12 +14,14 @@ logger = logging.getLogger('default')
class BMObjectInsufficientPOWError(Exception):
"""Exception indicating the object doesn't have sufficient proof of work."""
"""Exception indicating the object
doesn't have sufficient proof of work."""
errorCodes = ("Insufficient proof of work")
class BMObjectInvalidDataError(Exception):
"""Exception indicating the data being parsed does not match the specification."""
"""Exception indicating the data being parsed
does not match the specification."""
errorCodes = ("Data invalid")
@ -30,7 +31,8 @@ class BMObjectExpiredError(Exception):
class BMObjectUnwantedStreamError(Exception):
"""Exception indicating the object is in a stream we didn't advertise as being interested in."""
"""Exception indicating the object is in a stream
we didn't advertise as being interested in."""
errorCodes = ("Object in unwanted stream")
@ -44,9 +46,8 @@ class BMObjectAlreadyHaveError(Exception):
errorCodes = ("Already have this object")
class BMObject(object):
class BMObject(object): # pylint: disable=too-many-instance-attributes
"""Bitmessage Object as a class."""
# pylint: disable=too-many-instance-attributes
# max TTL, 28 days and 3 hours
maxTTL = 28 * 24 * 60 * 60 + 10800
@ -81,31 +82,36 @@ class BMObject(object):
raise BMObjectInsufficientPOWError()
def checkEOLSanity(self):
"""Check if object's lifetime isn't ridiculously far in the past or future."""
"""Check if object's lifetime
isn't ridiculously far in the past or future."""
# EOL sanity check
if self.expiresTime - int(time.time()) > BMObject.maxTTL:
logger.info(
'This object\'s End of Life time is too far in the future. Ignoring it. Time is %i',
self.expiresTime)
'This object\'s End of Life time is too far in the future.'
' Ignoring it. Time is %i', self.expiresTime)
# .. todo:: remove from download queue
raise BMObjectExpiredError()
if self.expiresTime - int(time.time()) < BMObject.minTTL:
logger.info(
'This object\'s End of Life time was too long ago. Ignoring the object. Time is %i',
self.expiresTime)
'This object\'s End of Life time was too long ago.'
' Ignoring the object. Time is %i', self.expiresTime)
# .. todo:: remove from download queue
raise BMObjectExpiredError()
def checkStream(self):
"""Check if object's stream matches streams we are interested in"""
if self.streamNumber not in state.streamsInWhichIAmParticipating:
logger.debug('The streamNumber %i isn\'t one we are interested in.', self.streamNumber)
logger.debug(
'The streamNumber %i isn\'t one we are interested in.',
self.streamNumber)
raise BMObjectUnwantedStreamError()
def checkAlreadyHave(self):
"""
Check if we already have the object (so that we don't duplicate it in inventory or advertise it unnecessarily)
Check if we already have the object
(so that we don't duplicate it in inventory
or advertise it unnecessarily)
"""
# if it's a stem duplicate, pretend we don't have it
if Dandelion().hasHash(self.inventoryHash):
@ -114,7 +120,8 @@ class BMObject(object):
raise BMObjectAlreadyHaveError()
def checkObjectByType(self):
"""Call a object type specific check (objects can have additional checks based on their types)"""
"""Call a object type specific check
(objects can have additional checks based on their types)"""
if self.objectType == protocol.OBJECT_GETPUBKEY:
self.checkGetpubkey()
elif self.objectType == protocol.OBJECT_PUBKEY:
@ -125,20 +132,21 @@ class BMObject(object):
self.checkBroadcast()
# other objects don't require other types of tests
def checkMessage(self):
def checkMessage(self): # pylint: disable=no-self-use
""""Message" object type checks."""
# pylint: disable=no-self-use
return
def checkGetpubkey(self):
""""Getpubkey" object type checks."""
if len(self.data) < 42:
logger.info('getpubkey message doesn\'t contain enough data. Ignoring.')
logger.info(
'getpubkey message doesn\'t contain enough data. Ignoring.')
raise BMObjectInvalidError()
def checkPubkey(self):
""""Pubkey" object type checks."""
if len(self.data) < 146 or len(self.data) > 440: # sanity check
# sanity check
if len(self.data) < 146 or len(self.data) > 440:
logger.info('pubkey object too short or too long. Ignoring.')
raise BMObjectInvalidError()
@ -146,8 +154,9 @@ class BMObject(object):
""""Broadcast" object type checks."""
if len(self.data) < 180:
logger.debug(
'The payload length of this broadcast packet is unreasonably low.'
' Someone is probably trying funny business. Ignoring message.')
'The payload length of this broadcast'
' packet is unreasonably low. Someone is probably'
' trying funny business. Ignoring message.')
raise BMObjectInvalidError()
# this isn't supported anymore

View File

@ -1,8 +1,7 @@
"""
src/network/bmproto.py
==================================
Bitmessage Protocol
"""
# pylint: disable=attribute-defined-outside-init
# pylint: disable=attribute-defined-outside-init, too-few-public-methods
import base64
import hashlib
import logging
@ -19,17 +18,16 @@ import state
from bmconfigparser import BMConfigParser
from inventory import Inventory
from network.advanceddispatcher import AdvancedDispatcher
from network.constants import (
ADDRESS_ALIVE,
MAX_MESSAGE_SIZE,
MAX_OBJECT_COUNT,
MAX_OBJECT_PAYLOAD_SIZE,
MAX_TIME_OFFSET)
from network.dandelion import Dandelion
from network.bmobject import (
BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError,
BMObjectExpiredError, BMObjectUnwantedStreamError,
BMObjectInvalidError, BMObjectAlreadyHaveError)
BMObjectInvalidError, BMObjectAlreadyHaveError
)
from network.constants import (
ADDRESS_ALIVE, MAX_MESSAGE_SIZE, MAX_OBJECT_COUNT,
MAX_OBJECT_PAYLOAD_SIZE, MAX_TIME_OFFSET
)
from network.dandelion import Dandelion
from network.proxy import ProxyError
from node import Node, Peer
from objectracker import missingObjects, ObjectTracker
@ -59,7 +57,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
# pylint: disable=too-many-instance-attributes, too-many-public-methods
timeOffsetWrongCount = 0
def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called
def __init__(self, address=None, sock=None):
# pylint: disable=unused-argument, super-init-not-called
AdvancedDispatcher.__init__(self, sock)
self.isOutbound = False
# packet/connection from a local IP
@ -163,7 +162,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def decode_payload_varint(self):
"""Decode a varint from the payload"""
value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:])
value, offset = addresses.decodeVarint(
self.payload[self.payloadOffset:])
self.payloadOffset += offset
return value
@ -185,8 +185,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return Node(services, host, port)
def decode_payload_content(self, pattern="v"): # pylint: disable=too-many-branches, too-many-statements
# pylint: disable=too-many-branches, too-many-statements
def decode_payload_content(self, pattern="v"):
"""
Decode the payload depending on pattern:
@ -202,7 +202,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
, = end of array
"""
def decode_simple(self, char="v"): # pylint: disable=inconsistent-return-statements
# pylint: disable=inconsistent-return-statements
def decode_simple(self, char="v"):
"""Decode the payload using one char pattern"""
if char == "v":
return self.decode_payload_varint()
@ -312,8 +313,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_error(self):
"""Decode an error message and log it"""
fatalStatus, banTime, inventoryVector, errorText = \
self.decode_payload_content("vvlsls")
err_values = self.decode_payload_content("vvlsls")
fatalStatus = err_values[0]
# banTime = err_values[1]
# inventoryVector = err_values[2]
errorText = err_values[3]
logger.error(
'%s:%i error: %i, %s', self.destination.host,
self.destination.port, fatalStatus, errorText)
@ -408,8 +412,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
except KeyError:
pass
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash):
Dandelion().removeHash(self.object.inventoryHash, "cycle detection")
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(
self.object.inventoryHash):
Dandelion().removeHash(
self.object.inventoryHash, "cycle detection")
Inventory()[self.object.inventoryHash] = (
self.object.objectType, self.object.streamNumber,
@ -428,9 +434,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_addr(self):
"""Incoming addresses, process them"""
addresses = self._decode_addr() # pylint: disable=redefined-outer-name
for i in addresses:
seenTime, stream, services, ip, port = i
# pylint: disable=redefined-outer-name
addresses = self._decode_addr()
for seenTime, stream, _, ip, port in addresses:
decodedIP = protocol.checkIPAddress(str(ip))
if stream not in state.streamsInWhichIAmParticipating:
continue
@ -441,14 +447,17 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
):
peer = Peer(decodedIP, port)
try:
if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime:
if knownnodes.knownNodes[stream][peer]["lastseen"] > \
seenTime:
continue
except KeyError:
pass
if len(knownnodes.knownNodes[stream]) < BMConfigParser().safeGetInt("knownnodes", "maxnodes"):
if len(knownnodes.knownNodes[stream]) < \
BMConfigParser().safeGetInt("knownnodes", "maxnodes"):
with knownnodes.knownNodesLock:
try:
knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime
knownnodes.knownNodes[stream][peer]["lastseen"] = \
seenTime
except (TypeError, KeyError):
knownnodes.knownNodes[stream][peer] = {
"lastseen": seenTime,
@ -539,7 +548,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
length=self.payloadLength, expectBytes=0)
return False
def peerValidityChecks(self): # pylint: disable=too-many-return-statements
# pylint: disable=too-many-return-statements
def peerValidityChecks(self):
"""Check the validity of the peer"""
if self.remoteProtocolVersion < 3:
self.append_write_buf(protocol.assembleErrorMessage(
@ -551,8 +561,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
return False
if self.timeOffset > MAX_TIME_OFFSET:
self.append_write_buf(protocol.assembleErrorMessage(
errorText="Your time is too far in the future compared to mine."
" Closing connection.", fatal=2))
errorText="Your time is too far in the future"
" compared to mine. Closing connection.", fatal=2))
logger.info(
"%s's time is too far in the future (%s seconds)."
" Closing connection to it.", self.destination, self.timeOffset)
@ -574,8 +584,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
errorText="We don't have shared stream interests."
" Closing connection.", fatal=2))
logger.debug(
'Closed connection to %s because there is no overlapping interest'
' in streams.', self.destination)
'Closed connection to %s because there is no overlapping'
' interest in streams.', self.destination)
return False
if self.destination in connectionpool.BMConnectionPool().inboundConnections:
try:
@ -584,8 +594,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
errorText="Too many connections from your IP."
" Closing connection.", fatal=2))
logger.debug(
'Closed connection to %s because we are already connected'
' to that IP.', self.destination)
'Closed connection to %s because we are already'
' connected to that IP.', self.destination)
return False
except:
pass

View File

@ -1,3 +1,6 @@
"""
Select which node to connect to
"""
# pylint: disable=too-many-branches
import logging
import random # nosec
@ -12,6 +15,7 @@ logger = logging.getLogger('default')
def getDiscoveredPeer():
"""Get a peer from the local peer discovery list"""
try:
peer = random.choice(state.discoveredPeers.keys())
except (IndexError, KeyError):
@ -24,6 +28,7 @@ def getDiscoveredPeer():
def chooseConnection(stream):
"""Returns an appropriate connection"""
haveOnion = BMConfigParser().safeGet(
"bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
onionOnly = BMConfigParser().safeGetBoolean(
@ -49,7 +54,8 @@ def chooseConnection(stream):
logger.warning('Error in %s', peer)
rating = 0
if haveOnion:
# do not connect to raw IP addresses--keep all traffic within Tor overlay
# do not connect to raw IP addresses
# --keep all traffic within Tor overlay
if onionOnly and not peer.host.endswith('.onion'):
continue
# onion addresses have a higher priority when SOCKS

View File

@ -3,9 +3,15 @@ Network protocol constants
"""
ADDRESS_ALIVE = 10800 #: address is online if online less than this many seconds ago
MAX_ADDR_COUNT = 1000 #: protocol specification says max 1000 addresses in one addr command
MAX_MESSAGE_SIZE = 1600100 #: ~1.6 MB which is the maximum possible size of an inv message.
MAX_OBJECT_PAYLOAD_SIZE = 2**18 #: 2**18 = 256kB is the maximum size of an object payload
MAX_OBJECT_COUNT = 50000 #: protocol specification says max 50000 objects in one inv command
MAX_TIME_OFFSET = 3600 #: maximum time offset
#: address is online if online less than this many seconds ago
ADDRESS_ALIVE = 10800
#: protocol specification says max 1000 addresses in one addr command
MAX_ADDR_COUNT = 1000
#: ~1.6 MB which is the maximum possible size of an inv message.
MAX_MESSAGE_SIZE = 1600100
#: 2**18 = 256kB is the maximum size of an object payload
MAX_OBJECT_PAYLOAD_SIZE = 2**18
#: protocol specification says max 50000 objects in one inv command
MAX_OBJECT_COUNT = 50000
#: maximum time offset
MAX_TIME_OFFSET = 3600

View File

@ -1,10 +1,9 @@
"""
src/network/dandelion.py
========================
Dandelion class definition, tracks stages
"""
import logging
from collections import namedtuple
from random import choice, sample, expovariate
from random import choice, expovariate, sample
from threading import RLock
from time import time
@ -28,7 +27,7 @@ logger = logging.getLogger('default')
@Singleton
class Dandelion(): # pylint: disable=old-style-class
class Dandelion: # pylint: disable=old-style-class
"""Dandelion class for tracking stem/fluff stages."""
def __init__(self):
# currently assignable child stems
@ -123,7 +122,8 @@ class Dandelion(): # pylint: disable=old-style-class
self.stem.remove(connection)
# active mappings to pointing to the removed node
for k in (
k for k, v in self.nodeMap.iteritems() if v == connection
k for k, v in self.nodeMap.iteritems()
if v == connection
):
self.nodeMap[k] = None
for k, v in {

View File

@ -1,7 +1,6 @@
"""
`DownloadThread` class definition
"""
import time
import addresses
@ -30,7 +29,9 @@ class DownloadThread(StoppableThread):
"""Expire pending downloads eventually"""
deadline = time.time() - self.requestExpires
try:
toDelete = [k for k, v in missingObjects.iteritems() if v < deadline]
toDelete = [
k for k, v in missingObjects.iteritems()
if v < deadline]
except RuntimeError:
pass
else:

View File

@ -1,6 +1,5 @@
"""
src/network/invthread.py
========================
Thread to send inv annoucements
"""
import Queue
import random
@ -34,7 +33,7 @@ def handleExpiredDandelion(expired):
class InvThread(StoppableThread):
"""A thread to send inv annoucements."""
"""Main thread that sends inv annoucements"""
name = "InvBroadcaster"
@ -43,7 +42,8 @@ class InvThread(StoppableThread):
"""Locally generated inventory items require special handling"""
Dandelion().addHash(hashId, stream=stream)
for connection in BMConnectionPool().connections():
if state.dandelion and connection != Dandelion().objectChildStem(hashId):
if state.dandelion and connection != \
Dandelion().objectChildStem(hashId):
continue
connection.objectsNewToThem[hashId] = time()
@ -92,15 +92,17 @@ class InvThread(StoppableThread):
random.shuffle(fluffs)
connection.append_write_buf(protocol.CreatePacket(
'inv',
addresses.encodeVarint(len(fluffs)) + ''.join(fluffs)))
addresses.encodeVarint(
len(fluffs)) + ''.join(fluffs)))
if stems:
random.shuffle(stems)
connection.append_write_buf(protocol.CreatePacket(
'dinv',
addresses.encodeVarint(len(stems)) + ''.join(stems)))
addresses.encodeVarint(
len(stems)) + ''.join(stems)))
invQueue.iterate()
for i in range(len(chunk)):
for _ in range(len(chunk)):
invQueue.task_done()
if Dandelion().refresh < time():