Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions msal/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ def _main():
logging.error("Invalid input: %s", e)
except KeyboardInterrupt: # Useful for bailing out a stuck interactive flow
print("Aborted")
except Exception as e:
logging.error("Error: %s", e)

if __name__ == "__main__":
_main()
Expand Down
34 changes: 22 additions & 12 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,9 @@ def initiate_auth_code_flow(
):
"""Initiate an auth code flow.

.. deprecated::
The response_mode parameter is deprecated and will be removed in a future version.

Later when the response reaches your redirect_uri,
you can use :func:`~acquire_token_by_auth_code_flow()`
to complete the authentication/authorization.
Expand Down Expand Up @@ -960,18 +963,16 @@ def initiate_auth_code_flow(
New in version 1.15.

:param str response_mode:
OPTIONAL. Specifies the method with which response parameters should be returned.
The default value is equivalent to ``query``, which is still secure enough in MSAL Python
(because MSAL Python does not transfer tokens via query parameter in the first place).
For even better security, we recommend using the value ``form_post``.
In "form_post" mode, response parameters
will be encoded as HTML form values that are transmitted via the HTTP POST method and
encoded in the body using the application/x-www-form-urlencoded format.
Valid values can be either "form_post" for HTTP POST to callback URI or
"query" (the default) for HTTP GET with parameters encoded in query string.
More information on possible values
`here <https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#ResponseModes>`
and `here <https://openid.net/specs/oauth-v2-form-post-response-mode-1_0.html#FormPostResponseMode>`
.. deprecated::
This parameter is deprecated and will be removed in a future version.

**For PublicClientApplication**: response_mode is automatically set to
``form_post`` for security reasons. This parameter is ignored.

**For ConfidentialClientApplication**: You should configure your web
framework to accept form_post responses instead of query responses.
While this parameter still works, it will be removed in a future version.
Using query-based response modes is less secure and should be avoided.

:return:
The auth code flow. It is a dict in this form::
Expand All @@ -991,6 +992,15 @@ def initiate_auth_code_flow(
3. and then relay this dict and subsequent auth response to
:func:`~acquire_token_by_auth_code_flow()`.
"""
if response_mode is not None:
import warnings
warnings.warn(
"The 'response_mode' parameter is deprecated and will be removed in a future version. "
"For public clients, response_mode is automatically set to 'form_post'. "
"For confidential clients, configure your web framework to use 'form_post'. "
"Query-based response modes are less secure.",
DeprecationWarning,
stacklevel=2)
client = _ClientWithCcsRoutingInfo(
{"authorization_endpoint": self.authority.authorization_endpoint},
self.client_id,
Expand Down
93 changes: 74 additions & 19 deletions msal/oauth2cli/authcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,27 +111,70 @@ class _AuthCodeHandler(BaseHTTPRequestHandler):
def do_GET(self):
# For flexibility, we choose to not check self.path matching redirect_uri
#assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')

qs = parse_qs(urlparse(self.path).query)
if qs.get('code') or qs.get("error"): # So, it is an auth response
auth_response = _qs2kv(qs)
logger.debug("Got auth response: %s", auth_response)
if self.server.auth_state and self.server.auth_state != auth_response.get("state"):
# OAuth2 successful and error responses contain state when it was used
# https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1
self._send_full_response("State mismatch") # Possibly an attack
else:
template = (self.server.success_template
if "code" in qs else self.server.error_template)
if _is_html(template.template):
safe_data = _escape(auth_response) # Foiling an XSS attack
else:
safe_data = auth_response
self._send_full_response(template.safe_substitute(**safe_data))
self.server.auth_response = auth_response # Set it now, after the response is likely sent
if qs.get('code') or qs.get('error'):
# GET request with auth code or error - reject for security (form_post only)
self._send_full_response(
"GET method is not supported for authentication responses. "
"This application requires form_post response mode.",
is_ok=False)
elif not qs:
# Blank redirect from eSTS error - show generic error and mark done
self._send_full_response(
"Authentication could not be completed. "
"You can close this window and return to the application.")
self.server.done = True
else:
# Other GET requests - show welcome page
self._send_full_response(self.server.welcome_page)
# NOTE: Don't do self.server.shutdown() here. It'll halt the server.

def do_POST(self):
# Handle form_post response mode where auth code is sent via POST body
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')

qs = parse_qs(post_data)
if qs.get('code') or qs.get('error'): # So, it is an auth response
auth_response = _qs2kv(qs)
logger.debug("Got auth response via POST: %s", auth_response)
self._process_auth_response(auth_response)
else:
self._send_full_response("Invalid POST request", is_ok=False)
# NOTE: Don't do self.server.shutdown() here. It'll halt the server.

def _process_auth_response(self, auth_response):
"""Process the auth response from either GET or POST request."""
if self.server.auth_state and self.server.auth_state != auth_response.get("state"):
# OAuth2 successful and error responses contain state when it was used
# https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2.1
self._send_full_response("State mismatch") # Possibly an attack
# Don't set auth_response for security, but mark as done to avoid hanging
self.server.done = True
else:
template = (self.server.success_template
if "code" in auth_response else self.server.error_template)
if _is_html(template.template):
safe_data = _escape(auth_response) # Foiling an XSS attack
else:
safe_data = dict(auth_response) # Make a copy to avoid mutating original
# Provide default values for common OAuth2 response fields
# to avoid showing literal placeholder text like "$error_description"
safe_data.setdefault("error", "")
safe_data.setdefault("error_description", "")
# Format error message nicely: include ": description." only if description exists
if "code" not in auth_response: # This is an error response
error_desc = auth_response.get("error_description", "").strip()
if error_desc:
safe_data["error_message"] = f"{safe_data['error']}: {error_desc}."
else:
safe_data["error_message"] = safe_data["error"]
else:
safe_data["error_message"] = ""
self._send_full_response(template.safe_substitute(**safe_data))
self.server.auth_response = auth_response # Set it now, after the response is likely sent

def _send_full_response(self, body, is_ok=True):
self.send_response(200 if is_ok else 400)
content_type = 'text/html' if _is_html(body) else 'text/plain'
Expand Down Expand Up @@ -287,6 +330,15 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None,
welcome_uri = "http://localhost:{p}".format(p=self.get_port())
abort_uri = "{loc}?error=abort".format(loc=welcome_uri)
logger.debug("Abort by visit %s", abort_uri)

# Enforce response_mode=form_post for security
if auth_uri:
parsed = urlparse(auth_uri)
params = parse_qs(parsed.query)
params['response_mode'] = ['form_post'] # Enforce form_post
new_query = urlencode(params, doseq=True)
auth_uri = parsed._replace(query=new_query).geturl()

self._server.welcome_page = Template(welcome_template or "").safe_substitute(
auth_uri=auth_uri, abort_uri=abort_uri)
if auth_uri: # Now attempt to open a local browser to visit it
Expand Down Expand Up @@ -317,19 +369,22 @@ def _get_auth_response(self, result, auth_uri=None, timeout=None, state=None,
auth_uri_callback(_uri)

self._server.success_template = Template(success_template or
"Authentication completed. You can close this window now.")
"Authentication complete. You can return to the application. Please close this browser tab.\n\n"
"For your security: Do not share the contents of this page, the address bar, or take screenshots.")
self._server.error_template = Template(error_template or
"Authentication failed. $error: $error_description. ($error_uri)")
"Authentication failed. $error_message\n\n"
"For your security: Do not share the contents of this page, the address bar, or take screenshots.")

self._server.timeout = timeout # Otherwise its handle_timeout() won't work
self._server.auth_response = {} # Shared with _AuthCodeHandler
self._server.auth_state = state # So handler will check it before sending response
self._server.done = False # Flag to indicate completion without setting auth_response
while not self._closing: # Otherwise, the handle_request() attempt
# would yield noisy ValueError trace
# Derived from
# https://docs.python.org/2/library/basehttpserver.html#more-examples
self._server.handle_request()
if self._server.auth_response:
if self._server.auth_response or self._server.done:
break
result.update(self._server.auth_response) # Return via writable result param

Expand Down
17 changes: 16 additions & 1 deletion msal/oauth2cli/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,22 @@ def _build_auth_request_params(self, response_type, **kwargs):
response_type = self._stringify(response_type)

params = {'client_id': self.client_id, 'response_type': response_type}
params.update(kwargs) # Note: None values will override params
params.update(kwargs)
if 'response_mode' in params:
import warnings
warnings.warn(
"The 'response_mode' parameter is deprecated and will be removed in a future version. "
"For public clients, response_mode is automatically set to 'form_post'. "
"For confidential clients, configure your web framework to use 'form_post'. "
"Query-based response modes are less secure.",
DeprecationWarning,
stacklevel=3)
if params['response_mode'] != 'form_post':
warnings.warn(
"response_mode='form_post' is recommended for better security. "
"See https://tools.ietf.org/html/rfc6749#section-4.1.2",
UserWarning,
stacklevel=3)
params = {k: v for k, v in params.items() if v is not None} # clean up
if params.get('scope'):
params['scope'] = self._stringify(params['scope'])
Expand Down
148 changes: 145 additions & 3 deletions tests/test_authcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,159 @@ def test_no_two_concurrent_receivers_can_listen_on_same_port(self):
pass

def test_template_should_escape_input(self):
"""Test that POST request with HTML in error is properly escaped"""
with AuthCodeReceiver() as receiver:
receiver._scheduled_actions = [( # Injection happens here when the port is known
1, # Delay it until the receiver is activated by get_auth_response()
lambda: self.assertEqual(
"<html>&lt;tag&gt;foo&lt;/tag&gt;</html>",
requests.get("http://localhost:{}?error=<tag>foo</tag>".format(
receiver.get_port())).text,
"Unsafe data in HTML should be escaped",
requests.post(
"http://localhost:{}".format(receiver.get_port()),
data={"error": "<tag>foo</tag>"},
headers={'Content-Type': 'application/x-www-form-urlencoded'}
).text,
))]
receiver.get_auth_response( # Starts server and hang until timeout
timeout=3,
error_template="<html>$error</html>",
)

def test_get_request_with_auth_code_is_rejected(self):
"""Test that GET request with auth code is rejected for security"""
with AuthCodeReceiver() as receiver:
test_code = "test_auth_code_12345"
test_state = "test_state_67890"
receiver._scheduled_actions = [(
1,
lambda: self._verify_get_rejection(
receiver.get_port(),
code=test_code,
state=test_state
)
)]
result = receiver.get_auth_response(
timeout=3,
state=test_state,
)
# Should not receive auth response via GET
self.assertIsNone(result)

def _verify_get_rejection(self, port, **params):
"""Helper to verify GET requests with auth codes are rejected"""
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode

response = requests.get(
"http://localhost:{}?{}".format(port, urlencode(params))
)
# Verify error message about unsupported method
self.assertIn("not supported", response.text.lower())
self.assertEqual(response.status_code, 400)

def test_post_request_with_auth_code(self):
"""Test that POST request with auth code is handled correctly (form_post response mode)"""
with AuthCodeReceiver() as receiver:
test_code = "test_auth_code_12345"
test_state = "test_state_67890"
receiver._scheduled_actions = [(
1,
lambda: self._send_post_auth_response(
receiver.get_port(),
code=test_code,
state=test_state
)
)]
result = receiver.get_auth_response(
timeout=3,
state=test_state,
success_template="Success: Got code $code",
)
self.assertIsNotNone(result)
self.assertEqual(result.get("code"), test_code)
self.assertEqual(result.get("state"), test_state)

def test_post_request_with_error(self):
"""Test that POST request with error is handled correctly"""
with AuthCodeReceiver() as receiver:
test_error = "access_denied"
test_error_description = "User denied access"
receiver._scheduled_actions = [(
1,
lambda: self._send_post_auth_response(
receiver.get_port(),
error=test_error,
error_description=test_error_description
)
)]
result = receiver.get_auth_response(
timeout=3,
error_template="Error: $error - $error_description",
)
self.assertIsNotNone(result)
self.assertEqual(result.get("error"), test_error)
self.assertEqual(result.get("error_description"), test_error_description)

def test_post_request_state_mismatch(self):
"""Test that POST request with mismatched state is rejected"""
with AuthCodeReceiver() as receiver:
test_code = "test_auth_code_12345"
wrong_state = "wrong_state"
expected_state = "expected_state"
receiver._scheduled_actions = [(
1,
lambda: self._send_post_auth_response(
receiver.get_port(),
code=test_code,
state=wrong_state
)
)]
result = receiver.get_auth_response(
timeout=3,
state=expected_state,
)
# When state mismatches, the response should not be set
self.assertIsNone(result)

def test_post_request_escapes_html(self):
"""Test that POST request with HTML in error is properly escaped"""
with AuthCodeReceiver() as receiver:
malicious_error = "<script>alert('xss')</script>"
receiver._scheduled_actions = [(
1,
lambda: self._verify_post_escaping(receiver.get_port(), malicious_error)
)]
receiver.get_auth_response(
timeout=3,
error_template="<html>$error</html>",
)

def _send_post_auth_response(self, port, **params):
"""Helper to send POST request with auth response"""
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode

response = requests.post(
"http://localhost:{}".format(port),
data=params,
headers={'Content-Type': 'application/x-www-form-urlencoded'}
)
return response

def _verify_post_escaping(self, port, malicious_error):
"""Helper to verify HTML escaping in POST requests"""
response = self._send_post_auth_response(port, error=malicious_error)
# Verify that the malicious script is escaped
self.assertIn("&lt;script&gt;", response.text)
self.assertNotIn("<script>", response.text)
# Note: The escape function also escapes single quotes to &#x27;
expected = "<html>&lt;script&gt;alert(&#x27;xss&#x27;)&lt;/script&gt;</html>"
self.assertEqual(
expected,
response.text,
"HTML in POST data should be escaped to prevent XSS"
)

Loading
Loading