Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmake_modules/IcebergBuildUtils.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ function(add_iceberg_lib LIB_NAME)
hidden
VISIBILITY_INLINES_HIDDEN 1)

if(MSVC_TOOLCHAIN)
target_compile_options(${LIB_NAME}_shared PRIVATE /bigobj)
endif()

install(TARGETS ${LIB_NAME}_shared
EXPORT iceberg_targets
ARCHIVE DESTINATION ${INSTALL_ARCHIVE_DIR}
Expand Down Expand Up @@ -220,6 +224,10 @@ function(add_iceberg_lib LIB_NAME)
target_compile_definitions(${LIB_NAME}_static PUBLIC ${VISIBILITY_NAME}_STATIC)
endif()

if(MSVC_TOOLCHAIN)
target_compile_options(${LIB_NAME}_static PRIVATE /bigobj)
endif()

install(TARGETS ${LIB_NAME}_static
EXPORT iceberg_targets
ARCHIVE DESTINATION ${INSTALL_ARCHIVE_DIR}
Expand Down
34 changes: 28 additions & 6 deletions src/iceberg/catalog/rest/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,33 @@ Status HandleFailureResponse(const cpr::Response& response,
} // namespace

void HttpClient::PrepareSession(
const std::string& path, const std::unordered_map<std::string, std::string>& params,
const std::string& path, HttpMethod method,
const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers) {
session_->SetUrl(cpr::Url{path});
session_->SetParameters(GetParameters(params));
session_->RemoveContent();
// clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set
// to 1 by POST requests, and this state is not reset by RemoveContent(), so we must
// manually enforce HTTP GET to clear it.
curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
switch (method) {
case HttpMethod::kGet:
session_->PrepareGet();
break;
case HttpMethod::kPost:
session_->PreparePost();
break;
case HttpMethod::kPut:
session_->PreparePut();
break;
case HttpMethod::kDelete:
session_->PrepareDelete();
break;
case HttpMethod::kHead:
session_->PrepareHead();
break;
}
auto final_headers = MergeHeaders(default_headers_, headers);
session_->SetHeader(final_headers);
}
Expand All @@ -163,7 +185,7 @@ Result<HttpResponse> HttpClient::Get(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, params, headers);
PrepareSession(path, HttpMethod::kGet, params, headers);
response = session_->Get();
}

Expand All @@ -180,7 +202,7 @@ Result<HttpResponse> HttpClient::Post(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, /*params=*/{}, headers);
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, headers);
session_->SetBody(cpr::Body{body});
response = session_->Post();
}
Expand All @@ -205,7 +227,7 @@ Result<HttpResponse> HttpClient::PostForm(
auto form_headers = headers;
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;

PrepareSession(path, /*params=*/{}, form_headers);
PrepareSession(path, HttpMethod::kPost, /*params=*/{}, form_headers);
std::vector<cpr::Pair> pair_list;
pair_list.reserve(form_data.size());
for (const auto& [key, val] : form_data) {
Expand All @@ -228,7 +250,7 @@ Result<HttpResponse> HttpClient::Head(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, /*params=*/{}, headers);
PrepareSession(path, HttpMethod::kHead, /*params=*/{}, headers);
response = session_->Head();
}

Expand All @@ -245,7 +267,7 @@ Result<HttpResponse> HttpClient::Delete(
cpr::Response response;
{
std::lock_guard guard(session_mutex_);
PrepareSession(path, params, headers);
PrepareSession(path, HttpMethod::kDelete, params, headers);
response = session_->Delete();
}

Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/catalog/rest/http_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <string>
#include <unordered_map>

#include "iceberg/catalog/rest/endpoint.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/type_fwd.h"
#include "iceberg/result.h"
Expand Down Expand Up @@ -109,7 +110,7 @@ class ICEBERG_REST_EXPORT HttpClient {
const ErrorHandler& error_handler);

private:
void PrepareSession(const std::string& path,
void PrepareSession(const std::string& path, HttpMethod method,
const std::unordered_map<std::string, std::string>& params,
const std::unordered_map<std::string, std::string>& headers);

Expand Down
75 changes: 75 additions & 0 deletions src/iceberg/catalog/rest/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "iceberg/partition_spec.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/json_util_internal.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -69,6 +71,8 @@ constexpr std::string_view kType = "type";
constexpr std::string_view kCode = "code";
constexpr std::string_view kStack = "stack";
constexpr std::string_view kError = "error";
constexpr std::string_view kIdentifier = "identifier";
constexpr std::string_view kRequirements = "requirements";

} // namespace

Expand Down Expand Up @@ -390,6 +394,75 @@ Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json
return request;
}

// CommitTableRequest serialization
nlohmann::json ToJson(const CommitTableRequest& request) {
nlohmann::json json;
if (!request.identifier.name.empty()) {
json[kIdentifier] = ToJson(request.identifier);
}

nlohmann::json requirements_json = nlohmann::json::array();
for (const auto& req : request.requirements) {
requirements_json.push_back(ToJson(*req));
}
json[kRequirements] = std::move(requirements_json);

nlohmann::json updates_json = nlohmann::json::array();
for (const auto& update : request.updates) {
updates_json.push_back(ToJson(*update));
}
json[kUpdates] = std::move(updates_json);

return json;
}

Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& json) {
CommitTableRequest request;
if (json.contains(kIdentifier)) {
ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
GetJsonValue<nlohmann::json>(json, kIdentifier));
ICEBERG_ASSIGN_OR_RAISE(request.identifier, TableIdentifierFromJson(identifier_json));
}

ICEBERG_ASSIGN_OR_RAISE(auto requirements_json,
GetJsonValue<nlohmann::json>(json, kRequirements));
for (const auto& req_json : requirements_json) {
ICEBERG_ASSIGN_OR_RAISE(auto requirement, TableRequirementFromJson(req_json));
request.requirements.push_back(std::move(requirement));
}

ICEBERG_ASSIGN_OR_RAISE(auto updates_json,
GetJsonValue<nlohmann::json>(json, kUpdates));
for (const auto& update_json : updates_json) {
ICEBERG_ASSIGN_OR_RAISE(auto update, TableUpdateFromJson(update_json));
request.updates.push_back(std::move(update));
}

ICEBERG_RETURN_UNEXPECTED(request.Validate());
return request;
}

// CommitTableResponse serialization
nlohmann::json ToJson(const CommitTableResponse& response) {
nlohmann::json json;
json[kMetadataLocation] = response.metadata_location;
if (response.metadata) {
json[kMetadata] = ToJson(*response.metadata);
}
return json;
}

Result<CommitTableResponse> CommitTableResponseFromJson(const nlohmann::json& json) {
CommitTableResponse response;
ICEBERG_ASSIGN_OR_RAISE(response.metadata_location,
GetJsonValue<std::string>(json, kMetadataLocation));
ICEBERG_ASSIGN_OR_RAISE(auto metadata_json,
GetJsonValue<nlohmann::json>(json, kMetadata));
ICEBERG_ASSIGN_OR_RAISE(response.metadata, TableMetadataFromJson(metadata_json));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

#define ICEBERG_DEFINE_FROM_JSON(Model) \
template <> \
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
Expand All @@ -409,5 +482,7 @@ ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)

} // namespace iceberg::rest
2 changes: 2 additions & 0 deletions src/iceberg/catalog/rest/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)

#undef ICEBERG_DECLARE_JSON_SERDE

Expand Down
98 changes: 84 additions & 14 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/table.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/macros.h"

namespace iceberg::rest {
Expand Down Expand Up @@ -175,7 +177,7 @@ Result<std::vector<Namespace>> RestCatalog::ListNamespaces(const Namespace& ns)
if (list_response.next_page_token.empty()) {
return result;
}
next_token = list_response.next_page_token;
next_token = std::move(list_response.next_page_token);
}
return result;
}
Expand Down Expand Up @@ -244,9 +246,30 @@ Status RestCatalog::UpdateNamespaceProperties(
return {};
}

Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
[[maybe_unused]] const Namespace& ns) const {
return NotImplemented("Not implemented");
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::ListTables());

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
std::vector<TableIdentifier> result;
std::string next_token;
while (true) {
std::unordered_map<std::string, std::string> params;
if (!next_token.empty()) {
params[kQueryParamPageToken] = next_token;
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListTablesResponseFromJson(json));
result.insert(result.end(), list_response.identifiers.begin(),
list_response.identifiers.end());
if (list_response.next_page_token.empty()) {
return result;
}
next_token = std::move(list_response.next_page_token);
}
return result;
}

Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
Expand Down Expand Up @@ -280,10 +303,33 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
}

Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
return NotImplemented("Not implemented");
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::UpdateTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));

CommitTableRequest request{.identifier = identifier};
request.requirements.reserve(requirements.size());
for (const auto& req : requirements) {
request.requirements.push_back(req->Clone());
}
request.updates.reserve(updates.size());
for (const auto& update : updates) {
request.updates.push_back(update->Clone());
}

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto commit_response, CommitTableResponseFromJson(json));

return Table::Make(identifier, std::move(commit_response.metadata),
std::move(commit_response.metadata_location), file_io_,
shared_from_this());
}

Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
Expand Down Expand Up @@ -321,9 +367,17 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
}

Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
[[maybe_unused]] const TableIdentifier& to) {
return NotImplemented("Not implemented");
Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RenameTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Rename());

RenameTableRequest request{.source = from, .destination = to};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));

return {};
}

Result<std::string> RestCatalog::LoadTableInternal(
Expand All @@ -350,9 +404,25 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
}

Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const std::string& metadata_file_location) {
return NotImplemented("Not implemented");
const TableIdentifier& identifier, const std::string& metadata_file_location) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));

RegisterTableRequest request{
.name = identifier.name,
.metadata_location = metadata_file_location,
};

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
shared_from_this());
}

} // namespace iceberg::rest
Loading
Loading