Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 130 additions & 4 deletions pyiceberg/catalog/bigquery_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@
from google.oauth2 import service_account

from pyiceberg.catalog import WAREHOUSE_LOCATION, MetastoreCatalog, PropertiesUpdateSummary
from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError
from pyiceberg.exceptions import (
CommitFailedException,
CommitStateUnknownException,
NamespaceAlreadyExistsError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
Expand Down Expand Up @@ -229,7 +236,88 @@ def drop_table(self, identifier: str | Identifier) -> None:
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
raise NotImplementedError
table_identifier = table.name()
dataset_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
table_ref = TableReference(
dataset_ref=DatasetReference(project=self.project_id, dataset_id=dataset_name),
table_id=table_name,
)

current_bq_table: BQTable | None
current_table: Table | None
try:
current_bq_table = self.client.get_table(table_ref)
except NotFound:
current_bq_table = None
current_table = None
else:
current_table = self._convert_bigquery_table_to_iceberg_table(table_identifier, current_bq_table)

updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

commit_error: Exception | None = None
try:
if current_bq_table and current_table:
current_bq_table.external_catalog_table_options = self._create_external_catalog_table_options(
updated_staged_table.metadata.location,
self._create_table_parameters(
metadata_file_location=updated_staged_table.metadata_location,
table_metadata=updated_staged_table.metadata,
previous_metadata_location=current_table.metadata_location,
),
)
self.client.update_table(current_bq_table, ["external_catalog_table_options"])
else:
self.client.create_table(
self._make_new_table(
updated_staged_table.metadata,
updated_staged_table.metadata_location,
table_ref,
)
)
except NotFound as e:
commit_error = (
CommitFailedException(f"Table does not exist: {dataset_name}.{table_name}")
if current_table
else NoSuchNamespaceError(f"Namespace does not exist: {dataset_name}")
)
commit_error.__cause__ = e
except Conflict as e:
commit_error = (
CommitFailedException(f"Table has been updated by another process: {dataset_name}.{table_name}")
if current_table
else TableAlreadyExistsError(f"Table {table_name} already exists")
)
commit_error.__cause__ = e
except Exception as e:
commit_error = e
finally:
if commit_error:
commit_status = self._check_bigquery_commit_status(table_ref, updated_staged_table.metadata_location)
if commit_status == "SUCCESS":
commit_error = None
elif commit_status == "UNKNOWN":
raise CommitStateUnknownException(
f"Commit state unknown for table {dataset_name}.{table_name}"
) from commit_error

if commit_error:
raise commit_error

if current_table:
self._delete_old_metadata(updated_staged_table.io, current_table.metadata, updated_staged_table.metadata)

return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table:
raise NotImplementedError
Expand Down Expand Up @@ -381,11 +469,20 @@ def _convert_bigquery_table_to_iceberg_table(self, identifier: str | Identifier,
catalog=self,
)

def _create_table_parameters(self, metadata_file_location: str, table_metadata: TableMetadata) -> dict[str, Any]:
parameters: dict[str, Any] = table_metadata.properties
def _create_table_parameters(
self,
metadata_file_location: str,
table_metadata: TableMetadata,
previous_metadata_location: str | None = None,
) -> dict[str, Any]:
parameters: dict[str, Any] = dict(table_metadata.properties)
if table_metadata.table_uuid:
parameters["uuid"] = str(table_metadata.table_uuid)
parameters[METADATA_LOCATION_PROP] = metadata_file_location
if previous_metadata_location:
parameters[PREVIOUS_METADATA_LOCATION_PROP] = previous_metadata_location
else:
parameters.pop(PREVIOUS_METADATA_LOCATION_PROP, None)
parameters[TABLE_TYPE_PROP] = ICEBERG_TABLE_TYPE_VALUE
parameters["EXTERNAL"] = True

Expand All @@ -405,6 +502,35 @@ def _create_table_parameters(self, metadata_file_location: str, table_metadata:

return parameters

def _check_bigquery_commit_status(self, table_ref: TableReference, new_metadata_location: str) -> str:
try:
bq_table = self.client.get_table(table_ref)
parameters = (
bq_table.external_catalog_table_options.parameters
if bq_table.external_catalog_table_options and bq_table.external_catalog_table_options.parameters
else {}
)
current_metadata_location = parameters.get(METADATA_LOCATION_PROP)
if current_metadata_location == new_metadata_location:
return "SUCCESS"

if not current_metadata_location:
return "FAILURE"

io = self._load_file_io(location=current_metadata_location)
current_metadata = FromInputFile.table_metadata(io.new_input(current_metadata_location))

previous_metadata_locations = {log.metadata_file for log in current_metadata.metadata_log}
previous_metadata_location = parameters.get(PREVIOUS_METADATA_LOCATION_PROP)
if previous_metadata_location:
previous_metadata_locations.add(previous_metadata_location)

return "SUCCESS" if new_metadata_location in previous_metadata_locations else "FAILURE"
except NotFound:
return "FAILURE"
except Exception:
return "UNKNOWN"

def _default_storage_location(self, location: str | None, dataset_ref: DatasetReference) -> str | None:
if location:
return location
Expand Down
127 changes: 126 additions & 1 deletion tests/catalog/test_bigquery_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
import os
from unittest.mock import MagicMock

import pytest
from google.api_core.exceptions import NotFound
from google.cloud.bigquery import Dataset, DatasetReference, Table, TableReference
from google.cloud.bigquery.external_config import ExternalCatalogDatasetOptions, ExternalCatalogTableOptions
from pytest_mock import MockFixture

from pyiceberg.catalog.bigquery_metastore import ICEBERG_TABLE_TYPE_VALUE, TABLE_TYPE_PROP, BigQueryMetastoreCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.exceptions import CommitStateUnknownException, NoSuchTableError
from pyiceberg.schema import Schema


Expand Down Expand Up @@ -178,3 +179,127 @@ def test_list_namespaces(mocker: MockFixture) -> None:
assert ("dataset1",) in namespaces
assert ("dataset2",) in namespaces
client_mock.list_datasets.assert_called_once()


def test_commit_table_create_path_uses_create_table(mocker: MockFixture) -> None:
client_mock = MagicMock()
client_mock.get_table.side_effect = NotFound("missing")
mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"})
table = MagicMock()
table.name.return_value = ("my-dataset", "my-table")

staged = MagicMock()
staged.metadata = MagicMock()
staged.metadata_location = "gs://bucket/db/table/metadata/00001.metadata.json"
staged.io = MagicMock()
mocker.patch.object(catalog, "_update_and_stage_table", return_value=staged)
mocker.patch.object(catalog, "_write_metadata")
mocker.patch.object(catalog, "_make_new_table", return_value=MagicMock())
commit_response = MagicMock()
commit_response.metadata_location = staged.metadata_location
mocker.patch("pyiceberg.catalog.bigquery_metastore.CommitTableResponse", return_value=commit_response)

response = catalog.commit_table(table, requirements=(), updates=())

client_mock.create_table.assert_called_once()
client_mock.update_table.assert_not_called()
assert response.metadata_location == staged.metadata_location


def test_commit_table_update_path_uses_update_table(mocker: MockFixture) -> None:
client_mock = MagicMock()
current_bq_table = MagicMock()
client_mock.get_table.return_value = current_bq_table
mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"})
table = MagicMock()
table.name.return_value = ("my-dataset", "my-table")

current_table = MagicMock()
current_table.metadata = MagicMock()
current_table.metadata_location = "gs://bucket/db/table/metadata/00000.metadata.json"
mocker.patch.object(catalog, "_convert_bigquery_table_to_iceberg_table", return_value=current_table)

staged = MagicMock()
staged.metadata = MagicMock()
staged.metadata.location = "gs://bucket/db/table"
staged.metadata_location = "gs://bucket/db/table/metadata/00001.metadata.json"
staged.io = MagicMock()
mocker.patch.object(catalog, "_update_and_stage_table", return_value=staged)
mocker.patch.object(catalog, "_write_metadata")
mocker.patch.object(catalog, "_create_table_parameters", return_value={"metadata_location": staged.metadata_location})
mocker.patch.object(catalog, "_create_external_catalog_table_options", return_value=MagicMock())
delete_old_metadata = mocker.patch.object(catalog, "_delete_old_metadata")
commit_response = MagicMock()
commit_response.metadata_location = staged.metadata_location
mocker.patch("pyiceberg.catalog.bigquery_metastore.CommitTableResponse", return_value=commit_response)

response = catalog.commit_table(table, requirements=(), updates=())

client_mock.update_table.assert_called_once_with(current_bq_table, ["external_catalog_table_options"])
client_mock.create_table.assert_not_called()
delete_old_metadata.assert_called_once_with(staged.io, current_table.metadata, staged.metadata)
assert response.metadata_location == staged.metadata_location


def test_commit_table_raises_unknown_when_commit_status_is_unknown(mocker: MockFixture) -> None:
client_mock = MagicMock()
current_bq_table = MagicMock()
client_mock.get_table.return_value = current_bq_table
client_mock.update_table.side_effect = RuntimeError("boom")
mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"})
table = MagicMock()
table.name.return_value = ("my-dataset", "my-table")

current_table = MagicMock()
current_table.metadata = MagicMock()
current_table.metadata_location = "gs://bucket/db/table/metadata/00000.metadata.json"
mocker.patch.object(catalog, "_convert_bigquery_table_to_iceberg_table", return_value=current_table)

staged = MagicMock()
staged.metadata = MagicMock()
staged.metadata.location = "gs://bucket/db/table"
staged.metadata_location = "gs://bucket/db/table/metadata/00001.metadata.json"
staged.io = MagicMock()
mocker.patch.object(catalog, "_update_and_stage_table", return_value=staged)
mocker.patch.object(catalog, "_write_metadata")
mocker.patch.object(catalog, "_create_table_parameters", return_value={"metadata_location": staged.metadata_location})
mocker.patch.object(catalog, "_create_external_catalog_table_options", return_value=MagicMock())
mocker.patch.object(catalog, "_check_bigquery_commit_status", return_value="UNKNOWN")

with pytest.raises(CommitStateUnknownException):
catalog.commit_table(table, requirements=(), updates=())


def test_check_bigquery_commit_status_returns_success_when_metadata_in_history(mocker: MockFixture) -> None:
client_mock = MagicMock()
bq_table = MagicMock()
bq_table.external_catalog_table_options = MagicMock(
parameters={"metadata_location": "gs://bucket/db/table/metadata/00002.metadata.json"}
)
client_mock.get_table.return_value = bq_table
mocker.patch("pyiceberg.catalog.bigquery_metastore.Client", return_value=client_mock)
mocker.patch.dict(os.environ, values={"PYICEBERG_LEGACY_CURRENT_SNAPSHOT_ID": "True"})

catalog = BigQueryMetastoreCatalog("test_catalog", **{"gcp.bigquery.project-id": "my-project"})
io_mock = MagicMock()
catalog._load_file_io = MagicMock(return_value=io_mock) # type: ignore[method-assign]

current_metadata = MagicMock()
current_metadata.metadata_log = [MagicMock(metadata_file="gs://bucket/db/table/metadata/00001.metadata.json")]
mocker.patch("pyiceberg.catalog.bigquery_metastore.FromInputFile.table_metadata", return_value=current_metadata)

status = catalog._check_bigquery_commit_status(
TableReference(dataset_ref=DatasetReference(project="my-project", dataset_id="my-dataset"), table_id="my-table"),
"gs://bucket/db/table/metadata/00001.metadata.json",
)

assert status == "SUCCESS"