Merge branch 'v0.6' of https://github.com/surbhicis/PyBitmessage into UiChanges
This commit is contained in:
commit
4b609e46b3
21
LICENSE
21
LICENSE
|
@ -47,3 +47,24 @@ Redistribution and use in source and binary forms, with or without modification,
|
|||
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
===== based on asyncore_pollchoose.py asyncore_pollchoose python implementation. by Sam Rushing <rushing@nightmare.com>
|
||||
|
||||
Copyright 1996 by Sam Rushing. All Rights Reserved
|
||||
|
||||
Permission to use, copy, modify, and distribute this software and
|
||||
its documentation for any purpose and without fee is hereby
|
||||
granted, provided that the above copyright notice appear in all
|
||||
copies and that both that copyright notice and this permission
|
||||
notice appear in supporting documentation, and that the name of Sam
|
||||
Rushing not be used in advertising or publicity pertaining to
|
||||
distribution of the software without specific, written prior
|
||||
permission.
|
||||
|
||||
SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
|
||||
INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
|
||||
NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
|
||||
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
|
||||
OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
||||
NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||
CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
"""
|
||||
Network subsystem packages
|
||||
"""
|
||||
from addrthread import AddrThread
|
||||
from announcethread import AnnounceThread
|
||||
from connectionpool import BMConnectionPool
|
||||
|
|
|
@ -12,6 +12,7 @@ from threads import StoppableThread
|
|||
|
||||
|
||||
class AddrThread(StoppableThread):
|
||||
"""(Node) address broadcasting thread"""
|
||||
name = "AddrBroadcaster"
|
||||
|
||||
def run(self):
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
"""
|
||||
src/network/advanceddispatcher.py
|
||||
=================================
|
||||
Improved version of asyncore dispatcher
|
||||
"""
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
@ -14,7 +12,8 @@ from threads import BusyError, nonBlocking
|
|||
|
||||
|
||||
class ProcessingError(Exception):
|
||||
"""General class for protocol parser exception, use as a base for others."""
|
||||
"""General class for protocol parser exception,
|
||||
use as a base for others."""
|
||||
pass
|
||||
|
||||
|
||||
|
@ -24,7 +23,8 @@ class UnknownStateError(ProcessingError):
|
|||
|
||||
|
||||
class AdvancedDispatcher(asyncore.dispatcher):
|
||||
"""Improved version of asyncore dispatcher, with buffers and protocol state."""
|
||||
"""Improved version of asyncore dispatcher,
|
||||
with buffers and protocol state."""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
_buf_len = 131072 # 128kB
|
||||
|
||||
|
@ -72,7 +72,8 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
|||
del self.read_buf[0:length]
|
||||
|
||||
def process(self):
|
||||
"""Process (parse) data that's in the buffer, as long as there is enough data and the connection is open."""
|
||||
"""Process (parse) data that's in the buffer,
|
||||
as long as there is enough data and the connection is open."""
|
||||
while self.connected and not state.shutdown:
|
||||
try:
|
||||
with nonBlocking(self.processingLock):
|
||||
|
@ -104,8 +105,9 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
|||
if asyncore.maxUploadRate > 0:
|
||||
self.uploadChunk = int(asyncore.uploadBucket)
|
||||
self.uploadChunk = min(self.uploadChunk, len(self.write_buf))
|
||||
return asyncore.dispatcher.writable(self) and \
|
||||
(self.connecting or (self.connected and self.uploadChunk > 0))
|
||||
return asyncore.dispatcher.writable(self) and (
|
||||
self.connecting or (
|
||||
self.connected and self.uploadChunk > 0))
|
||||
|
||||
def readable(self):
|
||||
"""Is the read buffer ready to accept data from the network?"""
|
||||
|
@ -114,13 +116,15 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
|||
self.downloadChunk = int(asyncore.downloadBucket)
|
||||
try:
|
||||
if self.expectBytes > 0 and not self.fullyEstablished:
|
||||
self.downloadChunk = min(self.downloadChunk, self.expectBytes - len(self.read_buf))
|
||||
self.downloadChunk = min(
|
||||
self.downloadChunk, self.expectBytes - len(self.read_buf))
|
||||
if self.downloadChunk < 0:
|
||||
self.downloadChunk = 0
|
||||
except AttributeError:
|
||||
pass
|
||||
return asyncore.dispatcher.readable(self) and \
|
||||
(self.connecting or self.accepting or (self.connected and self.downloadChunk > 0))
|
||||
return asyncore.dispatcher.readable(self) and (
|
||||
self.connecting or self.accepting or (
|
||||
self.connected and self.downloadChunk > 0))
|
||||
|
||||
def handle_read(self):
|
||||
"""Append incoming data to the read buffer."""
|
||||
|
@ -144,20 +148,21 @@ class AdvancedDispatcher(asyncore.dispatcher):
|
|||
try:
|
||||
asyncore.dispatcher.handle_connect_event(self)
|
||||
except socket.error as e:
|
||||
if e.args[0] not in asyncore._DISCONNECTED: # pylint: disable=protected-access
|
||||
# pylint: disable=protected-access
|
||||
if e.args[0] not in asyncore._DISCONNECTED:
|
||||
raise
|
||||
|
||||
def handle_connect(self):
|
||||
"""Method for handling connection established implementations."""
|
||||
self.lastTx = time.time()
|
||||
|
||||
def state_close(self):
|
||||
def state_close(self): # pylint: disable=no-self-use
|
||||
"""Signal to the processing loop to end."""
|
||||
# pylint: disable=no-self-use
|
||||
return False
|
||||
|
||||
def handle_close(self):
|
||||
"""Callback for connection being closed, but can also be called directly when you want connection to close."""
|
||||
"""Callback for connection being closed,
|
||||
but can also be called directly when you want connection to close."""
|
||||
with self.readLock:
|
||||
self.read_buf = bytearray()
|
||||
with self.writeLock:
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
"""
|
||||
src/network/announcethread.py
|
||||
=================================
|
||||
Announce myself (node address)
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
import state
|
||||
|
@ -40,6 +38,7 @@ class AnnounceThread(StoppableThread):
|
|||
stream,
|
||||
Peer(
|
||||
'127.0.0.1',
|
||||
BMConfigParser().safeGetInt('bitmessagesettings', 'port')),
|
||||
BMConfigParser().safeGetInt(
|
||||
'bitmessagesettings', 'port')),
|
||||
time.time())
|
||||
connection.append_write_buf(assemble_addr([addr]))
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
"""
|
||||
Create bitmessage protocol command packets
|
||||
"""
|
||||
|
||||
import struct
|
||||
|
||||
import addresses
|
||||
|
@ -13,20 +12,20 @@ from protocol import CreatePacket, encodeHost
|
|||
def assemble_addr(peerList):
|
||||
"""Create address command"""
|
||||
if isinstance(peerList, Peer):
|
||||
peerList = (peerList)
|
||||
peerList = [peerList]
|
||||
if not peerList:
|
||||
return b''
|
||||
retval = b''
|
||||
for i in range(0, len(peerList), MAX_ADDR_COUNT):
|
||||
payload = addresses.encodeVarint(
|
||||
len(peerList[i:i + MAX_ADDR_COUNT]))
|
||||
payload = addresses.encodeVarint(len(peerList[i:i + MAX_ADDR_COUNT]))
|
||||
for stream, peer, timestamp in peerList[i:i + MAX_ADDR_COUNT]:
|
||||
payload += struct.pack(
|
||||
'>Q', timestamp) # 64-bit time
|
||||
# 64-bit time
|
||||
payload += struct.pack('>Q', timestamp)
|
||||
payload += struct.pack('>I', stream)
|
||||
payload += struct.pack(
|
||||
'>q', 1) # service bit flags offered by this node
|
||||
# service bit flags offered by this node
|
||||
payload += struct.pack('>q', 1)
|
||||
payload += encodeHost(peer.host)
|
||||
payload += struct.pack('>H', peer.port) # remote port
|
||||
# remote port
|
||||
payload += struct.pack('>H', peer.port)
|
||||
retval += CreatePacket('addr', payload)
|
||||
return retval
|
||||
|
|
|
@ -1,56 +1,11 @@
|
|||
"""
|
||||
Basic infrastructure for asynchronous socket service clients and servers.
|
||||
"""
|
||||
# -*- Mode: Python -*-
|
||||
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
|
||||
# Author: Sam Rushing <rushing@nightmare.com>
|
||||
# pylint: disable=too-many-statements,too-many-branches,no-self-use,too-many-lines,attribute-defined-outside-init
|
||||
# pylint: disable=global-statement
|
||||
"""
|
||||
src/network/asyncore_pollchoose.py
|
||||
==================================
|
||||
|
||||
# ======================================================================
|
||||
# Copyright 1996 by Sam Rushing
|
||||
#
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Permission to use, copy, modify, and distribute this software and
|
||||
# its documentation for any purpose and without fee is hereby
|
||||
# granted, provided that the above copyright notice appear in all
|
||||
# copies and that both that copyright notice and this permission
|
||||
# notice appear in supporting documentation, and that the name of Sam
|
||||
# Rushing not be used in advertising or publicity pertaining to
|
||||
# distribution of the software without specific, written prior
|
||||
# permission.
|
||||
#
|
||||
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
|
||||
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
|
||||
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
|
||||
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
|
||||
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
|
||||
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
# ======================================================================
|
||||
|
||||
Basic infrastructure for asynchronous socket service clients and servers.
|
||||
|
||||
There are only two ways to have a program on a single processor do "more
|
||||
than one thing at a time". Multi-threaded programming is the simplest and
|
||||
most popular way to do it, but there is another very different technique,
|
||||
that lets you have nearly all the advantages of multi-threading, without
|
||||
actually using multiple threads. it's really only practical if your program
|
||||
is largely I/O bound. If your program is CPU bound, then pre-emptive
|
||||
scheduled threads are probably what you really need. Network servers are
|
||||
rarely CPU-bound, however.
|
||||
|
||||
If your operating system supports the select() system call in its I/O
|
||||
library (and nearly all do), then you can use it to juggle multiple
|
||||
communication channels at once; doing other work while your I/O is taking
|
||||
place in the "background." Although this strategy can seem strange and
|
||||
complex, especially at first, it is in many ways easier to understand and
|
||||
control than multi-threaded programming. The module documented here solves
|
||||
many of the difficult problems for you, making the task of building
|
||||
sophisticated high-performance network servers and clients a snap.
|
||||
"""
|
||||
|
||||
# pylint: disable=too-many-branches,too-many-lines,global-statement
|
||||
# pylint: disable=redefined-builtin,no-self-use
|
||||
import os
|
||||
import select
|
||||
import socket
|
||||
|
@ -58,8 +13,9 @@ import sys
|
|||
import time
|
||||
import warnings
|
||||
from errno import (
|
||||
EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED, ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR,
|
||||
EINVAL, EISCONN, ENETUNREACH, ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode
|
||||
EADDRINUSE, EAGAIN, EALREADY, EBADF, ECONNABORTED, ECONNREFUSED,
|
||||
ECONNRESET, EHOSTUNREACH, EINPROGRESS, EINTR, EINVAL, EISCONN, ENETUNREACH,
|
||||
ENOTCONN, ENOTSOCK, EPIPE, ESHUTDOWN, ETIMEDOUT, EWOULDBLOCK, errorcode
|
||||
)
|
||||
from threading import current_thread
|
||||
|
||||
|
@ -107,7 +63,8 @@ def _strerror(err):
|
|||
|
||||
|
||||
class ExitNow(Exception):
|
||||
"""We don't use directly but may be necessary as we replace asyncore due to some library raising or expecting it"""
|
||||
"""We don't use directly but may be necessary as we replace
|
||||
asyncore due to some library raising or expecting it"""
|
||||
pass
|
||||
|
||||
|
||||
|
@ -152,7 +109,8 @@ def write(obj):
|
|||
def set_rates(download, upload):
|
||||
"""Set throttling rates"""
|
||||
|
||||
global maxDownloadRate, maxUploadRate, downloadBucket, uploadBucket, downloadTimestamp, uploadTimestamp
|
||||
global maxDownloadRate, maxUploadRate, downloadBucket
|
||||
global uploadBucket, downloadTimestamp, uploadTimestamp
|
||||
|
||||
maxDownloadRate = float(download) * 1024
|
||||
maxUploadRate = float(upload) * 1024
|
||||
|
@ -182,7 +140,8 @@ def update_received(download=0):
|
|||
currentTimestamp = time.time()
|
||||
receivedBytes += download
|
||||
if maxDownloadRate > 0:
|
||||
bucketIncrease = maxDownloadRate * (currentTimestamp - downloadTimestamp)
|
||||
bucketIncrease = \
|
||||
maxDownloadRate * (currentTimestamp - downloadTimestamp)
|
||||
downloadBucket += bucketIncrease
|
||||
if downloadBucket > maxDownloadRate:
|
||||
downloadBucket = int(maxDownloadRate)
|
||||
|
@ -242,7 +201,6 @@ def readwrite(obj, flags):
|
|||
|
||||
def select_poller(timeout=0.0, map=None):
|
||||
"""A poller which uses select(), available on most platforms."""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
if map is None:
|
||||
map = socket_map
|
||||
|
@ -298,7 +256,6 @@ def select_poller(timeout=0.0, map=None):
|
|||
|
||||
def poll_poller(timeout=0.0, map=None):
|
||||
"""A poller which uses poll(), available on most UNIXen."""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
if map is None:
|
||||
map = socket_map
|
||||
|
@ -356,7 +313,6 @@ poll2 = poll3 = poll_poller
|
|||
|
||||
def epoll_poller(timeout=0.0, map=None):
|
||||
"""A poller which uses epoll(), supported on Linux 2.5.44 and newer."""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
if map is None:
|
||||
map = socket_map
|
||||
|
@ -412,7 +368,7 @@ def epoll_poller(timeout=0.0, map=None):
|
|||
|
||||
def kqueue_poller(timeout=0.0, map=None):
|
||||
"""A poller which uses kqueue(), BSD specific."""
|
||||
# pylint: disable=redefined-builtin,no-member
|
||||
# pylint: disable=no-member,too-many-statements
|
||||
|
||||
if map is None:
|
||||
map = socket_map
|
||||
|
@ -440,14 +396,20 @@ def kqueue_poller(timeout=0.0, map=None):
|
|||
poller_flags |= select.KQ_EV_ENABLE
|
||||
else:
|
||||
poller_flags |= select.KQ_EV_DISABLE
|
||||
updates.append(select.kevent(fd, filter=select.KQ_FILTER_READ, flags=poller_flags))
|
||||
updates.append(
|
||||
select.kevent(
|
||||
fd, filter=select.KQ_FILTER_READ,
|
||||
flags=poller_flags))
|
||||
if kq_filter & 2 != obj.poller_filter & 2:
|
||||
poller_flags = select.KQ_EV_ADD
|
||||
if kq_filter & 2:
|
||||
poller_flags |= select.KQ_EV_ENABLE
|
||||
else:
|
||||
poller_flags |= select.KQ_EV_DISABLE
|
||||
updates.append(select.kevent(fd, filter=select.KQ_FILTER_WRITE, flags=poller_flags))
|
||||
updates.append(
|
||||
select.kevent(
|
||||
fd, filter=select.KQ_FILTER_WRITE,
|
||||
flags=poller_flags))
|
||||
obj.poller_filter = kq_filter
|
||||
|
||||
if not selectables:
|
||||
|
@ -481,7 +443,6 @@ def kqueue_poller(timeout=0.0, map=None):
|
|||
|
||||
def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
|
||||
"""Poll in a loop, until count or timeout is reached"""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
if map is None:
|
||||
map = socket_map
|
||||
|
@ -520,9 +481,9 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
|
|||
count = count - 1
|
||||
|
||||
|
||||
class dispatcher:
|
||||
class dispatcher(object):
|
||||
"""Dispatcher for socket objects"""
|
||||
# pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class
|
||||
# pylint: disable=too-many-public-methods,too-many-instance-attributes
|
||||
|
||||
debug = False
|
||||
connected = False
|
||||
|
@ -537,7 +498,6 @@ class dispatcher:
|
|||
minTx = 1500
|
||||
|
||||
def __init__(self, sock=None, map=None):
|
||||
# pylint: disable=redefined-builtin
|
||||
if map is None:
|
||||
self._map = socket_map
|
||||
else:
|
||||
|
@ -586,8 +546,7 @@ class dispatcher:
|
|||
|
||||
def add_channel(self, map=None):
|
||||
"""Add a channel"""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
if map is None:
|
||||
map = self._map
|
||||
map[self._fileno] = self
|
||||
|
@ -596,8 +555,6 @@ class dispatcher:
|
|||
|
||||
def del_channel(self, map=None):
|
||||
"""Delete a channel"""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
fd = self._fileno
|
||||
if map is None:
|
||||
map = self._map
|
||||
|
@ -605,12 +562,14 @@ class dispatcher:
|
|||
del map[fd]
|
||||
if self._fileno:
|
||||
try:
|
||||
kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0)
|
||||
except (AttributeError, KeyError, TypeError, IOError, OSError):
|
||||
kqueue_poller.pollster.control([select.kevent(
|
||||
fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)], 0)
|
||||
except(AttributeError, KeyError, TypeError, IOError, OSError):
|
||||
pass
|
||||
try:
|
||||
kqueue_poller.pollster.control([select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0)
|
||||
except (AttributeError, KeyError, TypeError, IOError, OSError):
|
||||
kqueue_poller.pollster.control([select.kevent(
|
||||
fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)], 0)
|
||||
except(AttributeError, KeyError, TypeError, IOError, OSError):
|
||||
pass
|
||||
try:
|
||||
epoll_poller.pollster.unregister(fd)
|
||||
|
@ -627,8 +586,10 @@ class dispatcher:
|
|||
self.poller_filter = 0
|
||||
self.poller_registered = False
|
||||
|
||||
def create_socket(self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM):
|
||||
def create_socket(
|
||||
self, family=socket.AF_INET, socket_type=socket.SOCK_STREAM):
|
||||
"""Create a socket"""
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self.family_and_type = family, socket_type
|
||||
sock = socket.socket(family, socket_type)
|
||||
sock.setblocking(0)
|
||||
|
@ -636,20 +597,16 @@ class dispatcher:
|
|||
|
||||
def set_socket(self, sock, map=None):
|
||||
"""Set socket"""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
self.socket = sock
|
||||
self._fileno = sock.fileno()
|
||||
self.add_channel(map)
|
||||
|
||||
def set_reuse_addr(self):
|
||||
"""try to re-use a server port if possible"""
|
||||
|
||||
try:
|
||||
self.socket.setsockopt(
|
||||
socket.SOL_SOCKET, socket.SO_REUSEADDR,
|
||||
self.socket.getsockopt(socket.SOL_SOCKET,
|
||||
socket.SO_REUSEADDR) | 1
|
||||
socket.SOL_SOCKET, socket.SO_REUSEADDR, self.socket.getsockopt(
|
||||
socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1
|
||||
)
|
||||
except socket.error:
|
||||
pass
|
||||
|
@ -704,13 +661,16 @@ class dispatcher:
|
|||
raise socket.error(err, errorcode[err])
|
||||
|
||||
def accept(self):
|
||||
"""Accept incoming connections. Returns either an address pair or None."""
|
||||
"""Accept incoming connections.
|
||||
Returns either an address pair or None."""
|
||||
try:
|
||||
conn, addr = self.socket.accept()
|
||||
except TypeError:
|
||||
return None
|
||||
except socket.error as why:
|
||||
if why.args[0] in (EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED, EAGAIN, ENOTCONN):
|
||||
if why.args[0] in (
|
||||
EWOULDBLOCK, WSAEWOULDBLOCK, ECONNABORTED,
|
||||
EAGAIN, ENOTCONN):
|
||||
return None
|
||||
else:
|
||||
raise
|
||||
|
@ -769,11 +729,12 @@ class dispatcher:
|
|||
try:
|
||||
retattr = getattr(self.socket, attr)
|
||||
except AttributeError:
|
||||
raise AttributeError("%s instance has no attribute '%s'"
|
||||
% (self.__class__.__name__, attr))
|
||||
raise AttributeError(
|
||||
"%s instance has no attribute '%s'"
|
||||
% (self.__class__.__name__, attr))
|
||||
else:
|
||||
msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s " \
|
||||
"instead" % {'me': self.__class__.__name__, 'attr': attr}
|
||||
msg = "%(me)s.%(attr)s is deprecated; use %(me)s.socket.%(attr)s"\
|
||||
" instead" % {'me': self.__class__.__name__, 'attr': attr}
|
||||
warnings.warn(msg, DeprecationWarning, stacklevel=2)
|
||||
return retattr
|
||||
|
||||
|
@ -855,13 +816,8 @@ class dispatcher:
|
|||
|
||||
self.log_info(
|
||||
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
|
||||
self_repr,
|
||||
t,
|
||||
v,
|
||||
tbinfo
|
||||
),
|
||||
'error'
|
||||
)
|
||||
self_repr, t, v, tbinfo),
|
||||
'error')
|
||||
self.handle_close()
|
||||
|
||||
def handle_accept(self):
|
||||
|
@ -902,11 +858,8 @@ class dispatcher_with_send(dispatcher):
|
|||
adds simple buffered output capability, useful for simple clients.
|
||||
[for more sophisticated usage use asynchat.async_chat]
|
||||
"""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
def __init__(self, sock=None, map=None):
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
dispatcher.__init__(self, sock, map)
|
||||
self.out_buffer = b''
|
||||
|
||||
|
@ -941,7 +894,8 @@ def compact_traceback():
|
|||
"""Return a compact traceback"""
|
||||
t, v, tb = sys.exc_info()
|
||||
tbinfo = []
|
||||
if not tb: # Must have a traceback
|
||||
# Must have a traceback
|
||||
if not tb:
|
||||
raise AssertionError("traceback does not exist")
|
||||
while tb:
|
||||
tbinfo.append((
|
||||
|
@ -961,7 +915,6 @@ def compact_traceback():
|
|||
|
||||
def close_all(map=None, ignore_all=False):
|
||||
"""Close all connections"""
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
if map is None:
|
||||
map = socket_map
|
||||
|
@ -998,13 +951,13 @@ def close_all(map=None, ignore_all=False):
|
|||
if os.name == 'posix':
|
||||
import fcntl
|
||||
|
||||
class file_wrapper:
|
||||
class file_wrapper: # pylint: disable=old-style-class
|
||||
"""
|
||||
Here we override just enough to make a file look like a socket for the purposes of asyncore.
|
||||
Here we override just enough to make a file look
|
||||
like a socket for the purposes of asyncore.
|
||||
|
||||
The passed fd is automatically os.dup()'d
|
||||
"""
|
||||
# pylint: disable=old-style-class
|
||||
|
||||
def __init__(self, fd):
|
||||
self.fd = os.dup(fd)
|
||||
|
@ -1019,12 +972,11 @@ if os.name == 'posix':
|
|||
|
||||
def getsockopt(self, level, optname, buflen=None):
|
||||
"""Fake getsockopt()"""
|
||||
if (level == socket.SOL_SOCKET and
|
||||
optname == socket.SO_ERROR and
|
||||
if (level == socket.SOL_SOCKET and optname == socket.SO_ERROR and
|
||||
not buflen):
|
||||
return 0
|
||||
raise NotImplementedError("Only asyncore specific behaviour "
|
||||
"implemented.")
|
||||
raise NotImplementedError(
|
||||
"Only asyncore specific behaviour implemented.")
|
||||
|
||||
read = recv
|
||||
write = send
|
||||
|
@ -1041,8 +993,6 @@ if os.name == 'posix':
|
|||
"""A dispatcher for file_wrapper objects"""
|
||||
|
||||
def __init__(self, fd, map=None):
|
||||
# pylint: disable=redefined-builtin
|
||||
|
||||
dispatcher.__init__(self, None, map)
|
||||
self.connected = True
|
||||
try:
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
"""
|
||||
BMObject and it's exceptions.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
|
@ -15,12 +14,14 @@ logger = logging.getLogger('default')
|
|||
|
||||
|
||||
class BMObjectInsufficientPOWError(Exception):
|
||||
"""Exception indicating the object doesn't have sufficient proof of work."""
|
||||
"""Exception indicating the object
|
||||
doesn't have sufficient proof of work."""
|
||||
errorCodes = ("Insufficient proof of work")
|
||||
|
||||
|
||||
class BMObjectInvalidDataError(Exception):
|
||||
"""Exception indicating the data being parsed does not match the specification."""
|
||||
"""Exception indicating the data being parsed
|
||||
does not match the specification."""
|
||||
errorCodes = ("Data invalid")
|
||||
|
||||
|
||||
|
@ -30,7 +31,8 @@ class BMObjectExpiredError(Exception):
|
|||
|
||||
|
||||
class BMObjectUnwantedStreamError(Exception):
|
||||
"""Exception indicating the object is in a stream we didn't advertise as being interested in."""
|
||||
"""Exception indicating the object is in a stream
|
||||
we didn't advertise as being interested in."""
|
||||
errorCodes = ("Object in unwanted stream")
|
||||
|
||||
|
||||
|
@ -44,9 +46,8 @@ class BMObjectAlreadyHaveError(Exception):
|
|||
errorCodes = ("Already have this object")
|
||||
|
||||
|
||||
class BMObject(object):
|
||||
class BMObject(object): # pylint: disable=too-many-instance-attributes
|
||||
"""Bitmessage Object as a class."""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
# max TTL, 28 days and 3 hours
|
||||
maxTTL = 28 * 24 * 60 * 60 + 10800
|
||||
|
@ -81,31 +82,36 @@ class BMObject(object):
|
|||
raise BMObjectInsufficientPOWError()
|
||||
|
||||
def checkEOLSanity(self):
|
||||
"""Check if object's lifetime isn't ridiculously far in the past or future."""
|
||||
"""Check if object's lifetime
|
||||
isn't ridiculously far in the past or future."""
|
||||
# EOL sanity check
|
||||
if self.expiresTime - int(time.time()) > BMObject.maxTTL:
|
||||
logger.info(
|
||||
'This object\'s End of Life time is too far in the future. Ignoring it. Time is %i',
|
||||
self.expiresTime)
|
||||
'This object\'s End of Life time is too far in the future.'
|
||||
' Ignoring it. Time is %i', self.expiresTime)
|
||||
# .. todo:: remove from download queue
|
||||
raise BMObjectExpiredError()
|
||||
|
||||
if self.expiresTime - int(time.time()) < BMObject.minTTL:
|
||||
logger.info(
|
||||
'This object\'s End of Life time was too long ago. Ignoring the object. Time is %i',
|
||||
self.expiresTime)
|
||||
'This object\'s End of Life time was too long ago.'
|
||||
' Ignoring the object. Time is %i', self.expiresTime)
|
||||
# .. todo:: remove from download queue
|
||||
raise BMObjectExpiredError()
|
||||
|
||||
def checkStream(self):
|
||||
"""Check if object's stream matches streams we are interested in"""
|
||||
if self.streamNumber not in state.streamsInWhichIAmParticipating:
|
||||
logger.debug('The streamNumber %i isn\'t one we are interested in.', self.streamNumber)
|
||||
logger.debug(
|
||||
'The streamNumber %i isn\'t one we are interested in.',
|
||||
self.streamNumber)
|
||||
raise BMObjectUnwantedStreamError()
|
||||
|
||||
def checkAlreadyHave(self):
|
||||
"""
|
||||
Check if we already have the object (so that we don't duplicate it in inventory or advertise it unnecessarily)
|
||||
Check if we already have the object
|
||||
(so that we don't duplicate it in inventory
|
||||
or advertise it unnecessarily)
|
||||
"""
|
||||
# if it's a stem duplicate, pretend we don't have it
|
||||
if Dandelion().hasHash(self.inventoryHash):
|
||||
|
@ -114,7 +120,8 @@ class BMObject(object):
|
|||
raise BMObjectAlreadyHaveError()
|
||||
|
||||
def checkObjectByType(self):
|
||||
"""Call a object type specific check (objects can have additional checks based on their types)"""
|
||||
"""Call a object type specific check
|
||||
(objects can have additional checks based on their types)"""
|
||||
if self.objectType == protocol.OBJECT_GETPUBKEY:
|
||||
self.checkGetpubkey()
|
||||
elif self.objectType == protocol.OBJECT_PUBKEY:
|
||||
|
@ -125,20 +132,21 @@ class BMObject(object):
|
|||
self.checkBroadcast()
|
||||
# other objects don't require other types of tests
|
||||
|
||||
def checkMessage(self):
|
||||
def checkMessage(self): # pylint: disable=no-self-use
|
||||
""""Message" object type checks."""
|
||||
# pylint: disable=no-self-use
|
||||
return
|
||||
|
||||
def checkGetpubkey(self):
|
||||
""""Getpubkey" object type checks."""
|
||||
if len(self.data) < 42:
|
||||
logger.info('getpubkey message doesn\'t contain enough data. Ignoring.')
|
||||
logger.info(
|
||||
'getpubkey message doesn\'t contain enough data. Ignoring.')
|
||||
raise BMObjectInvalidError()
|
||||
|
||||
def checkPubkey(self):
|
||||
""""Pubkey" object type checks."""
|
||||
if len(self.data) < 146 or len(self.data) > 440: # sanity check
|
||||
# sanity check
|
||||
if len(self.data) < 146 or len(self.data) > 440:
|
||||
logger.info('pubkey object too short or too long. Ignoring.')
|
||||
raise BMObjectInvalidError()
|
||||
|
||||
|
@ -146,8 +154,9 @@ class BMObject(object):
|
|||
""""Broadcast" object type checks."""
|
||||
if len(self.data) < 180:
|
||||
logger.debug(
|
||||
'The payload length of this broadcast packet is unreasonably low.'
|
||||
' Someone is probably trying funny business. Ignoring message.')
|
||||
'The payload length of this broadcast'
|
||||
' packet is unreasonably low. Someone is probably'
|
||||
' trying funny business. Ignoring message.')
|
||||
raise BMObjectInvalidError()
|
||||
|
||||
# this isn't supported anymore
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
"""
|
||||
src/network/bmproto.py
|
||||
==================================
|
||||
Bitmessage Protocol
|
||||
"""
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
# pylint: disable=attribute-defined-outside-init, too-few-public-methods
|
||||
import base64
|
||||
import hashlib
|
||||
import logging
|
||||
|
@ -19,17 +18,16 @@ import state
|
|||
from bmconfigparser import BMConfigParser
|
||||
from inventory import Inventory
|
||||
from network.advanceddispatcher import AdvancedDispatcher
|
||||
from network.constants import (
|
||||
ADDRESS_ALIVE,
|
||||
MAX_MESSAGE_SIZE,
|
||||
MAX_OBJECT_COUNT,
|
||||
MAX_OBJECT_PAYLOAD_SIZE,
|
||||
MAX_TIME_OFFSET)
|
||||
from network.dandelion import Dandelion
|
||||
from network.bmobject import (
|
||||
BMObject, BMObjectInsufficientPOWError, BMObjectInvalidDataError,
|
||||
BMObjectExpiredError, BMObjectUnwantedStreamError,
|
||||
BMObjectInvalidError, BMObjectAlreadyHaveError)
|
||||
BMObjectInvalidError, BMObjectAlreadyHaveError
|
||||
)
|
||||
from network.constants import (
|
||||
ADDRESS_ALIVE, MAX_MESSAGE_SIZE, MAX_OBJECT_COUNT,
|
||||
MAX_OBJECT_PAYLOAD_SIZE, MAX_TIME_OFFSET
|
||||
)
|
||||
from network.dandelion import Dandelion
|
||||
from network.proxy import ProxyError
|
||||
from node import Node, Peer
|
||||
from objectracker import missingObjects, ObjectTracker
|
||||
|
@ -59,7 +57,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
# pylint: disable=too-many-instance-attributes, too-many-public-methods
|
||||
timeOffsetWrongCount = 0
|
||||
|
||||
def __init__(self, address=None, sock=None): # pylint: disable=unused-argument, super-init-not-called
|
||||
def __init__(self, address=None, sock=None):
|
||||
# pylint: disable=unused-argument, super-init-not-called
|
||||
AdvancedDispatcher.__init__(self, sock)
|
||||
self.isOutbound = False
|
||||
# packet/connection from a local IP
|
||||
|
@ -163,7 +162,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
|
||||
def decode_payload_varint(self):
|
||||
"""Decode a varint from the payload"""
|
||||
value, offset = addresses.decodeVarint(self.payload[self.payloadOffset:])
|
||||
value, offset = addresses.decodeVarint(
|
||||
self.payload[self.payloadOffset:])
|
||||
self.payloadOffset += offset
|
||||
return value
|
||||
|
||||
|
@ -185,8 +185,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
|
||||
return Node(services, host, port)
|
||||
|
||||
def decode_payload_content(self, pattern="v"): # pylint: disable=too-many-branches, too-many-statements
|
||||
|
||||
# pylint: disable=too-many-branches, too-many-statements
|
||||
def decode_payload_content(self, pattern="v"):
|
||||
"""
|
||||
Decode the payload depending on pattern:
|
||||
|
||||
|
@ -202,7 +202,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
, = end of array
|
||||
"""
|
||||
|
||||
def decode_simple(self, char="v"): # pylint: disable=inconsistent-return-statements
|
||||
# pylint: disable=inconsistent-return-statements
|
||||
def decode_simple(self, char="v"):
|
||||
"""Decode the payload using one char pattern"""
|
||||
if char == "v":
|
||||
return self.decode_payload_varint()
|
||||
|
@ -312,8 +313,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
|
||||
def bm_command_error(self):
|
||||
"""Decode an error message and log it"""
|
||||
fatalStatus, banTime, inventoryVector, errorText = \
|
||||
self.decode_payload_content("vvlsls")
|
||||
err_values = self.decode_payload_content("vvlsls")
|
||||
fatalStatus = err_values[0]
|
||||
# banTime = err_values[1]
|
||||
# inventoryVector = err_values[2]
|
||||
errorText = err_values[3]
|
||||
logger.error(
|
||||
'%s:%i error: %i, %s', self.destination.host,
|
||||
self.destination.port, fatalStatus, errorText)
|
||||
|
@ -408,8 +412,10 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
except KeyError:
|
||||
pass
|
||||
|
||||
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(self.object.inventoryHash):
|
||||
Dandelion().removeHash(self.object.inventoryHash, "cycle detection")
|
||||
if self.object.inventoryHash in Inventory() and Dandelion().hasHash(
|
||||
self.object.inventoryHash):
|
||||
Dandelion().removeHash(
|
||||
self.object.inventoryHash, "cycle detection")
|
||||
|
||||
Inventory()[self.object.inventoryHash] = (
|
||||
self.object.objectType, self.object.streamNumber,
|
||||
|
@ -428,27 +434,30 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
|
||||
def bm_command_addr(self):
|
||||
"""Incoming addresses, process them"""
|
||||
addresses = self._decode_addr() # pylint: disable=redefined-outer-name
|
||||
for i in addresses:
|
||||
seenTime, stream, services, ip, port = i
|
||||
# pylint: disable=redefined-outer-name
|
||||
addresses = self._decode_addr()
|
||||
for seenTime, stream, _, ip, port in addresses:
|
||||
decodedIP = protocol.checkIPAddress(str(ip))
|
||||
if stream not in state.streamsInWhichIAmParticipating:
|
||||
continue
|
||||
if (
|
||||
decodedIP and time.time() - seenTime > 0 and
|
||||
seenTime > time.time() - ADDRESS_ALIVE and
|
||||
port > 0
|
||||
decodedIP and time.time() - seenTime > 0 and
|
||||
seenTime > time.time() - ADDRESS_ALIVE and
|
||||
port > 0
|
||||
):
|
||||
peer = Peer(decodedIP, port)
|
||||
try:
|
||||
if knownnodes.knownNodes[stream][peer]["lastseen"] > seenTime:
|
||||
if knownnodes.knownNodes[stream][peer]["lastseen"] > \
|
||||
seenTime:
|
||||
continue
|
||||
except KeyError:
|
||||
pass
|
||||
if len(knownnodes.knownNodes[stream]) < BMConfigParser().safeGetInt("knownnodes", "maxnodes"):
|
||||
if len(knownnodes.knownNodes[stream]) < \
|
||||
BMConfigParser().safeGetInt("knownnodes", "maxnodes"):
|
||||
with knownnodes.knownNodesLock:
|
||||
try:
|
||||
knownnodes.knownNodes[stream][peer]["lastseen"] = seenTime
|
||||
knownnodes.knownNodes[stream][peer]["lastseen"] = \
|
||||
seenTime
|
||||
except (TypeError, KeyError):
|
||||
knownnodes.knownNodes[stream][peer] = {
|
||||
"lastseen": seenTime,
|
||||
|
@ -539,7 +548,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
length=self.payloadLength, expectBytes=0)
|
||||
return False
|
||||
|
||||
def peerValidityChecks(self): # pylint: disable=too-many-return-statements
|
||||
# pylint: disable=too-many-return-statements
|
||||
def peerValidityChecks(self):
|
||||
"""Check the validity of the peer"""
|
||||
if self.remoteProtocolVersion < 3:
|
||||
self.append_write_buf(protocol.assembleErrorMessage(
|
||||
|
@ -551,8 +561,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
return False
|
||||
if self.timeOffset > MAX_TIME_OFFSET:
|
||||
self.append_write_buf(protocol.assembleErrorMessage(
|
||||
errorText="Your time is too far in the future compared to mine."
|
||||
" Closing connection.", fatal=2))
|
||||
errorText="Your time is too far in the future"
|
||||
" compared to mine. Closing connection.", fatal=2))
|
||||
logger.info(
|
||||
"%s's time is too far in the future (%s seconds)."
|
||||
" Closing connection to it.", self.destination, self.timeOffset)
|
||||
|
@ -574,8 +584,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
errorText="We don't have shared stream interests."
|
||||
" Closing connection.", fatal=2))
|
||||
logger.debug(
|
||||
'Closed connection to %s because there is no overlapping interest'
|
||||
' in streams.', self.destination)
|
||||
'Closed connection to %s because there is no overlapping'
|
||||
' interest in streams.', self.destination)
|
||||
return False
|
||||
if self.destination in connectionpool.BMConnectionPool().inboundConnections:
|
||||
try:
|
||||
|
@ -584,8 +594,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
errorText="Too many connections from your IP."
|
||||
" Closing connection.", fatal=2))
|
||||
logger.debug(
|
||||
'Closed connection to %s because we are already connected'
|
||||
' to that IP.', self.destination)
|
||||
'Closed connection to %s because we are already'
|
||||
' connected to that IP.', self.destination)
|
||||
return False
|
||||
except:
|
||||
pass
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
"""
|
||||
Select which node to connect to
|
||||
"""
|
||||
# pylint: disable=too-many-branches
|
||||
import logging
|
||||
import random # nosec
|
||||
|
@ -12,6 +15,7 @@ logger = logging.getLogger('default')
|
|||
|
||||
|
||||
def getDiscoveredPeer():
|
||||
"""Get a peer from the local peer discovery list"""
|
||||
try:
|
||||
peer = random.choice(state.discoveredPeers.keys())
|
||||
except (IndexError, KeyError):
|
||||
|
@ -24,6 +28,7 @@ def getDiscoveredPeer():
|
|||
|
||||
|
||||
def chooseConnection(stream):
|
||||
"""Returns an appropriate connection"""
|
||||
haveOnion = BMConfigParser().safeGet(
|
||||
"bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
|
||||
onionOnly = BMConfigParser().safeGetBoolean(
|
||||
|
@ -49,7 +54,8 @@ def chooseConnection(stream):
|
|||
logger.warning('Error in %s', peer)
|
||||
rating = 0
|
||||
if haveOnion:
|
||||
# do not connect to raw IP addresses--keep all traffic within Tor overlay
|
||||
# do not connect to raw IP addresses
|
||||
# --keep all traffic within Tor overlay
|
||||
if onionOnly and not peer.host.endswith('.onion'):
|
||||
continue
|
||||
# onion addresses have a higher priority when SOCKS
|
||||
|
|
|
@ -3,9 +3,15 @@ Network protocol constants
|
|||
"""
|
||||
|
||||
|
||||
ADDRESS_ALIVE = 10800 #: address is online if online less than this many seconds ago
|
||||
MAX_ADDR_COUNT = 1000 #: protocol specification says max 1000 addresses in one addr command
|
||||
MAX_MESSAGE_SIZE = 1600100 #: ~1.6 MB which is the maximum possible size of an inv message.
|
||||
MAX_OBJECT_PAYLOAD_SIZE = 2**18 #: 2**18 = 256kB is the maximum size of an object payload
|
||||
MAX_OBJECT_COUNT = 50000 #: protocol specification says max 50000 objects in one inv command
|
||||
MAX_TIME_OFFSET = 3600 #: maximum time offset
|
||||
#: address is online if online less than this many seconds ago
|
||||
ADDRESS_ALIVE = 10800
|
||||
#: protocol specification says max 1000 addresses in one addr command
|
||||
MAX_ADDR_COUNT = 1000
|
||||
#: ~1.6 MB which is the maximum possible size of an inv message.
|
||||
MAX_MESSAGE_SIZE = 1600100
|
||||
#: 2**18 = 256kB is the maximum size of an object payload
|
||||
MAX_OBJECT_PAYLOAD_SIZE = 2**18
|
||||
#: protocol specification says max 50000 objects in one inv command
|
||||
MAX_OBJECT_COUNT = 50000
|
||||
#: maximum time offset
|
||||
MAX_TIME_OFFSET = 3600
|
||||
|
|
|
@ -1,10 +1,9 @@
|
|||
"""
|
||||
src/network/dandelion.py
|
||||
========================
|
||||
Dandelion class definition, tracks stages
|
||||
"""
|
||||
import logging
|
||||
from collections import namedtuple
|
||||
from random import choice, sample, expovariate
|
||||
from random import choice, expovariate, sample
|
||||
from threading import RLock
|
||||
from time import time
|
||||
|
||||
|
@ -28,7 +27,7 @@ logger = logging.getLogger('default')
|
|||
|
||||
|
||||
@Singleton
|
||||
class Dandelion(): # pylint: disable=old-style-class
|
||||
class Dandelion: # pylint: disable=old-style-class
|
||||
"""Dandelion class for tracking stem/fluff stages."""
|
||||
def __init__(self):
|
||||
# currently assignable child stems
|
||||
|
@ -123,7 +122,8 @@ class Dandelion(): # pylint: disable=old-style-class
|
|||
self.stem.remove(connection)
|
||||
# active mappings to pointing to the removed node
|
||||
for k in (
|
||||
k for k, v in self.nodeMap.iteritems() if v == connection
|
||||
k for k, v in self.nodeMap.iteritems()
|
||||
if v == connection
|
||||
):
|
||||
self.nodeMap[k] = None
|
||||
for k, v in {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
"""
|
||||
`DownloadThread` class definition
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
import addresses
|
||||
|
@ -30,7 +29,9 @@ class DownloadThread(StoppableThread):
|
|||
"""Expire pending downloads eventually"""
|
||||
deadline = time.time() - self.requestExpires
|
||||
try:
|
||||
toDelete = [k for k, v in missingObjects.iteritems() if v < deadline]
|
||||
toDelete = [
|
||||
k for k, v in missingObjects.iteritems()
|
||||
if v < deadline]
|
||||
except RuntimeError:
|
||||
pass
|
||||
else:
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
"""
|
||||
src/network/invthread.py
|
||||
========================
|
||||
Thread to send inv annoucements
|
||||
"""
|
||||
import Queue
|
||||
import random
|
||||
|
@ -34,7 +33,7 @@ def handleExpiredDandelion(expired):
|
|||
|
||||
|
||||
class InvThread(StoppableThread):
|
||||
"""A thread to send inv annoucements."""
|
||||
"""Main thread that sends inv annoucements"""
|
||||
|
||||
name = "InvBroadcaster"
|
||||
|
||||
|
@ -43,12 +42,13 @@ class InvThread(StoppableThread):
|
|||
"""Locally generated inventory items require special handling"""
|
||||
Dandelion().addHash(hashId, stream=stream)
|
||||
for connection in BMConnectionPool().connections():
|
||||
if state.dandelion and connection != Dandelion().objectChildStem(hashId):
|
||||
if state.dandelion and connection != \
|
||||
Dandelion().objectChildStem(hashId):
|
||||
continue
|
||||
connection.objectsNewToThem[hashId] = time()
|
||||
|
||||
def run(self): # pylint: disable=too-many-branches
|
||||
while not state.shutdown: # pylint: disable=too-many-nested-blocks
|
||||
def run(self): # pylint: disable=too-many-branches
|
||||
while not state.shutdown: # pylint: disable=too-many-nested-blocks
|
||||
chunk = []
|
||||
while True:
|
||||
# Dandelion fluff trigger by expiration
|
||||
|
@ -92,15 +92,17 @@ class InvThread(StoppableThread):
|
|||
random.shuffle(fluffs)
|
||||
connection.append_write_buf(protocol.CreatePacket(
|
||||
'inv',
|
||||
addresses.encodeVarint(len(fluffs)) + ''.join(fluffs)))
|
||||
addresses.encodeVarint(
|
||||
len(fluffs)) + ''.join(fluffs)))
|
||||
if stems:
|
||||
random.shuffle(stems)
|
||||
connection.append_write_buf(protocol.CreatePacket(
|
||||
'dinv',
|
||||
addresses.encodeVarint(len(stems)) + ''.join(stems)))
|
||||
addresses.encodeVarint(
|
||||
len(stems)) + ''.join(stems)))
|
||||
|
||||
invQueue.iterate()
|
||||
for i in range(len(chunk)):
|
||||
for _ in range(len(chunk)):
|
||||
invQueue.task_done()
|
||||
|
||||
if Dandelion().refresh < time():
|
||||
|
|
Reference in New Issue
Block a user