From ded1defb1f86835c27e4e7e10d828bec2f715909 Mon Sep 17 00:00:00 2001 From: "jai.s" Date: Sat, 30 Nov 2019 13:18:15 +0530 Subject: [PATCH] Worked on network issue and stopped the udp connection aand required the select's epoll condition and instead used select pollar method --- src/network/asyncore_pollchoose.py | 57 +++++++++++++++--------------- src/network/bmproto.py | 51 ++++++++++++++++++++------ src/network/connectionchooser.py | 7 ++-- src/network/connectionpool.py | 7 ++-- src/network/stats.py | 2 +- src/network/tcp.py | 3 ++ src/network/tls.py | 10 +++--- src/network/udp.py | 11 +++--- src/protocol.py | 1 + 9 files changed, 96 insertions(+), 53 deletions(-) diff --git a/src/network/asyncore_pollchoose.py b/src/network/asyncore_pollchoose.py index 1c93283f..0c7198db 100644 --- a/src/network/asyncore_pollchoose.py +++ b/src/network/asyncore_pollchoose.py @@ -102,8 +102,7 @@ def _strerror(err): return os.strerror(err) except (ValueError, OverflowError, NameError): if err in errorcode: - return errorcode[err] - return "Unknown error %s" % err + ret18 ("Unknown error {}".format(err)) class ExitNow(Exception): @@ -247,25 +246,24 @@ def select_poller(timeout=0.0, map=None): if map is None: map = socket_map if map: - r = [] - w = [] - e = [] - for fd, obj in list(map.items()): + rd = [] + wt = [] + ex = [] + for fd, obj in map.items(): is_r = obj.readable() is_w = obj.writable() if is_r: - r.append(fd) + rd.append(fd) # accepting sockets should not be writable if is_w and not obj.accepting: - w.append(fd) + wt.append(fd) if is_r or is_w: - e.append(fd) - if [] == r == w == e: + ex.append(fd) + if [] == rd == wt == ex: time.sleep(timeout) return - try: - r, w, e = select.select(r, w, e, timeout) + rd, wt, ex = select.select(rd, wt, ex, timeout) except KeyboardInterrupt: return except socket.error as err: @@ -275,19 +273,19 @@ def select_poller(timeout=0.0, map=None): if err.args[0] in (WSAENOTSOCK, ): return - for fd in helper_random.randomsample(r, len(r)): + for fd in helper_random.randomsample(rd, len(rd)): obj = map.get(fd) if obj is None: continue read(obj) - for fd in helper_random.randomsample(w, len(w)): + for fd in helper_random.randomsample(wt, len(wt)): obj = map.get(fd) if obj is None: continue write(obj) - for fd in e: + for fd in ex: obj = map.get(fd) if obj is None: continue @@ -491,18 +489,18 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None): # argument which should no longer be used in favor of # "poller" - if poller is None: - if use_poll: - poller = poll_poller - elif hasattr(select, 'epoll'): - poller = epoll_poller - elif hasattr(select, 'kqueue'): - poller = kqueue_poller - elif hasattr(select, 'poll'): - poller = poll_poller - elif hasattr(select, 'select'): - poller = select_poller - + # if poller is None: + # if use_poll: + # poller = poll_poller + # elif hasattr(select, 'epoll'): + # poller = epoll_poller + # elif hasattr(select, 'kqueue'): + # poller = kqueue_poller + # elif hasattr(select, 'poll'): + # poller = poll_poller + # elif hasattr(select, 'select'): + # poller = select_poller + poller = select_poller if timeout == 0: deadline = 0 else: @@ -662,6 +660,9 @@ class dispatcher: def readable(self): """Predicate to indicate download throttle status""" + print('-------------------------------------') + print('---------------asyncore--------------') + print('-------------------------------------') if maxDownloadRate > 0: return downloadBucket > dispatcher.minTx return True @@ -788,7 +789,7 @@ class dispatcher: def log_info(self, message, log_type='info'): """Conditionally print a message""" if log_type not in self.ignore_log_types: - print ('{}: {}'.format((log_type, message))) + print ('{}: {}'.format(log_type, message)) def handle_read_event(self): """Handle a read event""" diff --git a/src/network/bmproto.py b/src/network/bmproto.py index a175fc43..f6ded3ca 100644 --- a/src/network/bmproto.py +++ b/src/network/bmproto.py @@ -30,6 +30,19 @@ from network.objectracker import missingObjects, ObjectTracker from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue from network.randomtrackingdict import RandomTrackingDict +global addr_count +addr_count = 0 + +global addr_verack +addr_verack = 0 + +global addr_version +addr_version = 0 + +# global addr_count +# addr_count = 0 + +count = 0 class BMProtoError(ProxyError): """A Bitmessage Protocol Base Error""" @@ -89,6 +102,24 @@ class BMProto(AdvancedDispatcher, ObjectTracker): protocol.Header.unpack(self.read_buf[:protocol.Header.size]) #its shoule be in string self.command = self.command.rstrip('\x00'.encode('utf-8')) + global count,addr_version,addr_count,addr_verack + count+=1 + if self.command == 'verack'.encode(): + addr_verack+=1 + print('the addr_verack count are -{}'.format(addr_verack)) + + if self.command == 'version'.encode(): + addr_version+=1 + print('the addr_version count are -{}'.format(addr_version)) + + if self.command == 'addr'.encode(): + addr_count+=1 + print('the addr_count count are -{}'.format(addr_count)) + + # print('The count of the excaution are -{}'.format(count)) + # print('-----------count---------------{}'.format(count)) + # print('------self command-----------{}'.format(self.command)) + # print('----------self---------------{}'.format(self)) if self.magic != 0xE9BEB4D9: # skip 1 byte in order to sync #in the advancedispatched and length commend's @@ -158,7 +189,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.set_state("close") return False if retval: - print('if retval is true and inside the if ') + # print('if retval is true and inside the if ') self.set_state("bm_header", length=self.payloadLength) self.bm_proto_reset() # else assume the command requires a different state to follow @@ -437,8 +468,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return self.decode_payload_content("LQIQ16sH") def bm_command_addr(self): - print('+++++++++++++++++++++++++++\ - bm_command_addr bm_command_addr bm_command_addr ++++++++++++++++') + # print('+++++++++++++++++++++++++++\ + # bm_command_addr bm_command_addr bm_command_addr ++++++++++++++++') """Incoming addresses, process them""" addresses = self._decode_addr() # pylint: disable=redefined-outer-name for i in addresses: @@ -506,7 +537,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker): length=self.payloadLength, expectBytes=0) return False def bm_command_version(self): - print('inside the bmproto ') + # print('inside the bmproto ') """ Incoming version. Parse and log, remember important things, like streams, bitfields, etc. @@ -543,12 +574,12 @@ class BMProto(AdvancedDispatcher, ObjectTracker): self.isSSL = True if not self.verackReceived: return True - print('inside the bmproto line') - print('before the value of state are :-{}'.format(self.state)) + # print('inside the bmproto line') + # print('before the value of state are :-{}'.format(self.state)) self.set_state( "tls_init" if self.isSSL else "connection_fully_established", length=self.payloadLength, expectBytes=0) - print('After the value of state are :-{}'.format(self.state)) + # print('After the value of state are :-{}'.format(self.state)) return False def peerValidityChecks(self): # pylint: disable=too-many-return-statements @@ -591,9 +622,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker): return False if self.destination in connectionpool.BMConnectionPool().inboundConnections: try: - print('+++++++++++++++++++++++++++') - print('self destination host -{}'.format(self.destination.host)) - print('++++++++++++++++++++++++++++++') + # print('+++++++++++++++++++++++++++') + # print('self destination host -{}'.format(self.destination.host)) + # print('++++++++++++++++++++++++++++++') if not protocol.checkSocksIP(self.destination.host): self.append_write_buf(protocol.assembleErrorMessage( errorText="Too many connections from your IP." diff --git a/src/network/connectionchooser.py b/src/network/connectionchooser.py index eb7c0f6e..99876135 100644 --- a/src/network/connectionchooser.py +++ b/src/network/connectionchooser.py @@ -11,7 +11,7 @@ from queues import Queue, portCheckerQueue def getDiscoveredPeer(): try: - peer = random.choice(state.discoveredPeers.keys()) + peer = random.choice([key for key in state.discoveredPeers.keys()]) except (IndexError, KeyError): raise ValueError try: @@ -24,6 +24,7 @@ def getDiscoveredPeer(): def chooseConnection(stream): haveOnion = BMConfigParser().safeGet( "bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS' + if state.trustedPeer: return state.trustedPeer try: @@ -37,14 +38,14 @@ def chooseConnection(stream): # discovered peers are already filtered by allowed streams return getDiscoveredPeer() for _ in range(50): - peer = random.choice(list(knownnodes.knownNodes[stream].keys())) + peer = random.choice([key for key in knownnodes.knownNodes[stream].keys()]) try: peer_info = knownnodes.knownNodes[stream][peer] if peer_info.get('self'): continue rating = peer_info["rating"] except TypeError: - logger.warning('Error in %s', peer) + logger.warning('Error in {}'.format(peer)) rating = 0 if haveOnion: # onion addresses have a higher priority when SOCKS diff --git a/src/network/connectionpool.py b/src/network/connectionpool.py index 374ebc62..509c0d1f 100644 --- a/src/network/connectionpool.py +++ b/src/network/connectionpool.py @@ -18,7 +18,7 @@ from debug import logger from network.proxy import Proxy from singleton import Singleton from network.tcp import ( - TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection,bootstrap) + TCPServer, Socks5BMConnection, Socks4aBMConnection, TCPConnection,bootstrap) from network.udp import UDPSocket @@ -147,6 +147,7 @@ class BMConnectionPool(object): port = int(BMConfigParser().safeGet("bitmessagesettings", "port")) # correct port even if it changed ls = TCPServer(host=bind, port=port) + print('inside the startListening method') self.listeningSockets[ls.destination] = ls def startUDPSocket(self, bind=None): @@ -219,6 +220,7 @@ class BMConnectionPool(object): self.startBootstrappers() knownnodes.knownNodesActual = True if not self.bootstrapped: + self.bootstrapped = True Proxy.proxy = ( BMConfigParser().safeGet( @@ -295,6 +297,7 @@ class BMConnectionPool(object): ): # FIXME: rating will be increased after next connection i.handle_close() + if acceptConnections: if not self.listeningSockets: if BMConfigParser().safeGet('network', 'bind') == '': @@ -306,7 +309,7 @@ class BMConnectionPool(object): ).split(): self.startListening(bind) logger.info('Listening for incoming connections.') - if not self.udpSockets: + if False: # self.udpSockets :- {'0.0.0.0': } if BMConfigParser().safeGet('network', 'bind') == '': self.startUDPSocket() diff --git a/src/network/stats.py b/src/network/stats.py index 7d6a5550..b6ac4f4b 100644 --- a/src/network/stats.py +++ b/src/network/stats.py @@ -32,7 +32,7 @@ def connectedHostsList(): retval.append(i) except AttributeError: pass - print('#################### retval -{}'.format(retval)) + # print('#################### retval -{}'.format(retval)) return retval diff --git a/src/network/tcp.py b/src/network/tcp.py index f07d9b1c..945740a0 100644 --- a/src/network/tcp.py +++ b/src/network/tcp.py @@ -367,8 +367,10 @@ class TCPServer(AdvancedDispatcher): self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() for attempt in range(50): + print('inside the attempt of line 371') try: if attempt > 0: + print('inside the if condition attempt in 373') port = random.randint(32767, 65535) self.bind((host, port)) except socket.error as e: @@ -376,6 +378,7 @@ class TCPServer(AdvancedDispatcher): continue else: if attempt > 0: + print('inside the if condition attempt in 381') BMConfigParser().set( 'bitmessagesettings', 'port', str(port)) BMConfigParser().save() diff --git a/src/network/tls.py b/src/network/tls.py index 33e04f7f..73f64e4e 100644 --- a/src/network/tls.py +++ b/src/network/tls.py @@ -67,7 +67,7 @@ class TLSDispatcher(AdvancedDispatcher): # pylint: disable=too-many-instanc self.isSSL = False def state_tls_init(self): - print() + # print() """Prepare sockets for TLS handshake""" # pylint: disable=attribute-defined-outside-init self.isSSL = True @@ -95,16 +95,16 @@ class TLSDispatcher(AdvancedDispatcher): # pylint: disable=too-many-instanc ciphers=self.ciphers, do_handshake_on_connect=False) self.sslSocket.setblocking(0) self.want_read = self.want_write = True - print('before tls file python 98 state are :- {}'.format(self.state)) + # print('before tls file python 98 state are :- {}'.format(self.state)) self.set_state("tls_handshake") - print('after tls file python 100 state are :- {}'.format(self.state)) + # print('after tls file python 100 state are :- {}'.format(self.state)) return False # if hasattr(self.socket, "context"): # self.socket.context.set_ecdh_curve("secp256k1") @staticmethod def state_tls_handshake(): - print("tls's state_tls_handshake method in line 107") + # print("tls's state_tls_handshake method in line 107") """Do nothing while TLS handshake is pending, as during this phase we need to react to callbacks instead""" return False @@ -182,7 +182,7 @@ class TLSDispatcher(AdvancedDispatcher): # pylint: disable=too-many-instanc return def tls_handshake(self): - print('inside the tls_handshake') + # print('inside the tls_handshake') """Perform TLS handshake and handle its stages""" # wait for flush if self.write_buf: diff --git a/src/network/udp.py b/src/network/udp.py index c3e4ce44..2280b77f 100644 --- a/src/network/udp.py +++ b/src/network/udp.py @@ -72,11 +72,14 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attribut addresses = self._decode_addr() # only allow peer discovery from private IPs in order to avoid # attacks from random IPs on the internet - if not self.local: - return True + # if not self.local: + # return True + self.local = True remoteport = False for seenTime, stream, services, ip, port in addresses: - decodedIP = protocol.checkIPAddress(str(ip)) + # decodedIP = bool(protocol.checkIPAddress(ip)) + decodedIP = False + if stream not in state.streamsInWhichIAmParticipating: continue if (seenTime < time.time() - self.maxTimeOffset or seenTime > time.time() + self.maxTimeOffset): @@ -88,7 +91,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attribut if remoteport is False: return True logger.debug( - "received peer discovery from %s:%i (port %i):", + "received peer discovery from {}:{} (port {}):", self.destination.host, self.destination.port, remoteport) if self.local: state.discoveredPeers[ diff --git a/src/protocol.py b/src/protocol.py index a8d5a9a3..cf030954 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -178,6 +178,7 @@ def haveSSL(server=False): python < 2.7.9's ssl library does not support ECDSA server due to missing initialisation of available curves, but client works ok """ + return False if not server: return True elif sys.version_info >= (2, 7, 9):