From 078c183ae94c4696dde6f9d60793ef920aa536ce Mon Sep 17 00:00:00 2001 From: cis-kuldeep Date: Wed, 28 Jul 2021 18:32:27 +0530 Subject: [PATCH] formatted, refactored & cleaned code --- src/protocol.py | 214 +++++++++++++++++++----------------------------- 1 file changed, 85 insertions(+), 129 deletions(-) diff --git a/src/protocol.py b/src/protocol.py index c28fccfc..1cdf9948 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -9,6 +9,7 @@ import hashlib import random import socket import sys +import six import time from binascii import hexlify from struct import Struct, pack, unpack @@ -36,8 +37,6 @@ except ImportError: from .helper_sql import sqlExecute from .version import softwareVersion -PY3 = sys.version_info[0] >= 3 - # Service flags #: This is a normal network node NODE_NETWORK = 1 @@ -107,27 +106,21 @@ def isBitSetWithinBitfield(fourByteString, n): # ip addresses -if PY3: - def encodeHost(host): - """Encode a given host to be used in low-level socket operations""" - if host.find('.onion') > -1: - return b'\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode( - host.split(".")[0], True) - elif host.find(':') == -1: - return b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ - socket.inet_aton(host) - return socket.inet_pton(socket.AF_INET6, host) +def encodeHost(host): + """Encode a given host to be used in low-level socket operations""" + if six.PY2: + ONION_PREFIX = '\xfd\x87\xd8\x7e\xeb\x43' + IP_PREFIX = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + else: + ONION_PREFIX = b'\xfd\x87\xd8\x7e\xeb\x43' + IP_PREFIX = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' -else: - def encodeHost(host): - """Encode a given host to be used in low-level socket operations""" - if host.find('.onion') > -1: - return '\xfd\x87\xd8\x7e\xeb\x43' + base64.b32decode( - host.split(".")[0], True) - elif host.find(':') == -1: - return '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + \ - socket.inet_aton(host) - return socket.inet_pton(socket.AF_INET6, host) + if host.find('.onion') > -1: + return ONION_PREFIX + base64.b32decode( + host.split(".")[0], True) + elif host.find(':') == -1: + return IP_PREFIX + socket.inet_aton(host) + return socket.inet_pton(socket.AF_INET6, host) def networkType(host): @@ -167,116 +160,79 @@ def network_group(host): return network_type -if PY3: - def checkIPAddress(host, private=False): - """ - Returns hostStandardFormat if it is a valid IP address, - otherwise returns False - """ - if host[0:12] == b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': - hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:]) - return checkIPv4Address(host[12:], hostStandardFormat, private) - elif host[0:6] == b'\xfd\x87\xd8\x7e\xeb\x43': - # Onion, based on BMD/bitcoind - hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion" - if private: - return False - return hostStandardFormat - else: - try: - hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host) - except ValueError: - return False - if hostStandardFormat == "": - # This can happen on Windows systems which are - # not 64-bit compatible so let us drop the IPv6 address. - return False - return checkIPv6Address(host, hostStandardFormat, private) +def checkIPAddress(host, private=False): + """ + Returns hostStandardFormat if it is a valid IP address, + otherwise returns False + """ + if six.PY3: + IP_PREFIX1 = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + IP_PREFIX2 = b'\xfd\x87\xd8\x7e\xeb\x43' + else: + IP_PREFIX1 = '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF' + IP_PREFIX2 = '\xfd\x87\xd8\x7e\xeb\x43' -else: - def checkIPAddress(host, private=False): - """ - Returns hostStandardFormat if it is a valid IP address, - otherwise returns False - """ - if host[0:12] == '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF\xFF': - hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:]) - return checkIPv4Address(host[12:], hostStandardFormat, private) - elif host[0:6] == '\xfd\x87\xd8\x7e\xeb\x43': - # Onion, based on BMD/bitcoind - hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion" - if private: - return False - return hostStandardFormat - else: - try: - hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host) - except ValueError: - return False - if hostStandardFormat == "": - # This can happen on Windows systems which are - # not 64-bit compatible so let us drop the IPv6 address. - return False - return checkIPv6Address(host, hostStandardFormat, private) + if host[0:12] == IP_PREFIX1: + hostStandardFormat = socket.inet_ntop(socket.AF_INET, host[12:]) + return checkIPv4Address(host[12:], hostStandardFormat, private) + elif host[0:6] == IP_PREFIX2: + # Onion, based on BMD/bitcoind + hostStandardFormat = base64.b32encode(host[6:]).lower() + ".onion" + if private: + return False + return hostStandardFormat + else: + try: + hostStandardFormat = socket.inet_ntop(socket.AF_INET6, host) + except ValueError: + return False + if hostStandardFormat == "": + # This can happen on Windows systems which are + # not 64-bit compatible so let us drop the IPv6 address. + return False + return checkIPv6Address(host, hostStandardFormat, private) -if PY3: - def checkIPv4Address(host, hostStandardFormat, private=False): - """ - Returns hostStandardFormat if it is an IPv4 address, - otherwise returns False - """ - if host[0] == 127: # 127/8 - if not private: - logger.debug( - 'Ignoring IP address in loopback range: %s', - hostStandardFormat) - return hostStandardFormat if private else False - if host[0] == 10: # 10/8 - if not private: - logger.debug( - 'Ignoring IP address in private range: %s', hostStandardFormat) - return hostStandardFormat if private else False - if host[0:2] == b'\xC0\xA8': # 192.168/16 - if not private: - logger.debug( - 'Ignoring IP address in private range: %s', hostStandardFormat) - return hostStandardFormat if private else False - if host[0:2] >= b'\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 - if not private: - logger.debug( - 'Ignoring IP address in private range: %s', hostStandardFormat) - return hostStandardFormat if private else False - return False if private else hostStandardFormat - -else: - def checkIPv4Address(host, hostStandardFormat, private=False): - """ - Returns hostStandardFormat if it is an IPv4 address, - otherwise returns False - """ - if host[0] == '\x7F': # 127/8 - if not private: - logger.debug( - 'Ignoring IP address in loopback range: %s', - hostStandardFormat) - return hostStandardFormat if private else False - if host[0] == '\x0A': # 10/8 - if not private: - logger.debug( - 'Ignoring IP address in private range: %s', hostStandardFormat) - return hostStandardFormat if private else False - if host[0:2] == '\xC0\xA8': # 192.168/16 - if not private: - logger.debug( - 'Ignoring IP address in private range: %s', hostStandardFormat) - return hostStandardFormat if private else False - if host[0:2] >= '\xAC\x10' and host[0:2] < '\xAC\x20': # 172.16/12 - if not private: - logger.debug( - 'Ignoring IP address in private range: %s', hostStandardFormat) - return hostStandardFormat if private else False - return False if private else hostStandardFormat +def checkIPv4Address(host, hostStandardFormat, private=False): + """ + Returns hostStandardFormat if it is an IPv4 address, + otherwise returns False + """ + if six.PY2: + IP_OCT1 = '\x7F' # 127/8 + IP_OCT2 = '\x0A' # 10/8 + IP_OCT3 = '\xC0\xA8' # 192.168/16 + IP_OCT4 = '\xAC\x10' # 172.16 + IP_OCT5 = '\xAC\x20' # 12 + else: + IP_OCT1 = 127 # 127/8 + IP_OCT2 = 10 # 10/8 + IP_OCT3 = b'\xC0\xA8' # 192.168/16 + IP_OCT4 = b'\xAC\x10' # 172.16 + IP_OCT5 = b'\xAC\x20' # 12 + + if host[0] == IP_OCT1: + if not private: + logger.debug( + 'Ignoring IP address in loopback range: %s', + hostStandardFormat) + return hostStandardFormat if private else False + if host[0] == IP_OCT2: + if not private: + logger.debug( + 'Ignoring IP address in private range: %s', hostStandardFormat) + return hostStandardFormat if private else False + if host[0:2] == IP_OCT3: + if not private: + logger.debug( + 'Ignoring IP address in private range: %s', hostStandardFormat) + return hostStandardFormat if private else False + if host[0:2] >= IP_OCT4 and host[0:2] < IP_OCT5: + if not private: + logger.debug( + 'Ignoring IP address in private range: %s', hostStandardFormat) + return hostStandardFormat if private else False + return False if private else hostStandardFormat def checkIPv6Address(host, hostStandardFormat, private=False):