|
|
|
@ -208,20 +208,21 @@ class BasicSocketTests(unittest.TestCase): |
|
|
|
s = socket.socket(socket.AF_INET) |
|
|
|
ss = ssl.wrap_socket(s) |
|
|
|
wr = weakref.ref(ss) |
|
|
|
del ss |
|
|
|
self.assertEqual(wr(), None) |
|
|
|
with support.check_warnings(("", ResourceWarning)): |
|
|
|
del ss |
|
|
|
self.assertEqual(wr(), None) |
|
|
|
|
|
|
|
def test_wrapped_unconnected(self): |
|
|
|
# Methods on an unconnected SSLSocket propagate the original |
|
|
|
# socket.error raise by the underlying socket object. |
|
|
|
s = socket.socket(socket.AF_INET) |
|
|
|
ss = ssl.wrap_socket(s) |
|
|
|
self.assertRaises(socket.error, ss.recv, 1) |
|
|
|
self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) |
|
|
|
self.assertRaises(socket.error, ss.recvfrom, 1) |
|
|
|
self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) |
|
|
|
self.assertRaises(socket.error, ss.send, b'x') |
|
|
|
self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) |
|
|
|
with ssl.wrap_socket(s) as ss: |
|
|
|
self.assertRaises(socket.error, ss.recv, 1) |
|
|
|
self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) |
|
|
|
self.assertRaises(socket.error, ss.recvfrom, 1) |
|
|
|
self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) |
|
|
|
self.assertRaises(socket.error, ss.send, b'x') |
|
|
|
self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) |
|
|
|
|
|
|
|
def test_timeout(self): |
|
|
|
# Issue #8524: when creating an SSL socket, the timeout of the |
|
|
|
@ -229,8 +230,8 @@ class BasicSocketTests(unittest.TestCase): |
|
|
|
for timeout in (None, 0.0, 5.0): |
|
|
|
s = socket.socket(socket.AF_INET) |
|
|
|
s.settimeout(timeout) |
|
|
|
ss = ssl.wrap_socket(s) |
|
|
|
self.assertEqual(timeout, ss.gettimeout()) |
|
|
|
with ssl.wrap_socket(s) as ss: |
|
|
|
self.assertEqual(timeout, ss.gettimeout()) |
|
|
|
|
|
|
|
def test_errors(self): |
|
|
|
sock = socket.socket() |
|
|
|
@ -243,9 +244,9 @@ class BasicSocketTests(unittest.TestCase): |
|
|
|
self.assertRaisesRegex(ValueError, |
|
|
|
"certfile must be specified for server-side operations", |
|
|
|
ssl.wrap_socket, sock, server_side=True, certfile="") |
|
|
|
s = ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) |
|
|
|
self.assertRaisesRegex(ValueError, "can't connect in server-side mode", |
|
|
|
s.connect, (HOST, 8080)) |
|
|
|
with ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE) as s: |
|
|
|
self.assertRaisesRegex(ValueError, "can't connect in server-side mode", |
|
|
|
s.connect, (HOST, 8080)) |
|
|
|
with self.assertRaises(IOError) as cm: |
|
|
|
with socket.socket() as sock: |
|
|
|
ssl.wrap_socket(sock, certfile=WRONGCERT) |
|
|
|
@ -358,21 +359,21 @@ class BasicSocketTests(unittest.TestCase): |
|
|
|
def test_unknown_channel_binding(self): |
|
|
|
# should raise ValueError for unknown type |
|
|
|
s = socket.socket(socket.AF_INET) |
|
|
|
ss = ssl.wrap_socket(s) |
|
|
|
with self.assertRaises(ValueError): |
|
|
|
ss.get_channel_binding("unknown-type") |
|
|
|
with ssl.wrap_socket(s) as ss: |
|
|
|
with self.assertRaises(ValueError): |
|
|
|
ss.get_channel_binding("unknown-type") |
|
|
|
|
|
|
|
@unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, |
|
|
|
"'tls-unique' channel binding not available") |
|
|
|
def test_tls_unique_channel_binding(self): |
|
|
|
# unconnected should return None for known type |
|
|
|
s = socket.socket(socket.AF_INET) |
|
|
|
ss = ssl.wrap_socket(s) |
|
|
|
self.assertIsNone(ss.get_channel_binding("tls-unique")) |
|
|
|
with ssl.wrap_socket(s) as ss: |
|
|
|
self.assertIsNone(ss.get_channel_binding("tls-unique")) |
|
|
|
# the same for server-side |
|
|
|
s = socket.socket(socket.AF_INET) |
|
|
|
ss = ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) |
|
|
|
self.assertIsNone(ss.get_channel_binding("tls-unique")) |
|
|
|
with ssl.wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: |
|
|
|
self.assertIsNone(ss.get_channel_binding("tls-unique")) |
|
|
|
|
|
|
|
def test_dealloc_warn(self): |
|
|
|
ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) |
|
|
|
@ -623,10 +624,10 @@ class SSLErrorTests(unittest.TestCase): |
|
|
|
with socket.socket() as s: |
|
|
|
s.bind(("127.0.0.1", 0)) |
|
|
|
s.listen(5) |
|
|
|
with socket.socket() as c: |
|
|
|
c.connect(s.getsockname()) |
|
|
|
c.setblocking(False) |
|
|
|
c = ctx.wrap_socket(c, False, do_handshake_on_connect=False) |
|
|
|
c = socket.socket() |
|
|
|
c.connect(s.getsockname()) |
|
|
|
c.setblocking(False) |
|
|
|
with ctx.wrap_socket(c, False, do_handshake_on_connect=False) as c: |
|
|
|
with self.assertRaises(ssl.SSLWantReadError) as cm: |
|
|
|
c.do_handshake() |
|
|
|
s = str(cm.exception) |
|
|
|
@ -867,12 +868,12 @@ class NetworkedTests(unittest.TestCase): |
|
|
|
def test_ciphers(self): |
|
|
|
remote = ("svn.python.org", 443) |
|
|
|
with support.transient_internet(remote[0]): |
|
|
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
|
|
cert_reqs=ssl.CERT_NONE, ciphers="ALL") |
|
|
|
s.connect(remote) |
|
|
|
s = ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
|
|
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") |
|
|
|
s.connect(remote) |
|
|
|
with ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
|
|
cert_reqs=ssl.CERT_NONE, ciphers="ALL") as s: |
|
|
|
s.connect(remote) |
|
|
|
with ssl.wrap_socket(socket.socket(socket.AF_INET), |
|
|
|
cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT") as s: |
|
|
|
s.connect(remote) |
|
|
|
# Error checking can happen at instantiation or when connecting |
|
|
|
with self.assertRaisesRegex(ssl.SSLError, "No cipher can be selected"): |
|
|
|
with socket.socket(socket.AF_INET) as sock: |
|
|
|
@ -1847,6 +1848,8 @@ else: |
|
|
|
client_addr = client.getsockname() |
|
|
|
client.close() |
|
|
|
t.join() |
|
|
|
remote.close() |
|
|
|
server.close() |
|
|
|
# Sanity checks. |
|
|
|
self.assertIsInstance(remote, ssl.SSLSocket) |
|
|
|
self.assertEqual(peer, client_addr) |
|
|
|
@ -1861,8 +1864,7 @@ else: |
|
|
|
with ThreadedEchoServer(CERTFILE, |
|
|
|
ssl_version=ssl.PROTOCOL_SSLv23, |
|
|
|
chatty=False) as server: |
|
|
|
with socket.socket() as sock: |
|
|
|
s = context.wrap_socket(sock) |
|
|
|
with context.wrap_socket(socket.socket()) as s: |
|
|
|
with self.assertRaises((OSError, ssl.SSLError)): |
|
|
|
s.connect((HOST, server.port)) |
|
|
|
self.assertIn("no shared cipher", str(server.conn_errors[0])) |
|
|
|
|