Skip to content

Commit 4a3e23c

Browse files
add tenant_id params in new handlers
1 parent 87ff8e0 commit 4a3e23c

3 files changed

Lines changed: 28 additions & 13 deletions

File tree

src/handlers/http/datasets.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919
use std::collections::HashSet;
2020

2121
use actix_web::http::StatusCode;
22-
use actix_web::{HttpResponse, web};
22+
use actix_web::{HttpRequest, HttpResponse, web};
2323
use serde::{Deserialize, Serialize};
2424

25+
use crate::utils::get_tenant_id_from_request;
2526
use crate::{
2627
handlers::DatasetTag,
2728
parseable::PARSEABLE,
@@ -39,12 +40,13 @@ struct CorrelatedDataset {
3940
/// GET /api/v1/datasets/{name}/correlated
4041
/// Returns all datasets sharing at least one tag or label with the named dataset.
4142
pub async fn get_correlated_datasets(
43+
req: HttpRequest,
4244
path: web::Path<String>,
4345
) -> Result<HttpResponse, DatasetsError> {
4446
let dataset_name = path.into_inner();
45-
47+
let tenant_id = get_tenant_id_from_request(&req);
4648
let stream = PARSEABLE
47-
.get_stream(&dataset_name)
49+
.get_stream(&dataset_name, &tenant_id)
4850
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
4951

5052
let target_tags: HashSet<DatasetTag> = stream.get_dataset_tags().into_iter().collect();
@@ -54,14 +56,14 @@ pub async fn get_correlated_datasets(
5456
return Ok(HttpResponse::Ok().json(Vec::<CorrelatedDataset>::new()));
5557
}
5658

57-
let all_streams = PARSEABLE.streams.list();
59+
let all_streams = PARSEABLE.streams.list(&tenant_id);
5860
let mut correlated = Vec::new();
5961

6062
for name in all_streams {
6163
if name == dataset_name {
6264
continue;
6365
}
64-
if let Ok(s) = PARSEABLE.get_stream(&name) {
66+
if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
6567
// Skip internal streams
6668
if s.get_stream_type() == StreamType::Internal {
6769
continue;
@@ -89,16 +91,20 @@ pub async fn get_correlated_datasets(
8991

9092
/// GET /api/v1/datasets/tags/{tag}
9193
/// Returns all datasets that have the specified tag.
92-
pub async fn get_datasets_by_tag(path: web::Path<String>) -> Result<HttpResponse, DatasetsError> {
94+
pub async fn get_datasets_by_tag(
95+
req: HttpRequest,
96+
path: web::Path<String>,
97+
) -> Result<HttpResponse, DatasetsError> {
98+
let tenant_id = get_tenant_id_from_request(&req);
9399
let tag_str = path.into_inner();
94100
let tag =
95101
DatasetTag::try_from(tag_str.as_str()).map_err(|_| DatasetsError::InvalidTag(tag_str))?;
96102

97-
let all_streams = PARSEABLE.streams.list();
103+
let all_streams = PARSEABLE.streams.list(&tenant_id);
98104
let mut matching = Vec::new();
99105

100106
for name in all_streams {
101-
if let Ok(s) = PARSEABLE.get_stream(&name) {
107+
if let Ok(s) = PARSEABLE.get_stream(&name, &tenant_id) {
102108
if s.get_stream_type() == StreamType::Internal {
103109
continue;
104110
}
@@ -121,14 +127,16 @@ pub struct PutDatasetMetadataBody {
121127
/// Replaces the dataset's tags and/or labels.
122128
/// Only fields present in the body are updated; absent fields are left unchanged.
123129
pub async fn put_dataset_metadata(
130+
req: HttpRequest,
124131
path: web::Path<String>,
125132
body: web::Json<PutDatasetMetadataBody>,
126133
) -> Result<HttpResponse, DatasetsError> {
127134
let dataset_name = path.into_inner();
128135
let body = body.into_inner();
136+
let tenant_id = get_tenant_id_from_request(&req);
129137

130138
let stream = PARSEABLE
131-
.get_stream(&dataset_name)
139+
.get_stream(&dataset_name, &tenant_id)
132140
.map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
133141

134142
let final_tags = match body.tags {
@@ -151,7 +159,12 @@ pub async fn put_dataset_metadata(
151159
// Update storage first, then in-memory
152160
let storage = PARSEABLE.storage.get_object_store();
153161
storage
154-
.update_dataset_tags_and_labels_in_stream(&dataset_name, &final_tags, &final_labels)
162+
.update_dataset_tags_and_labels_in_stream(
163+
&dataset_name,
164+
&final_tags,
165+
&final_labels,
166+
&tenant_id,
167+
)
155168
.await
156169
.map_err(DatasetsError::Storage)?;
157170

src/parseable/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,8 @@ impl Parseable {
475475
vec![log_source_entry.clone()],
476476
TelemetryType::Logs,
477477
&tenant_id,
478-
None,
478+
vec![],
479+
vec![]
479480
)
480481
.await;
481482

src/storage/object_storage.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,19 +501,20 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
501501
stream_name: &str,
502502
tags: &[DatasetTag],
503503
labels: &[String],
504+
tenant_id: &Option<String>,
504505
) -> Result<(), ObjectStorageError> {
505506
let mut format: ObjectStoreFormat = serde_json::from_slice(
506507
&PARSEABLE
507508
.metastore
508-
.get_stream_json(stream_name, false)
509+
.get_stream_json(stream_name, false, tenant_id)
509510
.await
510511
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?,
511512
)?;
512513
format.dataset_tags = tags.to_owned();
513514
format.dataset_labels = labels.to_owned();
514515
PARSEABLE
515516
.metastore
516-
.put_stream_json(&format, stream_name)
517+
.put_stream_json(&format, stream_name, tenant_id)
517518
.await
518519
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?;
519520

0 commit comments

Comments
 (0)