Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Added

- Support non-JSON content types in `WriteAttachments` and `ReadAttachments`, [PR-125](https://github.com/reductstore/reduct-cpp/pull/125)

## 1.19.1 - 2026-04-21

### Fixed
Expand Down
98 changes: 24 additions & 74 deletions src/reduct/bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
#endif

#include <nlohmann/json.hpp>
#include <openssl/evp.h>

#include <algorithm>
#include <atomic>
#include <cctype>
#include <chrono>
Expand All @@ -36,43 +34,6 @@ using internal::IHttpClient;
using internal::ParseStatus;
using internal::QueryOptionsToJsonString;

namespace {

std::string Base64Encode(const std::string& input) {
const auto encoded_len = 4 * ((input.size() + 2) / 3);
std::string output(encoded_len + 1, '\0');
EVP_EncodeBlock(reinterpret_cast<unsigned char*>(output.data()), reinterpret_cast<const unsigned char*>(input.data()),
static_cast<int>(input.size()));
output.resize(encoded_len);
return output;
}

std::string Base64Decode(const std::string& input) {
const auto max_decoded_len = 3 * input.size() / 4;
std::string output(max_decoded_len, '\0');
int decoded_len =
EVP_DecodeBlock(reinterpret_cast<unsigned char*>(output.data()),
reinterpret_cast<const unsigned char*>(input.data()), static_cast<int>(input.size()));
// EVP_DecodeBlock doesn't account for padding, trim trailing zeros
if (input.size() >= 2 && input[input.size() - 1] == '=') decoded_len--;
if (input.size() >= 2 && input[input.size() - 2] == '=') decoded_len--;
output.resize(decoded_len);
return output;
}

bool IsJsonContentType(std::string_view content_type) {
auto pos = content_type.find(';');
auto ct = content_type.substr(0, pos);
while (!ct.empty() && ct.back() == ' ') ct.remove_suffix(1);
while (!ct.empty() && ct.front() == ' ') ct.remove_prefix(1);
std::string lower(ct);
std::transform(lower.begin(), lower.end(), lower.begin(), ::tolower);
return lower == "application/json" || lower == "text/json" ||
(lower.size() >= 5 && lower.substr(lower.size() - 5) == "+json");
}

} // namespace

class Bucket : public IBucket {
using BatchType = internal::BatchType;

Expand Down Expand Up @@ -221,31 +182,22 @@ class Bucket : public IBucket {
return ProcessBatchV2(std::move(callback), BatchType::kUpdate);
}

Error WriteAttachments(std::string_view entry_name, const AttachmentMap& attachments,
std::string_view content_type) const noexcept override {
Error WriteAttachments(std::string_view entry_name,
const AttachmentMap& attachments) const noexcept override {
if (attachments.empty()) {
return Error::kOk;
}

const auto ct = content_type.empty() ? "application/json" : std::string(content_type);
const bool is_json = IsJsonContentType(ct);

Batch batch;
const auto meta_entry = fmt::format("{}/$meta", entry_name);
auto timestamp = std::chrono::time_point_cast<std::chrono::microseconds>(Time::clock::now());
for (const auto& [key, payload] : attachments) {
std::string data;
if (is_json) {
try {
[[maybe_unused]] auto parsed = nlohmann::json::parse(payload);
} catch (const std::exception& ex) {
return Error{.code = -1, .message = ex.what()};
}
data = payload;
} else {
data = Base64Decode(payload);
try {
[[maybe_unused]] auto parsed = nlohmann::json::parse(payload);
} catch (const std::exception& ex) {
return Error{.code = -1, .message = ex.what()};
}
batch.AddRecord(meta_entry, timestamp, data, ct, {{"key", key}});
batch.AddRecord(meta_entry, timestamp, payload, "application/json", {{"key", key}});
timestamp += std::chrono::microseconds(1);
}

Expand Down Expand Up @@ -274,17 +226,14 @@ class Bucket : public IBucket {
return false;
}

if (IsJsonContentType(record.content_type)) {
try {
[[maybe_unused]] auto parsed = nlohmann::json::parse(payload);
} catch (const std::exception& ex) {
callback_err = Error{.code = -1, .message = ex.what()};
return false;
}
attachments[key->second] = std::move(payload);
} else {
attachments[key->second] = Base64Encode(payload);
try {
[[maybe_unused]] auto parsed = nlohmann::json::parse(payload);
} catch (const std::exception& ex) {
callback_err = Error{.code = -1, .message = ex.what()};
return false;
}

attachments[key->second] = std::move(payload);
return true;
});

Expand Down Expand Up @@ -326,13 +275,13 @@ class Bucket : public IBucket {

Batch remove_batch;
const auto meta_entry = fmt::format("{}/$meta", entry_name);
auto query_err =
QueryV2({meta_entry}, std::nullopt, std::nullopt, std::move(options), [&remove_batch](const auto& record) {
auto labels = record.labels;
labels["remove"] = "true";
remove_batch.AddOnlyLabels(record.entry, record.timestamp, std::move(labels));
return true;
});
auto query_err = QueryV2({meta_entry}, std::nullopt, std::nullopt, std::move(options),
[&remove_batch](const auto& record) {
auto labels = record.labels;
labels["remove"] = "true";
remove_batch.AddOnlyLabels(record.entry, record.timestamp, std::move(labels));
return true;
});

if (query_err) {
return query_err;
Expand Down Expand Up @@ -897,8 +846,9 @@ class Bucket : public IBucket {
if (internal::IsCompatible("1.19", api_version) && !options.record_entry.has_value()) {
return {{},
Error{.code = -1,
.message = "record entry and timestamp must be provided for ReductStore API v1.19+; use "
"record_entry and record_timestamp"}};
.message =
"record entry and timestamp must be provided for ReductStore API v1.19+; use "
"record_entry and record_timestamp"}};
}

return {std::move(options), Error::kOk};
Expand Down
24 changes: 12 additions & 12 deletions src/reduct/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ class IBucket {
* @param attachments map of attachment keys to JSON payloads
* @return HTTP or communication error
*/
virtual Error WriteAttachments(std::string_view entry_name, const AttachmentMap& attachments,
std::string_view content_type = "") const noexcept = 0;
virtual Error WriteAttachments(std::string_view entry_name,
const AttachmentMap& attachments) const noexcept = 0;

/**
* @brief Read JSON attachments for an entry
Expand All @@ -400,19 +400,19 @@ class IBucket {
* @param attachment_keys attachment keys to remove
* @return HTTP or communication error
*/
virtual Error RemoveAttachments(std::string_view entry_name,
const std::set<std::string>& attachment_keys) const noexcept = 0;
virtual Error RemoveAttachments(std::string_view entry_name, const std::set<std::string>& attachment_keys) const
noexcept = 0;

/**
* Query options
*/
struct QueryOptions {
std::optional<std::string> when; ///< query condition
std::optional<bool> strict; ///< strict mode
std::optional<std::string> ext; ///< additional parameters for extensions
std::optional<std::string> when; ///< query condition
std::optional<bool> strict; ///< strict mode
std::optional<std::string> ext; ///< additional parameters for extensions
std::optional<std::chrono::milliseconds> ttl; ///< time to live
bool continuous = false; ///< continuous query. If true,
/// the method returns the latest record and waits for the next one
bool continuous = false; ///< continuous query. If true,
/// the method returns the latest record and waits for the next one
std::chrono::milliseconds poll_interval = std::chrono::milliseconds(1000); ///< poll interval for continuous query
bool head_only = false; ///< read only metadata
};
Expand Down Expand Up @@ -543,9 +543,9 @@ class IBucket {
std::optional<Time> start;
std::optional<Time> stop;
QueryOptions query_options = {};
uint64_t record_index = 0; // legacy index selector (API < 1.19)
std::optional<std::string> record_entry = std::nullopt; // explicit record entry (API >= 1.19)
std::optional<Time> record_timestamp = std::nullopt; // explicit record timestamp (API >= 1.19)
uint64_t record_index = 0; // legacy index selector (API < 1.19)
std::optional<std::string> record_entry = std::nullopt; // explicit record entry (API >= 1.19)
std::optional<Time> record_timestamp = std::nullopt; // explicit record timestamp (API >= 1.19)
std::optional<Time> expire_at = std::nullopt;
std::optional<std::string> file_name = std::nullopt;
std::optional<std::string> base_url = std::nullopt;
Expand Down
37 changes: 2 additions & 35 deletions tests/reduct/entry_api_test.cc
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright 2022-2024 ReductSoftware UG

#include <catch2/catch.hpp>
#include <nlohmann/json.hpp>

#include <fstream>
#include <map>
#include <nlohmann/json.hpp>
#include <set>
#include <sstream>
#include <vector>
Expand Down Expand Up @@ -782,6 +782,7 @@ TEST_CASE("reduct::IBucket should remove entry attachments with numeric keys", "
REQUIRE(stored_after.empty());
}


TEST_CASE("reduct::IBucket should remove entry attachments with reserved keys", "[entry_api][1_19]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);
Expand All @@ -803,40 +804,6 @@ TEST_CASE("reduct::IBucket should remove entry attachments with reserved keys",
REQUIRE(nlohmann::json::parse(stored.at("meta-1")) == nlohmann::json::parse(attachments.at("meta-1")));
}

TEST_CASE("reduct::IBucket should write and read non-JSON attachments", "[entry_api][1_19]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);
REQUIRE(bucket);

// "3q2+7w==" is base64 for bytes {0xDE, 0xAD, 0xBE, 0xEF}
IBucket::AttachmentMap attachments{
{"binary-data", "3q2+7w=="},
};

REQUIRE(bucket->WriteAttachments("entry-1", attachments, "application/octet-stream") == Error::kOk);

auto [stored, err] = bucket->ReadAttachments("entry-1");
REQUIRE(err == Error::kOk);
REQUIRE(stored.size() == 1);
REQUIRE(stored.at("binary-data") == "3q2+7w==");
}

TEST_CASE("reduct::IBucket should write and read JSON attachments with default content type", "[entry_api][1_19]") {
Fixture ctx;
auto [bucket, _] = ctx.client->CreateBucket(kBucketName);
REQUIRE(bucket);

IBucket::AttachmentMap attachments{
{"meta-1", R"({"enabled":true,"values":[1,2,3]})"},
};

REQUIRE(bucket->WriteAttachments("entry-1", attachments) == Error::kOk);

auto [stored, err] = bucket->ReadAttachments("entry-1");
REQUIRE(err == Error::kOk);
REQUIRE(stored.size() == 1);
REQUIRE(nlohmann::json::parse(stored.at("meta-1")) == nlohmann::json::parse(attachments.at("meta-1")));
}

TEST_CASE("Batch should slice data", "[batch]") {
auto batch = IBucket::Batch();
Expand Down
Loading