From 21a7bdba441c34a104186d573c90d772e7c7cdcd Mon Sep 17 00:00:00 2001 From: Kashiko Koibumi Date: Sat, 25 May 2024 12:27:32 +0900 Subject: [PATCH] use six.int2byte(), six.byte2int, six.BytesIO --- src/bitmessagecurses/__init__.py | 11 ++--- src/bitmessagekivy/identiconGeneration.py | 2 +- src/fallback/umsgpack/umsgpack.py | 51 +++++++++++------------ src/network/dandelion.py | 2 +- src/network/socks4a.py | 17 ++++---- src/network/socks5.py | 23 +++++----- src/protocol.py | 3 +- src/pyelliptic/arithmetic.py | 3 +- src/pyelliptic/hash.py | 4 +- src/pyelliptic/openssl.py | 2 +- src/tests/test_randomtrackingdict.py | 3 +- 11 files changed, 64 insertions(+), 57 deletions(-) diff --git a/src/bitmessagecurses/__init__.py b/src/bitmessagecurses/__init__.py index 64fd735b..c2764d1c 100644 --- a/src/bitmessagecurses/__init__.py +++ b/src/bitmessagecurses/__init__.py @@ -17,6 +17,7 @@ import sys import time from textwrap import fill from threading import Timer +import six from dialog import Dialog import helper_sent @@ -105,7 +106,7 @@ def ascii(s): """ASCII values""" r = "" for c in s: - if ord(c) in range(128): + if six.byte2int(c) in range(128): r += c return r @@ -326,13 +327,13 @@ def handlech(c, stdscr): if c != curses.ERR: global inboxcur, addrcur, sentcur, subcur, abookcur, blackcur if c in range(256): - if chr(c) in '12345678': + if six.int2byte(c) in '12345678': global menutab - menutab = int(chr(c)) - elif chr(c) == 'q': + menutab = int(six.int2byte(c)) + elif six.int2byte(c) == 'q': global quit_ quit_ = True - elif chr(c) == '\n': + elif six.int2byte(c) == '\n': curses.curs_set(1) d = Dialog(dialog="dialog") if menutab == 1: diff --git a/src/bitmessagekivy/identiconGeneration.py b/src/bitmessagekivy/identiconGeneration.py index 2e2f2e93..63c58c0d 100644 --- a/src/bitmessagekivy/identiconGeneration.py +++ b/src/bitmessagekivy/identiconGeneration.py @@ -3,7 +3,7 @@ Core classes for loading images and converting them to a Texture. The raw image data can be keep in memory for further access """ import hashlib -from io import BytesIO +from six import BytesIO from PIL import Image from kivy.core.image import Image as CoreImage diff --git a/src/fallback/umsgpack/umsgpack.py b/src/fallback/umsgpack/umsgpack.py index d4c347ca..cac4986f 100644 --- a/src/fallback/umsgpack/umsgpack.py +++ b/src/fallback/umsgpack/umsgpack.py @@ -50,7 +50,6 @@ License: MIT # pylint: disable=unused-argument import collections -import io import struct import sys import six @@ -126,7 +125,7 @@ class Ext: # pylint: disable=old-style-class String representation of this Ext object. """ s = "Ext Object (Type: 0x%02x, Data: " % self.type - s += " ".join(["0x%02x" % ord(self.data[i:i + 1]) + s += " ".join(["0x%02x" % six.byte2int(self.data[i:i + 1]) for i in xrange(min(len(self.data), 8))]) if len(self.data) > 8: s += " ..." @@ -550,7 +549,7 @@ def _packb2(obj, **options): '\x82\xa7compact\xc3\xa6schema\x00' >>> """ - fp = io.BytesIO() + fp = six.BytesIO() _pack2(obj, fp, **options) return fp.getvalue() @@ -583,7 +582,7 @@ def _packb3(obj, **options): b'\x82\xa7compact\xc3\xa6schema\x00' >>> """ - fp = io.BytesIO() + fp = six.BytesIO() _pack3(obj, fp, **options) return fp.getvalue() @@ -600,7 +599,7 @@ def _read_except(fp, n): def _unpack_integer(code, fp, options): - if (ord(code) & 0xe0) == 0xe0: + if (six.byte2int(code) & 0xe0) == 0xe0: return struct.unpack("b", code)[0] elif code == b'\xd0': return struct.unpack("b", _read_except(fp, 1))[0] @@ -610,7 +609,7 @@ def _unpack_integer(code, fp, options): return struct.unpack(">i", _read_except(fp, 4))[0] elif code == b'\xd3': return struct.unpack(">q", _read_except(fp, 8))[0] - elif (ord(code) & 0x80) == 0x00: + elif (six.byte2int(code) & 0x80) == 0x00: return struct.unpack("B", code)[0] elif code == b'\xcc': return struct.unpack("B", _read_except(fp, 1))[0] @@ -620,21 +619,21 @@ def _unpack_integer(code, fp, options): return struct.unpack(">I", _read_except(fp, 4))[0] elif code == b'\xcf': return struct.unpack(">Q", _read_except(fp, 8))[0] - raise Exception("logic error, not int: 0x%02x" % ord(code)) + raise Exception("logic error, not int: 0x%02x" % six.byte2int(code)) def _unpack_reserved(code, fp, options): if code == b'\xc1': raise ReservedCodeException( - "encountered reserved code: 0x%02x" % ord(code)) + "encountered reserved code: 0x%02x" % six.byte2int(code)) raise Exception( - "logic error, not reserved code: 0x%02x" % ord(code)) + "logic error, not reserved code: 0x%02x" % six.byte2int(code)) def _unpack_nil(code, fp, options): if code == b'\xc0': return None - raise Exception("logic error, not nil: 0x%02x" % ord(code)) + raise Exception("logic error, not nil: 0x%02x" % six.byte2int(code)) def _unpack_boolean(code, fp, options): @@ -642,7 +641,7 @@ def _unpack_boolean(code, fp, options): return False elif code == b'\xc3': return True - raise Exception("logic error, not boolean: 0x%02x" % ord(code)) + raise Exception("logic error, not boolean: 0x%02x" % six.byte2int(code)) def _unpack_float(code, fp, options): @@ -650,12 +649,12 @@ def _unpack_float(code, fp, options): return struct.unpack(">f", _read_except(fp, 4))[0] elif code == b'\xcb': return struct.unpack(">d", _read_except(fp, 8))[0] - raise Exception("logic error, not float: 0x%02x" % ord(code)) + raise Exception("logic error, not float: 0x%02x" % six.byte2int(code)) def _unpack_string(code, fp, options): - if (ord(code) & 0xe0) == 0xa0: - length = ord(code) & ~0xe0 + if (six.byte2int(code) & 0xe0) == 0xa0: + length = six.byte2int(code) & ~0xe0 elif code == b'\xd9': length = struct.unpack("B", _read_except(fp, 1))[0] elif code == b'\xda': @@ -663,7 +662,7 @@ def _unpack_string(code, fp, options): elif code == b'\xdb': length = struct.unpack(">I", _read_except(fp, 4))[0] else: - raise Exception("logic error, not string: 0x%02x" % ord(code)) + raise Exception("logic error, not string: 0x%02x" % six.byte2int(code)) # Always return raw bytes in compatibility mode global compatibility @@ -687,7 +686,7 @@ def _unpack_binary(code, fp, options): elif code == b'\xc6': length = struct.unpack(">I", _read_except(fp, 4))[0] else: - raise Exception("logic error, not binary: 0x%02x" % ord(code)) + raise Exception("logic error, not binary: 0x%02x" % six.byte2int(code)) return _read_except(fp, length) @@ -710,9 +709,9 @@ def _unpack_ext(code, fp, options): elif code == b'\xc9': length = struct.unpack(">I", _read_except(fp, 4))[0] else: - raise Exception("logic error, not ext: 0x%02x" % ord(code)) + raise Exception("logic error, not ext: 0x%02x" % six.byte2int(code)) - ext = Ext(ord(_read_except(fp, 1)), _read_except(fp, length)) + ext = Ext(six.byte2int(_read_except(fp, 1)), _read_except(fp, length)) # Unpack with ext handler, if we have one ext_handlers = options.get("ext_handlers") @@ -723,14 +722,14 @@ def _unpack_ext(code, fp, options): def _unpack_array(code, fp, options): - if (ord(code) & 0xf0) == 0x90: - length = (ord(code) & ~0xf0) + if (six.byte2int(code) & 0xf0) == 0x90: + length = (six.byte2int(code) & ~0xf0) elif code == b'\xdc': length = struct.unpack(">H", _read_except(fp, 2))[0] elif code == b'\xdd': length = struct.unpack(">I", _read_except(fp, 4))[0] else: - raise Exception("logic error, not array: 0x%02x" % ord(code)) + raise Exception("logic error, not array: 0x%02x" % six.byte2int(code)) return [_unpack(fp, options) for _ in xrange(length)] @@ -742,14 +741,14 @@ def _deep_list_to_tuple(obj): def _unpack_map(code, fp, options): - if (ord(code) & 0xf0) == 0x80: - length = (ord(code) & ~0xf0) + if (six.byte2int(code) & 0xf0) == 0x80: + length = (six.byte2int(code) & ~0xf0) elif code == b'\xde': length = struct.unpack(">H", _read_except(fp, 2))[0] elif code == b'\xdf': length = struct.unpack(">I", _read_except(fp, 4))[0] else: - raise Exception("logic error, not map: 0x%02x" % ord(code)) + raise Exception("logic error, not map: 0x%02x" % six.byte2int(code)) d = {} if not options.get('use_ordered_dict') \ else collections.OrderedDict() @@ -912,7 +911,7 @@ def _unpackb2(s, **options): """ if not isinstance(s, (str, bytearray)): raise TypeError("packed data must be type 'str' or 'bytearray'") - return _unpack(io.BytesIO(s), options) + return _unpack(six.BytesIO(s), options) # For Python 3, expects a bytes object @@ -958,7 +957,7 @@ def _unpackb3(s, **options): """ if not isinstance(s, (bytes, bytearray)): raise TypeError("packed data must be type 'bytes' or 'bytearray'") - return _unpack(io.BytesIO(s), options) + return _unpack(six.BytesIO(s), options) ############################################################################# # Module Initialization diff --git a/src/network/dandelion.py b/src/network/dandelion.py index e2232082..8bed5bee 100644 --- a/src/network/dandelion.py +++ b/src/network/dandelion.py @@ -75,7 +75,7 @@ class Dandelion: # pylint: disable=old-style-class if logger.isEnabledFor(logging.DEBUG): logger.debug( '%s entering fluff mode due to %s.', - ''.join('%02x' % ord(i) for i in hashId), reason) + ''.join('%02x' % six.byte2int(i) for i in hashId), reason) with self.lock: try: del self.hashMap[hashId] diff --git a/src/network/socks4a.py b/src/network/socks4a.py index e9786168..2df46f06 100644 --- a/src/network/socks4a.py +++ b/src/network/socks4a.py @@ -5,6 +5,7 @@ SOCKS4a proxy module import logging import socket import struct +import six from proxy import GeneralProxyError, Proxy, ProxyError @@ -39,16 +40,16 @@ class Socks4a(Proxy): def state_pre_connect(self): """Handle feedback from SOCKS4a while it is connecting on our behalf""" # Get the response - if self.read_buf[0:1] != chr(0x00).encode(): + if self.read_buf[0:1] != six.int2byte(0x00).encode(): # bad data self.close() raise GeneralProxyError(1) - elif self.read_buf[1:2] != chr(0x5A).encode(): + elif self.read_buf[1:2] != six.int2byte(0x5A).encode(): # Connection failed self.close() - if ord(self.read_buf[1:2]) in (91, 92, 93): + if six.byte2int(self.read_buf[1:2]) in (91, 92, 93): # socks 4 error - raise Socks4aError(ord(self.read_buf[1:2]) - 90) + raise Socks4aError(six.byte2int(self.read_buf[1:2]) - 90) else: raise Socks4aError(4) # Get the bound address/port @@ -102,9 +103,9 @@ class Socks4aConnection(Socks4a): self.append_write_buf(self.ipaddr) if self._auth: self.append_write_buf(self._auth[0]) - self.append_write_buf(chr(0x00).encode()) + self.append_write_buf(six.int2byte(0x00).encode()) if rmtrslv: - self.append_write_buf(self.destination[0] + chr(0x00).encode()) + self.append_write_buf(self.destination[0] + six.int2byte(0x00).encode()) self.set_state("pre_connect", length=0, expectBytes=8) return True @@ -132,8 +133,8 @@ class Socks4aResolver(Socks4a): self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01)) if self._auth: self.append_write_buf(self._auth[0]) - self.append_write_buf(chr(0x00).encode()) - self.append_write_buf(self.host + chr(0x00).encode()) + self.append_write_buf(six.int2byte(0x00).encode()) + self.append_write_buf(self.host + six.int2byte(0x00).encode()) self.set_state("pre_connect", length=0, expectBytes=8) return True diff --git a/src/network/socks5.py b/src/network/socks5.py index d1daae42..9aaa57cc 100644 --- a/src/network/socks5.py +++ b/src/network/socks5.py @@ -6,6 +6,7 @@ SOCKS5 proxy module import logging import socket import struct +import six from node import Peer from proxy import GeneralProxyError, Proxy, ProxyError @@ -97,20 +98,20 @@ class Socks5(Proxy): def state_pre_connect(self): """Handle feedback from socks5 while it is connecting on our behalf.""" # Get the response - if self.read_buf[0:1] != chr(0x05).encode(): + if self.read_buf[0:1] != six.int2byte(0x05).encode(): self.close() raise GeneralProxyError(1) - elif self.read_buf[1:2] != chr(0x00).encode(): + elif self.read_buf[1:2] != six.int2byte(0x00).encode(): # Connection failed self.close() - if ord(self.read_buf[1:2]) <= 8: - raise Socks5Error(ord(self.read_buf[1:2])) + if six.byte2int(self.read_buf[1:2]) <= 8: + raise Socks5Error(six.byte2int(self.read_buf[1:2])) else: raise Socks5Error(9) # Get the bound address/port - elif self.read_buf[3:4] == chr(0x01).encode(): + elif self.read_buf[3:4] == six.int2byte(0x01).encode(): self.set_state("proxy_addr_1", length=4, expectBytes=4) - elif self.read_buf[3:4] == chr(0x03).encode(): + elif self.read_buf[3:4] == six.int2byte(0x03).encode(): self.set_state("proxy_addr_2_1", length=4, expectBytes=1) else: self.close() @@ -129,7 +130,7 @@ class Socks5(Proxy): (e.g. IPv6, onion, ...). This is part 1 which retrieves the length of the data. """ - self.address_length = ord(self.read_buf[0:1]) + self.address_length = six.byte2int(self.read_buf[0:1]) self.set_state( "proxy_addr_2_2", length=1, expectBytes=self.address_length) return True @@ -171,19 +172,19 @@ class Socks5Connection(Socks5): # use the IPv4 address request even if remote resolving was specified. try: self.ipaddr = socket.inet_aton(self.destination[0]) - self.append_write_buf(chr(0x01).encode() + self.ipaddr) + self.append_write_buf(six.int2byte(0x01).encode() + self.ipaddr) except socket.error: # may be IPv6! # Well it's not an IP number, so it's probably a DNS name. if self._remote_dns: # Resolve remotely self.ipaddr = None - self.append_write_buf(chr(0x03).encode() + chr( + self.append_write_buf(six.int2byte(0x03).encode() + six.int2byte( len(self.destination[0])).encode() + self.destination[0]) else: # Resolve locally self.ipaddr = socket.inet_aton( socket.gethostbyname(self.destination[0])) - self.append_write_buf(chr(0x01).encode() + self.ipaddr) + self.append_write_buf(six.int2byte(0x01).encode() + self.ipaddr) self.append_write_buf(struct.pack(">H", self.destination[1])) self.set_state("pre_connect", length=0, expectBytes=4) return True @@ -208,7 +209,7 @@ class Socks5Resolver(Socks5): """Perform resolving""" # Now we can request the actual connection self.append_write_buf(struct.pack('BBB', 0x05, 0xF0, 0x00)) - self.append_write_buf(chr(0x03).encode() + chr( + self.append_write_buf(six.int2byte(0x03).encode() + six.int2byte( len(self.host)).encode() + str(self.host)) self.append_write_buf(struct.pack(">H", self.port)) self.set_state("pre_connect", length=0, expectBytes=4) diff --git a/src/protocol.py b/src/protocol.py index 7f9830e5..e300b97a 100644 --- a/src/protocol.py +++ b/src/protocol.py @@ -12,6 +12,7 @@ import sys import time from binascii import hexlify from struct import Struct, pack, unpack +import six import defaults import highlevelcrypto @@ -227,7 +228,7 @@ def checkIPv6Address(host, hostStandardFormat, private=False): logger.debug('Ignoring loopback address: %s', hostStandardFormat) return False try: - host = [ord(c) for c in host[:2]] + host = [six.byte2int(c) for c in host[:2]] except TypeError: # python3 has ints already pass if host[0] == 0xfe and host[1] & 0xc0 == 0x80: diff --git a/src/pyelliptic/arithmetic.py b/src/pyelliptic/arithmetic.py index 23c24b5e..ce66db98 100644 --- a/src/pyelliptic/arithmetic.py +++ b/src/pyelliptic/arithmetic.py @@ -3,6 +3,7 @@ Arithmetic Expressions """ import hashlib import re +import six P = 2**256 - 2**32 - 2**9 - 2**8 - 2**7 - 2**6 - 2**4 - 1 A = 0 @@ -34,7 +35,7 @@ def get_code_string(base): return b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz' if base == 256: try: - return b''.join([chr(x) for x in range(256)]) + return b''.join([six.int2byte(x) for x in range(256)]) except TypeError: return bytes([x for x in range(256)]) diff --git a/src/pyelliptic/hash.py b/src/pyelliptic/hash.py index 70c9a6ce..51ffd623 100644 --- a/src/pyelliptic/hash.py +++ b/src/pyelliptic/hash.py @@ -4,6 +4,8 @@ Wrappers for hash functions from OpenSSL. # Copyright (C) 2011 Yann GUIBET # See LICENSE for details. +import six + from .openssl import OpenSSL @@ -22,7 +24,7 @@ def _equals_str(a, b): return False result = 0 for x, y in zip(a, b): - result |= ord(x) ^ ord(y) + result |= six.byte2int(x) ^ six.byte2int(y) return result == 0 diff --git a/src/pyelliptic/openssl.py b/src/pyelliptic/openssl.py index 42c2946d..f1def5d3 100644 --- a/src/pyelliptic/openssl.py +++ b/src/pyelliptic/openssl.py @@ -692,7 +692,7 @@ class _OpenSSL(object): length = self.BN_num_bytes(x) data = self.malloc(0, length) OpenSSL.BN_bn2bin(x, data) - return ord(data[length - 1]) & 1 + return six.byte2int(data[length - 1]) & 1 def get_cipher(self, name): """ diff --git a/src/tests/test_randomtrackingdict.py b/src/tests/test_randomtrackingdict.py index 2db3c423..528780a9 100644 --- a/src/tests/test_randomtrackingdict.py +++ b/src/tests/test_randomtrackingdict.py @@ -3,6 +3,7 @@ Tests for RandomTrackingDict Class """ import random import unittest +import six from time import time @@ -17,7 +18,7 @@ class TestRandomTrackingDict(unittest.TestCase): """helper function for tests, generates a random string""" retval = '' for _ in range(32): - retval += chr(random.randint(0, 255)) + retval += six.int2byte(random.randint(0, 255)) return retval def test_check_randomtrackingdict(self):