Add ssltest.py for developers/debugging
This commit is contained in:
parent
fa2f87743e
commit
ff593273bf
83
dev/ssltest.py
Normal file
83
dev/ssltest.py
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
import select
|
||||||
|
import socket
|
||||||
|
import ssl
|
||||||
|
import sys
|
||||||
|
|
||||||
|
HOST = "127.0.0.1"
|
||||||
|
PORT = 8912
|
||||||
|
|
||||||
|
def sslProtocolVersion():
|
||||||
|
# sslProtocolVersion
|
||||||
|
if sys.version_info >= (2,7,13):
|
||||||
|
# this means TLSv1 or higher
|
||||||
|
# in the future change to
|
||||||
|
# ssl.PROTOCOL_TLS1.2
|
||||||
|
return ssl.PROTOCOL_TLS
|
||||||
|
elif sys.version_info >= (2,7,9):
|
||||||
|
# this means any SSL/TLS. SSLv2 and 3 are excluded with an option after context is created
|
||||||
|
return ssl.PROTOCOL_SSLv23
|
||||||
|
else:
|
||||||
|
# this means TLSv1, there is no way to set "TLSv1 or higher" or
|
||||||
|
# "TLSv1.2" in < 2.7.9
|
||||||
|
return ssl.PROTOCOL_TLSv1
|
||||||
|
|
||||||
|
def connect():
|
||||||
|
sock = socket.create_connection((HOST, PORT))
|
||||||
|
return sock
|
||||||
|
|
||||||
|
def listen():
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||||
|
sock.bind((HOST, PORT))
|
||||||
|
sock.listen(0)
|
||||||
|
return sock
|
||||||
|
|
||||||
|
def sslHandshake(sock, server=False):
|
||||||
|
if sys.version_info >= (2,7,9):
|
||||||
|
context = ssl.SSLContext(sslProtocolVersion())
|
||||||
|
context.set_ciphers("AECDH-AES256-SHA")
|
||||||
|
context.set_ecdh_curve("secp256k1")
|
||||||
|
context.check_hostname = False
|
||||||
|
context.verify_mode = ssl.CERT_NONE
|
||||||
|
context.options = ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_SINGLE_ECDH_USE | ssl.OP_CIPHER_SERVER_PREFERENCE
|
||||||
|
sslSock = context.wrap_socket(sock, server_side = server, do_handshake_on_connect=False)
|
||||||
|
else:
|
||||||
|
sslSock = ssl.wrap_socket(self.sock, keyfile = os.path.join('src', 'sslkeys', 'key.pem'), certfile = os.path.join('src', 'sslkeys', 'cert.pem'), server_side = server, ssl_version=sslProtocolVersion(), do_handshake_on_connect=False, ciphers='AECDH-AES256-SHA')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
sslSock.do_handshake()
|
||||||
|
break
|
||||||
|
except ssl.SSLWantReadError:
|
||||||
|
print "Waiting for SSL socket handhake read"
|
||||||
|
select.select([self.sslSock], [], [], 10)
|
||||||
|
except ssl.SSLWantWriteError:
|
||||||
|
print "Waiting for SSL socket handhake write"
|
||||||
|
select.select([], [self.sslSock], [], 10)
|
||||||
|
except Exception as e:
|
||||||
|
print "SSL socket handhake failed, shutting down connection: %s" % (e)
|
||||||
|
return
|
||||||
|
print "Success!"
|
||||||
|
return sslSock
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if len(sys.argv) != 2:
|
||||||
|
print "Usage: ssltest.py client|server"
|
||||||
|
sys.exit(0)
|
||||||
|
elif sys.argv[1] == "server":
|
||||||
|
serversock = listen()
|
||||||
|
while True:
|
||||||
|
print "Waiting for connection"
|
||||||
|
sock, addr = serversock.accept()
|
||||||
|
print "Got connection from %s:%i" % (addr[0], addr[1])
|
||||||
|
sslSock = sslHandshake(sock, True)
|
||||||
|
sslSock.shutdown(socket.SHUT_RDWR)
|
||||||
|
sslSock.close()
|
||||||
|
elif sys.argv[1] == "client":
|
||||||
|
sock = connect()
|
||||||
|
sslSock = sslHandshake(sock, False)
|
||||||
|
sslSock.shutdown(socket.SHUT_RDWR)
|
||||||
|
sslSock.close()
|
||||||
|
else:
|
||||||
|
print "Usage: ssltest.py client|server"
|
||||||
|
sys.exit(0)
|
Reference in New Issue
Block a user