Merge pull request #18 from jaicis/fix#3

Network files fixes
This commit is contained in:
lakshyacis 2020-01-02 16:33:17 +05:30 committed by GitHub
commit 102ea32d28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 93 additions and 88 deletions

View File

@ -1,13 +1,13 @@
from .addrthread import AddrThread
from .announcethread import AnnounceThread
from .connectionpool import BMConnectionPool
from .dandelion import Dandelion
from .downloadthread import DownloadThread
from .invthread import InvThread
from .networkthread import BMNetworkThread
from .receivequeuethread import ReceiveQueueThread
from .threads import StoppableThread
from .uploadthread import UploadThread
from network.addrthread import AddrThread
from network.announcethread import AnnounceThread
from network.connectionpool import BMConnectionPool
from network.dandelion import Dandelion
from network.downloadthread import DownloadThread
from network.invthread import InvThread
from network.networkthread import BMNetworkThread
from network.receivequeuethread import ReceiveQueueThread
from network.threads import StoppableThread
from network.uploadthread import UploadThread
__all__ = [

View File

@ -1,4 +1,3 @@
# import queue as Queue
"""
Announce addresses as they are received from other hosts
"""

View File

@ -3,7 +3,6 @@ src/network/advanceddispatcher.py
=================================
"""
# pylint: disable=attribute-defined-outside-init
# import pdb;pdb.set_trace()
import socket
import threading
import time
@ -32,7 +31,7 @@ class AdvancedDispatcher(asyncore.dispatcher):
# python 2 below condition is used
# if not hasattr(self, '_map'):
# python 3 below condition is used
if not '_map' in dir(self):
if '_map' not in dir(self):
asyncore.dispatcher.__init__(self, sock)
self.read_buf = bytearray()
self.write_buf = bytearray()

View File

@ -11,7 +11,7 @@ from bmconfigparser import BMConfigParser
from network.assemble import assemble_addr
from network.connectionpool import BMConnectionPool
from network.udp import UDPSocket
from .node import Peer
from network.node import Peer
from network.threads import StoppableThread
@ -32,17 +32,15 @@ class AnnounceThread(StoppableThread):
@staticmethod
def announceSelf():
"""Announce our presence"""
for connection in [ udpSockets for udpSockets in BMConnectionPool().udpSockets.values()]:
for connection in [udpSockets for udpSockets in BMConnectionPool().udpSockets.values()]:
if not connection.announcing:
continue
for stream in state.streamsInWhichIAmParticipating:
addr = (
stream,
# state.Peer('127.0.0.1',int( BMConfigParser().safeGet("bitmessagesettings", "port"))),
# int(time.time()))
# connection.append_write_buf(BMProto.assembleAddr([addr]))
Peer(
'127.0.0.1',
BMConfigParser().safeGetInt('bitmessagesettings', 'port')),

View File

@ -13,7 +13,7 @@ from protocol import CreatePacket, encodeHost
def assemble_addr(peerList):
"""Create address command"""
if isinstance(peerList, Peer):
peerList = (peerList)
peerList = [peerList]
if not peerList:
return b''
retval = b''

View File

@ -102,7 +102,9 @@ def _strerror(err):
return os.strerror(err)
except (ValueError, OverflowError, NameError):
if err in errorcode:
ret18 ("Unknown error {}".format(err))
return errorcode[err]
return "Unknown error %s" % err
# ret18 ("Unknown error {}".format(err))
class ExitNow(Exception):
@ -477,7 +479,7 @@ def kqueue_poller(timeout=0.0, map=None):
current_thread().stop.wait(timeout)
def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
def loop(timeout=30.0, _=False, map=None, count=None, poller=None):
"""Poll in a loop, until count or timeout is reached"""
# pylint: disable=redefined-builtin
@ -518,7 +520,7 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
count = count - 1
class dispatcher:
class dispatcher(object):
"""Dispatcher for socket objects"""
# pylint: disable=too-many-public-methods,too-many-instance-attributes,old-style-class
@ -786,7 +788,7 @@ class dispatcher:
def log_info(self, message, log_type='info'):
"""Conditionally print a message"""
if log_type not in self.ignore_log_types:
print ('{}: {}'.format(log_type, message))
print('{}: {}'.format(log_type, message))
def handle_read_event(self):
"""Handle a read event"""

View File

@ -32,7 +32,7 @@ from network.bmobject import (
BMObjectInvalidError, BMObjectAlreadyHaveError)
from network.proxy import ProxyError
from network.objectracker import missingObjects, ObjectTracker
from .node import Node, Peer
from network.node import Node, Peer
from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue
from network.randomtrackingdict import RandomTrackingDict
@ -52,6 +52,7 @@ count = 0
logger = logging.getLogger('default')
class BMProtoError(ProxyError):
"""A Bitmessage Protocol Base Error"""
errorCodes = ("Protocol error")
@ -94,30 +95,30 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
self.object = None
def state_bm_header(self):
"""Process incoming header"""
self.magic, self.command, self.payloadLength, self.checksum = \
protocol.Header.unpack(self.read_buf[:protocol.Header.size])
#its shoule be in string
# its shoule be in string
self.command = self.command.rstrip('\x00'.encode('utf-8'))
global count,addr_version,addr_count,addr_verack
count+=1
# pylint: disable=global-statement
global count, addr_version, addr_count, addr_verack
count += 1
if self.command == 'verack'.encode():
addr_verack+=1
addr_verack += 1
# print('the addr_verack count are -{}'.format(addr_verack))
if self.command == 'version'.encode():
addr_version+=1
addr_version += 1
# print('the addr_version count are -{}'.format(addr_version))
if self.command == 'addr'.encode():
addr_count+=1
addr_count += 1
# print('the addr_count count are -{}'.format(addr_count))
if self.magic != 0xE9BEB4D9:
# skip 1 byte in order to sync
#in the advancedispatched and length commend's
#escape the 1 length
# in the advancedispatched and length commend's
# escape the 1 length
self.set_state("bm_header", length=1)
self.bm_proto_reset()
logger.debug('Bad magic')
@ -132,7 +133,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
length=protocol.Header.size, expectBytes=self.payloadLength)
return True
def state_bm_command(self): # pylint: disable=too-many-branches
def state_bm_command(self): # pylint: disable=too-many-branches, too-many-statements
"""Process incoming command"""
self.payload = self.read_buf[:self.payloadLength]
if self.checksum != hashlib.sha512(self.payload).digest()[0:4]:
@ -143,14 +144,14 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
"error".encode(), "version".encode(), "verack".encode()):
logger.error(
'Received command {} before connection was fully'
' established, ignoring'.format (self.command))
' established, ignoring'.format(self.command))
self.invalid = True
if not self.invalid:
try:
command = self.command.decode() if self.command else self.command
retval = getattr(
self, "bm_command_" +command)()
self, "bm_command_" + command)()
except AttributeError:
# unimplemented command
logger.debug('unimplemented command %s', self.command)
@ -346,8 +347,11 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
def bm_command_error(self):
"""Decode an error message and log it"""
fatalStatus, banTime, inventoryVector, errorText = \
self.decode_payload_content("vvlsls")
err_values = self.decode_payload_content("vvlsls")
fatalStatus = err_values[0]
# banTime = err_values[1]
# inventoryVector = err_values[2]
errorText = err_values[3]
logger.error(
'%s:%i error: %i, %s', self.destination.host,
self.destination.port, fatalStatus, errorText)
@ -359,7 +363,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
If we have them and some other conditions are fulfilled,
append them to the write queue.
"""
#32 an array bit long strings
# 32 an array bit long strings
items = self.decode_payload_content("l32s")
# skip?
now = time.time()
@ -467,7 +471,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
"""Incoming addresses, process them"""
addresses = self._decode_addr() # pylint: disable=redefined-outer-name
for i in addresses:
seenTime, stream, services, ip, port = i
seenTime, stream, _, ip, port = i
decodedIP = protocol.checkIPAddress(ip)
if stream not in state.streamsInWhichIAmParticipating:
continue
@ -533,6 +537,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
"tls_init" if self.isSSL else "connection_fully_established",
length=self.payloadLength, expectBytes=0)
return False
def bm_command_version(self):
# print('inside the bmproto ')
"""
@ -566,7 +571,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
logger.debug(
'%(host)s:%(port)i sending version',
self.destination._asdict())
if ((self.services & protocol.NODE_SSL == protocol.NODE_SSL)):
if self.services & protocol.NODE_SSL == protocol.NODE_SSL:
# self.isSSL = True
pass
if not self.verackReceived:
@ -660,10 +665,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
"Closed connection to %s because I'm connected to myself.",
self.destination)
return False
return True
@staticmethod
def stopDownloadingObject(hashId, forwardAnyway=False):
"""Stop downloading an object"""

View File

@ -17,11 +17,11 @@ from bmconfigparser import BMConfigParser
from network.connectionchooser import chooseConnection
from network.proxy import Proxy
from .node import Peer
from singleton import Singleton
from network.tcp import (
TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection,bootstrap)
TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection, bootstrap)
from network.udp import UDPSocket
from singleton import Singleton
from .node import Peer
logger = logging.getLogger('default')
@ -79,7 +79,7 @@ class BMConnectionPool(object):
"""
inboundConnections = [inboundConnections for inboundConnections in self.inboundConnections.values()]
outboundConnections = [outboundConnections for outboundConnections in self.outboundConnections.values()]
return [ connections for connections in inboundConnections +outboundConnections]
return [connections for connections in inboundConnections + outboundConnections]
def establishedConnections(self):
"""Shortcut for list of connections having fullyEstablished == True"""
@ -430,14 +430,13 @@ class BMConnectionPool(object):
# list(self.udpSockets.values())
# ):
for i in (
# [inboundConnections for inboundConnections in self.inboundConnections.values()] +
# [outboundConnections for outboundConnections in self.outboundConnections.values()] +
# [listeningSockets for listeningSockets in self.listeningSockets.values()] +
# [udpSockets for udpSockets in self.udpSockets.values()]
self.connections()
+ [listeningSockets for listeningSockets in self.listeningSockets.values()] + [udpSockets for udpSockets in self.udpSockets.values()]
self.connections() +
[listeningSockets for listeningSockets in self.listeningSockets.values()] +
[udpSockets for udpSockets in self.udpSockets.values()]
):
if not (i.accepting or i.connecting or i.connected):
reaper.append(i)

View File

@ -28,7 +28,7 @@ logger = logging.getLogger('default')
@Singleton
class Dandelion(): # pylint: disable=old-style-class
class Dandelion(object):
"""Dandelion class for tracking stem/fluff stages."""
def __init__(self):
# currently assignable child stems
@ -104,8 +104,8 @@ class Dandelion(): # pylint: disable=old-style-class
self.stem.append(connection)
for k in (k for k, v in iter(self.nodeMap.items()) if v is None):
self.nodeMap[k] = connection
#The Purpose of adding this condition that if self
#hashMap is has any value
# The Purpose of adding this condition that if self
# hashMap is has any value
# if not [hasmap for hasmap in self.hashMap.items()] ==[]:
try:
for k, v in {

View File

@ -44,7 +44,8 @@ class DownloadThread(StoppableThread):
# Choose downloading peers randomly
# connections = [
# x for x in
# list(BMConnectionPool().inboundConnections.values()) + list(BMConnectionPool().outboundConnections.values())
# list(BMConnectionPool().inboundConnections.values()) +
# list(BMConnectionPool().outboundConnections.values())
# if x.fullyEstablished]
connections = BMConnectionPool().establishedConnections()

View File

@ -1,3 +1,4 @@
# pylint: disable=redefined-outer-name, too-many-ancestors, missing-docstring
import socket
from advanceddispatcher import AdvancedDispatcher
@ -12,7 +13,7 @@ class HttpError(ProxyError):
class HttpConnection(AdvancedDispatcher):
def __init__(self, host, path="/"): # pylint: disable=redefined-outer-name
def __init__(self, host, path="/"):
AdvancedDispatcher.__init__(self)
self.path = path
self.destination = (host, 80)
@ -38,7 +39,7 @@ class HttpConnection(AdvancedDispatcher):
class Socks5HttpConnection(Socks5Connection, HttpConnection):
def __init__(self, host, path="/"): # pylint: disable=super-init-not-called, redefined-outer-name
def __init__(self, host, path="/"): # pylint: disable=super-init-not-called
self.path = path
Socks5Connection.__init__(self, address=(host, 80))
@ -48,7 +49,7 @@ class Socks5HttpConnection(Socks5Connection, HttpConnection):
class Socks4aHttpConnection(Socks4aConnection, HttpConnection):
def __init__(self, host, path="/"): # pylint: disable=super-init-not-called, redefined-outer-name
def __init__(self, host, path="/"): # pylint: disable=super-init-not-called
Socks4aConnection.__init__(self, address=(host, 80))
self.path = path

View File

@ -1,3 +1,6 @@
"""
src/network/http_old.py
"""
import asyncore
import socket
import time

View File

@ -5,7 +5,7 @@ src/network/httpd.py
import asyncore
import socket
from tls import TLSHandshake
from .tls import TLSHandshake
class HTTPRequestHandler(asyncore.dispatcher):
@ -129,7 +129,7 @@ class HTTPServer(asyncore.dispatcher):
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
sock, _ = pair
# print 'Incoming connection from %s' % repr(addr)
self.connections += 1
# if self.connections % 1000 == 0:
@ -148,7 +148,7 @@ class HTTPSServer(HTTPServer):
def handle_accept(self):
pair = self.accept()
if pair is not None:
sock, addr = pair
sock, _ = pair
# print 'Incoming connection from %s' % repr(addr)
self.connections += 1
# if self.connections % 1000 == 0:

View File

@ -1,8 +1,8 @@
# pylint: disable=missing-docstring
import asyncore
from http import HTTPClient
from tls import TLSHandshake
from .http import HTTPClient
from .tls import TLSHandshake
"""
self.sslSock = ssl.wrap_socket(
self.sock,
@ -17,6 +17,7 @@ self.sslSock = ssl.wrap_socket(
class HTTPSClient(HTTPClient, TLSHandshake):
def __init__(self, host, path):
# pylint: disable=non-parent-init-called
if not hasattr(self, '_map'):
asyncore.dispatcher.__init__(self)
self.tlsDone = False

View File

@ -24,7 +24,7 @@ class BMNetworkThread(StoppableThread):
i.close()
except:
pass
for i in [ outboundConnections for outboundConnections in BMConnectionPool().outboundConnections.values()]:
for i in [outboundConnections for outboundConnections in BMConnectionPool().outboundConnections.values()]:
try:
i.close()
except:

View File

@ -140,20 +140,20 @@ if __name__ == '__main__':
k = RandomTrackingDict()
d = {}
print ("populating random tracking dict")
print("populating random tracking dict")
a.append(time())
for i in range(50000):
k[randString()] = True
a.append(time())
print ("done")
print("done")
while k:
retval = k.randomKeys(1000)
if not retval:
print ("error getting random keys")
print("error getting random keys")
try:
k.randomKeys(100)
print( "bad")
print("bad")
except KeyError:
pass
for i in retval:

View File

@ -90,7 +90,7 @@ class TCPConnection(BMProto, TLSDispatcher):
ObjectTracker.__init__(self) # pylint: disable=non-parent-init-called
self.bm_proto_reset()
# print('--------------tcp------------------')
from network import stats
# from network import stats
self.set_state("bm_header", expectBytes=protocol.Header.size)
def antiIntersectionDelay(self, initial=False):

View File

@ -1,7 +1,6 @@
"""
SSL/TLS negotiation.
"""
import logging
import os
import socket