-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathauth_validators.py
More file actions
124 lines (102 loc) · 3.39 KB
/
auth_validators.py
File metadata and controls
124 lines (102 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""Authorization header validators to use in the mock."""
import logging
from collections.abc import Iterable, Mapping
from http import HTTPStatus
from beartype import beartype
from mock_vws._database_matchers import (
AnyDatabase,
get_database_matching_server_keys,
)
from mock_vws._services_validators.exceptions import (
AuthenticationFailureError,
FailError,
)
_LOGGER = logging.getLogger(name=__name__)
@beartype
def validate_auth_header_exists(*, request_headers: Mapping[str, str]) -> None:
"""Validate that there is an authorization header given to a VWS
endpoint.
Args:
request_headers: The headers sent with the request.
Raises:
AuthenticationFailureError: There is no "Authorization" header.
"""
if "Authorization" not in request_headers:
_LOGGER.warning(msg="There is no authorization header.")
raise AuthenticationFailureError
@beartype
def validate_access_key_exists(
*,
request_headers: Mapping[str, str],
databases: Iterable[AnyDatabase],
) -> None:
"""Validate the authorization header includes an access key for a
database.
Args:
request_headers: The headers sent with the request.
databases: All Vuforia databases.
Raises:
FailError: The access key does not match a given database.
"""
header = request_headers["Authorization"]
first_part, _ = header.split(sep=":")
_, access_key = first_part.split(sep=" ")
for database in databases:
if access_key == database.server_access_key:
return
_LOGGER.warning(
'The access key "%s" does not match a known database.',
access_key,
)
raise FailError(status_code=HTTPStatus.BAD_REQUEST)
@beartype
def validate_auth_header_has_signature(
*,
request_headers: Mapping[str, str],
) -> None:
"""Validate the authorization header includes a signature.
Args:
request_headers: The headers sent with the request.
Raises:
FailError: The "Authorization" header does not include a signature.
"""
header = request_headers["Authorization"]
if header.count(":") == 1 and header.split(sep=":")[1]:
return
_LOGGER.warning(
msg="The authorization header does not include a signature.",
)
raise FailError(status_code=HTTPStatus.BAD_REQUEST)
@beartype
def validate_authorization(
*,
request_path: str,
request_headers: Mapping[str, str],
request_body: bytes,
request_method: str,
databases: Iterable[AnyDatabase],
) -> None:
"""Validate the authorization header given to a VWS endpoint.
Args:
request_path: The path of the request.
request_headers: The headers sent with the request.
request_body: The body of the request.
request_method: The HTTP method of the request.
databases: All Vuforia databases.
Raises:
AuthenticationFailureError: No database matches the given authorization
header.
"""
try:
get_database_matching_server_keys(
request_headers=request_headers,
request_body=request_body,
request_method=request_method,
request_path=request_path,
databases=databases,
)
except ValueError as exc:
_LOGGER.warning(
msg="No database matches the given authorization header.",
)
raise AuthenticationFailureError from exc