-
Notifications
You must be signed in to change notification settings - Fork 459
[Store] feat: Implement a unified storage interface to simplify integration and extension #1185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…le of key-value data, including creation, reading, and synchronizing metadata with the master.
…or metadata changes
# Conflicts: # mooncake-store/include/master_service.h # mooncake-store/include/storage_backend.h # mooncake-store/src/master_client.cpp # mooncake-store/src/master_service.cpp # mooncake-store/src/storage_backend.cpp
# Conflicts: # mooncake-store/include/real_client.h # mooncake-store/src/real_client.cpp # mooncake-store/src/real_client_main.cpp
…nto file-storage-dev
…nto file-storage-dev
…nto file-storage-dev
Summary of ChangesHello @zhangzuo21, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors the storage layer by introducing a unified interface for storage backends. This architectural change allows the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new StorageBackendInterface to abstract different storage implementations, specifically adding a StorageBackendAdaptor for a file-per-key storage mechanism alongside the existing BucketStorageBackend. The FileStorageConfig struct was moved to storage_backend.h and extended with backend-specific parameters like storage_backend_desciptor, fsdir, and enable_eviction, allowing FileStorage to dynamically select and initialize the appropriate backend. The BucketIterator logic was refactored and integrated directly into BucketStorageBackend's ScanMeta, HandleNext, and HasNext methods. Review comments primarily focused on improving code robustness and maintainability: ensuring FileStorageConfig::Validate() correctly propagates validation failures, making StorageBackendAdaptor::ScanMeta()'s directory traversal more flexible, adding missing override keywords to virtual function implementations in StorageBackendAdaptor and BucketStorageBackend, and addressing concerns about FileStorageConfig containing backend-specific fields. Additionally, comments noted that StorageBackendInterface's type_descriptor and config_ members should be private, StorageBackendAdaptor::ScanMeta's total_size calculation might be inconsistent with StorageObjectMetadata, and FileStorageConfig parameters in ScanMeta methods should be const references if not modified.
mooncake-store/src/file_storage.cpp
Outdated
| } | ||
| if(storage_backend_desciptor == "FilePerKeyBackend") { | ||
| auto full_path = std::filesystem::path(storage_filepath) / fsdir; | ||
| ValidatePath(full_path.string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value of ValidatePath(full_path.string()) is not checked here. If ValidatePath returns false, the Validate() function will still proceed and return true, which could lead to an invalid configuration being accepted. It should return false if ValidatePath fails.
if(!ValidatePath(full_path.string())) {
return false;
}| for (char d1 = 'a'; d1 <= 'p'; ++d1) { | ||
| for (char d2 = 'a'; d2 <= 'p'; ++d2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loops for (char d1 = 'a'; d1 <= 'p'; ++d1) and for (char d2 = 'a'; d2 <= 'p'; ++d2) hardcode the directory structure based on the ResolvePath logic. This creates a tight coupling between ScanMeta and ResolvePath. If the hashing scheme or character range in ResolvePath ever changes, this scanning logic will break. It would be more robust to derive the iteration range dynamically or use a more abstract way to traverse the directory structure if possible.
| tl::expected<void, ErrorCode> BatchLoad( | ||
| const std::unordered_map<std::string, Slice>& batched_slices); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| tl::expected<void, ErrorCode> BatchLoad( | ||
| const std::unordered_map<std::string, Slice>& batched_slices); | ||
|
|
||
| tl::expected<bool, ErrorCode> IsExist(const std::string& key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| // Limits for scanning and iteration operations, required by bucket backend only | ||
| int64_t bucket_iterator_keys_limit = | ||
| 20000; // Max number of keys returned per Scan call, required by bucket backend only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| metadatas.emplace_back(StorageObjectMetadata{ | ||
| -1, 0, static_cast<int64_t>(kv.key.size()), | ||
| static_cast<int64_t>(kv.value.size()) | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bucket_id is set to -1 for StorageObjectMetadata. While this might be an acceptable placeholder for the StorageBackendAdaptor (which doesn't use buckets), it's a magic number. Consider defining a named constant (e.g., kNoBucketId) to improve readability and indicate its specific meaning.
metadatas.emplace_back(StorageObjectMetadata{
/*bucket_id=*/-1, 0, static_cast<int64_t>(kv.key.size()),
static_cast<int64_t>(kv.value.size())
});| } | ||
|
|
||
| tl::expected<void, ErrorCode> StorageBackendAdaptor::ScanMeta( | ||
| FileStorageConfig& cfg, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FileStorageConfig& cfg parameter is passed by non-const reference to ScanMeta. This implies that the function might modify the configuration object. However, scanning metadata is typically a read-only operation. If cfg is not intended to be modified, it should be passed as const FileStorageConfig& cfg to ensure immutability and improve safety.
tl::expected<void, ErrorCode> StorageBackendAdaptor::ScanMeta(
const FileStorageConfig& cfg,| // Path where data files are stored on disk | ||
| std::string storage_filepath = "/data/file_storage"; | ||
|
|
||
| // Subdirectory name, required by file per key backend only |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| struct_pb::from_pb(kv, buf); | ||
|
|
||
| total_keys++; | ||
| total_size += buf.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The total_size += buf.size(); line adds the size of the serialized KV object (buf) to total_size. However, the StorageObjectMetadata (created on line 1089) stores kv.value.size(). This discrepancy means total_size will reflect the serialized size (including key and protobuf overhead) rather than just the value size, which might not align with the intended metric for total_size if it's meant to represent only the data payload.
| return enable_offloading; | ||
| } | ||
|
|
||
| tl::expected<void, ErrorCode> BucketStorageBackend::ScanMeta(FileStorageConfig& config_, const std::function<ErrorCode(const std::vector<std::string>& keys, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FileStorageConfig& config_ parameter is passed by non-const reference to ScanMeta. Similar to the StorageBackendAdaptor's ScanMeta, if the configuration is not intended to be modified during the scan, it should be passed as const FileStorageConfig& config_ to enforce read-only access.
tl::expected<void, ErrorCode> BucketStorageBackend::ScanMeta(const FileStorageConfig& config_, const std::function<ErrorCode(const std::vector<std::string>& keys,|
@maheshrbapatu Could you help take a look? Thanks |
| bool enable_eviction = true; | ||
|
|
||
| // Size of the local client-side buffer (used for caching or batching) | ||
| int64_t local_buffer_size = 1280 * 1024 * 1024; // ~1.2 GB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update the constants here if possible.
where kKB = 1024
kMB = kkB * 1024
kGB = kMB * 1024
And all of these can be made constexpr
| enum class FileMode { Read, Write }; | ||
|
|
||
| struct FileStorageConfig { | ||
| // type of the storage backend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I am wrong, Just a suggestion,
Maybe we can break this big structure into child structures.
struct FilePerKeyConfig {
std::string fsdir = "file_per_key_dir";
bool enable_eviction = true;
bool Validate() const {
return !fsdir.empty();
}
};
struct BucketBackendConfig {
int64_t bucket_iterator_keys_limit = 20'000; // tunable
int64_t bucket_size_limit = 256 * kMB; // tunable but bounded
bool Validate() const {
if (bucket_iterator_keys_limit <= 0) return false;
if (bucket_size_limit <= 0 || bucket_size_limit > kMaxBucketSize) return false;
return true;
}
};
struct FileStorageConfig {
// Type: "BucketBackend" or "FilePerKeyBackend"
std::string storage_backend_descriptor = "BucketBackend";
// Root storage path
std::string storage_filepath = "/data/file_storage";
// Optional backend-specific configs
std::optional<FilePerKeyConfig> file_per_key;
std::optional<BucketBackendConfig> bucket_backend;
int64_t local_buffer_size = 1280 * kMB; // ~1.2GB, tunable
int64_t total_keys_limit = kMaxTotalKeys;
int64_t total_size_limit = kMaxTotalStorageSize;
uint32_t heartbeat_interval_seconds = 10;
bool Validate();
bool ValidatePath(const std::string& path);
static FileStorageConfig FromEnvironment();
static std::string GetEnvStringOr(const char* name,
const std::string& default_value);
template <typename T>
static T GetEnvOr(const char* name, T default_value);
};
|
|
||
| bool FileStorageConfig::Validate() const { | ||
| if (storage_filepath.empty()) { | ||
| bool FileStorageConfig::ValidatePath(std::string path) const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also how about providing default implementaion for this in the interface?
do we expect different implementaiton for ValidatePath in BucketStorageBackend, FilePerObject backend?
| } | ||
|
|
||
| bool FileStorageConfig::Validate() const { | ||
| if (storage_backend_descriptor != "FilePerKeyBackend" && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enum class StorageBackendType {
kFilePerKey,
kBucket
};
Enum might be a good idea here,
And we can keep extending this enum as needed for new backends.
| if (!ValidatePath(storage_filepath)) { | ||
| return false; | ||
| } | ||
| if (storage_backend_descriptor == "FilePerKeyBackend") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also switch case might be better over here
bool FileStorageConfig::Validate() const {
if (!ValidatePath(storage_filepath)) {
LOG(ERROR) << "Invalid storage_filepath: " << storage_filepath;
return false;
}
if (total_keys_limit <= 0) {
LOG(ERROR) << "total_keys_limit must be > 0";
return false;
}
if (local_buffer_size <= 0) {
LOG(ERROR) << "local_buffer_size must be > 0";
return false;
}
switch (backend_type) {
case StorageBackendType::kFilePerKey: {
auto full_path =
std::filesystem::path(storage_filepath) / fsdir;
if (!ValidatePath(full_path.string())) {
LOG(ERROR) << "Invalid FilePerKey full path: " << full_path;
return false;
}
break;
}
case StorageBackendType::kBucket: {
if (bucket_keys_limit <= 0) {
LOG(ERROR) << "bucket_keys_limit must be > 0";
return false;
}
if (bucket_size_limit <= 0) {
LOG(ERROR) << "bucket_size_limit must be > 0";
return false;
}
break;
}
default:
LOG(ERROR) << "Unknown storage backend type";
return false;
}
return true;
}
| if (!config.Validate()) { | ||
| throw std::invalid_argument("Invalid FileStorage configuration"); | ||
| } | ||
| if (config.storage_backend_descriptor == "BucketBackend") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we create central factory pattern for creating this storage backend
std::shared_ptr<IStorageBackend>
CreateStorageBackend(const FileStorageConfig& config) {
switch (config.backend_type) {
case StorageBackendType::kBucket:
return std::make_shared<BucketStorageBackend>(config);
case StorageBackendType::kFilePerKey:
return std::make_shared<StorageBackendAdaptor>(config);
default:
LOG(FATAL) << "Unsupported backend type";
}
}
And we can intialize it using factory pattern rather than using strings.
FileStorage::FileStorage(std::shared_ptr<Client> client,
const std::string& local_rpc_addr,
const FileStorageConfig& config)
: client_(client),
local_rpc_addr_(local_rpc_addr),
config_(config),
client_buffer_allocator_(
ClientBufferAllocator::create(config.local_buffer_size, "")) {
if (!config_.Validate()) {
throw std::invalid_argument("Invalid FileStorage configuration");
}
storage_backend_ = CreateStorageBackend(config_);
}
| out.append(reinterpret_cast<const char*>(s.ptr), s.size); | ||
| } | ||
| return out; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an empty line here
|
|
||
| int64_t total_size GUARDED_BY(mutex_); | ||
|
|
||
| struct KV { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a constructor for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also KV seems like an ambiguous name for production code.
struct KVEntry {
std::string key; // K tensor or its storage identifier
std::string value; // V tensor or its storage block
KVEntry() = default;
KVEntry(std::string k, std::string v)
: key(std::move(k)), value(std::move(v)) {}
YLT_REFL(KVEntry, key, value);
};
| metadatas.reserve(batch_object.size()); | ||
| keys.reserve(batch_object.size()); | ||
| MutexLocker lock(&mutex_); | ||
| for (auto object : batch_object) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently unnecessary copy of object.
for (const auto& object : batch_object)
|
|
||
| std::string kv_buf; | ||
| struct_pb::to_pb(kv, kv_buf); | ||
| auto store_result = storage_backend_->StoreObject(path, kv_buf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really want to acquire and keep mutex during the entire duration of IO operation.
what is the exact critical section over here?
If the critical section is only total_keys we can do this.
Keeping the lock for the entire duration of IO will cause scalability issues
{
MutexLocker lock(&mutex_);
total_keys++;
total_size += kv_buf.size();
}
A unified storage backend interface is implemented, it is called by FileStorage class, which manages upper put and load activities. Currently two types of storage backend inherit from the storage backend interface: