-
Notifications
You must be signed in to change notification settings - Fork 0
feat: add replication factor support and chunk location registration #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,7 @@ impl MonoceClient { | |
| } | ||
| } | ||
|
|
||
| pub async fn allocate_chunks(&self, chunks: &[(Cid, u64)]) -> Result<AllocationResponse> { | ||
| pub async fn allocate_chunks(&self, chunks: &[(Cid, u64)], replication_factor: u32) -> Result<AllocationResponse> { | ||
| let url = format!("{}/api/v1/sessions", self.endpoint); | ||
| let request = AllocateRequest { | ||
| chunks: chunks.iter() | ||
|
|
@@ -27,6 +27,7 @@ impl MonoceClient { | |
| size: *size, | ||
| }) | ||
| .collect(), | ||
| replication_factor, | ||
| }; | ||
|
|
||
| let response = self | ||
|
|
@@ -61,6 +62,29 @@ impl MonoceClient { | |
| .error_for_status() | ||
| .map_err(map_reqwest_err)?; | ||
|
|
||
| // Register chunk location with gateway | ||
| self.register_chunk_location(&cid.to_hex(), node_addr).await?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| pub async fn register_chunk_location(&self, cid_hex: &str, node_addr: &str) -> Result<()> { | ||
|
||
| let url = format!("{}/api/v1/chunks/{}/locations", self.endpoint, cid_hex); | ||
| let node_address = if node_addr.starts_with("http://") || node_addr.starts_with("https://") { | ||
| node_addr.to_string() | ||
| } else { | ||
| format!("http://{}", node_addr) | ||
| }; | ||
|
Comment on lines
+73
to
+77
|
||
|
|
||
| self.http | ||
| .post(&url) | ||
| .json(&serde_json::json!({"node_address": node_address})) | ||
| .send() | ||
| .await | ||
| .map_err(map_reqwest_err)? | ||
| .error_for_status() | ||
| .map_err(map_reqwest_err)?; | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
|
|
@@ -264,6 +288,7 @@ impl MonoceClient { | |
| #[derive(Serialize)] | ||
| struct AllocateRequest { | ||
| chunks: Vec<ChunkAllocInfo>, | ||
| replication_factor: u32, | ||
| } | ||
|
|
||
| #[derive(Serialize)] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,7 +10,7 @@ use std::path::Path; | |
| use crate::chunker::{Chunker, DEFAULT_CHUNK_SIZE}; | ||
| use crate::client::{MonoceClient, NamespaceEntry}; | ||
|
|
||
| pub async fn put(endpoint: &str, path: &str, name: Option<&str>) -> Result<()> { | ||
| pub async fn put(endpoint: &str, path: &str, name: Option<&str>, replication_factor: u32) -> Result<()> { | ||
|
||
| let file = File::open(path).context("failed to open file")?; | ||
| let file_size = file.metadata()?.len(); | ||
| let file_name = Path::new(path) | ||
|
|
@@ -32,7 +32,7 @@ pub async fn put(endpoint: &str, path: &str, name: Option<&str>) -> Result<()> { | |
| .map(|c| (c.cid.clone(), c.size)) | ||
| .collect(); | ||
| let allocation = client | ||
| .allocate_chunks(&chunk_info) | ||
| .allocate_chunks(&chunk_info, replication_factor) | ||
| .await | ||
| .context("failed to allocate chunks")?; | ||
|
|
||
|
|
@@ -155,19 +155,19 @@ pub async fn get(endpoint: &str, target: &str, output: Option<&str>) -> Result<( | |
| println!("Downloading chunks..."); | ||
|
|
||
| for (i, chunk_ref) in manifest.chunks.iter().enumerate() { | ||
| let chunk_cid = chunk_ref.chunk_cid.to_string(); | ||
| let chunk_cid_hex = chunk_ref.chunk_cid.to_hex(); | ||
| let locations = client | ||
| .get_chunk_locations(&chunk_cid) | ||
| .get_chunk_locations(&chunk_cid_hex) | ||
| .await | ||
| .context("failed to get chunk locations")?; | ||
|
|
||
| if locations.is_empty() { | ||
| anyhow::bail!("no nodes available for chunk {}", chunk_cid); | ||
| anyhow::bail!("no nodes available for chunk {}", chunk_cid_hex); | ||
| } | ||
|
|
||
| let mut downloaded = false; | ||
| for node_addr in &locations { | ||
| match client.download_chunk(&chunk_cid, node_addr).await { | ||
| match client.download_chunk(&chunk_cid_hex, node_addr).await { | ||
| Ok(data) => { | ||
| writer.write_all(&data)?; | ||
| downloaded = true; | ||
|
|
@@ -180,7 +180,7 @@ pub async fn get(endpoint: &str, target: &str, output: Option<&str>) -> Result<( | |
| } | ||
|
|
||
| if !downloaded { | ||
| anyhow::bail!("failed to download chunk {} from any node", chunk_cid); | ||
| anyhow::bail!("failed to download chunk {} from any node", chunk_cid_hex); | ||
| } | ||
|
|
||
| print!("\r [{}/{}] chunks", i + 1, manifest.chunks.len()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,6 +22,8 @@ enum Commands { | |||||||||||||||||
| path: String, | ||||||||||||||||||
| #[arg(long, help = "Target namespace path")] | ||||||||||||||||||
| name: Option<String>, | ||||||||||||||||||
| #[arg(short = 'r', long, default_value = "1", help = "Replication factor")] | ||||||||||||||||||
|
||||||||||||||||||
| #[arg(short = 'r', long, default_value = "1", help = "Replication factor")] | |
| #[arg( | |
| short = 'r', | |
| long, | |
| default_value_t = 1u32, | |
| value_parser = clap::value_parser!(u32).range(1..=100), | |
| help = "Replication factor (1-100)" | |
| )] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new public method register_chunk_location lacks documentation. Consider adding a doc comment explaining its purpose, parameters, and expected behavior, especially since it's a public API that could be used by other parts of the codebase.