|
|
|
@ -1,3 +1,4 @@ |
|
|
|
import base64 |
|
|
|
import os |
|
|
|
import email |
|
|
|
import urllib.parse |
|
|
|
@ -197,6 +198,50 @@ class DigestAuthHandler: |
|
|
|
return self._return_auth_challenge(request_handler) |
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
class BasicAuthHandler(http.server.BaseHTTPRequestHandler): |
|
|
|
"""Handler for performing basic authentication.""" |
|
|
|
# Server side values |
|
|
|
USER = 'testUser' |
|
|
|
PASSWD = 'testPass' |
|
|
|
REALM = 'Test' |
|
|
|
USER_PASSWD = "%s:%s" % (USER, PASSWD) |
|
|
|
ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') |
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
|
http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) |
|
|
|
|
|
|
|
def log_message(self, format, *args): |
|
|
|
# Suppress console log message |
|
|
|
pass |
|
|
|
|
|
|
|
def do_HEAD(self): |
|
|
|
self.send_response(200) |
|
|
|
self.send_header("Content-type", "text/html") |
|
|
|
self.end_headers() |
|
|
|
|
|
|
|
def do_AUTHHEAD(self): |
|
|
|
self.send_response(401) |
|
|
|
self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) |
|
|
|
self.send_header("Content-type", "text/html") |
|
|
|
self.end_headers() |
|
|
|
|
|
|
|
def do_GET(self): |
|
|
|
if not self.headers.get("Authorization", ""): |
|
|
|
self.do_AUTHHEAD() |
|
|
|
self.wfile.write(b"No Auth header received") |
|
|
|
elif self.headers.get( |
|
|
|
"Authorization", "") == "Basic " + self.ENCODED_AUTH: |
|
|
|
self.send_response(200) |
|
|
|
self.end_headers() |
|
|
|
self.wfile.write(b"It works") |
|
|
|
else: |
|
|
|
# Request Unauthorized |
|
|
|
self.do_AUTHHEAD() |
|
|
|
self.wfile.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Proxy test infrastructure |
|
|
|
|
|
|
|
class FakeProxyHandler(http.server.BaseHTTPRequestHandler): |
|
|
|
@ -232,6 +277,43 @@ class FakeProxyHandler(http.server.BaseHTTPRequestHandler): |
|
|
|
|
|
|
|
# Test cases |
|
|
|
|
|
|
|
@unittest.skipUnless(threading, "Threading required for this test.") |
|
|
|
class BasicAuthTests(unittest.TestCase): |
|
|
|
USER = "testUser" |
|
|
|
PASSWD = "testPass" |
|
|
|
INCORRECT_PASSWD = "Incorrect" |
|
|
|
REALM = "Test" |
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
super(BasicAuthTests, self).setUp() |
|
|
|
# With Basic Authentication |
|
|
|
def http_server_with_basic_auth_handler(*args, **kwargs): |
|
|
|
return BasicAuthHandler(*args, **kwargs) |
|
|
|
self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) |
|
|
|
self.server_url = 'http://127.0.0.1:%s' % self.server.port |
|
|
|
self.server.start() |
|
|
|
self.server.ready.wait() |
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
self.server.stop() |
|
|
|
super(BasicAuthTests, self).tearDown() |
|
|
|
|
|
|
|
def test_basic_auth_success(self): |
|
|
|
ah = urllib.request.HTTPBasicAuthHandler() |
|
|
|
ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) |
|
|
|
urllib.request.install_opener(urllib.request.build_opener(ah)) |
|
|
|
try: |
|
|
|
self.assertTrue(urllib.request.urlopen(self.server_url)) |
|
|
|
except urllib.error.HTTPError: |
|
|
|
self.fail("Basic auth failed for the url: %s", self.server_url) |
|
|
|
|
|
|
|
def test_basic_auth_httperror(self): |
|
|
|
ah = urllib.request.HTTPBasicAuthHandler() |
|
|
|
ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) |
|
|
|
urllib.request.install_opener(urllib.request.build_opener(ah)) |
|
|
|
self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) |
|
|
|
|
|
|
|
|
|
|
|
@unittest.skipUnless(threading, "Threading required for this test.") |
|
|
|
class ProxyAuthTests(unittest.TestCase): |
|
|
|
URL = "http://localhost" |
|
|
|
@ -245,6 +327,7 @@ class ProxyAuthTests(unittest.TestCase): |
|
|
|
self.digest_auth_handler = DigestAuthHandler() |
|
|
|
self.digest_auth_handler.set_users({self.USER: self.PASSWD}) |
|
|
|
self.digest_auth_handler.set_realm(self.REALM) |
|
|
|
# With Digest Authentication. |
|
|
|
def create_fake_proxy_handler(*args, **kwargs): |
|
|
|
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) |
|
|
|
|
|
|
|
|