Worked on network issue and stopped the udp connection aand required the select's epoll condition and instead used select pollar method
This commit is contained in:
parent
0e593b66ad
commit
ded1defb1f
|
@ -102,8 +102,7 @@ def _strerror(err):
|
|||
return os.strerror(err)
|
||||
except (ValueError, OverflowError, NameError):
|
||||
if err in errorcode:
|
||||
return errorcode[err]
|
||||
return "Unknown error %s" % err
|
||||
ret18 ("Unknown error {}".format(err))
|
||||
|
||||
|
||||
class ExitNow(Exception):
|
||||
|
@ -247,25 +246,24 @@ def select_poller(timeout=0.0, map=None):
|
|||
if map is None:
|
||||
map = socket_map
|
||||
if map:
|
||||
r = []
|
||||
w = []
|
||||
e = []
|
||||
for fd, obj in list(map.items()):
|
||||
rd = []
|
||||
wt = []
|
||||
ex = []
|
||||
for fd, obj in map.items():
|
||||
is_r = obj.readable()
|
||||
is_w = obj.writable()
|
||||
if is_r:
|
||||
r.append(fd)
|
||||
rd.append(fd)
|
||||
# accepting sockets should not be writable
|
||||
if is_w and not obj.accepting:
|
||||
w.append(fd)
|
||||
wt.append(fd)
|
||||
if is_r or is_w:
|
||||
e.append(fd)
|
||||
if [] == r == w == e:
|
||||
ex.append(fd)
|
||||
if [] == rd == wt == ex:
|
||||
time.sleep(timeout)
|
||||
return
|
||||
|
||||
try:
|
||||
r, w, e = select.select(r, w, e, timeout)
|
||||
rd, wt, ex = select.select(rd, wt, ex, timeout)
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
except socket.error as err:
|
||||
|
@ -275,19 +273,19 @@ def select_poller(timeout=0.0, map=None):
|
|||
if err.args[0] in (WSAENOTSOCK, ):
|
||||
return
|
||||
|
||||
for fd in helper_random.randomsample(r, len(r)):
|
||||
for fd in helper_random.randomsample(rd, len(rd)):
|
||||
obj = map.get(fd)
|
||||
if obj is None:
|
||||
continue
|
||||
read(obj)
|
||||
|
||||
for fd in helper_random.randomsample(w, len(w)):
|
||||
for fd in helper_random.randomsample(wt, len(wt)):
|
||||
obj = map.get(fd)
|
||||
if obj is None:
|
||||
continue
|
||||
write(obj)
|
||||
|
||||
for fd in e:
|
||||
for fd in ex:
|
||||
obj = map.get(fd)
|
||||
if obj is None:
|
||||
continue
|
||||
|
@ -491,18 +489,18 @@ def loop(timeout=30.0, use_poll=False, map=None, count=None, poller=None):
|
|||
# argument which should no longer be used in favor of
|
||||
# "poller"
|
||||
|
||||
if poller is None:
|
||||
if use_poll:
|
||||
poller = poll_poller
|
||||
elif hasattr(select, 'epoll'):
|
||||
poller = epoll_poller
|
||||
elif hasattr(select, 'kqueue'):
|
||||
poller = kqueue_poller
|
||||
elif hasattr(select, 'poll'):
|
||||
poller = poll_poller
|
||||
elif hasattr(select, 'select'):
|
||||
# if poller is None:
|
||||
# if use_poll:
|
||||
# poller = poll_poller
|
||||
# elif hasattr(select, 'epoll'):
|
||||
# poller = epoll_poller
|
||||
# elif hasattr(select, 'kqueue'):
|
||||
# poller = kqueue_poller
|
||||
# elif hasattr(select, 'poll'):
|
||||
# poller = poll_poller
|
||||
# elif hasattr(select, 'select'):
|
||||
# poller = select_poller
|
||||
poller = select_poller
|
||||
|
||||
if timeout == 0:
|
||||
deadline = 0
|
||||
else:
|
||||
|
@ -662,6 +660,9 @@ class dispatcher:
|
|||
|
||||
def readable(self):
|
||||
"""Predicate to indicate download throttle status"""
|
||||
print('-------------------------------------')
|
||||
print('---------------asyncore--------------')
|
||||
print('-------------------------------------')
|
||||
if maxDownloadRate > 0:
|
||||
return downloadBucket > dispatcher.minTx
|
||||
return True
|
||||
|
@ -788,7 +789,7 @@ class dispatcher:
|
|||
def log_info(self, message, log_type='info'):
|
||||
"""Conditionally print a message"""
|
||||
if log_type not in self.ignore_log_types:
|
||||
print ('{}: {}'.format((log_type, message)))
|
||||
print ('{}: {}'.format(log_type, message))
|
||||
|
||||
def handle_read_event(self):
|
||||
"""Handle a read event"""
|
||||
|
|
|
@ -30,6 +30,19 @@ from network.objectracker import missingObjects, ObjectTracker
|
|||
from queues import objectProcessorQueue, portCheckerQueue, invQueue, addrQueue
|
||||
from network.randomtrackingdict import RandomTrackingDict
|
||||
|
||||
global addr_count
|
||||
addr_count = 0
|
||||
|
||||
global addr_verack
|
||||
addr_verack = 0
|
||||
|
||||
global addr_version
|
||||
addr_version = 0
|
||||
|
||||
# global addr_count
|
||||
# addr_count = 0
|
||||
|
||||
count = 0
|
||||
|
||||
class BMProtoError(ProxyError):
|
||||
"""A Bitmessage Protocol Base Error"""
|
||||
|
@ -89,6 +102,24 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
protocol.Header.unpack(self.read_buf[:protocol.Header.size])
|
||||
#its shoule be in string
|
||||
self.command = self.command.rstrip('\x00'.encode('utf-8'))
|
||||
global count,addr_version,addr_count,addr_verack
|
||||
count+=1
|
||||
if self.command == 'verack'.encode():
|
||||
addr_verack+=1
|
||||
print('the addr_verack count are -{}'.format(addr_verack))
|
||||
|
||||
if self.command == 'version'.encode():
|
||||
addr_version+=1
|
||||
print('the addr_version count are -{}'.format(addr_version))
|
||||
|
||||
if self.command == 'addr'.encode():
|
||||
addr_count+=1
|
||||
print('the addr_count count are -{}'.format(addr_count))
|
||||
|
||||
# print('The count of the excaution are -{}'.format(count))
|
||||
# print('-----------count---------------{}'.format(count))
|
||||
# print('------self command-----------{}'.format(self.command))
|
||||
# print('----------self---------------{}'.format(self))
|
||||
if self.magic != 0xE9BEB4D9:
|
||||
# skip 1 byte in order to sync
|
||||
#in the advancedispatched and length commend's
|
||||
|
@ -158,7 +189,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
self.set_state("close")
|
||||
return False
|
||||
if retval:
|
||||
print('if retval is true and inside the if ')
|
||||
# print('if retval is true and inside the if ')
|
||||
self.set_state("bm_header", length=self.payloadLength)
|
||||
self.bm_proto_reset()
|
||||
# else assume the command requires a different state to follow
|
||||
|
@ -437,8 +468,8 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
return self.decode_payload_content("LQIQ16sH")
|
||||
|
||||
def bm_command_addr(self):
|
||||
print('+++++++++++++++++++++++++++\
|
||||
bm_command_addr bm_command_addr bm_command_addr ++++++++++++++++')
|
||||
# print('+++++++++++++++++++++++++++\
|
||||
# bm_command_addr bm_command_addr bm_command_addr ++++++++++++++++')
|
||||
"""Incoming addresses, process them"""
|
||||
addresses = self._decode_addr() # pylint: disable=redefined-outer-name
|
||||
for i in addresses:
|
||||
|
@ -506,7 +537,7 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
length=self.payloadLength, expectBytes=0)
|
||||
return False
|
||||
def bm_command_version(self):
|
||||
print('inside the bmproto ')
|
||||
# print('inside the bmproto ')
|
||||
"""
|
||||
Incoming version.
|
||||
Parse and log, remember important things, like streams, bitfields, etc.
|
||||
|
@ -543,12 +574,12 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
self.isSSL = True
|
||||
if not self.verackReceived:
|
||||
return True
|
||||
print('inside the bmproto line')
|
||||
print('before the value of state are :-{}'.format(self.state))
|
||||
# print('inside the bmproto line')
|
||||
# print('before the value of state are :-{}'.format(self.state))
|
||||
self.set_state(
|
||||
"tls_init" if self.isSSL else "connection_fully_established",
|
||||
length=self.payloadLength, expectBytes=0)
|
||||
print('After the value of state are :-{}'.format(self.state))
|
||||
# print('After the value of state are :-{}'.format(self.state))
|
||||
return False
|
||||
|
||||
def peerValidityChecks(self): # pylint: disable=too-many-return-statements
|
||||
|
@ -591,9 +622,9 @@ class BMProto(AdvancedDispatcher, ObjectTracker):
|
|||
return False
|
||||
if self.destination in connectionpool.BMConnectionPool().inboundConnections:
|
||||
try:
|
||||
print('+++++++++++++++++++++++++++')
|
||||
print('self destination host -{}'.format(self.destination.host))
|
||||
print('++++++++++++++++++++++++++++++')
|
||||
# print('+++++++++++++++++++++++++++')
|
||||
# print('self destination host -{}'.format(self.destination.host))
|
||||
# print('++++++++++++++++++++++++++++++')
|
||||
if not protocol.checkSocksIP(self.destination.host):
|
||||
self.append_write_buf(protocol.assembleErrorMessage(
|
||||
errorText="Too many connections from your IP."
|
||||
|
|
|
@ -11,7 +11,7 @@ from queues import Queue, portCheckerQueue
|
|||
|
||||
def getDiscoveredPeer():
|
||||
try:
|
||||
peer = random.choice(state.discoveredPeers.keys())
|
||||
peer = random.choice([key for key in state.discoveredPeers.keys()])
|
||||
except (IndexError, KeyError):
|
||||
raise ValueError
|
||||
try:
|
||||
|
@ -24,6 +24,7 @@ def getDiscoveredPeer():
|
|||
def chooseConnection(stream):
|
||||
haveOnion = BMConfigParser().safeGet(
|
||||
"bitmessagesettings", "socksproxytype")[0:5] == 'SOCKS'
|
||||
|
||||
if state.trustedPeer:
|
||||
return state.trustedPeer
|
||||
try:
|
||||
|
@ -37,14 +38,14 @@ def chooseConnection(stream):
|
|||
# discovered peers are already filtered by allowed streams
|
||||
return getDiscoveredPeer()
|
||||
for _ in range(50):
|
||||
peer = random.choice(list(knownnodes.knownNodes[stream].keys()))
|
||||
peer = random.choice([key for key in knownnodes.knownNodes[stream].keys()])
|
||||
try:
|
||||
peer_info = knownnodes.knownNodes[stream][peer]
|
||||
if peer_info.get('self'):
|
||||
continue
|
||||
rating = peer_info["rating"]
|
||||
except TypeError:
|
||||
logger.warning('Error in %s', peer)
|
||||
logger.warning('Error in {}'.format(peer))
|
||||
rating = 0
|
||||
if haveOnion:
|
||||
# onion addresses have a higher priority when SOCKS
|
||||
|
|
|
@ -147,6 +147,7 @@ class BMConnectionPool(object):
|
|||
port = int(BMConfigParser().safeGet("bitmessagesettings", "port"))
|
||||
# correct port even if it changed
|
||||
ls = TCPServer(host=bind, port=port)
|
||||
print('inside the startListening method')
|
||||
self.listeningSockets[ls.destination] = ls
|
||||
|
||||
def startUDPSocket(self, bind=None):
|
||||
|
@ -219,6 +220,7 @@ class BMConnectionPool(object):
|
|||
self.startBootstrappers()
|
||||
knownnodes.knownNodesActual = True
|
||||
if not self.bootstrapped:
|
||||
|
||||
self.bootstrapped = True
|
||||
Proxy.proxy = (
|
||||
BMConfigParser().safeGet(
|
||||
|
@ -295,6 +297,7 @@ class BMConnectionPool(object):
|
|||
):
|
||||
# FIXME: rating will be increased after next connection
|
||||
i.handle_close()
|
||||
|
||||
if acceptConnections:
|
||||
if not self.listeningSockets:
|
||||
if BMConfigParser().safeGet('network', 'bind') == '':
|
||||
|
@ -306,7 +309,7 @@ class BMConnectionPool(object):
|
|||
).split():
|
||||
self.startListening(bind)
|
||||
logger.info('Listening for incoming connections.')
|
||||
if not self.udpSockets:
|
||||
if False:
|
||||
# self.udpSockets :- {'0.0.0.0': <network.udp.UDPSocket connected at 0x7f95cce7d7b8>}
|
||||
if BMConfigParser().safeGet('network', 'bind') == '':
|
||||
self.startUDPSocket()
|
||||
|
|
|
@ -32,7 +32,7 @@ def connectedHostsList():
|
|||
retval.append(i)
|
||||
except AttributeError:
|
||||
pass
|
||||
print('#################### retval -{}'.format(retval))
|
||||
# print('#################### retval -{}'.format(retval))
|
||||
return retval
|
||||
|
||||
|
||||
|
|
|
@ -367,8 +367,10 @@ class TCPServer(AdvancedDispatcher):
|
|||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.set_reuse_addr()
|
||||
for attempt in range(50):
|
||||
print('inside the attempt of line 371')
|
||||
try:
|
||||
if attempt > 0:
|
||||
print('inside the if condition attempt in 373')
|
||||
port = random.randint(32767, 65535)
|
||||
self.bind((host, port))
|
||||
except socket.error as e:
|
||||
|
@ -376,6 +378,7 @@ class TCPServer(AdvancedDispatcher):
|
|||
continue
|
||||
else:
|
||||
if attempt > 0:
|
||||
print('inside the if condition attempt in 381')
|
||||
BMConfigParser().set(
|
||||
'bitmessagesettings', 'port', str(port))
|
||||
BMConfigParser().save()
|
||||
|
|
|
@ -67,7 +67,7 @@ class TLSDispatcher(AdvancedDispatcher): # pylint: disable=too-many-instanc
|
|||
self.isSSL = False
|
||||
|
||||
def state_tls_init(self):
|
||||
print()
|
||||
# print()
|
||||
"""Prepare sockets for TLS handshake"""
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self.isSSL = True
|
||||
|
@ -95,16 +95,16 @@ class TLSDispatcher(AdvancedDispatcher): # pylint: disable=too-many-instanc
|
|||
ciphers=self.ciphers, do_handshake_on_connect=False)
|
||||
self.sslSocket.setblocking(0)
|
||||
self.want_read = self.want_write = True
|
||||
print('before tls file python 98 state are :- {}'.format(self.state))
|
||||
# print('before tls file python 98 state are :- {}'.format(self.state))
|
||||
self.set_state("tls_handshake")
|
||||
print('after tls file python 100 state are :- {}'.format(self.state))
|
||||
# print('after tls file python 100 state are :- {}'.format(self.state))
|
||||
return False
|
||||
# if hasattr(self.socket, "context"):
|
||||
# self.socket.context.set_ecdh_curve("secp256k1")
|
||||
|
||||
@staticmethod
|
||||
def state_tls_handshake():
|
||||
print("tls's state_tls_handshake method in line 107")
|
||||
# print("tls's state_tls_handshake method in line 107")
|
||||
"""Do nothing while TLS handshake is pending, as during this phase we need to react to callbacks instead"""
|
||||
return False
|
||||
|
||||
|
@ -182,7 +182,7 @@ class TLSDispatcher(AdvancedDispatcher): # pylint: disable=too-many-instanc
|
|||
return
|
||||
|
||||
def tls_handshake(self):
|
||||
print('inside the tls_handshake')
|
||||
# print('inside the tls_handshake')
|
||||
"""Perform TLS handshake and handle its stages"""
|
||||
# wait for flush
|
||||
if self.write_buf:
|
||||
|
|
|
@ -72,11 +72,14 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attribut
|
|||
addresses = self._decode_addr()
|
||||
# only allow peer discovery from private IPs in order to avoid
|
||||
# attacks from random IPs on the internet
|
||||
if not self.local:
|
||||
return True
|
||||
# if not self.local:
|
||||
# return True
|
||||
self.local = True
|
||||
remoteport = False
|
||||
for seenTime, stream, services, ip, port in addresses:
|
||||
decodedIP = protocol.checkIPAddress(str(ip))
|
||||
# decodedIP = bool(protocol.checkIPAddress(ip))
|
||||
decodedIP = False
|
||||
|
||||
if stream not in state.streamsInWhichIAmParticipating:
|
||||
continue
|
||||
if (seenTime < time.time() - self.maxTimeOffset or seenTime > time.time() + self.maxTimeOffset):
|
||||
|
@ -88,7 +91,7 @@ class UDPSocket(BMProto): # pylint: disable=too-many-instance-attribut
|
|||
if remoteport is False:
|
||||
return True
|
||||
logger.debug(
|
||||
"received peer discovery from %s:%i (port %i):",
|
||||
"received peer discovery from {}:{} (port {}):",
|
||||
self.destination.host, self.destination.port, remoteport)
|
||||
if self.local:
|
||||
state.discoveredPeers[
|
||||
|
|
|
@ -178,6 +178,7 @@ def haveSSL(server=False):
|
|||
python < 2.7.9's ssl library does not support ECDSA server due to
|
||||
missing initialisation of available curves, but client works ok
|
||||
"""
|
||||
return False
|
||||
if not server:
|
||||
return True
|
||||
elif sys.version_info >= (2, 7, 9):
|
||||
|
|
Reference in New Issue
Block a user