Skip to content

Commit 3ab0baf

Browse files
authored
fix: Multiple fixes (#1582)
- remove invalid tenant dir - log sync errors - add tenant to filter
1 parent 6c58308 commit 3ab0baf

4 files changed

Lines changed: 35 additions & 9 deletions

File tree

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@ pub async fn put_stream(
157157
actix_web::http::header::HeaderValue::from_str(&userid).unwrap(),
158158
);
159159
}
160-
sync_streams_with_ingestors(headers, body, &stream_name, &tenant_id).await?;
160+
if let Err(e) = sync_streams_with_ingestors(headers, body, &stream_name, &tenant_id).await {
161+
tracing::error!("{e}");
162+
};
161163

162164
if is_update {
163165
Ok(("Log stream updated", StatusCode::OK))

src/handlers/http/modal/query/querier_rbac.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,9 @@ pub async fn post_user(
9292
// let created_role = user_roles.clone();
9393
Users.put_user(user.clone());
9494

95-
sync_user_creation(&req, user, &None, &tenant_id, &caller_userid).await?;
95+
if let Err(e) = sync_user_creation(&req, user, &None, &tenant_id, &caller_userid).await {
96+
tracing::error!("{e}");
97+
};
9698

9799
Ok(password)
98100
}
@@ -161,7 +163,11 @@ pub async fn delete_user(
161163
}
162164
put_metadata(&metadata, &tenant_id).await?;
163165

164-
sync_user_deletion_with_ingestors(&req, &userid, &tenant_id, &caller_userid).await?;
166+
if let Err(e) =
167+
sync_user_deletion_with_ingestors(&req, &userid, &tenant_id, &caller_userid).await
168+
{
169+
tracing::error!("{e}");
170+
};
165171

166172
// update in mem table
167173
Users.delete_user(&userid, &tenant_id);
@@ -380,7 +386,9 @@ pub async fn post_gen_password(
380386
put_metadata(&metadata, &tenant_id).await?;
381387
Users.change_password_hash(&username, &new_hash, &tenant_id);
382388

383-
sync_password_reset_with_ingestors(req, &username, &caller_userid).await?;
389+
if let Err(e) = sync_password_reset_with_ingestors(req, &username, &caller_userid).await {
390+
tracing::error!("{e}");
391+
};
384392

385393
Ok(new_password)
386394
}

src/handlers/http/users/filters.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pub async fn post(
7171
filter.filter_id = Some(filter_id.clone());
7272
filter.user_id = Some(user_id.clone());
7373
filter.version = Some(CURRENT_FILTER_VERSION.to_string());
74+
filter.tenant_id.clone_from(&tenant_id);
7475
PARSEABLE.metastore.put_filter(&filter, &tenant_id).await?;
7576
FILTERS.update(&filter, &tenant_id).await;
7677

@@ -99,6 +100,7 @@ pub async fn update(
99100
filter.filter_id = Some(filter_id.clone());
100101
filter.user_id = Some(user_id.clone());
101102
filter.version = Some(CURRENT_FILTER_VERSION.to_string());
103+
filter.tenant_id.clone_from(&tenant_id);
102104

103105
PARSEABLE.metastore.put_filter(&filter, &tenant_id).await?;
104106
FILTERS.update(&filter, &tenant_id).await;

src/parseable/mod.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,8 +1148,10 @@ impl Parseable {
11481148
.filter(|d| !d.starts_with("."))
11491149
.collect();
11501150

1151+
let mut recognized_tenants = vec![];
1152+
11511153
// validate the possible presence of tenant storage metadata
1152-
for tenant_id in dirs.iter() {
1154+
for tenant_id in dirs.into_iter() {
11531155
if let Some(meta) = PARSEABLE
11541156
.metastore
11551157
.get_parseable_metadata(&Some(tenant_id.clone()))
@@ -1159,11 +1161,23 @@ impl Parseable {
11591161
let metadata: StorageMetadata = serde_json::from_slice(&meta)?;
11601162

11611163
TENANT_METADATA.insert_tenant(tenant_id.clone(), metadata.clone());
1164+
recognized_tenants.push(tenant_id);
11621165
} else if !is_multi_tenant {
11631166
} else {
1164-
return Err(anyhow::Error::msg(format!(
1165-
"Found invalid tenant directory with multi-tenant mode- {tenant_id}.\nExiting."
1166-
)));
1167+
// found an invalid multi-tenant directory, delete
1168+
tracing::warn!(
1169+
"Found invalid tenant directory with multi-tenant mode- {tenant_id}.\nDeleting."
1170+
);
1171+
1172+
// delete from storage
1173+
if let Err(e) = obj_store
1174+
.delete_prefix(&RelativePathBuf::from_iter([&tenant_id]), &None)
1175+
.await
1176+
{
1177+
tracing::error!(
1178+
"Unable to delete invalid directory- {tenant_id} with error-\n{e}"
1179+
);
1180+
};
11671181
}
11681182
}
11691183

@@ -1172,7 +1186,7 @@ impl Parseable {
11721186
.tenants
11731187
.write()
11741188
.expect("Unable to write to in-memory tenant map");
1175-
t.extend(dirs);
1189+
t.extend(recognized_tenants);
11761190
Ok(())
11771191
} else {
11781192
Ok(())

0 commit comments

Comments
 (0)