Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/iceberg/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
#include <unordered_set>
#include <vector>

#include "iceberg/catalog/rest/types.h"
#include "iceberg/result.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_scan.h"
#include "iceberg/type_fwd.h"

namespace iceberg {
Expand Down Expand Up @@ -188,6 +190,44 @@ class ICEBERG_EXPORT Catalog {
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
virtual Result<std::shared_ptr<Table>> RegisterTable(
const TableIdentifier& identifier, const std::string& metadata_file_location) = 0;

/// \brief Initiate a scan planning operation for the given table.
///
/// \param table The table to scan.
/// \param context The scan context containing snapshot, filter, and other options.
/// \return A PlanTableScanResponse with the plan status and initial scan tasks.
virtual Result<rest::PlanTableScanResponse> PlanTableScan(
const Table& table, const internal::TableScanContext& context) {
return NotImplemented("PlanTableScan is not supported by this catalog");
}

/// \brief Fetch the current status and results of an asynchronous scan plan.
///
/// \param table The table being scanned.
/// \param plan_id The plan ID returned by PlanTableScan.
/// \return A FetchPlanningResultResponse with the current plan status and tasks.
virtual Result<rest::FetchPlanningResultResponse> FetchPlanningResult(
const Table& table, const std::string& plan_id) {
return NotImplemented("FetchPlanningResult is not supported by this catalog");
}

/// \brief Cancel an in-progress scan planning operation.
///
/// \param table The table being scanned.
/// \param plan_id The plan ID returned by PlanTableScan.
virtual Status CancelPlanning(const Table& table, const std::string& plan_id) {
return NotImplemented("CancelPlanning is not supported by this catalog");
}

/// \brief Fetch the scan tasks for a given plan task token.
///
/// \param table The table being scanned.
/// \param plan_task The plan task token returned in a scan plan response.
/// \return A FetchScanTasksResponse with the file scan tasks.
virtual Result<rest::FetchScanTasksResponse> FetchScanTasks(
const Table& table, const std::string& plan_task) {
return NotImplemented("FetchScanTasks is not supported by this catalog");
}
};

} // namespace iceberg
20 changes: 20 additions & 0 deletions src/iceberg/catalog/rest/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,26 @@ class ICEBERG_REST_EXPORT Endpoint {
return {HttpMethod::kPost, "/v1/{prefix}/transactions/commit"};
}

// Scan planning endpoints
static Endpoint PlanTableScan() {
return {HttpMethod::kPost, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"};
}

static Endpoint FetchPlanningResult() {
return {HttpMethod::kGet,
"/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"};
}

static Endpoint CancelPlanning() {
return {HttpMethod::kDelete,
"/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"};
}

static Endpoint FetchScanTasks() {
return {HttpMethod::kPost,
"/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"};
}

private:
Endpoint(HttpMethod method, std::string_view path) : method_(method), path_(path) {}

Expand Down
31 changes: 31 additions & 0 deletions src/iceberg/catalog/rest/error_handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ namespace {
constexpr std::string_view kIllegalArgumentException = "IllegalArgumentException";
constexpr std::string_view kNoSuchNamespaceException = "NoSuchNamespaceException";
constexpr std::string_view kNamespaceNotEmptyException = "NamespaceNotEmptyException";
constexpr std::string_view kNoSuchTableException = "NoSuchTableException";
constexpr std::string_view kNoSuchPlanIdException = "NoSuchPlanIdException";
constexpr std::string_view kNoSuchPlanTaskException = "NoSuchPlanTaskException";

} // namespace

Expand Down Expand Up @@ -183,4 +186,32 @@ Status ViewCommitErrorHandler::Accept(const ErrorResponse& error) const {
return DefaultErrorHandler::Accept(error);
}

const std::shared_ptr<ScanPlanErrorHandler>& ScanPlanErrorHandler::Instance() {
static const std::shared_ptr<ScanPlanErrorHandler> instance{new ScanPlanErrorHandler()};
return instance;
}

Status ScanPlanErrorHandler::Accept(const ErrorResponse& error) const {
switch (error.code) {
case 404:
if (error.type == kNoSuchNamespaceException) {
return NoSuchNamespace(error.message);
}
if (error.type == kNoSuchTableException) {
return NoSuchTable(error.message);
}
if (error.type == kNoSuchPlanIdException) {
return NoSuchPlanId(error.message);
}
if (error.type == kNoSuchPlanTaskException) {
return NoSuchPlanTask(error.message);
}
return NotFound(error.message);
case 406:
return NotSupported(error.message);
}

return DefaultErrorHandler::Accept(error);
}

} // namespace iceberg::rest
11 changes: 11 additions & 0 deletions src/iceberg/catalog/rest/error_handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,15 @@ class ICEBERG_REST_EXPORT ViewCommitErrorHandler final : public DefaultErrorHand
constexpr ViewCommitErrorHandler() = default;
};

/// \brief Scan plan operation error handler.
class ICEBERG_REST_EXPORT ScanPlanErrorHandler final : public DefaultErrorHandler {
public:
static const std::shared_ptr<ScanPlanErrorHandler>& Instance();

Status Accept(const ErrorResponse& error) const override;

private:
constexpr ScanPlanErrorHandler() = default;
};

} // namespace iceberg::rest
125 changes: 125 additions & 0 deletions src/iceberg/catalog/rest/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

#include "iceberg/catalog/rest/json_serde_internal.h"
#include "iceberg/catalog/rest/types.h"
#include "iceberg/expression/json_serde_internal.h"
#include "iceberg/json_serde_internal.h"
#include "iceberg/partition_spec.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_requirement.h"
Expand Down Expand Up @@ -78,6 +80,21 @@ constexpr std::string_view kExpiresIn = "expires_in";
constexpr std::string_view kIssuedTokenType = "issued_token_type";
constexpr std::string_view kRefreshToken = "refresh_token";
constexpr std::string_view kOAuthScope = "scope";
constexpr std::string_view kPlanStatus = "status";
constexpr std::string_view kPlanId = "plan-id";
constexpr std::string_view kPlanTasks = "plan-tasks";
constexpr std::string_view kFileScanTasks = "file-scan-tasks";
constexpr std::string_view kDeleteFiles = "delete-files";
constexpr std::string_view kSnapshotId = "snapshot-id";
constexpr std::string_view kSelect = "select";
constexpr std::string_view kFilter = "filter";
constexpr std::string_view kCaseSensitive = "case-sensitive";
constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema";
constexpr std::string_view kStartSnapshotId = "start-snapshot-id";
constexpr std::string_view kEndSnapshotId = "end-snapshot-id";
constexpr std::string_view kStatsFields = "stats-fields";
constexpr std::string_view kMinRowsRequired = "min-rows-required";
constexpr std::string_view kPlanTask = "plan-task";

} // namespace

Expand Down Expand Up @@ -506,6 +523,114 @@ Result<OAuthTokenResponse> OAuthTokenResponseFromJson(const nlohmann::json& json
return response;
}

Result<nlohmann::json> ToJson(const PlanTableScanRequest& request) {
nlohmann::json json;
if (request.snapshot_id.has_value()) {
json[kSnapshotId] = request.snapshot_id.value();
}
if (!request.select.empty()) {
json[kSelect] = request.select;
}
if (request.filter) {
ICEBERG_ASSIGN_OR_RAISE(auto filter_json, iceberg::ToJson(*request.filter));
json[kFilter] = std::move(filter_json);
}
json[kCaseSensitive] = request.case_sensitive;
json[kUseSnapshotSchema] = request.use_snapshot_schema;
if (request.start_snapshot_id.has_value()) {
json[kStartSnapshotId] = request.start_snapshot_id.value();
}
if (request.end_snapshot_id.has_value()) {
json[kEndSnapshotId] = request.end_snapshot_id.value();
}
if (!request.statsFields.empty()) {
json[kStatsFields] = request.statsFields;
}
if (request.min_rows_required.has_value()) {
json[kMinRowsRequired] = request.min_rows_required.value();
}
return json;
}

nlohmann::json ToJson(const FetchScanTasksRequest& request) {
nlohmann::json json;
json[kPlanTask] = request.planTask;
return json;
}

Status BaseScanTaskResponseFromJson(
const nlohmann::json& json, BaseScanTaskResponse* response,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
partition_specs_by_id,
const Schema& schema) {
// 1. plan_tasks
ICEBERG_ASSIGN_OR_RAISE(
response->plan_tasks,
GetJsonValueOrDefault<std::vector<std::string>>(json, kPlanTasks));

// 2. delete_files
ICEBERG_ASSIGN_OR_RAISE(
auto delete_files_json,
GetJsonValueOrDefault<nlohmann::json>(json, kDeleteFiles, nlohmann::json::array()));
for (const auto& entry_json : delete_files_json) {
ICEBERG_ASSIGN_OR_RAISE(auto delete_file,
DataFileFromJson(entry_json, partition_specs_by_id, schema));
response->delete_files.push_back(std::move(delete_file));
}

// 3. file_scan_tasks
ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json,
GetJsonValueOrDefault<nlohmann::json>(json, kFileScanTasks,
nlohmann::json::array()));
ICEBERG_ASSIGN_OR_RAISE(
response->file_scan_tasks,
FileScanTasksFromJson(file_scan_tasks_json, response->delete_files,
partition_specs_by_id, schema));
return {};
}

Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
partition_specs_by_id,
const Schema& schema) {
PlanTableScanResponse response;
ICEBERG_ASSIGN_OR_RAISE(response.plan_status,
GetJsonValue<std::string>(json, kPlanStatus));
ICEBERG_ASSIGN_OR_RAISE(response.plan_id,
GetJsonValueOrDefault<std::string>(json, kPlanId));
ICEBERG_RETURN_UNEXPECTED(
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

Result<FetchPlanningResultResponse> FetchPlanningResultResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
partition_specs_by_id,
const Schema& schema) {
FetchPlanningResultResponse response;
ICEBERG_ASSIGN_OR_RAISE(response.plan_status,
GetJsonValue<std::string>(json, kPlanStatus));
ICEBERG_RETURN_UNEXPECTED(
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
partition_specs_by_id,
const Schema& schema) {
FetchScanTasksResponse response;
ICEBERG_RETURN_UNEXPECTED(
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

#define ICEBERG_DEFINE_FROM_JSON(Model) \
template <> \
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
Expand Down
26 changes: 26 additions & 0 deletions src/iceberg/catalog/rest/json_serde_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

#pragma once

#include <unordered_map>

#include <nlohmann/json_fwd.hpp>

#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/types.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"

/// \file iceberg/catalog/rest/json_serde_internal.h
/// JSON serialization and deserialization for Iceberg REST Catalog API types.
Expand Down Expand Up @@ -62,4 +66,26 @@ ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)

#undef ICEBERG_DECLARE_JSON_SERDE

ICEBERG_REST_EXPORT Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
partition_specs_by_id,
const Schema& schema);

ICEBERG_REST_EXPORT Result<FetchPlanningResultResponse>
FetchPlanningResultResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
partition_specs_by_id,
const Schema& schema);

ICEBERG_REST_EXPORT Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson(
const nlohmann::json& json,
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
partition_specs_by_id,
const Schema& schema);

ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const PlanTableScanRequest& request);
ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request);

} // namespace iceberg::rest
22 changes: 22 additions & 0 deletions src/iceberg/catalog/rest/resource_paths.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,26 @@ Result<std::string> ResourcePaths::CommitTransaction() const {
return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_);
}

Result<std::string> ResourcePaths::ScanPlan(const TableIdentifier& ident) const {
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns));
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name));
return std::format("{}/v1/{}namespaces/{}/tables/{}/plan", base_uri_, prefix_,
encoded_namespace, encoded_table_name);
}

Result<std::string> ResourcePaths::ScanPlan(const TableIdentifier& ident,
const std::string& plan_id) const {
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns));
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name));
return std::format("{}/v1/{}namespaces/{}/tables/{}/plan/{}", base_uri_, prefix_,
encoded_namespace, encoded_table_name, plan_id);
}

Result<std::string> ResourcePaths::ScanTask(const TableIdentifier& ident) const {
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns));
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name));
return std::format("{}/v1/{}namespaces/{}/tables/{}/tasks", base_uri_, prefix_,
encoded_namespace, encoded_table_name);
}

} // namespace iceberg::rest
13 changes: 13 additions & 0 deletions src/iceberg/catalog/rest/resource_paths.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ class ICEBERG_REST_EXPORT ResourcePaths {
/// \brief Get the /v1/{prefix}/transactions/commit endpoint path.
Result<std::string> CommitTransaction() const;

/// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan endpoint
/// path.
Result<std::string> ScanPlan(const TableIdentifier& ident) const;

/// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan_id}
/// endpoint path.
Result<std::string> ScanPlan(const TableIdentifier& ident,
const std::string& plan_id) const;

/// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks endpoint
/// path.
Result<std::string> ScanTask(const TableIdentifier& ident) const;

private:
ResourcePaths(std::string base_uri, const std::string& prefix);

Expand Down
Loading
Loading