Skip to content

Commit ba65619

Browse files
010SohamFokkokevinjqliu
authored
Core: Pass REST auth manager to S3 signer (#2846)
What does this change do? - RestCatalog now passes its AuthManager into FileIO so downstream components can reuse a live token. - S3V4RestSigner now calls the AuthManager’s auth_header() when no static token is provided, ensuring the signer gets a fresh bearer token. - Added a unit test to verify the signer pulls the Authorization header from an AuthManager. Why is this needed? - After the AuthManager refactor, the signer no longer received a token, causing remote signing to 401 for REST catalog users (e.g., Lakekeeper/MinIO). This restores token propagation and refresh. How was this tested? - make lint - make test - uv run python -m pytest tests/io/test_fsspec.py -k auth_manager -v Closes #2544 --------- Co-authored-by: Soham <010Soham@users.noreply.github.com> Co-authored-by: Fokko Driesprong <fokko@apache.org> Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com>
1 parent 9468ae5 commit ba65619

File tree

4 files changed

+80
-15
lines changed

4 files changed

+80
-15
lines changed

pyiceberg/catalog/rest/__init__.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,8 @@
2727
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt
2828

2929
from pyiceberg import __version__
30-
from pyiceberg.catalog import (
31-
BOTOCORE_SESSION,
32-
TOKEN,
33-
URI,
34-
WAREHOUSE_LOCATION,
35-
Catalog,
36-
PropertiesUpdateSummary,
37-
)
38-
from pyiceberg.catalog.rest.auth import AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
30+
from pyiceberg.catalog import BOTOCORE_SESSION, TOKEN, URI, WAREHOUSE_LOCATION, Catalog, PropertiesUpdateSummary
31+
from pyiceberg.catalog.rest.auth import AUTH_MANAGER, AuthManager, AuthManagerAdapter, AuthManagerFactory, LegacyOAuth2AuthManager
3932
from pyiceberg.catalog.rest.response import _handle_non_200_response
4033
from pyiceberg.catalog.rest.scan_planning import (
4134
FetchScanTasksRequest,
@@ -61,7 +54,7 @@
6154
TableAlreadyExistsError,
6255
UnauthorizedError,
6356
)
64-
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
57+
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, FileIO, load_file_io
6558
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
6659
from pyiceberg.schema import Schema, assign_fresh_schema_ids
6760
from pyiceberg.table import (
@@ -335,6 +328,7 @@ class ListViewsResponse(IcebergBaseModel):
335328
class RestCatalog(Catalog):
336329
uri: str
337330
_session: Session
331+
_auth_manager: AuthManager | None
338332
_supported_endpoints: set[Endpoint]
339333

340334
def __init__(self, name: str, **properties: str):
@@ -347,6 +341,7 @@ def __init__(self, name: str, **properties: str):
347341
properties: Properties that are passed along to the configuration.
348342
"""
349343
super().__init__(name, **properties)
344+
self._auth_manager: AuthManager | None = None
350345
self.uri = properties[URI]
351346
self._fetch_config()
352347
self._session = self._create_session()
@@ -381,16 +376,24 @@ def _create_session(self) -> Session:
381376
if auth_type != CUSTOM and auth_impl:
382377
raise ValueError("auth.impl can only be specified when using custom auth.type")
383378

384-
session.auth = AuthManagerAdapter(AuthManagerFactory.create(auth_impl or auth_type, auth_type_config))
379+
self._auth_manager = AuthManagerFactory.create(auth_impl or auth_type, auth_type_config)
380+
session.auth = AuthManagerAdapter(self._auth_manager)
385381
else:
386-
session.auth = AuthManagerAdapter(self._create_legacy_oauth2_auth_manager(session))
382+
self._auth_manager = self._create_legacy_oauth2_auth_manager(session)
383+
session.auth = AuthManagerAdapter(self._auth_manager)
387384

388385
# Configure SigV4 Request Signing
389386
if property_as_bool(self.properties, SIGV4, False):
390387
self._init_sigv4(session)
391388

392389
return session
393390

391+
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
392+
merged_properties = {**self.properties, **properties}
393+
if self._auth_manager:
394+
merged_properties[AUTH_MANAGER] = self._auth_manager
395+
return load_file_io(merged_properties, location)
396+
394397
def supports_server_side_planning(self) -> bool:
395398
"""Check if the catalog supports server-side scan planning."""
396399
return Capability.V1_SUBMIT_TABLE_SCAN_PLAN in self._supported_endpoints and property_as_bool(

pyiceberg/catalog/rest/auth.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
from pyiceberg.catalog.rest.response import TokenResponse, _handle_non_200_response
3232
from pyiceberg.exceptions import OAuthError
3333

34+
AUTH_MANAGER = "auth.manager"
35+
3436
COLON = ":"
3537
logger = logging.getLogger(__name__)
3638

pyiceberg/io/fsspec.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from requests import HTTPError
3838

3939
from pyiceberg.catalog import TOKEN, URI
40+
from pyiceberg.catalog.rest.auth import AUTH_MANAGER
4041
from pyiceberg.exceptions import SignError
4142
from pyiceberg.io import (
4243
ADLS_ACCOUNT_HOST,
@@ -121,9 +122,17 @@ def __call__(self, request: "AWSRequest", **_: Any) -> None:
121122
signer_url = self.properties.get(S3_SIGNER_URI, self.properties[URI]).rstrip("/") # type: ignore
122123
signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)
123124

124-
signer_headers = {}
125-
if token := self.properties.get(TOKEN):
126-
signer_headers = {"Authorization": f"Bearer {token}"}
125+
signer_headers: dict[str, str] = {}
126+
127+
auth_header: str | None = None
128+
if auth_manager := self.properties.get(AUTH_MANAGER):
129+
auth_header = auth_manager.auth_header()
130+
elif token := self.properties.get(TOKEN):
131+
auth_header = f"Bearer {token}"
132+
133+
if auth_header:
134+
signer_headers["Authorization"] = auth_header
135+
127136
signer_headers.update(get_header_properties(self.properties))
128137

129138
signer_body = {

tests/io/test_fsspec.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from fsspec.spec import AbstractFileSystem
2929
from requests_mock import Mocker
3030

31+
from pyiceberg.catalog.rest.auth import AUTH_MANAGER
3132
from pyiceberg.exceptions import SignError
3233
from pyiceberg.io import fsspec
3334
from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner
@@ -948,3 +949,53 @@ def test_s3v4_rest_signer_forbidden(requests_mock: Mocker) -> None:
948949
"""Failed to sign request 401: {'method': 'HEAD', 'region': 'us-west-2', 'uri': 'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro', 'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}"""
949950
in str(exc_info.value)
950951
)
952+
953+
954+
def test_s3v4_rest_signer_uses_auth_manager(requests_mock: Mocker) -> None:
955+
new_uri = "https://bucket/metadata/snap-signed.avro"
956+
requests_mock.post(
957+
f"{TEST_URI}/v1/aws/s3/sign",
958+
json={
959+
"uri": new_uri,
960+
"headers": {
961+
"Authorization": ["AWS4-HMAC-SHA256 Credential=ASIA.../s3/aws4_request, SignedHeaders=host, Signature=abc"],
962+
"Host": ["bucket.s3.us-west-2.amazonaws.com"],
963+
},
964+
"extensions": {},
965+
},
966+
status_code=200,
967+
)
968+
969+
request = AWSRequest(
970+
method="HEAD",
971+
url="https://bucket/metadata/snap-foo.avro",
972+
headers={"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0"},
973+
data=b"",
974+
params={},
975+
auth_path="/metadata/snap-foo.avro",
976+
)
977+
request.context = {
978+
"client_region": "us-west-2",
979+
"has_streaming_input": False,
980+
"auth_type": None,
981+
"signing": {"bucket": "bucket"},
982+
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
983+
}
984+
985+
class DummyAuthManager:
986+
def __init__(self) -> None:
987+
self.calls = 0
988+
989+
def auth_header(self) -> str:
990+
self.calls += 1
991+
return "Bearer via-manager"
992+
993+
auth_manager = DummyAuthManager()
994+
995+
signer = S3V4RestSigner(properties={AUTH_MANAGER: auth_manager, "uri": TEST_URI})
996+
signer(request)
997+
998+
assert auth_manager.calls == 1
999+
assert requests_mock.last_request is not None
1000+
assert requests_mock.last_request.headers["Authorization"] == "Bearer via-manager"
1001+
assert request.url == new_uri

0 commit comments

Comments
 (0)