Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aas_http_client/classes/client/aas_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def set_token(self) -> str | None:
return self._cached_token.access_token

# Obtain new token
token_data = get_token(self.auth_settings.o_auth)
token_data = get_token(self.auth_settings.o_auth, self.ssl_verify)

if token_data and token_data.access_token:
# Cache the token data
Expand Down
22 changes: 15 additions & 7 deletions aas_http_client/classes/client/implementations/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,27 @@ def __init__(self, access_token: str, token_type: str, token_expiry: float):
self.token_expiry: float = token_expiry


def get_token(o_auth_configuration: OAuth) -> TokenData | None:
def get_token(o_auth_configuration: OAuth, ssl_verify: bool) -> TokenData | None:
"""Get token based on the provided OAuth configuration.

:param auth_configuration: Authentication configuration
:param ssl_verify: Whether to verify SSL certificates, defaults to True
:return: Access token or None if an error occurred
"""
if o_auth_configuration.grant_type == "password":
token = get_token_by_password(
o_auth_configuration.token_url,
o_auth_configuration.client_id,
o_auth_configuration.get_client_secret(),
ssl_verify=ssl_verify,
)

elif o_auth_configuration.is_active() and o_auth_configuration.grant_type == "client_credentials":
token = get_token_by_basic_auth(
o_auth_configuration.token_url,
o_auth_configuration.client_id,
o_auth_configuration.get_client_secret(),
ssl_verify=ssl_verify,
)

if not token:
Expand All @@ -63,46 +66,51 @@ def get_token(o_auth_configuration: OAuth) -> TokenData | None:
return token


def get_token_by_basic_auth(endpoint: str, username: str, password: str, timeout=200) -> TokenData | None:
def get_token_by_basic_auth(endpoint: str, username: str, password: str, timeout=200, ssl_verify=True) -> TokenData | None:
"""Get token from a specific authentication service provider by basic authentication.

:param endpoint: Get token endpoint for the authentication service provider
:param username: Username for the authentication service provider
:param password: Password for the authentication service provider
:param timeout: Timeout for the API calls, defaults to 200
:param ssl_verify: Whether to verify SSL certificates, defaults to True
:return: Access token or None if an error occurred
"""
data = {"grant_type": "client_credentials"}

auth = HTTPBasicAuth(username, password)

return __get_token_from_endpoint(endpoint, data, auth, timeout)
return __get_token_from_endpoint(endpoint, data, auth, timeout, ssl_verify)


def get_token_by_password(endpoint: str, username: str, password: str, timeout=200) -> TokenData | None:
def get_token_by_password(endpoint: str, username: str, password: str, timeout=200, ssl_verify=True) -> TokenData | None:
"""Get token from a specific authentication service provider by username and password.

:param endpoint: Get token endpoint for the authentication service provider
:param username: Username for the authentication service provider
:param password: Password for the authentication service provider
:param timeout: Timeout for the API calls, defaults to 200
:param ssl_verify: Whether to verify SSL certificates, defaults to True
:return: Access token or None if an error occurred
"""
data = {"grant_type": "password", "username": username, "password": password}

return __get_token_from_endpoint(endpoint, data, None, timeout)
return __get_token_from_endpoint(endpoint, data, None, timeout, ssl_verify)


def __get_token_from_endpoint(endpoint: str, data: dict[str, str], auth: HTTPBasicAuth | None = None, timeout: int = 200) -> TokenData | None:
def __get_token_from_endpoint(
endpoint: str, data: dict[str, str], auth: HTTPBasicAuth | None = None, timeout: int = 200, ssl_verify: bool = True
) -> TokenData | None:
"""Get token from a specific authentication service provider.

:param endpoint: Get token endpoint for the authentication service provider
:param data: Data for the authentication service provider
:param timeout: Timeout for the API calls, defaults to 200
:param ssl_verify: Whether to verify SSL certificates, defaults to True
:return: Access token or None if an error occurred
"""
try:
response = requests.post(endpoint, auth=auth, data=data, timeout=timeout)
response = requests.post(endpoint, auth=auth, data=data, timeout=timeout, verify=ssl_verify)
_logger.debug(f"Call REST API url '{response.url}'")

if response.status_code != 200:
Expand Down