Skip to content

Commit 8caed34

Browse files
committed
coderabbit suggestions
1 parent 13f6f22 commit 8caed34

11 files changed

Lines changed: 50 additions & 28 deletions

File tree

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ impl ParseableSinkProcessor {
6464
vec![log_source_entry],
6565
TelemetryType::default(),
6666
tenant_id,
67+
None,
6768
)
6869
.await?;
6970

src/correlation.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,11 @@ impl Correlations {
139139
.await?;
140140
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
141141
// Update in memory
142-
if let Some(corrs) = self.write().await.get_mut(tenant) {
143-
corrs.insert(correlation.id.to_owned(), correlation.clone());
144-
}
142+
self.write()
143+
.await
144+
.entry(tenant.to_string())
145+
.or_default()
146+
.insert(correlation.id.to_owned(), correlation.clone());
145147

146148
Ok(correlation)
147149
}

src/handlers/http/ingest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ pub async fn ingest(
120120
vec![log_source_entry.clone()],
121121
telemetry_type,
122122
&tenant_id,
123-
None
123+
None,
124124
)
125125
.await
126126
.map_err(|e| {
@@ -239,7 +239,7 @@ pub async fn setup_otel_stream(
239239
vec![log_source_entry.clone()],
240240
telemetry_type,
241241
&tenant_id,
242-
None
242+
None,
243243
)
244244
.await?;
245245
let mut time_partition = None;

src/handlers/http/middleware.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ where
168168
// if action is Ingest and multi-tenancy is on, then request MUST have tenant id
169169
// else check for the presence of tenant id using other details
170170

171-
// an optional error to track the presence of CORRECTtenant header in case of ingestion
171+
// an optional error to track the presence of CORRECT tenant header in case of ingestion
172172
let mut header_error = None;
173173
let user_and_tenant_id: Result<(Result<String, RBACError>, Option<String>), RBACError> =
174174
if PARSEABLE.options.is_multi_tenant() {
@@ -205,12 +205,6 @@ where
205205
t = tenant;
206206
}
207207
t
208-
// else {
209-
// header_error = Some(actix_web::Error::from(PostError::Header(
210-
// crate::utils::header_parsing::ParseHeaderError::InvalidTenantId,
211-
// )));
212-
// None
213-
// }
214208
};
215209
let userid = get_user_from_request(req.request());
216210
Ok((userid, tenant))

src/metastore/metastores/object_store_metastore.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,9 @@ impl Metastore for ObjectStoreMetastore {
514514

515515
/// Fetch all dashboards
516516
async fn get_dashboards(&self) -> Result<HashMap<String, Vec<Bytes>>, MetastoreError> {
517-
let mut dashboards = HashMap::new();
517+
let mut dashboards: HashMap<String, Vec<Bytes>> = HashMap::new();
518518
let base_paths = PARSEABLE.list_tenants().unwrap_or_else(|| vec!["".into()]);
519-
for mut tenant in base_paths {
519+
for tenant in base_paths {
520520
let tenant_id = &Some(tenant.clone());
521521
let users_dir = RelativePathBuf::from_iter([&tenant, USERS_ROOT_DIR]);
522522
for user in self
@@ -533,11 +533,16 @@ impl Metastore for ObjectStoreMetastore {
533533
tenant_id,
534534
)
535535
.await?;
536-
if tenant.is_empty() {
537-
tenant.clone_from(&DEFAULT_TENANT.to_string());
538-
}
539-
dashboards.insert(tenant.to_owned(), dashboard_bytes);
540-
// dashboards.extend(dashboard_bytes);
536+
537+
let tenant_key = if tenant.is_empty() {
538+
DEFAULT_TENANT.to_string()
539+
} else {
540+
tenant.clone()
541+
};
542+
dashboards
543+
.entry(tenant_key)
544+
.or_default()
545+
.extend(dashboard_bytes);
541546
}
542547
}
543548

src/parseable/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ impl Parseable {
473473
vec![log_source_entry.clone()],
474474
TelemetryType::Logs,
475475
&tenant_id,
476-
None
476+
None,
477477
)
478478
.await;
479479

@@ -512,6 +512,7 @@ impl Parseable {
512512
}
513513

514514
// Check if the stream exists and create a new stream if doesn't exist
515+
#[allow(clippy::too_many_arguments)]
515516
pub async fn create_stream_if_not_exists(
516517
&self,
517518
stream_name: &str,

src/prism/home/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
13
/*
24
* Parseable Server (C) 2022 - 2025 Parseable, Inc.
35
*
@@ -27,7 +29,7 @@ use crate::{
2729
event::format::{LogSource, LogSourceEntry},
2830
handlers::{TelemetryType, http::logstream::error::StreamError},
2931
metastore::MetastoreError,
30-
parseable::PARSEABLE,
32+
parseable::{DEFAULT_TENANT, PARSEABLE},
3133
rbac::{
3234
Users,
3335
map::{SessionKey, users},
@@ -155,7 +157,10 @@ pub async fn generate_home_response(
155157

156158
// Generate checklist and count triggered alerts
157159
let data_ingested = datasets.iter().any(|d| d.ingestion);
158-
let user_count = users().len();
160+
let user_count = users()
161+
.get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT))
162+
.unwrap_or(&HashMap::default())
163+
.len();
159164
let user_added = user_count > 1; // more than just the default admin user
160165

161166
// Calculate triggered alerts count

src/rbac/user.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,9 @@ impl UserGroup {
349349
pub fn validate(&self, tenant_id: &Option<String>) -> Result<(), RBACError> {
350350
let valid_name = is_valid_group_name(&self.name);
351351
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
352-
if read_user_groups().contains_key(&self.name) {
352+
if let Some(tenant_ug) = read_user_groups().get(tenant)
353+
&& tenant_ug.contains_key(&self.name)
354+
{
353355
return Err(RBACError::UserGroupExists(self.name.clone()));
354356
}
355357
let mut non_existent_roles = Vec::new();

src/storage/retention.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,13 @@ pub struct Task {
109109
}
110110

111111
impl Task {
112-
pub fn new(description: String, days: u32) -> Self {
112+
pub fn new(description: String, mut days: u32) -> Self {
113+
if days.eq(&0) {
114+
days = 7;
115+
tracing::warn!(
116+
"Using default 7 days for retention since an invalid value of 0 was provided by the user"
117+
);
118+
}
113119
let days = NonZeroU32::new(days).unwrap();
114120
Self {
115121
description,

src/tenants/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,18 @@ impl TenantMetadata {
7070
pub fn suspend_service(&self, tenant_id: &str, service: Service) {
7171
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
7272
tenant.suspended_services.insert(service);
73+
tenant.meta.suspended_services = Some(tenant.suspended_services.clone());
7374
}
7475
}
7576

7677
pub fn resume_service(&self, tenant_id: &str, service: Service) {
7778
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
7879
tenant.suspended_services.remove(&service);
80+
tenant.meta.suspended_services = if tenant.suspended_services.is_empty() {
81+
None
82+
} else {
83+
Some(tenant.suspended_services.clone())
84+
};
7985
}
8086
}
8187

0 commit comments

Comments
 (0)