Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/source/differences-to-vws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ Model Target datasets

The Model Target Web API mock supports OAuth2 token requests, standard and advanced dataset creation, status polling, dataset downloads, and deletion.
The generated dataset download is a small valid zip file containing request metadata, not a real Vuforia Engine Model Target dataset.
Model Target API routes accept any non-empty bearer token.
Model Target API routes require a syntactically JSON Web Token-shaped bearer token, such as the token returned by the mock OAuth2 route.
The mock does not verify token signatures, claims, expiry, or revocation.

Header cases
------------
Expand Down
1 change: 1 addition & 0 deletions newsfragments/3192.change
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve Model Target Web API mock authentication failure responses.
2 changes: 1 addition & 1 deletion src/mock_vws/_flask_server/vws.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _flask_request_data() -> RequestData:
method=request.method,
path=request.path,
headers=dict(request.headers),
body=request.data,
body=request.get_data(parse_form_data=False),
)


Expand Down
125 changes: 104 additions & 21 deletions src/mock_vws/_model_target_web_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

_ResponseType = tuple[int, dict[str, str], str | bytes]
_MAX_ADVANCED_MODEL_COUNT = 20
_JWT_DOT_COUNT = 2
_MOCK_MODEL_TARGET_CLIENT_ID = "client-id"
_MOCK_MODEL_TARGET_CLIENT_SECRET = "client-secret" # noqa: S105


@beartype
Expand Down Expand Up @@ -57,6 +60,16 @@ def _error_response(
)


@beartype
def _oauth2_error_response(
*,
status_code: HTTPStatus,
body: dict[str, str],
) -> _ResponseType:
"""Return an OAuth2 error response."""
return _json_response(status_code=status_code, body=body)


@beartype
def _get_header(request: RequestData, name: str) -> str | None:
"""Return a request header, case-insensitively."""
Expand All @@ -67,6 +80,28 @@ def _get_header(request: RequestData, name: str) -> str | None:
return None


@beartype
def _basic_auth_credentials(auth_header: str | None) -> tuple[str, str] | None:
"""Return HTTP Basic credentials from an authorization header."""
if auth_header is None or not auth_header.startswith("Basic "):
return None

encoded_credentials = auth_header.removeprefix("Basic ").strip()
try:
decoded_credentials = base64.b64decode(
s=encoded_credentials,
validate=True,
).decode(encoding="utf-8")
except ValueError:
return None

client_id, separator, client_secret = decoded_credentials.partition(":")
if not separator:
return None

return client_id, client_secret


@beartype
def _require_bearer_token(request: RequestData) -> _ResponseType | None:
"""Return an error response if the request has no bearer token."""
Expand All @@ -78,47 +113,95 @@ def _require_bearer_token(request: RequestData) -> _ResponseType | None:
message="no Bearer token",
target="jwt",
)
if not auth_header.removeprefix("Bearer ").strip():
bearer_token = auth_header.removeprefix("Bearer ").strip()
if not bearer_token:
return _error_response(
status_code=HTTPStatus.UNAUTHORIZED,
code="401",
message="no Bearer token",
target="jwt",
)
if bearer_token.count(".") != _JWT_DOT_COUNT:
return _error_response(
status_code=HTTPStatus.UNAUTHORIZED,
code="401",
message="invalid Bearer token",
message="Invalid JWT serialization: Missing dot delimiter(s)",
target="jwt",
)
return None


@beartype
def _fake_jwt(*, token_source: bytes) -> str:
"""Return a deterministic bearer token for the mock."""

def encode_part(value: dict[str, Any]) -> str:
"""Return a base64url-encoded token part."""
raw_part = json.dumps(
obj=value,
sort_keys=True,
separators=(",", ":"),
).encode(encoding="utf-8")
return (
base64.urlsafe_b64encode(s=raw_part)
.decode(
encoding="ascii",
)
.rstrip("=")
)

header = encode_part(value={"alg": "mock", "typ": "JWT"})
payload = encode_part(
value={
"aud": "vuforia-model-target",
"src": base64.urlsafe_b64encode(s=token_source)
.decode(
encoding="ascii",
)
.rstrip("="),
},
)
return f"{header}.{payload}.mock-signature"


@beartype
def oauth2_token(request: RequestData) -> _ResponseType:
"""Return a fake OAuth2 access token."""
auth_header = _get_header(request=request, name="Authorization")
form = parse_qs(qs=request.body.decode(encoding="utf-8"))
grant_type = form.get("grant_type", [""])[0]
has_basic_auth = auth_header is not None and auth_header.startswith(
"Basic ",
)
has_password_credentials = all(
form.get(field, [""])[0] for field in ("username", "password")
)
if grant_type not in {"", "client_credentials", "password"} or (
not has_basic_auth and not has_password_credentials
):
return _error_response(
grant_type = form.get("grant_type", ["client_credentials"])[0]
if grant_type != "client_credentials":
return _oauth2_error_response(
status_code=HTTPStatus.BAD_REQUEST,
code="BAD_REQUEST",
message="Invalid OAuth2 token request.",
target="grant_type",
body={"error": "unsupported_grant_type"},
)

basic_credentials = _basic_auth_credentials(auth_header=auth_header)
if basic_credentials is None:
return _oauth2_error_response(
status_code=HTTPStatus.UNAUTHORIZED,
body={
"error": "invalid_request",
"error_description": (
"Missing or invalid authorization header"
),
},
)

if basic_credentials != (
_MOCK_MODEL_TARGET_CLIENT_ID,
_MOCK_MODEL_TARGET_CLIENT_SECRET,
):
return _oauth2_error_response(
status_code=HTTPStatus.UNAUTHORIZED,
body={"error": "invalid_client"},
)

token_source = request.body or (auth_header or "").encode()
access_token = base64.urlsafe_b64encode(s=token_source).decode(
encoding="ascii",
)
access_token = access_token.rstrip("=") or "mock-vuforia-access-token"
return _json_response(
status_code=HTTPStatus.OK,
body={
"access_token": access_token,
"access_token": _fake_jwt(token_source=token_source),
"token_type": "bearer",
"expires_in": 3600,
},
Expand Down
2 changes: 1 addition & 1 deletion tests/mock_vws/test_flask_app_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class TestProcessingTime:

# There is a race condition in this test type - if tests start to
# fail, consider increasing the leeway.
LEEWAY = 0.5
LEEWAY = 1.0

def test_default(
self,
Expand Down
Loading
Loading