1616
1717_ResponseType = tuple [int , dict [str , str ], str | bytes ]
1818_MAX_ADVANCED_MODEL_COUNT = 20
19+ _JWT_DOT_COUNT = 2
20+ _MOCK_MODEL_TARGET_CLIENT_ID = "client-id"
21+ _MOCK_MODEL_TARGET_CLIENT_SECRET = "client-secret" # noqa: S105
1922
2023
2124@beartype
@@ -57,6 +60,16 @@ def _error_response(
5760 )
5861
5962
63+ @beartype
64+ def _oauth2_error_response (
65+ * ,
66+ status_code : HTTPStatus ,
67+ body : dict [str , str ],
68+ ) -> _ResponseType :
69+ """Return an OAuth2 error response."""
70+ return _json_response (status_code = status_code , body = body )
71+
72+
6073@beartype
6174def _get_header (request : RequestData , name : str ) -> str | None :
6275 """Return a request header, case-insensitively."""
@@ -67,6 +80,28 @@ def _get_header(request: RequestData, name: str) -> str | None:
6780 return None
6881
6982
83+ @beartype
84+ def _basic_auth_credentials (auth_header : str | None ) -> tuple [str , str ] | None :
85+ """Return HTTP Basic credentials from an authorization header."""
86+ if auth_header is None or not auth_header .startswith ("Basic " ):
87+ return None
88+
89+ encoded_credentials = auth_header .removeprefix ("Basic " ).strip ()
90+ try :
91+ decoded_credentials = base64 .b64decode (
92+ s = encoded_credentials ,
93+ validate = True ,
94+ ).decode (encoding = "utf-8" )
95+ except ValueError :
96+ return None
97+
98+ client_id , separator , client_secret = decoded_credentials .partition (":" )
99+ if not separator :
100+ return None
101+
102+ return client_id , client_secret
103+
104+
70105@beartype
71106def _require_bearer_token (request : RequestData ) -> _ResponseType | None :
72107 """Return an error response if the request has no bearer token."""
@@ -78,47 +113,95 @@ def _require_bearer_token(request: RequestData) -> _ResponseType | None:
78113 message = "no Bearer token" ,
79114 target = "jwt" ,
80115 )
81- if not auth_header .removeprefix ("Bearer " ).strip ():
116+ bearer_token = auth_header .removeprefix ("Bearer " ).strip ()
117+ if not bearer_token :
118+ return _error_response (
119+ status_code = HTTPStatus .UNAUTHORIZED ,
120+ code = "401" ,
121+ message = "no Bearer token" ,
122+ target = "jwt" ,
123+ )
124+ if bearer_token .count ("." ) != _JWT_DOT_COUNT :
82125 return _error_response (
83126 status_code = HTTPStatus .UNAUTHORIZED ,
84127 code = "401" ,
85- message = "invalid Bearer token " ,
128+ message = "Invalid JWT serialization: Missing dot delimiter(s) " ,
86129 target = "jwt" ,
87130 )
88131 return None
89132
90133
134+ @beartype
135+ def _fake_jwt (* , token_source : bytes ) -> str :
136+ """Return a deterministic bearer token for the mock."""
137+
138+ def encode_part (value : dict [str , Any ]) -> str :
139+ """Return a base64url-encoded token part."""
140+ raw_part = json .dumps (
141+ obj = value ,
142+ sort_keys = True ,
143+ separators = ("," , ":" ),
144+ ).encode (encoding = "utf-8" )
145+ return (
146+ base64 .urlsafe_b64encode (s = raw_part )
147+ .decode (
148+ encoding = "ascii" ,
149+ )
150+ .rstrip ("=" )
151+ )
152+
153+ header = encode_part (value = {"alg" : "mock" , "typ" : "JWT" })
154+ payload = encode_part (
155+ value = {
156+ "aud" : "vuforia-model-target" ,
157+ "src" : base64 .urlsafe_b64encode (s = token_source )
158+ .decode (
159+ encoding = "ascii" ,
160+ )
161+ .rstrip ("=" ),
162+ },
163+ )
164+ return f"{ header } .{ payload } .mock-signature"
165+
166+
91167@beartype
92168def oauth2_token (request : RequestData ) -> _ResponseType :
93169 """Return a fake OAuth2 access token."""
94170 auth_header = _get_header (request = request , name = "Authorization" )
95171 form = parse_qs (qs = request .body .decode (encoding = "utf-8" ))
96- grant_type = form .get ("grant_type" , ["" ])[0 ]
97- has_basic_auth = auth_header is not None and auth_header .startswith (
98- "Basic " ,
99- )
100- has_password_credentials = all (
101- form .get (field , ["" ])[0 ] for field in ("username" , "password" )
102- )
103- if grant_type not in {"" , "client_credentials" , "password" } or (
104- not has_basic_auth and not has_password_credentials
105- ):
106- return _error_response (
172+ grant_type = form .get ("grant_type" , ["client_credentials" ])[0 ]
173+ if grant_type != "client_credentials" :
174+ return _oauth2_error_response (
107175 status_code = HTTPStatus .BAD_REQUEST ,
108- code = "BAD_REQUEST" ,
109- message = "Invalid OAuth2 token request." ,
110- target = "grant_type" ,
176+ body = {"error" : "unsupported_grant_type" },
177+ )
178+
179+ basic_credentials = _basic_auth_credentials (auth_header = auth_header )
180+ if basic_credentials is None :
181+ return _oauth2_error_response (
182+ status_code = HTTPStatus .UNAUTHORIZED ,
183+ body = {
184+ "error" : "invalid_request" ,
185+ "error_description" : (
186+ "Missing or invalid authorization header"
187+ ),
188+ },
189+ )
190+
191+ if basic_credentials != (
192+ _MOCK_MODEL_TARGET_CLIENT_ID ,
193+ _MOCK_MODEL_TARGET_CLIENT_SECRET ,
194+ ):
195+ return _oauth2_error_response (
196+ status_code = HTTPStatus .UNAUTHORIZED ,
197+ body = {"error" : "invalid_client" },
111198 )
112199
113200 token_source = request .body or (auth_header or "" ).encode ()
114- access_token = base64 .urlsafe_b64encode (s = token_source ).decode (
115- encoding = "ascii" ,
116- )
117- access_token = access_token .rstrip ("=" ) or "mock-vuforia-access-token"
118201 return _json_response (
119202 status_code = HTTPStatus .OK ,
120203 body = {
121- "access_token" : access_token ,
204+ "access_token" : _fake_jwt ( token_source = token_source ) ,
122205 "token_type" : "bearer" ,
123206 "expires_in" : 3600 ,
124207 },
0 commit comments