use six.int2byte(), six.byte2int, six.BytesIO

This commit is contained in:
Kashiko Koibumi 2024-05-25 12:27:32 +09:00
parent 46d56c703e
commit 21a7bdba44
No known key found for this signature in database
GPG Key ID: 8F06E069E37C40C4
11 changed files with 64 additions and 57 deletions

View File

@ -17,6 +17,7 @@ import sys
import time
from textwrap import fill
from threading import Timer
import six
from dialog import Dialog
import helper_sent
@ -105,7 +106,7 @@ def ascii(s):
"""ASCII values"""
r = ""
for c in s:
if ord(c) in range(128):
if six.byte2int(c) in range(128):
r += c
return r
@ -326,13 +327,13 @@ def handlech(c, stdscr):
if c != curses.ERR:
global inboxcur, addrcur, sentcur, subcur, abookcur, blackcur
if c in range(256):
if chr(c) in '12345678':
if six.int2byte(c) in '12345678':
global menutab
menutab = int(chr(c))
elif chr(c) == 'q':
menutab = int(six.int2byte(c))
elif six.int2byte(c) == 'q':
global quit_
quit_ = True
elif chr(c) == '\n':
elif six.int2byte(c) == '\n':
curses.curs_set(1)
d = Dialog(dialog="dialog")
if menutab == 1:

View File

@ -3,7 +3,7 @@ Core classes for loading images and converting them to a Texture.
The raw image data can be keep in memory for further access
"""
import hashlib
from io import BytesIO
from six import BytesIO
from PIL import Image
from kivy.core.image import Image as CoreImage

View File

@ -50,7 +50,6 @@ License: MIT
# pylint: disable=unused-argument
import collections
import io
import struct
import sys
import six
@ -126,7 +125,7 @@ class Ext: # pylint: disable=old-style-class
String representation of this Ext object.
"""
s = "Ext Object (Type: 0x%02x, Data: " % self.type
s += " ".join(["0x%02x" % ord(self.data[i:i + 1])
s += " ".join(["0x%02x" % six.byte2int(self.data[i:i + 1])
for i in xrange(min(len(self.data), 8))])
if len(self.data) > 8:
s += " ..."
@ -550,7 +549,7 @@ def _packb2(obj, **options):
'\x82\xa7compact\xc3\xa6schema\x00'
>>>
"""
fp = io.BytesIO()
fp = six.BytesIO()
_pack2(obj, fp, **options)
return fp.getvalue()
@ -583,7 +582,7 @@ def _packb3(obj, **options):
b'\x82\xa7compact\xc3\xa6schema\x00'
>>>
"""
fp = io.BytesIO()
fp = six.BytesIO()
_pack3(obj, fp, **options)
return fp.getvalue()
@ -600,7 +599,7 @@ def _read_except(fp, n):
def _unpack_integer(code, fp, options):
if (ord(code) & 0xe0) == 0xe0:
if (six.byte2int(code) & 0xe0) == 0xe0:
return struct.unpack("b", code)[0]
elif code == b'\xd0':
return struct.unpack("b", _read_except(fp, 1))[0]
@ -610,7 +609,7 @@ def _unpack_integer(code, fp, options):
return struct.unpack(">i", _read_except(fp, 4))[0]
elif code == b'\xd3':
return struct.unpack(">q", _read_except(fp, 8))[0]
elif (ord(code) & 0x80) == 0x00:
elif (six.byte2int(code) & 0x80) == 0x00:
return struct.unpack("B", code)[0]
elif code == b'\xcc':
return struct.unpack("B", _read_except(fp, 1))[0]
@ -620,21 +619,21 @@ def _unpack_integer(code, fp, options):
return struct.unpack(">I", _read_except(fp, 4))[0]
elif code == b'\xcf':
return struct.unpack(">Q", _read_except(fp, 8))[0]
raise Exception("logic error, not int: 0x%02x" % ord(code))
raise Exception("logic error, not int: 0x%02x" % six.byte2int(code))
def _unpack_reserved(code, fp, options):
if code == b'\xc1':
raise ReservedCodeException(
"encountered reserved code: 0x%02x" % ord(code))
"encountered reserved code: 0x%02x" % six.byte2int(code))
raise Exception(
"logic error, not reserved code: 0x%02x" % ord(code))
"logic error, not reserved code: 0x%02x" % six.byte2int(code))
def _unpack_nil(code, fp, options):
if code == b'\xc0':
return None
raise Exception("logic error, not nil: 0x%02x" % ord(code))
raise Exception("logic error, not nil: 0x%02x" % six.byte2int(code))
def _unpack_boolean(code, fp, options):
@ -642,7 +641,7 @@ def _unpack_boolean(code, fp, options):
return False
elif code == b'\xc3':
return True
raise Exception("logic error, not boolean: 0x%02x" % ord(code))
raise Exception("logic error, not boolean: 0x%02x" % six.byte2int(code))
def _unpack_float(code, fp, options):
@ -650,12 +649,12 @@ def _unpack_float(code, fp, options):
return struct.unpack(">f", _read_except(fp, 4))[0]
elif code == b'\xcb':
return struct.unpack(">d", _read_except(fp, 8))[0]
raise Exception("logic error, not float: 0x%02x" % ord(code))
raise Exception("logic error, not float: 0x%02x" % six.byte2int(code))
def _unpack_string(code, fp, options):
if (ord(code) & 0xe0) == 0xa0:
length = ord(code) & ~0xe0
if (six.byte2int(code) & 0xe0) == 0xa0:
length = six.byte2int(code) & ~0xe0
elif code == b'\xd9':
length = struct.unpack("B", _read_except(fp, 1))[0]
elif code == b'\xda':
@ -663,7 +662,7 @@ def _unpack_string(code, fp, options):
elif code == b'\xdb':
length = struct.unpack(">I", _read_except(fp, 4))[0]
else:
raise Exception("logic error, not string: 0x%02x" % ord(code))
raise Exception("logic error, not string: 0x%02x" % six.byte2int(code))
# Always return raw bytes in compatibility mode
global compatibility
@ -687,7 +686,7 @@ def _unpack_binary(code, fp, options):
elif code == b'\xc6':
length = struct.unpack(">I", _read_except(fp, 4))[0]
else:
raise Exception("logic error, not binary: 0x%02x" % ord(code))
raise Exception("logic error, not binary: 0x%02x" % six.byte2int(code))
return _read_except(fp, length)
@ -710,9 +709,9 @@ def _unpack_ext(code, fp, options):
elif code == b'\xc9':
length = struct.unpack(">I", _read_except(fp, 4))[0]
else:
raise Exception("logic error, not ext: 0x%02x" % ord(code))
raise Exception("logic error, not ext: 0x%02x" % six.byte2int(code))
ext = Ext(ord(_read_except(fp, 1)), _read_except(fp, length))
ext = Ext(six.byte2int(_read_except(fp, 1)), _read_except(fp, length))
# Unpack with ext handler, if we have one
ext_handlers = options.get("ext_handlers")
@ -723,14 +722,14 @@ def _unpack_ext(code, fp, options):
def _unpack_array(code, fp, options):
if (ord(code) & 0xf0) == 0x90:
length = (ord(code) & ~0xf0)
if (six.byte2int(code) & 0xf0) == 0x90:
length = (six.byte2int(code) & ~0xf0)
elif code == b'\xdc':
length = struct.unpack(">H", _read_except(fp, 2))[0]
elif code == b'\xdd':
length = struct.unpack(">I", _read_except(fp, 4))[0]
else:
raise Exception("logic error, not array: 0x%02x" % ord(code))
raise Exception("logic error, not array: 0x%02x" % six.byte2int(code))
return [_unpack(fp, options) for _ in xrange(length)]
@ -742,14 +741,14 @@ def _deep_list_to_tuple(obj):
def _unpack_map(code, fp, options):
if (ord(code) & 0xf0) == 0x80:
length = (ord(code) & ~0xf0)
if (six.byte2int(code) & 0xf0) == 0x80:
length = (six.byte2int(code) & ~0xf0)
elif code == b'\xde':
length = struct.unpack(">H", _read_except(fp, 2))[0]
elif code == b'\xdf':
length = struct.unpack(">I", _read_except(fp, 4))[0]
else:
raise Exception("logic error, not map: 0x%02x" % ord(code))
raise Exception("logic error, not map: 0x%02x" % six.byte2int(code))
d = {} if not options.get('use_ordered_dict') \
else collections.OrderedDict()
@ -912,7 +911,7 @@ def _unpackb2(s, **options):
"""
if not isinstance(s, (str, bytearray)):
raise TypeError("packed data must be type 'str' or 'bytearray'")
return _unpack(io.BytesIO(s), options)
return _unpack(six.BytesIO(s), options)
# For Python 3, expects a bytes object
@ -958,7 +957,7 @@ def _unpackb3(s, **options):
"""
if not isinstance(s, (bytes, bytearray)):
raise TypeError("packed data must be type 'bytes' or 'bytearray'")
return _unpack(io.BytesIO(s), options)
return _unpack(six.BytesIO(s), options)
#############################################################################
# Module Initialization

View File

@ -75,7 +75,7 @@ class Dandelion: # pylint: disable=old-style-class
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
'%s entering fluff mode due to %s.',
''.join('%02x' % ord(i) for i in hashId), reason)
''.join('%02x' % six.byte2int(i) for i in hashId), reason)
with self.lock:
try:
del self.hashMap[hashId]

View File

@ -5,6 +5,7 @@ SOCKS4a proxy module
import logging
import socket
import struct
import six
from proxy import GeneralProxyError, Proxy, ProxyError
@ -39,16 +40,16 @@ class Socks4a(Proxy):
def state_pre_connect(self):
"""Handle feedback from SOCKS4a while it is connecting on our behalf"""
# Get the response
if self.read_buf[0:1] != chr(0x00).encode():
if self.read_buf[0:1] != six.int2byte(0x00).encode():
# bad data
self.close()
raise GeneralProxyError(1)
elif self.read_buf[1:2] != chr(0x5A).encode():
elif self.read_buf[1:2] != six.int2byte(0x5A).encode():
# Connection failed
self.close()
if ord(self.read_buf[1:2]) in (91, 92, 93):
if six.byte2int(self.read_buf[1:2]) in (91, 92, 93):
# socks 4 error
raise Socks4aError(ord(self.read_buf[1:2]) - 90)
raise Socks4aError(six.byte2int(self.read_buf[1:2]) - 90)
else:
raise Socks4aError(4)
# Get the bound address/port
@ -102,9 +103,9 @@ class Socks4aConnection(Socks4a):
self.append_write_buf(self.ipaddr)
if self._auth:
self.append_write_buf(self._auth[0])
self.append_write_buf(chr(0x00).encode())
self.append_write_buf(six.int2byte(0x00).encode())
if rmtrslv:
self.append_write_buf(self.destination[0] + chr(0x00).encode())
self.append_write_buf(self.destination[0] + six.int2byte(0x00).encode())
self.set_state("pre_connect", length=0, expectBytes=8)
return True
@ -132,8 +133,8 @@ class Socks4aResolver(Socks4a):
self.append_write_buf(struct.pack("BBBB", 0x00, 0x00, 0x00, 0x01))
if self._auth:
self.append_write_buf(self._auth[0])
self.append_write_buf(chr(0x00).encode())
self.append_write_buf(self.host + chr(0x00).encode())
self.append_write_buf(six.int2byte(0x00).encode())
self.append_write_buf(self.host + six.int2byte(0x00).encode())
self.set_state("pre_connect", length=0, expectBytes=8)
return True

View File

@ -6,6 +6,7 @@ SOCKS5 proxy module
import logging
import socket
import struct
import six
from node import Peer
from proxy import GeneralProxyError, Proxy, ProxyError
@ -97,20 +98,20 @@ class Socks5(Proxy):
def state_pre_connect(self):
"""Handle feedback from socks5 while it is connecting on our behalf."""
# Get the response
if self.read_buf[0:1] != chr(0x05).encode():
if self.read_buf[0:1] != six.int2byte(0x05).encode():
self.close()
raise GeneralProxyError(1)
elif self.read_buf[1:2] != chr(0x00).encode():
elif self.read_buf[1:2] != six.int2byte(0x00).encode():
# Connection failed
self.close()
if ord(self.read_buf[1:2]) <= 8:
raise Socks5Error(ord(self.read_buf[1:2]))
if six.byte2int(self.read_buf[1:2]) <= 8:
raise Socks5Error(six.byte2int(self.read_buf[1:2]))
else:
raise Socks5Error(9)
# Get the bound address/port
elif self.read_buf[3:4] == chr(0x01).encode():
elif self.read_buf[3:4] == six.int2byte(0x01).encode():
self.set_state("proxy_addr_1", length=4, expectBytes=4)
elif self.read_buf[3:4] == chr(0x03).encode():
elif self.read_buf[3:4] == six.int2byte(0x03).encode():
self.set_state("proxy_addr_2_1", length=4, expectBytes=1)
else:
self.close()
@ -129,7 +130,7 @@ class Socks5(Proxy):
(e.g. IPv6, onion, ...). This is part 1 which retrieves the
length of the data.
"""
self.address_length = ord(self.read_buf[0:1])
self.address_length = six.byte2int(self.read_buf[0:1])
self.set_state(
"proxy_addr_2_2", length=1, expectBytes=self.address_length)
return True
@ -171,19 +172,19 @@ class Socks5Connection(Socks5):
# use the IPv4 address request even if remote resolving was specified.
try:
self.ipaddr = socket.inet_aton(self.destination[0])
self.append_write_buf(chr(0x01).encode() + self.ipaddr)
self.append_write_buf(six.int2byte(0x01).encode() + self.ipaddr)
except socket.error: # may be IPv6!
# Well it's not an IP number, so it's probably a DNS name.
if self._remote_dns:
# Resolve remotely
self.ipaddr = None
self.append_write_buf(chr(0x03).encode() + chr(
self.append_write_buf(six.int2byte(0x03).encode() + six.int2byte(
len(self.destination[0])).encode() + self.destination[0])
else:
# Resolve locally
self.ipaddr = socket.inet_aton(
socket.gethostbyname(self.destination[0]))
self.append_write_buf(chr(0x01).encode() + self.ipaddr)
self.append_write_buf(six.int2byte(0x01).encode() + self.ipaddr)
self.append_write_buf(struct.pack(">H", self.destination[1]))
self.set_state("pre_connect", length=0, expectBytes=4)
return True
@ -208,7 +209,7 @@ class Socks5Resolver(Socks5):
"""Perform resolving"""
# Now we can request the actual connection
self.append_write_buf(struct.pack('BBB', 0x05, 0xF0, 0x00))
self.append_write_buf(chr(0x03).encode() + chr(
self.append_write_buf(six.int2byte(0x03).encode() + six.int2byte(
len(self.host)).encode() + str(self.host))
self.append_write_buf(struct.pack(">H", self.port))
self.set_state("pre_connect", length=0, expectBytes=4)

View File

@ -12,6 +12,7 @@ import sys
import time
from binascii import hexlify
from struct import Struct, pack, unpack
import six
import defaults
import highlevelcrypto
@ -227,7 +228,7 @@ def checkIPv6Address(host, hostStandardFormat, private=False):
logger.debug('Ignoring loopback address: %s', hostStandardFormat)
return False
try:
host = [ord(c) for c in host[:2]]
host = [six.byte2int(c) for c in host[:2]]
except TypeError: # python3 has ints already
pass
if host[0] == 0xfe and host[1] & 0xc0 == 0x80:

View File

@ -3,6 +3,7 @@ Arithmetic Expressions
"""
import hashlib
import re
import six
P = 2**256 - 2**32 - 2**9 - 2**8 - 2**7 - 2**6 - 2**4 - 1
A = 0
@ -34,7 +35,7 @@ def get_code_string(base):
return b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
if base == 256:
try:
return b''.join([chr(x) for x in range(256)])
return b''.join([six.int2byte(x) for x in range(256)])
except TypeError:
return bytes([x for x in range(256)])

View File

@ -4,6 +4,8 @@ Wrappers for hash functions from OpenSSL.
# Copyright (C) 2011 Yann GUIBET <yannguibet@gmail.com>
# See LICENSE for details.
import six
from .openssl import OpenSSL
@ -22,7 +24,7 @@ def _equals_str(a, b):
return False
result = 0
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
result |= six.byte2int(x) ^ six.byte2int(y)
return result == 0

View File

@ -692,7 +692,7 @@ class _OpenSSL(object):
length = self.BN_num_bytes(x)
data = self.malloc(0, length)
OpenSSL.BN_bn2bin(x, data)
return ord(data[length - 1]) & 1
return six.byte2int(data[length - 1]) & 1
def get_cipher(self, name):
"""

View File

@ -3,6 +3,7 @@ Tests for RandomTrackingDict Class
"""
import random
import unittest
import six
from time import time
@ -17,7 +18,7 @@ class TestRandomTrackingDict(unittest.TestCase):
"""helper function for tests, generates a random string"""
retval = ''
for _ in range(32):
retval += chr(random.randint(0, 255))
retval += six.int2byte(random.randint(0, 255))
return retval
def test_check_randomtrackingdict(self):