Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## ?.?.? - Unreleased

* Removed support for file system credentials caching.

## 4.0.0 - 2026-01-22

* Added support for Python 3.14.
Expand Down
23 changes: 8 additions & 15 deletions okdata/sdk/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
)
from okdata.sdk.auth.credentials.password_grant import TokenServiceProvider
from okdata.sdk.exceptions import ApiAuthenticateError
from okdata.sdk.file_cache import FileCache

log = logging.getLogger()

Expand All @@ -25,7 +24,15 @@ class Authenticate:
_expires_at = None
_refresh_expires_at = None

# TODO: Remove keyword argument `file_cache` in a later release.
def __init__(self, config, token_provider=None, file_cache=None):
if file_cache is not None:
log.warning(
"Keyword argument `file_cache` to "
"`okdata.sdk.auth.auth.Authenticate` is deprecated and will "
"be removed in a later release of okdata-sdk."
)

self.token_provider = token_provider
if not self.token_provider:
try:
Expand All @@ -36,10 +43,6 @@ def __init__(self, config, token_provider=None, file_cache=None):
except StopIteration:
log.info("No valid auth strategies available")

self.file_cache = file_cache
if not self.file_cache:
self.file_cache = FileCache(config)

def _resolve_token_provider(self, config):
# Add more TokenProviders to accept different login methods
strategies = [ClientCredentialsProvider, TokenServiceProvider]
Expand Down Expand Up @@ -69,15 +72,6 @@ def login(self, force=False):
if not self.token_provider:
return

cached = self.file_cache.read_credentials()
if cached:
self._access_token = cached["access_token"]
self._refresh_token = cached.get("refresh_token")
if expires_at := cached.get("expires_at"):
self._expires_at = datetime.fromisoformat(expires_at)
if refresh_expires_at := cached.get("refresh_expires_at"):
self._refresh_expires_at = datetime.fromisoformat(refresh_expires_at)

if self._access_token and not _is_expired(self._expires_at):
log.info("Token not expired, skipping")
return
Expand Down Expand Up @@ -109,7 +103,6 @@ def refresh_access_token(self):
)

self._access_token = tokens["access_token"]
self.file_cache.write_credentials(credentials=str(self))

def __repr__(self):
return json.dumps(
Expand Down
34 changes: 0 additions & 34 deletions okdata/sdk/file_cache.py

This file was deleted.

29 changes: 1 addition & 28 deletions okdata/sdk/io_utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,8 @@
import os
import errno
import os
from pathlib import Path


def write_to_okdata_cache(content, filename, failure_count=0):
okdata_cache_path = Path(f"{os.environ['HOME']}/.okdata/cache")

if failure_count == 2:
print(f"Could not write credentials to {okdata_cache_path}/{filename}")
return

if okdata_cache_path.exists():
with open(f"{okdata_cache_path}/{filename}", "w+") as file:
file.write(content)
else:
create_dir(okdata_cache_path)
write_to_okdata_cache(content, filename, failure_count + 1)


def create_dir(path):
try:
os.makedirs(path)
Expand All @@ -26,18 +11,6 @@ def create_dir(path):
raise


def read_from_okdata_cache(filename):
okdata_cache_path = Path(f"{os.environ['HOME']}/.okdata/cache")

try:
file = open(f"{okdata_cache_path}/{filename}", "r")
except IOError:
return None
else:
with file:
return file.read()


def write_file_content(file_name, path, content):
if not Path(path).exists():
create_dir(path)
Expand Down
170 changes: 10 additions & 160 deletions tests/auth/auth_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@
import re

import pytest
from freezegun import freeze_time

from okdata.sdk.auth.auth import Authenticate
from okdata.sdk.auth.credentials.client_credentials import ClientCredentialsProvider
from okdata.sdk.config import Config
from okdata.sdk.exceptions import ApiAuthenticateError
from freezegun import freeze_time

from tests.auth.client_credentials_test_utils import (
expired_time,
from_cache_expired_token,
from_cache_not_expired_token,
not_expired_time,
not_expired_token,
utc_now,
)
from tests.test_utils import (
Expand All @@ -29,33 +25,12 @@
token_endpoint = "https://login.oslo.kommune.no/auth/realms/api-catalog/protocol/openid-connect/token"


@pytest.fixture(scope="function")
def mock_home_dir(monkeypatch, tmp_path):
monkeypatch.setenv("HOME", str(tmp_path))


@freeze_time(utc_now)
class TestAuthenticate:
def test_authenticate_cache_disabled(self, requests_mock, mock_home_dir):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = False

response = json.dumps(client_credentials_response)
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, text=response, status_code=200)

auth.login()
assert auth._access_token == client_credentials_response["access_token"]
assert auth._refresh_token == client_credentials_response["refresh_token"]

def test_authenticat_no_cache(self, requests_mock, mock_home_dir):
def test_authenticate(self, requests_mock):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = True

response = json.dumps(client_credentials_response)
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, text=response, status_code=200)
Expand All @@ -64,99 +39,19 @@ def test_authenticat_no_cache(self, requests_mock, mock_home_dir):
assert auth._access_token == client_credentials_response["access_token"]
assert auth._refresh_token == client_credentials_response["refresh_token"]

def test_authenticate_cached_credentials(self, mock_home_dir):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = True
cached_credentials = {
"provider": "ClientCredentialsProvider",
"access_token": from_cache_not_expired_token,
"refresh_token": from_cache_not_expired_token,
"expires_at": not_expired_time.isoformat(),
"refresh_expires_at": not_expired_time.isoformat(),
}

auth.file_cache.write_credentials(json.dumps(cached_credentials))
auth.login()
assert auth._access_token == cached_credentials["access_token"]
assert auth._refresh_token == cached_credentials["refresh_token"]

def test_authenticate_refresh_credentials(self, requests_mock, mock_home_dir):
def test_authenticate_refresh_credentials(self, requests_mock):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = True

cached_credentials = {
"provider": "ClientCredentialsProvider",
"access_token": from_cache_not_expired_token,
"refresh_token": from_cache_not_expired_token,
"expires_at": not_expired_time.isoformat(),
"refresh_expires_at": not_expired_time.isoformat(),
}

auth.file_cache.write_credentials(json.dumps(cached_credentials))

response = json.dumps(client_credentials_response)
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, text=response, status_code=200)

auth.login()
assert auth._access_token == cached_credentials["access_token"]
assert auth._refresh_token == cached_credentials["refresh_token"]

def test_authenticate_expired_tokens(self, requests_mock, mock_home_dir):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = True

cached_credentials = {
"provider": "TokenServiceProvider",
"access_token": from_cache_expired_token,
"refresh_token": from_cache_expired_token,
"expires_at": expired_time.isoformat(),
"refresh_expires_at": expired_time.isoformat(),
}
assert auth._access_token == not_expired_token
assert auth._refresh_token == not_expired_token

auth.file_cache.write_credentials(json.dumps(cached_credentials))

response = json.dumps(client_credentials_response)
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, text=response, status_code=200)

auth.login()
print(from_cache_not_expired_token)
print(from_cache_expired_token)
assert auth._access_token == client_credentials_response["access_token"]
assert auth._refresh_token == client_credentials_response["access_token"]

def test_authenticate_expired_access_token(self, requests_mock, mock_home_dir):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = True

cached_credentials = {
"provider": "TokenServiceProvider",
"access_token": from_cache_expired_token,
"refresh_token": from_cache_not_expired_token,
"expires_at": expired_time.isoformat(),
"refresh_expires_at": not_expired_time.isoformat(),
}

auth.file_cache.write_credentials(json.dumps(cached_credentials))

response = json.dumps(client_credentials_response)
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, text=response, status_code=200)

auth.login()
assert auth._access_token == from_cache_not_expired_token
assert auth._refresh_token == cached_credentials["refresh_token"]

def test_authenticate_fail(self, requests_mock, mock_home_dir):
def test_authenticate_fail(self, requests_mock):
client_credentials_provider = ClientCredentialsProvider(
config, client_id="wrong_id"
)
Expand All @@ -168,63 +63,18 @@ def test_authenticate_fail(self, requests_mock, mock_home_dir):
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, text=response, status_code=200)

try:
with pytest.raises(ApiAuthenticateError):
auth.login()
except ApiAuthenticateError:
assert True

def test_refresh_inactive_session(self, requests_mock, mock_home_dir):
def test_refresh_no_refresh_token(self, requests_mock):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = True

cached_credentials = {
"provider": "TokenServiceProvider",
"access_token": from_cache_expired_token,
"refresh_token": from_cache_not_expired_token,
"expires_at": expired_time.isoformat(),
"refresh_expires_at": not_expired_time.isoformat(),
}

auth.file_cache.write_credentials(json.dumps(cached_credentials))

error_msg = {
"error": "invalid_grant",
"error_description": "Session not active",
}
refresh_response = {"text": json.dumps(error_msg), "status_code": 400}
login_response = {
"text": json.dumps(client_credentials_response),
"status_code": 200,
}
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, [refresh_response, login_response])

auth.login()

assert auth._access_token == from_cache_not_expired_token
assert auth._refresh_token == cached_credentials["refresh_token"]

def test_refresh_no_refresh_token(self, requests_mock, mock_home_dir):
client_credentials_provider = ClientCredentialsProvider(config)
auth = Authenticate(config=config, token_provider=client_credentials_provider)

auth.file_cache.credentials_cache_enabled = True

cached_credentials = {
"provider": "TokenServiceProvider",
"access_token": from_cache_expired_token,
"expires_at": expired_time.isoformat(),
}

auth.file_cache.write_credentials(json.dumps(cached_credentials))

response = json.dumps(client_credentials_response_no_refresh)
matcher = re.compile(token_endpoint)
requests_mock.register_uri("POST", matcher, text=response, status_code=200)

auth.login()

assert auth._access_token == from_cache_not_expired_token
assert auth._access_token == not_expired_token
assert auth._refresh_token is None
Loading
Loading