Skip to content

add system defined tags and free form labels to datasets#1553

Merged
nikhilsinhaparseable merged 16 commits intoparseablehq:mainfrom
nikhilsinhaparseable:dataset-tags-lables
Mar 28, 2026
Merged

add system defined tags and free form labels to datasets#1553
nikhilsinhaparseable merged 16 commits intoparseablehq:mainfrom
nikhilsinhaparseable:dataset-tags-lables

Conversation

@nikhilsinhaparseable
Copy link
Copy Markdown
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Feb 21, 2026

  • PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags
  • and X-P-Dataset-Labels headers (comma-separated) on stream creation
  • include tags and labels in home api response and info response

Summary by CodeRabbit

  • New Features

    • Support multiple dataset tags and labels per stream; new request headers for tags/labels
    • Nested logical groups for alert conditions, enabling grouped boolean expressions
  • Improvements

    • OTEL ingestion and dataset APIs accept and propagate dataset tags/labels
    • Dataset listing is tenant-scoped
    • Storage can persist and update dataset tags and labels
  • Bug Fixes

    • Alert filter builder defaults missing operators to AND and validates empty expressions

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 21, 2026

Walkthrough

Replaces a single optional dataset_tag with dataset_tags: Vec<DatasetTag> and dataset_labels: Vec<String>, updating metadata, stream-creation APIs, header parsing, object storage, prism/home responses, several callsites, and alert condition handling to support multi-value tags/labels and nested condition groups. (50 words)

Changes

Cohort / File(s) Summary
Core Metadata & Defaults
src/metadata.rs, src/storage/mod.rs, src/migration/mod.rs
Replaced dataset_tag: Option<DatasetTag> with dataset_tags: Vec<DatasetTag> and added dataset_labels: Vec<String> across metadata structs, defaults, and migration deserialization.
Stream Creation & Parseable API
src/parseable/mod.rs, src/parseable/streams.rs, src/connectors/kafka/processor.rs
Signatures updated: create_stream_if_not_exists/create_stream now accept dataset_tags and dataset_labels. Stream accessors added for tags/labels; call sites updated to pass vec![] where None was used.
Object Storage & Merge Logic
src/storage/object_storage.rs, src/storage/object_storage.rs (create_stream_from_ingestor)
Added ObjectStorage::update_dataset_tags_and_labels_in_stream. Merging ingestor metadata now aggregates and deduplicates dataset_tags and dataset_labels.
HTTP Handlers & Header Parsing
src/handlers/mod.rs, src/handlers/http/ingest.rs, src/handlers/http/modal/utils/logstream_utils.rs, src/handlers/http/prism_logstream.rs
Added DATASET_TAGS_KEY and DATASET_LABELS_KEY. PutStreamHeaders now parses dataset_tags: Vec<DatasetTag> and dataset_labels: Vec<String>. setup_otel_stream and OTEL handlers accept/pass tags/labels; prism dataset retrieval was made tenant-scoped.
Prism / Home Response
src/prism/home/mod.rs, src/prism/logstream/mod.rs
DataSet now exposes tags: Vec<DatasetTag> and labels: Vec<String> (serde omits when empty). Home response aggregates/deduplicates tags/labels. get_datasets now accepts a tenant_id parameter.
Field Stats & Other Callsites
src/storage/field_stats.rs
Updated PARSEABLE.create_stream_if_not_exists calls to pass explicit vec![] for new parameters; surrounding stats logic unchanged.
Alerts System
src/alerts/alert_structs.rs, src/alerts/alerts_utils.rs
Conditions supports nested groups: Option<Vec<Conditions>>; condition_config defaults to empty. Filter/message generation refactored to recursively process groups and default operators.

Sequence Diagram(s)

sequenceDiagram
    participant Client as "HTTP Handler / Ingest"
    participant Parseable as "PARSEABLE (parseable/mod)"
    participant ObjectStore as "ObjectStorage / Metastore"
    participant Stream as "Stream Metadata"

    Client->>Parseable: setup_otel_stream(stream_name, tenant_id, dataset_tags, dataset_labels)
    Parseable->>ObjectStore: create_stream_if_not_exists(stream_name, tenant_id, tags, labels)
    ObjectStore->>ObjectStore: load ObjectStoreFormat(stream_name, tenant_id)
    alt stream exists
        ObjectStore->>ObjectStore: merge tags/labels, dedupe
        ObjectStore->>ObjectStore: persist updated ObjectStoreFormat
    else stream missing
        ObjectStore->>ObjectStore: create ObjectStoreFormat with tags/labels
        ObjectStore->>ObjectStore: persist new ObjectStoreFormat
    end
    ObjectStore->>Stream: return persisted metadata (dataset_tags, dataset_labels)
    Parseable->>Client: return creation/result
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐇 I swapped one tag for many in a hop,
Collected labels as I danced on top,
Groups nested like burrows, snug and round,
Streams dedupe, tags and labels found,
A rabbit cheers: multi-tags — hop, hop, pop!

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete. It lacks the structured template sections (Fixes issue, Description, solutions/rationale, testing checklist, comments, documentation). Expand the description to follow the template: include issue reference, detailed rationale for the design choice (tags vs labels), mention testing performed, and document the new headers and their behavior.
Docstring Coverage ⚠️ Warning Docstring coverage is 77.08% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: adding system-defined tags and free-form labels to datasets through the PUT endpoint and API responses.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

480-489: Consider a builder pattern for stream creation parameters.

The #[allow(clippy::too_many_arguments)] annotation is a reasonable workaround, but with 7+ parameters, this function is becoming unwieldy. For a future improvement, consider introducing a StreamCreationParams struct or builder pattern to group these related parameters.

This is a minor suggestion and doesn't block the current PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/mod.rs` around lines 480 - 489, The function
create_stream_if_not_exists has many parameters; refactor by introducing a
StreamCreationParams struct (or a builder StreamCreationParamsBuilder) to group
stream_name, stream_type, custom_partition, log_source, telemetry_type,
dataset_tags, and dataset_labels; update create_stream_if_not_exists signature
to accept a single StreamCreationParams (or builder output) and adapt internal
usage accordingly, add a conversion constructor or builder methods for easy
construction where callers currently pass those seven+ args, and preserve
existing behavior and validation inside the new struct or builder.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-489: The function create_stream_if_not_exists has many
parameters; refactor by introducing a StreamCreationParams struct (or a builder
StreamCreationParamsBuilder) to group stream_name, stream_type,
custom_partition, log_source, telemetry_type, dataset_tags, and dataset_labels;
update create_stream_if_not_exists signature to accept a single
StreamCreationParams (or builder output) and adapt internal usage accordingly,
add a conversion constructor or builder methods for easy construction where
callers currently pass those seven+ args, and preserve existing behavior and
validation inside the new struct or builder.

coderabbitai[bot]
coderabbitai bot previously approved these changes Feb 21, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

480-527: Consider bundling stream creation parameters into a struct.

The function now accepts 8 parameters (including self), which triggers the clippy::too_many_arguments lint. While the #[allow] attribute suppresses it, this pattern repeats across create_stream_if_not_exists, create_stream, and create_update_stream.

A dedicated struct (e.g., StreamCreationParams or StreamConfig) would:

  • Improve readability at call sites
  • Make future parameter additions easier
  • Provide a natural place for default values
💡 Example struct-based approach
pub struct StreamCreationParams {
    pub stream_type: StreamType,
    pub custom_partition: Option<String>,
    pub log_source: Vec<LogSourceEntry>,
    pub telemetry_type: TelemetryType,
    pub dataset_tags: Vec<DatasetTag>,
    pub dataset_labels: Vec<String>,
}

impl Default for StreamCreationParams {
    fn default() -> Self {
        Self {
            stream_type: StreamType::UserDefined,
            custom_partition: None,
            log_source: vec![],
            telemetry_type: TelemetryType::Logs,
            dataset_tags: vec![],
            dataset_labels: vec![],
        }
    }
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/mod.rs` around lines 480 - 527, Create a StreamCreationParams
(or StreamConfig) struct to bundle the multiple stream creation arguments and
refactor create_stream_if_not_exists, create_stream, and create_update_stream to
accept that struct instead of the long parameter list; update the call site in
create_stream_if_not_exists to build a StreamCreationParams (using Default for
sensible defaults) and pass it to create_stream, adjust create_stream signature
to destructure or reference the struct fields (stream_type, custom_partition,
log_source, telemetry_type, dataset_tags, dataset_labels), remove the
#[allow(clippy::too_many_arguments)] on those functions, and add
conversions/constructors where needed so existing call sites can migrate with
minimal changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 163-169: The labels vector built in the PUT handler (where
new_labels is created from body.into_inner().labels) must trim each label and
filter out empty or whitespace-only strings before deduplicating; update the
pipeline that builds new_labels to map each label through .trim(), filter out
strings that are empty after trimming, then collect into a HashSet to dedupe and
back into a Vec (preserving whatever ordering you need). Ensure you modify the
code around the new_labels construction so labels are normalized (trimmed) and
empty/whitespace-only entries are removed prior to deduplication.
- Around line 31-186: Handlers get_correlated_datasets, get_datasets_by_tag,
put_dataset_tags and put_dataset_labels currently use PARSEABLE.streams/list,
PARSEABLE.get_stream and storage APIs without tenant context; update each
handler to resolve the tenant from the request (e.g., call
get_tenant_id_from_request or read the normalized tenant header injected by
middleware), then scope all stream listing and lookups to that tenant (filter
list results or call tenant-aware APIs) and ensure any
storage.update_dataset_tags_and_labels_in_stream and PARSEABLE.get_stream calls
include/are called with the resolved tenant or have the tenant header
overwritten server-side so clients cannot spoof it; ensure the same tenant
resolution is applied to both read (listing/get) and write
(put_dataset_tags/put_dataset_labels) flows.

---

Nitpick comments:
In `@src/parseable/mod.rs`:
- Around line 480-527: Create a StreamCreationParams (or StreamConfig) struct to
bundle the multiple stream creation arguments and refactor
create_stream_if_not_exists, create_stream, and create_update_stream to accept
that struct instead of the long parameter list; update the call site in
create_stream_if_not_exists to build a StreamCreationParams (using Default for
sensible defaults) and pass it to create_stream, adjust create_stream signature
to destructure or reference the struct fields (stream_type, custom_partition,
log_source, telemetry_type, dataset_tags, dataset_labels), remove the
#[allow(clippy::too_many_arguments)] on those functions, and add
conversions/constructors where needed so existing call sites can migrate with
minimal changes.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 139-151: The current read-modify-write in put_dataset_tags uses
PARSEABLE.get_stream(...), calls stream.get_dataset_labels(), then
storage.update_dataset_tags_and_labels_in_stream(...) and finally
stream.set_dataset_tags(...), which allows TOCTOU race with concurrent
put_dataset_labels; fix by serializing updates per-stream (or by providing a
combined tags+labels atomic endpoint). Add a per-stream mutex/async lock on the
stream (e.g., a metadata_lock() or similar on the stream returned by
PARSEABLE.get_stream) and acquire it around the read-modify-write sequence that
uses stream.get_dataset_labels,
storage.update_dataset_tags_and_labels_in_stream, and stream.set_dataset_tags so
concurrent tag/label updates for the same stream are executed sequentially.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 41-88: get_correlated_datasets currently iterates the global
PARSEABLE.streams.list() and PARSEABLE.get_stream(...) without restricting to
the tenant, causing cross-tenant exposure; update the handler to derive the
tenant context (e.g., from request auth/headers or a tenant path param) and then
restrict both the initial stream lookup (PARSEABLE.get_stream(&dataset_name))
and the loop over streams to the same tenant: either call a tenant-scoped
listing API (e.g., PARSEABLE.streams.list_for_tenant(tenant) if available) or
filter the results of PARSEABLE.streams.list() by a tenant identity exposed on
each stream (e.g., s.get_tenant() == tenant) before skipping and comparing;
ensure the check for the target dataset also validates it belongs to the tenant
so only tenant-scoped streams are considered.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

150-156: ⚠️ Potential issue | 🟡 Minor

Normalize label input in PUT body before dedupe.

Line 150-Line 156 deduplicates raw strings but does not trim/filter whitespace-only labels, so empty labels can still be persisted.

Suggested fix
     let final_labels = match body.labels {
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 150 - 156, Normalize and filter
incoming labels before deduplication: when building final_labels from
body.labels (the match arm handling Some(labels)), map each label to trimmed
string and filter out labels that are empty or whitespace-only (e.g.,
labels.into_iter().map(|s| s.trim().to_string()).filter(|s| !s.is_empty())...),
then collect into a HashSet to dedupe and back into the desired collection; keep
the None branch using stream.get_dataset_labels() unchanged.

142-172: ⚠️ Potential issue | 🔴 Critical

Concurrent partial metadata updates can still lose data.

Line 142-Line 172 does a read-modify-write using current in-memory values for absent fields, so two concurrent requests (tags-only and labels-only) can overwrite each other with stale counterparts.

Suggested direction
 pub async fn put_dataset_metadata(...) -> Result<HttpResponse, DatasetsError> {
     ...
     let stream = PARSEABLE
         .get_stream(&dataset_name, &tenant_id)
         .map_err(|_| DatasetsError::DatasetNotFound(dataset_name.clone()))?;
+
+    // Serialize metadata updates for this stream (or use atomic storage merge).
+    // let _guard = stream.metadata_lock().await;

     let final_tags = match body.tags { ... };
     let final_labels = match body.labels { ... };

     storage
         .update_dataset_tags_and_labels_in_stream(...)
         .await
         .map_err(DatasetsError::Storage)?;

     stream.set_dataset_tags(final_tags.clone());
     stream.set_dataset_labels(final_labels.clone());

If a stream-scoped lock is unavailable, prefer a storage-level atomic patch API that updates only provided fields server-side.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 142 - 172, Current code computes
final_tags/final_labels from in-memory stream values and then does a
read-modify-write, which lets concurrent requests overwrite each other; instead
either acquire the stream-scoped lock around the read-modify-write (use
stream.get_dataset_tags/get_dataset_labels and
stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably,
change the update to an atomic storage-side patch that accepts
Option<HashSet<_>> and only updates provided fields (modify the call to
storage.update_dataset_tags_and_labels_in_stream to pass Option types and
implement server-side merge), removing the reliance on in-memory reads to
prevent lost updates.
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

982-988: Prefer atomic update for tags+labels to avoid transient mixed state.

Line 982 and Line 986 update related metadata in two separate writes. A single setter that updates both fields under one lock would avoid short-lived inconsistent reads.

♻️ Suggested refactor
+    pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) {
+        let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+        metadata.dataset_tags = tags;
+        metadata.dataset_labels = labels;
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 982 - 988, The two setters
set_dataset_tags and set_dataset_labels perform separate metadata.write() calls
causing transient inconsistent state; add a new atomic setter (e.g.,
set_dataset_tags_and_labels(&self, tags: Vec<DatasetTag>, labels: Vec<String>))
that acquires metadata.write() once and assigns both dataset_tags and
dataset_labels inside the same lock, then update call sites to use the new
method (or have the existing setters delegate to it) so modifications never
occur in two separate writes; reference the metadata RwLock field and the
existing set_dataset_tags/set_dataset_labels method names when making the
change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 222-227: The route currently mounts only PUT /{name} using
http::datasets::put_dataset_metadata (authorized with Action::CreateStream), but
the API requires distinct replacement endpoints for tags and labels; add two
separate routes for "/{name}/tags" and "/{name}/labels" each using web::put()
and wired to the appropriate handlers (e.g., http::datasets::put_dataset_tags
and http::datasets::put_dataset_labels) with the same
authorize_for_resource(Action::CreateStream) policy, or if those handlers do not
exist, split http::datasets::put_dataset_metadata into two functions and route
them accordingly so clients can call PUT /datasets/{name}/tags and PUT
/datasets/{name}/labels.

In `@src/handlers/mod.rs`:
- Around line 95-114: The TryFrom<&str> for DatasetTag currently only accepts
the new literals (e.g., "agent-monitoring", "k8s-monitoring") causing older
values to fail; update the DatasetTag enum and its impl TryFrom to accept legacy
names by adding serde aliases on the enum variants (e.g., add #[serde(alias =
"agent-observability")] and #[serde(alias = "k8s-observability")] to
DatasetTag::AgentMonitoring and ::K8sMonitoring) and extend the match in impl
TryFrom (the function try_from) to also match the old strings
("agent-observability", "k8s-observability") mapping them to the corresponding
variants, keeping the existing error message unchanged.

In `@src/storage/mod.rs`:
- Around line 134-137: The struct currently only reads dataset_tags so legacy
single-key dataset_tag values are dropped; update deserialization/migration to
map dataset_tag -> dataset_tags by adding a compatibility conversion: in
src/storage/mod.rs ensure deserialization recognizes a legacy dataset_tag (alias
or custom Deserialize) and appends it into the dataset_tags Vec, or add a
migration step that, when bumping
CURRENT_OBJECT_STORE_VERSION/CURRENT_SCHEMA_VERSION, transforms any legacy
dataset_tag value into dataset_tags before validation; target the symbols
dataset_tags, dataset_tag, CURRENT_OBJECT_STORE_VERSION and
CURRENT_SCHEMA_VERSION when making this change so older metadata is preserved
and participates in tag-based APIs.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 150-156: Normalize and filter incoming labels before
deduplication: when building final_labels from body.labels (the match arm
handling Some(labels)), map each label to trimmed string and filter out labels
that are empty or whitespace-only (e.g., labels.into_iter().map(|s|
s.trim().to_string()).filter(|s| !s.is_empty())...), then collect into a HashSet
to dedupe and back into the desired collection; keep the None branch using
stream.get_dataset_labels() unchanged.
- Around line 142-172: Current code computes final_tags/final_labels from
in-memory stream values and then does a read-modify-write, which lets concurrent
requests overwrite each other; instead either acquire the stream-scoped lock
around the read-modify-write (use stream.get_dataset_tags/get_dataset_labels and
stream.set_dataset_tags/set_dataset_labels inside the lock) or, preferably,
change the update to an atomic storage-side patch that accepts
Option<HashSet<_>> and only updates provided fields (modify the call to
storage.update_dataset_tags_and_labels_in_stream to pass Option types and
implement server-side merge), removing the reliance on in-memory reads to
prevent lost updates.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: The two setters set_dataset_tags and set_dataset_labels
perform separate metadata.write() calls causing transient inconsistent state;
add a new atomic setter (e.g., set_dataset_tags_and_labels(&self, tags:
Vec<DatasetTag>, labels: Vec<String>)) that acquires metadata.write() once and
assigns both dataset_tags and dataset_labels inside the same lock, then update
call sites to use the new method (or have the existing setters delegate to it)
so modifications never occur in two separate writes; reference the metadata
RwLock field and the existing set_dataset_tags/set_dataset_labels method names
when making the change.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 419f53d and 4a3e23c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (15)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/handlers/http/ingest.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/server.rs`:
- Around line 210-220: The two route registrations conflict because
"/tags/{tag}" will match "/tags/correlated" before "/{name}/correlated", causing
get_correlated_datasets to be unreachable for a dataset named "tags"; change the
correlated endpoint path to avoid structural collision (for example replace
"/{name}/correlated" with "/correlated/{name}" in the route registration where
web::get().to(http::datasets::get_correlated_datasets).authorize_for_resource(Action::GetStreamInfo)
is set) and update the path extractor in the get_correlated_datasets handler and
any clients/docs referencing the old path so they use the new
"/correlated/{name}" form.
- Around line 200-227: The post_datasets handler currently derives tenant from
BasicAuth via extract_session_key_from_req; change it to read the tenant using
get_tenant_id_from_request (the same middleware-normalized header used by
get_info and the other dataset handlers) to enforce tenant scoping and prevent
header spoofing: locate the post_datasets function in http::prism_logstream,
remove or stop using extract_session_key_from_req for tenant resolution, call
get_tenant_id_from_request(req) (or equivalent helper used elsewhere) and pass
that tenant downstream to any dataset creation/validation logic so it matches
the behavior of get_datasets_by_tag, get_correlated_datasets, and
put_dataset_metadata.

ℹ️ Review info

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4a3e23c and 4a2b764.

📒 Files selected for processing (1)
  • src/handlers/http/modal/server.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

197-202: ⚠️ Potential issue | 🟡 Minor

Normalize request-body labels before storing them.

This path still stores label bodies verbatim, so blank labels and whitespace variants can persist and skew correlation results.

Suggested fix
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 197 - 202, The current
final_labels assignment uses body.labels verbatim and can retain blank or
whitespace-only variants; update the transformation so that when handling
Some(labels) you trim each label (e.g., call trim on the string), discard empty
results (filter out labels where trimmed.is_empty()), and then deduplicate by
collecting into a HashSet before converting back to the Vec used by
final_labels; apply this change to the final_labels computation that consumes
body.labels so stored labels are normalized (trimmed and non-empty).

189-216: ⚠️ Potential issue | 🔴 Critical

Partial PUTs still have a lost-update race.

Combining tags and labels into one storage call does not make this atomic because omitted fields are still read from current in-memory state first. A concurrent {"tags": ...} request and {"labels": ...} request can each write back a stale copy of the other field and clobber one update. Either make PUT a full replacement that requires both arrays, or serialize this read/merge/write per stream.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 189 - 216, The current PUT
handler builds final_tags and final_labels by reading in-memory via
stream.get_dataset_tags()/get_dataset_labels() and then calls
storage.update_dataset_tags_and_labels_in_stream(...), which creates a
lost-update race when concurrent partial PUTs update only one field; to fix,
either require full replacement (validate that body.tags and body.labels are
both Some and reject partial PUTs) or serialize read/merge/write per stream
(introduce a per-stream mutex/lock when computing final_tags/final_labels and
calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot
interleave; locate and modify the code that constructs final_tags/final_labels
and the call to update_dataset_tags_and_labels_in_stream to implement one of
these two strategies, using identifiers final_tags, final_labels,
stream.get_dataset_tags/get_dataset_labels, and
update_dataset_tags_and_labels_in_stream to find the right spot.
🧹 Nitpick comments (2)
src/handlers/mod.rs (2)

153-161: Same non-deterministic ordering concern for parse_dataset_labels.

Similar to parse_dataset_tags, the label ordering is non-deterministic after HashSet deduplication. This may affect API response consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 153 - 161, parse_dataset_labels currently
deduplicates via HashSet which yields non-deterministic ordering; change it to
produce a deterministic, stable order by using either a BTreeSet for
deduplication or collect into a Vec then sort before returning. Update the
function parse_dataset_labels (and mirror the same approach used for
parse_dataset_tags if present) to trim and filter as now, then dedupe
deterministically (BTreeSet::from_iter or dedupe Vec + sort) and return a
Vec<String> with a stable sorted order.

131-151: Non-deterministic ordering in parse_dataset_tags.

The function collects into a HashSet for deduplication, then converts to Vec. This results in non-deterministic ordering of the returned tags. If consistent ordering matters for API responses or comparison purposes, consider sorting or using IndexSet from the indexmap crate.

// Current: non-deterministic order
.collect::<HashSet<_>>()
.into_iter()
.collect()

// Alternative: deterministic order
.collect::<HashSet<_>>()
.into_iter()
.sorted() // requires Ord impl or sorted_by
.collect()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 131 - 151, parse_dataset_tags currently
deduplicates via a HashSet then collects into a Vec which yields
non-deterministic ordering; change it to produce a deterministic order by either
(a) collecting into the HashSet for dedupe and then sorting the iterator
(requiring DatasetTag: Ord or using a comparator) before collecting into Vec, or
(b) replace the HashSet with an IndexSet from the indexmap crate to preserve
insertion order; update the function parse_dataset_tags (and any uses)
accordingly to ensure stable, deterministic ordering of the returned
Vec<DatasetTag>.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 68-70: The 404 is returned too early because
PARSEABLE.get_stream(&dataset_name, &tenant_id) only checks the in-memory
registry; before mapping to DatasetsError::DatasetNotFound you must attempt the
same storage-hydration used by Parseable's logstream handlers to load the
dataset from object storage and populate the in-memory registry, then retry
get_stream; apply this change to the current occurrence and the similar one at
the later block (around lines 185-187) so that you only return
DatasetsError::DatasetNotFound if the stream is still absent after trying the
storage-load path.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-202: The current final_labels assignment uses body.labels
verbatim and can retain blank or whitespace-only variants; update the
transformation so that when handling Some(labels) you trim each label (e.g.,
call trim on the string), discard empty results (filter out labels where
trimmed.is_empty()), and then deduplicate by collecting into a HashSet before
converting back to the Vec used by final_labels; apply this change to the
final_labels computation that consumes body.labels so stored labels are
normalized (trimmed and non-empty).
- Around line 189-216: The current PUT handler builds final_tags and
final_labels by reading in-memory via
stream.get_dataset_tags()/get_dataset_labels() and then calls
storage.update_dataset_tags_and_labels_in_stream(...), which creates a
lost-update race when concurrent partial PUTs update only one field; to fix,
either require full replacement (validate that body.tags and body.labels are
both Some and reject partial PUTs) or serialize read/merge/write per stream
(introduce a per-stream mutex/lock when computing final_tags/final_labels and
calling update_dataset_tags_and_labels_in_stream) so concurrent handlers cannot
interleave; locate and modify the code that constructs final_tags/final_labels
and the call to update_dataset_tags_and_labels_in_stream to implement one of
these two strategies, using identifiers final_tags, final_labels,
stream.get_dataset_tags/get_dataset_labels, and
update_dataset_tags_and_labels_in_stream to find the right spot.

---

Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 153-161: parse_dataset_labels currently deduplicates via HashSet
which yields non-deterministic ordering; change it to produce a deterministic,
stable order by using either a BTreeSet for deduplication or collect into a Vec
then sort before returning. Update the function parse_dataset_labels (and mirror
the same approach used for parse_dataset_tags if present) to trim and filter as
now, then dedupe deterministically (BTreeSet::from_iter or dedupe Vec + sort)
and return a Vec<String> with a stable sorted order.
- Around line 131-151: parse_dataset_tags currently deduplicates via a HashSet
then collects into a Vec which yields non-deterministic ordering; change it to
produce a deterministic order by either (a) collecting into the HashSet for
dedupe and then sorting the iterator (requiring DatasetTag: Ord or using a
comparator) before collecting into Vec, or (b) replace the HashSet with an
IndexSet from the indexmap crate to preserve insertion order; update the
function parse_dataset_tags (and any uses) accordingly to ensure stable,
deterministic ordering of the returned Vec<DatasetTag>.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/ingest.rs
  • src/parseable/streams.rs
  • src/handlers/http/modal/utils/logstream_utils.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/prism/home/mod.rs (1)

205-209: ⚠️ Potential issue | 🟠 Major

Pick a canonical metadata copy for tags and labels.

get_all_stream_jsons can yield multiple ObjectStoreFormat entries for one dataset, but this response copies dataset_tags and dataset_labels from stream_jsons[0] only. Since tags/labels are now mutable through PUT /datasets/{name}, the home API can return stale metadata depending on which copy the metastore returns first. Read an explicit authoritative stream.json here instead of the first entry.

Also applies to: 240-251

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/home/mod.rs` around lines 205 - 209, The code currently calls
PARSEABLE.metastore.get_all_stream_jsons and then copies
dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update
get_stream_metadata to instead fetch the canonical/authoritative stream.json
(not rely on the first entry) and extract dataset_tags and dataset_labels from
that single authoritative result (e.g., use a metastore method that returns the
canonical stream.json or a get_stream_json/get_authoritative_stream_json
variant), and apply the same fix to the other block that currently reads
stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both
places use the explicit authoritative stream.json when building
StreamMetadataResponse.
src/prism/logstream/mod.rs (1)

231-245: ⚠️ Potential issue | 🟠 Major

Enforce the full per-stream permission set before building PrismDatasetResponse.

This path eventually returns schema, stats, retention, and counts, but the only dataset-level gate in process_stream is Action::ListStream. A user who can list a dataset can therefore still receive data that should sit behind GetSchema/Query or other stronger actions. Validate the full action set per stream before calling build_dataset_response, or trim the response to only fields covered by the existing check.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/logstream/mod.rs` around lines 231 - 245, The code currently only
gates datasets by Action::ListStream in get_datasets/process_stream but still
builds full PrismDatasetResponse (schema, stats, retention, counts); fix by
enforcing per-stream permission checks for the full action set (e.g.,
Action::GetSchema, Action::Query, Action::GetRetention, Action::GetCounts, etc.)
before calling build_dataset_response — either (A) perform explicit permission
checks for those actions inside get_datasets (or have process_stream return the
set of allowed actions) and only include fields allowed by those actions, or (B)
change build_dataset_response to accept an allowed_actions flag and trim out
schema/stats/retention/counts when the caller lacks the corresponding
permission; apply this change around get_datasets, process_stream, and
build_dataset_response so responses never expose data beyond the allowed
per-stream actions.
♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

197-203: ⚠️ Potential issue | 🟡 Minor

Normalize labels before deduplication and persistence.

Line 197-203 dedupes labels but does not trim/filter empty values, so whitespace-only labels can still be stored and matched inconsistently.

💡 Proposed fix
-    let final_labels = match body.labels {
-        Some(labels) => labels
-            .into_iter()
-            .collect::<HashSet<_>>()
-            .into_iter()
-            .collect(),
+    let final_labels = match body.labels {
+        Some(labels) => labels
+            .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
+            .collect::<HashSet<_>>()
+            .into_iter()
+            .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 197 - 203, final_labels is
deduped but not normalized, allowing whitespace-only or untrimmed labels to
persist; update the handling of body.labels (and the branch that falls back to
stream.get_dataset_labels() if needed) to map each label through trim(), filter
out labels that are empty after trimming, then collect into a HashSet to
deduplicate and finally into the expected collection type used later (use the
same symbol final_labels). Ensure both the Some(labels) branch and any code path
that constructs labels from stream.get_dataset_labels() apply the same
trim+filter+dedupe normalization.

68-70: ⚠️ Potential issue | 🟠 Major

Defer DatasetNotFound until storage hydration is attempted.

Line 68-70 and Line 185-187 map in-memory get_stream misses straight to 404. In query/standalone flows, this can incorrectly 404 datasets that exist in object storage but are not hydrated yet. Please apply the same storage-hydration existence pattern used in logstream handlers before returning not found.

Based on learnings: stream existence checks should attempt storage-backed hydration in query/standalone mode before returning not found.

Also applies to: 185-187

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 68 - 70, The current code calls
PARSEABLE.get_stream(&dataset_name, &tenant_id) and immediately maps any error
to DatasetsError::DatasetNotFound (e.g., in the block around
PARSEABLE.get_stream and the similar check at lines 185-187), which causes a
premature 404 for datasets that exist in object storage but aren't hydrated;
instead, follow the logstream handler pattern: on get_stream error, attempt
storage-backed hydration (invoke the same hydrate/storage-check routine used by
logstream handlers for query/standalone flows) and only return
DatasetsError::DatasetNotFound if hydration confirms the dataset is absent or
hydration fails definitively; update the code paths around PARSEABLE.get_stream
and the duplicated check to defer mapping to DatasetNotFound until after the
hydration attempt completes.
🧹 Nitpick comments (2)
src/prism/home/mod.rs (1)

63-66: Keep tags and labels in the serialized shape even when empty.

With skip_serializing_if = "Vec::is_empty", datasets that simply have no metadata look the same as responses from servers that do not support these fields at all. Returning [] keeps the home-response schema stable for clients.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/home/mod.rs` around lines 63 - 66, The struct fields `tags:
Vec<DatasetTag>` and `labels: Vec<String>` in src/prism/home/mod.rs should
always be present in serialized output even when empty; remove the
`skip_serializing_if = "Vec::is_empty"` attribute while keeping
`#[serde(default)]` so empty vectors serialize as `[]`. Locate the `tags` and
`labels` field definitions in the struct (the lines with `tags: Vec<DatasetTag>`
and `labels: Vec<String>`) and delete only the `skip_serializing_if` portion of
their serde attributes, leaving `default` intact to still provide an empty Vec
when missing during deserialization.
src/parseable/streams.rs (1)

982-988: Add a single setter for dataset metadata.

The new dataset update flow has to call set_dataset_tags and set_dataset_labels separately, so readers can observe a mixed state between the two writes. A combined setter would keep the in-memory view coherent and avoid the second lock acquisition.

♻️ Suggested refactor
+    pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) {
+        let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+        metadata.dataset_tags = tags;
+        metadata.dataset_labels = labels;
+    }
+
     pub fn set_dataset_tags(&self, tags: Vec<DatasetTag>) {
         self.metadata.write().expect(LOCK_EXPECT).dataset_tags = tags;
     }
 
     pub fn set_dataset_labels(&self, labels: Vec<String>) {
         self.metadata.write().expect(LOCK_EXPECT).dataset_labels = labels;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 982 - 988, Introduce a single atomic
setter that updates both tags and labels under one write lock to avoid
intermediate inconsistent reads: add a new method (e.g. set_dataset_metadata or
set_dataset_tags_and_labels) that takes (tags: Vec<DatasetTag>, labels:
Vec<String>) and performs one self.metadata.write().expect(LOCK_EXPECT)
assignment setting both dataset_tags and dataset_labels in the same critical
section, and update call sites to use this combined setter instead of calling
set_dataset_tags and set_dataset_labels separately; keep the existing individual
setters only if backward compatibility is required but prefer the combined
setter to prevent two separate lock acquisitions and mixed-state visibility.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/prism/home/mod.rs`:
- Around line 205-209: The code currently calls
PARSEABLE.metastore.get_all_stream_jsons and then copies
dataset_tags/dataset_labels from stream_jsons[0], which can be stale; update
get_stream_metadata to instead fetch the canonical/authoritative stream.json
(not rely on the first entry) and extract dataset_tags and dataset_labels from
that single authoritative result (e.g., use a metastore method that returns the
canonical stream.json or a get_stream_json/get_authoritative_stream_json
variant), and apply the same fix to the other block that currently reads
stream_jsons[0] (the second occurrence mentioned at lines 240-251) so both
places use the explicit authoritative stream.json when building
StreamMetadataResponse.

In `@src/prism/logstream/mod.rs`:
- Around line 231-245: The code currently only gates datasets by
Action::ListStream in get_datasets/process_stream but still builds full
PrismDatasetResponse (schema, stats, retention, counts); fix by enforcing
per-stream permission checks for the full action set (e.g., Action::GetSchema,
Action::Query, Action::GetRetention, Action::GetCounts, etc.) before calling
build_dataset_response — either (A) perform explicit permission checks for those
actions inside get_datasets (or have process_stream return the set of allowed
actions) and only include fields allowed by those actions, or (B) change
build_dataset_response to accept an allowed_actions flag and trim out
schema/stats/retention/counts when the caller lacks the corresponding
permission; apply this change around get_datasets, process_stream, and
build_dataset_response so responses never expose data beyond the allowed
per-stream actions.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 197-203: final_labels is deduped but not normalized, allowing
whitespace-only or untrimmed labels to persist; update the handling of
body.labels (and the branch that falls back to stream.get_dataset_labels() if
needed) to map each label through trim(), filter out labels that are empty after
trimming, then collect into a HashSet to deduplicate and finally into the
expected collection type used later (use the same symbol final_labels). Ensure
both the Some(labels) branch and any code path that constructs labels from
stream.get_dataset_labels() apply the same trim+filter+dedupe normalization.
- Around line 68-70: The current code calls PARSEABLE.get_stream(&dataset_name,
&tenant_id) and immediately maps any error to DatasetsError::DatasetNotFound
(e.g., in the block around PARSEABLE.get_stream and the similar check at lines
185-187), which causes a premature 404 for datasets that exist in object storage
but aren't hydrated; instead, follow the logstream handler pattern: on
get_stream error, attempt storage-backed hydration (invoke the same
hydrate/storage-check routine used by logstream handlers for query/standalone
flows) and only return DatasetsError::DatasetNotFound if hydration confirms the
dataset is absent or hydration fails definitively; update the code paths around
PARSEABLE.get_stream and the duplicated check to defer mapping to
DatasetNotFound until after the hydration attempt completes.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-988: Introduce a single atomic setter that updates both tags
and labels under one write lock to avoid intermediate inconsistent reads: add a
new method (e.g. set_dataset_metadata or set_dataset_tags_and_labels) that takes
(tags: Vec<DatasetTag>, labels: Vec<String>) and performs one
self.metadata.write().expect(LOCK_EXPECT) assignment setting both dataset_tags
and dataset_labels in the same critical section, and update call sites to use
this combined setter instead of calling set_dataset_tags and set_dataset_labels
separately; keep the existing individual setters only if backward compatibility
is required but prefer the combined setter to prevent two separate lock
acquisitions and mixed-state visibility.

In `@src/prism/home/mod.rs`:
- Around line 63-66: The struct fields `tags: Vec<DatasetTag>` and `labels:
Vec<String>` in src/prism/home/mod.rs should always be present in serialized
output even when empty; remove the `skip_serializing_if = "Vec::is_empty"`
attribute while keeping `#[serde(default)]` so empty vectors serialize as `[]`.
Locate the `tags` and `labels` field definitions in the struct (the lines with
`tags: Vec<DatasetTag>` and `labels: Vec<String>`) and delete only the
`skip_serializing_if` portion of their serde attributes, leaving `default`
intact to still provide an empty Vec when missing during deserialization.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: e591f47e-21dd-4410-8761-13a3ec03a40b

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/handlers/http/mod.rs
  • src/storage/field_stats.rs
  • src/migration/mod.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/ingest.rs
  • src/parseable/mod.rs
  • src/handlers/mod.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/handlers/mod.rs (1)

131-151: Non-deterministic ordering in parsed tags/labels.

The parse_dataset_tags and parse_dataset_labels functions collect into a HashSet for deduplication then convert to Vec. Since HashSet iteration order is non-deterministic, the resulting vector order may vary across calls with the same input. This could cause issues with:

  • Consistent API responses
  • Testing
  • Diff-based change detection

If ordering matters for any downstream use case (e.g., serialization stability), consider sorting the result or using IndexSet from the indexmap crate to preserve insertion order while deduplicating.

♻️ Optional fix to ensure deterministic ordering
 pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
-    header_value
+    let mut tags: Vec<DatasetTag> = header_value
         .split(',')
         .filter_map(|s| {
             let trimmed = s.trim();
             if trimmed.is_empty() {
                 None
             } else {
                 match DatasetTag::try_from(trimmed) {
                     Ok(tag) => Some(tag),
                     Err(err) => {
                         warn!("Invalid dataset tag '{trimmed}': {err}");
                         None
                     }
                 }
             }
         })
         .collect::<HashSet<_>>()
         .into_iter()
-        .collect()
+        .collect();
+    tags.sort_by(|a, b| a.to_string().cmp(&b.to_string()));
+    tags
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 131 - 151, The functions parse_dataset_tags
(and parse_dataset_labels) deduplicate via HashSet then collect into Vec, which
yields non-deterministic ordering; update these functions to produce
deterministic output by either (a) replacing the HashSet dedupe with an IndexSet
from the indexmap crate to preserve insertion order while removing duplicates,
or (b) after collecting into a Vec, sort the Vec deterministically (e.g.,
lexicographically by tag/label string or by a stable Ord implementation) before
returning; apply the same change to both parse_dataset_tags and
parse_dataset_labels so repeated calls with the same input yield a stable
ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 131-151: The functions parse_dataset_tags (and
parse_dataset_labels) deduplicate via HashSet then collect into Vec, which
yields non-deterministic ordering; update these functions to produce
deterministic output by either (a) replacing the HashSet dedupe with an IndexSet
from the indexmap crate to preserve insertion order while removing duplicates,
or (b) after collecting into a Vec, sort the Vec deterministically (e.g.,
lexicographically by tag/label string or by a stable Ord implementation) before
returning; apply the same change to both parse_dataset_tags and
parse_dataset_labels so repeated calls with the same input yield a stable
ordering.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 9baefa2e-2d8a-4e4b-8fe1-ab8ef9c04ab3

📥 Commits

Reviewing files that changed from the base of the PR and between 4a2b764 and 6cab92e.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (2)
  • src/handlers/http/mod.rs
  • src/parseable/streams.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/connectors/kafka/processor.rs
  • src/storage/field_stats.rs
  • src/storage/object_storage.rs
  • src/handlers/http/modal/server.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

211-217: ⚠️ Potential issue | 🟡 Minor

Normalize labels before deduplication (trim + drop empty).

Current label handling can persist empty/whitespace-only labels and produce noisy correlation results.

Proposed fix
     let final_labels = match body.labels {
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 211 - 217, The label handling for
final_labels (when reading body.labels or falling back to
stream.get_dataset_labels()) must normalize values before deduplication: trim
whitespace from each label and drop any empty/whitespace-only strings, then
collect into a HashSet to dedupe and back into the final collection. Update the
Some(labels) branch (body.labels) to map each label through trim + to_string,
filter out empties, then dedupe; and apply the same normalization/filtering to
the fallback stream.get_dataset_labels() result so both paths produce cleaned,
non-empty, deduplicated labels.

203-218: ⚠️ Potential issue | 🔴 Critical

Partial-update merge is still vulnerable to lost updates under concurrency.

None => existing + single write means concurrent requests (one tags-only, one labels-only) can overwrite each other’s field with stale values. This is still a TOCTOU/lost-write bug.

Suggested direction

Serialize the read-merge-write path per stream (mutex/lock), or move merge semantics into storage as an atomic patch operation so omitted fields are preserved against the latest persisted state.

Also applies to: 220-233

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 203 - 218, The current
read-merge-write for final_tags/final_labels uses body.tags/body.labels with
fallback to stream.get_dataset_tags()/get_dataset_labels(), which allows TOCTOU
lost-updates under concurrent partial updates; to fix, serialize the
read-merge-write for a given stream (e.g., acquire a per-stream mutex/lock
around reading stream.get_dataset_*(), merging with body.tags/body.labels, and
writing) or implement an atomic patch in storage that merges omitted fields
server-side so updates to tags and labels cannot overwrite each other; update
the code paths that compute final_tags and final_labels (and the duplicated
logic at lines ~220-233) to use the chosen serialized/atomic approach.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 192-201: The handler put_dataset_metadata currently relies on
middleware that doesn't extract the `{name}` path param, allowing a
resource-level authorization bypass; add an explicit per-dataset RBAC check
inside put_dataset_metadata using the same pattern as get_correlated_datasets:
call Users.authorize (or the existing Users instance used in handlers) with
Action::CreateStream and a concrete resource context built from the dataset_name
and tenant_id (or equivalent Resource::Stream/StreamIdentifier used by your RBAC
layer) and return an authorization error if the call denies access;
alternatively, fix auth_resource_context to extract the `name` path parameter
before calling Users.authorize so context_resource is Some(name) for resource
checks.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 211-217: The label handling for final_labels (when reading
body.labels or falling back to stream.get_dataset_labels()) must normalize
values before deduplication: trim whitespace from each label and drop any
empty/whitespace-only strings, then collect into a HashSet to dedupe and back
into the final collection. Update the Some(labels) branch (body.labels) to map
each label through trim + to_string, filter out empties, then dedupe; and apply
the same normalization/filtering to the fallback stream.get_dataset_labels()
result so both paths produce cleaned, non-empty, deduplicated labels.
- Around line 203-218: The current read-merge-write for final_tags/final_labels
uses body.tags/body.labels with fallback to
stream.get_dataset_tags()/get_dataset_labels(), which allows TOCTOU lost-updates
under concurrent partial updates; to fix, serialize the read-merge-write for a
given stream (e.g., acquire a per-stream mutex/lock around reading
stream.get_dataset_*(), merging with body.tags/body.labels, and writing) or
implement an atomic patch in storage that merges omitted fields server-side so
updates to tags and labels cannot overwrite each other; update the code paths
that compute final_tags and final_labels (and the duplicated logic at lines
~220-233) to use the chosen serialized/atomic approach.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c0f20c84-897c-4668-a2bd-eff10b483cf9

📥 Commits

Reviewing files that changed from the base of the PR and between 6cab92e and 9fa8536.

📒 Files selected for processing (1)
  • src/handlers/http/datasets.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
src/handlers/http/datasets.rs (2)

213-240: ⚠️ Potential issue | 🔴 Critical

Partial PUTs can still clobber each other under concurrency.

Combining tags and labels into one endpoint removed the cross-endpoint race, but the None => stream.get_dataset_*() merge keeps a read-modify-write window here. If one request updates only tags while another updates only labels, both can read the same old state and the later write will restore one field to a stale value. Please serialize metadata updates per stream, or move the merge into a single atomic storage-side update/CAS.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 213 - 240, The current
read-modify-write uses body.tags/body.labels with
stream.get_dataset_tags()/stream.get_dataset_labels() then calls
update_dataset_tags_and_labels_in_stream, which allows concurrent partial-PUTs
to clobber each other; fix by moving the merge/compare-and-swap into the storage
layer (make update_dataset_tags_and_labels_in_stream perform an atomic
read-merge-CAS using tenant_id and dataset_name) or by adding per-stream
serialization around this handler (e.g., a per-stream mutex) so updates to
final_tags/final_labels cannot interleave; modify the storage method or
introduce a per-stream lock and remove the local stream.get_dataset_* merge to
ensure atomicity.

221-227: ⚠️ Potential issue | 🟡 Minor

Trim and drop blank labels before persisting.

This path still accepts "", whitespace-only labels, and " foo " as distinct labels, so correlation can diverge from the header-based creation flow.

Suggested fix
     let final_labels = match body.labels {
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
         None => stream.get_dataset_labels(),
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 221 - 227, Normalize incoming
labels by trimming whitespace and dropping empty strings before collecting into
the final set: in the Some(labels) branch where final_labels is built, replace
the current collect pipeline with one that maps each label through
trim().to_string(), filters out empty strings (after trim), then collects into a
HashSet to deduplicate and back into the desired collection. Also ensure the
same normalization is applied if you use stream.get_dataset_labels() or any
other source so header-based and body-based flows match.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 213-240: The current read-modify-write uses body.tags/body.labels
with stream.get_dataset_tags()/stream.get_dataset_labels() then calls
update_dataset_tags_and_labels_in_stream, which allows concurrent partial-PUTs
to clobber each other; fix by moving the merge/compare-and-swap into the storage
layer (make update_dataset_tags_and_labels_in_stream perform an atomic
read-merge-CAS using tenant_id and dataset_name) or by adding per-stream
serialization around this handler (e.g., a per-stream mutex) so updates to
final_tags/final_labels cannot interleave; modify the storage method or
introduce a per-stream lock and remove the local stream.get_dataset_* merge to
ensure atomicity.
- Around line 221-227: Normalize incoming labels by trimming whitespace and
dropping empty strings before collecting into the final set: in the Some(labels)
branch where final_labels is built, replace the current collect pipeline with
one that maps each label through trim().to_string(), filters out empty strings
(after trim), then collects into a HashSet to deduplicate and back into the
desired collection. Also ensure the same normalization is applied if you use
stream.get_dataset_labels() or any other source so header-based and body-based
flows match.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: c8ea088e-5c7e-4c3a-b5e1-e9cf1aee7f18

📥 Commits

Reviewing files that changed from the base of the PR and between 9fa8536 and c37a57d.

📒 Files selected for processing (1)
  • src/handlers/http/datasets.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/handlers/http/datasets.rs (1)

221-228: Consider normalizing label inputs to prevent whitespace/empty entries.

The current implementation deduplicates labels via HashSet but doesn't trim whitespace or filter empty strings. This could persist malformed labels like " " or "" if sent by clients.

Suggested improvement
 let final_labels = match body.labels {
     Some(labels) => labels
         .into_iter()
+        .map(|label| label.trim().to_string())
+        .filter(|label| !label.is_empty())
         .collect::<HashSet<_>>()
         .into_iter()
         .collect(),
     None => stream.get_dataset_labels(),
 };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 221 - 228, Normalize and filter
incoming labels before deduplication so whitespace-only or empty strings aren't
persisted: in the branch that handles body.labels (where final_labels is
computed) trim each label (e.g., via s.trim()), drop any empty results, then
deduplicate into a HashSet and collect back to the final collection; also apply
the same normalization/filtering to the labels returned by
stream.get_dataset_labels() in the None branch so both paths produce cleaned
labels. Use the existing symbols final_labels, body.labels, and
stream.get_dataset_labels() to locate and update the logic.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/modal/utils/logstream_utils.rs`:
- Around line 77-82: The current assignment to dataset_tags
(headers.get(DATASET_TAGS_KEY).or_else(||
headers.get(DATASET_TAG_KEY)).and_then(|v|
v.to_str().ok()).map(parse_dataset_tags).unwrap_or_default()) silently ignores
malformed tags; change this to a fallible parse: attempt to_str(), call a
fallible parser (or change parse_dataset_tags to return Result<Vec<_>, _>), and
if parsing returns Err return an HTTP 400 bad request with a clear error message
instead of using Vec::new(); keep the header lookup sequence (DATASET_TAGS_KEY
then DATASET_TAG_KEY) but propagate parse errors up the handler so dataset_tags
is only set on successful parse.

In `@src/storage/object_storage.rs`:
- Around line 499-519: The update_dataset_tags_and_labels_in_stream helper
currently only writes the canonical stream JSON, which leaves ingestor/copy
variants stale; modify update_dataset_tags_and_labels_in_stream to fetch all
persisted stream JSON variants via the metastore (use get_all_stream_jsons or
equivalent), iterate each ObjectStoreFormat, update dataset_tags and
dataset_labels on every variant, and persist each back (call put_stream_json for
each variant) so all saved copies stay in sync; reference functions/types:
update_dataset_tags_and_labels_in_stream,
PARSEABLE.metastore.get_all_stream_jsons, ObjectStoreFormat, put_stream_json,
and ensure this prevents stale reads by matching how create_stream_from_ingestor
and get_stream_metadata select entries.

---

Nitpick comments:
In `@src/handlers/http/datasets.rs`:
- Around line 221-228: Normalize and filter incoming labels before deduplication
so whitespace-only or empty strings aren't persisted: in the branch that handles
body.labels (where final_labels is computed) trim each label (e.g., via
s.trim()), drop any empty results, then deduplicate into a HashSet and collect
back to the final collection; also apply the same normalization/filtering to the
labels returned by stream.get_dataset_labels() in the None branch so both paths
produce cleaned labels. Use the existing symbols final_labels, body.labels, and
stream.get_dataset_labels() to locate and update the logic.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 660eea0a-7846-4da6-98ad-564df216bd71

📥 Commits

Reviewing files that changed from the base of the PR and between c37a57d and f395dea.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (2)
  • src/handlers/http/mod.rs
  • src/parseable/streams.rs
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/handlers/http/prism_logstream.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/ingest.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/mod.rs (1)

92-113: ⚠️ Potential issue | 🟠 Major

Align DatasetTag::APM serialization with TryFrom and Display.

#[serde(rename_all = "kebab-case")] serializes APM as a-p-m, but TryFrom<&str> and Display both use apm. This breaks round-trip consistency and causes deserialization failures when serde-generated JSON is re-parsed.

🔧 Suggested fix
 #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
 #[serde(rename_all = "kebab-case")]
 pub enum DatasetTag {
     AgentObservability,
     K8sObservability,
     DatabaseObservability,
-    APM,
+    #[serde(rename = "apm")]
+    APM,
     ServiceMap,
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 92 - 113, The DatasetTag enum's serde
kebab-case rule turns the APM variant into "a-p-m", breaking round-trip with
TryFrom and Display which expect "apm"; update the enum by overriding
serialization for the APM variant (e.g., add #[serde(rename = "apm")] to the APM
variant) so serde emits/accepts "apm" while keeping #[serde(rename_all =
"kebab-case")] for the rest; verify DatasetTag::APM, the TryFrom<&str>
implementation, and Display continue to use "apm".
♻️ Duplicate comments (1)
src/storage/object_storage.rs (1)

499-519: ⚠️ Potential issue | 🟠 Major

Update every persisted stream.json variant, not just the canonical copy.

This helper only rewrites the canonical metadata. create_stream_from_ingestor() later rebuilds from ingestor-side JSON, and src/prism/home/mod.rs::get_stream_metadata() still reads get_all_stream_jsons(...), so stale ingestor copies can resurrect or surface old dataset_tags / dataset_labels. Either update all persisted variants here or make those readers source dataset metadata from the canonical stream.json only.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 499 - 519, The helper
update_dataset_tags_and_labels_in_stream currently only rewrites the canonical
stream JSON; instead fetch all persisted variants (use the metastore method that
returns all stream JSON variants, e.g., get_all_stream_jsons or equivalent on
PARSEABLE.metastore), deserialize each into ObjectStoreFormat, set
format.dataset_tags = tags.to_owned() and format.dataset_labels =
labels.to_owned() for every variant, then write each back with the metastore
put_stream_json (or the corresponding put for each variant) and propagate errors
as ObjectStorageError::MetastoreError just like the existing logic;
alternatively, if you prefer the other approach, change readers (e.g.,
get_stream_metadata/get_all_stream_jsons consumers) to source dataset
tags/labels exclusively from the canonical stream.json instead of ingestor
copies so stale variants cannot resurrect old metadata.
🧹 Nitpick comments (1)
src/handlers/mod.rs (1)

131-160: Preserve the caller's order when deduplicating tags and labels.

Collecting into HashSet and then back into Vec produces arbitrary iteration order. Since these arrays are persisted and surfaced by APIs, the same request can come back in a different order.

♻️ Suggested fix
 pub fn parse_dataset_tags(header_value: &str) -> Vec<DatasetTag> {
+    let mut seen = HashSet::new();
     header_value
         .split(',')
         .filter_map(|s| {
             let trimmed = s.trim();
             if trimmed.is_empty() {
                 None
             } else {
                 match DatasetTag::try_from(trimmed) {
                     Ok(tag) => Some(tag),
                     Err(err) => {
                         warn!("Invalid dataset tag '{trimmed}': {err}");
                         None
                     }
                 }
             }
         })
-        .collect::<HashSet<_>>()
-        .into_iter()
+        .filter(|tag| seen.insert(*tag))
         .collect()
 }
 
 pub fn parse_dataset_labels(header_value: &str) -> Vec<String> {
+    let mut seen = HashSet::new();
     header_value
         .split(',')
-        .map(|s| s.trim().to_string())
-        .filter(|s| !s.is_empty())
-        .collect::<HashSet<_>>()
-        .into_iter()
-        .collect()
+        .map(str::trim)
+        .filter(|s| !s.is_empty())
+        .filter(|label| seen.insert((*label).to_owned()))
+        .map(str::to_owned)
+        .collect()
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/mod.rs` around lines 131 - 160, The functions parse_dataset_tags
and parse_dataset_labels currently deduplicate by collecting into a HashSet
which loses the caller's order; change both to preserve input order by iterating
through the split items, trimming each, skipping empties, and using a HashSet
called seen to track duplicates while pushing the first-seen items into a result
Vec; for parse_dataset_tags apply DatasetTag::try_from on each trimmed item and
log/skip invalid tags as before, but only insert a tag into seen/result when it
hasn't been seen yet; for parse_dataset_labels do the same for the trimmed
strings so the returned Vec retains the original order while removing
duplicates.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@src/handlers/mod.rs`:
- Around line 92-113: The DatasetTag enum's serde kebab-case rule turns the APM
variant into "a-p-m", breaking round-trip with TryFrom and Display which expect
"apm"; update the enum by overriding serialization for the APM variant (e.g.,
add #[serde(rename = "apm")] to the APM variant) so serde emits/accepts "apm"
while keeping #[serde(rename_all = "kebab-case")] for the rest; verify
DatasetTag::APM, the TryFrom<&str> implementation, and Display continue to use
"apm".

---

Duplicate comments:
In `@src/storage/object_storage.rs`:
- Around line 499-519: The helper update_dataset_tags_and_labels_in_stream
currently only rewrites the canonical stream JSON; instead fetch all persisted
variants (use the metastore method that returns all stream JSON variants, e.g.,
get_all_stream_jsons or equivalent on PARSEABLE.metastore), deserialize each
into ObjectStoreFormat, set format.dataset_tags = tags.to_owned() and
format.dataset_labels = labels.to_owned() for every variant, then write each
back with the metastore put_stream_json (or the corresponding put for each
variant) and propagate errors as ObjectStorageError::MetastoreError just like
the existing logic; alternatively, if you prefer the other approach, change
readers (e.g., get_stream_metadata/get_all_stream_jsons consumers) to source
dataset tags/labels exclusively from the canonical stream.json instead of
ingestor copies so stale variants cannot resurrect old metadata.

---

Nitpick comments:
In `@src/handlers/mod.rs`:
- Around line 131-160: The functions parse_dataset_tags and parse_dataset_labels
currently deduplicate by collecting into a HashSet which loses the caller's
order; change both to preserve input order by iterating through the split items,
trimming each, skipping empties, and using a HashSet called seen to track
duplicates while pushing the first-seen items into a result Vec; for
parse_dataset_tags apply DatasetTag::try_from on each trimmed item and log/skip
invalid tags as before, but only insert a tag into seen/result when it hasn't
been seen yet; for parse_dataset_labels do the same for the trimmed strings so
the returned Vec retains the original order while removing duplicates.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 81ad78f3-283d-4c33-bff4-a8dcb0a1423b

📥 Commits

Reviewing files that changed from the base of the PR and between f395dea and dc37227.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (1)
  • src/handlers/http/datasets.rs
🚧 Files skipped from review as they are similar to previous changes (7)
  • src/handlers/http/mod.rs
  • src/connectors/kafka/processor.rs
  • src/handlers/http/prism_logstream.rs
  • src/storage/field_stats.rs
  • src/metadata.rs
  • src/handlers/http/ingest.rs
  • src/parseable/streams.rs

PUT /api/v1/logstream/{name} accepts X-P-Dataset-Tags and
X-P-Dataset-Labels headers (comma-separated) on stream creation
PUT /api/prism/v1/datasets/{name} - update tags and labels
GET /api/prism/v1/datasets/{name}/correlated - find datasets sharing tags or labels
GET /api/prism/v1/datasets/tags/{tag} - find all datasets with a specific tag
include tags and labels in home api response
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
src/storage/object_storage.rs (1)

500-523: ⚠️ Potential issue | 🟠 Major

Persist replacements to every saved stream.json variant.

This helper still rewrites only the canonical stream.json. In distributed mode the ingestor copies stay stale, but create_stream_from_ingestor() below and src/prism/home/mod.rs::get_stream_metadata() read those copies too. A later reload—or even the home response after a clear—can resurrect removed tags/labels. Please either update every persisted variant here or make those read paths prefer the canonical JSON for these fields.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/storage/object_storage.rs` around lines 500 - 523, The helper
update_dataset_tags_and_labels_in_stream currently only rewrites the canonical
stream.json, leaving ingestor/distributed variants stale; modify it to persist
the updated dataset_tags and dataset_labels to every stored stream.json variant
(e.g., call the metastore method that lists/returns all persisted stream JSON
variants for stream_name and tenant_id, then loop each variant: deserialize, set
format.dataset_tags/format.dataset_labels, and call put_stream_json for that
variant) so copies read by create_stream_from_ingestor and get_stream_metadata
reflect the changes; alternatively, if a metastore API exists to atomically
update all variants, use it instead.
src/handlers/http/datasets.rs (3)

86-123: ⚠️ Potential issue | 🟠 Major

Hydrate candidates before scanning their metadata.

Both endpoints still continue on PARSEABLE.get_stream(...) failures. On nodes that lazily load streams, an existing dataset that is only in object storage at that moment just disappears from these results. Reuse check_or_load_stream (or enumerate storage-backed metadata) for each candidate before reading tags/labels. Based on learnings: In Parseable's logstream handlers, stream existence checks must load from storage when a stream is absent from memory.

Also applies to: 148-168

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 86 - 123, The loop currently
skips candidates when PARSEABLE.get_stream(&name, &tenant_id) returns Err, which
hides streams that exist only in object storage; modify the flow to ensure
streams are loaded from storage before reading metadata by calling
PARSEABLE.check_or_load_stream(&name, &tenant_id) (or the equivalent
storage-enumeration helper) for each candidate and only then call
PARSEABLE.get_stream(&name, &tenant_id) to read tags/labels and log_source;
apply the same change to the other occurrence (the block referenced at lines
148-168) so all metadata reads use check_or_load_stream first to avoid silently
dropping storage-backed streams.

221-226: ⚠️ Potential issue | 🟡 Minor

Trim and drop blank labels before deduping.

Whitespace-only labels are still persisted, and exact-match correlation will treat "prod" and " prod " as different labels.

♻️ Suggested normalization
         Some(labels) => labels
             .into_iter()
+            .map(|label| label.trim().to_string())
+            .filter(|label| !label.is_empty())
             .collect::<HashSet<_>>()
             .into_iter()
             .collect(),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 221 - 226, final_labels currently
collects body.labels without normalization, so whitespace-only labels and labels
with surrounding spaces are persisted as distinct values; update the
transformation of body.labels (the final_labels construction) to: iterate over
each label from body.labels, trim surrounding whitespace (use trim), drop labels
that are empty after trimming, then dedupe by collecting into a HashSet and
convert back to the desired collection type—ensure you modify the chain that
starts from body.labels => .into_iter() so it applies .map(|l|
l.trim().to_string()) and a .filter(|l| !l.is_empty()) before the HashSet
collection.

213-240: ⚠️ Potential issue | 🔴 Critical

Partial PUTs still have a lost-update race.

Combining tags and labels into one handler doesn't remove the TOCTOU here: a tags-only request snapshots the current labels, a concurrent labels-only request snapshots the current tags, and the last write restores a stale copy of the other field. If partial updates stay supported, this merge needs to happen under a per-stream lock or in storage against the latest persisted metadata.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 213 - 240, The current
partial-PUT flow snapshots tags and labels separately (using
stream.get_dataset_tags / get_dataset_labels) then calls
update_dataset_tags_and_labels_in_stream, which causes a lost-update race for
concurrent partial updates; fix by performing the merge under a per-stream lock
or delegating the merge to storage so it reads-and-updates atomically: acquire a
lock keyed by the stream (or call a new atomic storage method) before computing
final_tags/final_labels and calling
PARSEABLE.storage.get_object_store().update_dataset_tags_and_labels_in_stream so
the storage update sees the latest persisted metadata and prevents TOCTOU races
(update code paths around final_tags, final_labels, stream, and
update_dataset_tags_and_labels_in_stream accordingly).
🧹 Nitpick comments (1)
src/parseable/streams.rs (1)

982-987: Prefer one setter for dataset metadata.

put_dataset_metadata updates tags and labels as one logical operation, but these setters force two separate write-lock acquisitions. That leaves a small window where readers can observe mixed old/new state.

🔧 Suggested direction
+    pub fn set_dataset_metadata(&self, tags: Vec<DatasetTag>, labels: Vec<String>) {
+        let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
+        metadata.dataset_tags = tags;
+        metadata.dataset_labels = labels;
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/parseable/streams.rs` around lines 982 - 987, Currently set_dataset_tags
and set_dataset_labels each acquire the write lock separately, allowing readers
to see a mixed state; add a single setter (e.g., set_dataset_metadata or
update_dataset_metadata) that takes both tags: Vec<DatasetTag> and labels:
Vec<String> and updates metadata.dataset_tags and metadata.dataset_labels inside
one write() lock (use the same LOCK_EXPECT path); then replace or have
set_dataset_tags and set_dataset_labels call this new set_dataset_metadata to
ensure a single write-lock acquisition and preserve atomic update semantics used
by put_dataset_metadata.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/handlers/http/datasets.rs`:
- Around line 259-276: The error_response currently returns self.to_string()
which can leak internal storage/metastore details for DatasetsError::Storage;
update the actix_web::ResponseError::error_response implementation to
pattern-match DatasetsError::Storage (matching the ObjectStorageError variant),
log the underlying error details server-side (e.g. via log::error! or
tracing::error!) including any MetastoreError::to_detail() where applicable, and
return a generic JSON message like {"error":"internal server error"} or
{"error":"storage error"} for that branch while preserving existing behavior for
other DatasetsError variants.

---

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 86-123: The loop currently skips candidates when
PARSEABLE.get_stream(&name, &tenant_id) returns Err, which hides streams that
exist only in object storage; modify the flow to ensure streams are loaded from
storage before reading metadata by calling PARSEABLE.check_or_load_stream(&name,
&tenant_id) (or the equivalent storage-enumeration helper) for each candidate
and only then call PARSEABLE.get_stream(&name, &tenant_id) to read tags/labels
and log_source; apply the same change to the other occurrence (the block
referenced at lines 148-168) so all metadata reads use check_or_load_stream
first to avoid silently dropping storage-backed streams.
- Around line 221-226: final_labels currently collects body.labels without
normalization, so whitespace-only labels and labels with surrounding spaces are
persisted as distinct values; update the transformation of body.labels (the
final_labels construction) to: iterate over each label from body.labels, trim
surrounding whitespace (use trim), drop labels that are empty after trimming,
then dedupe by collecting into a HashSet and convert back to the desired
collection type—ensure you modify the chain that starts from body.labels =>
.into_iter() so it applies .map(|l| l.trim().to_string()) and a .filter(|l|
!l.is_empty()) before the HashSet collection.
- Around line 213-240: The current partial-PUT flow snapshots tags and labels
separately (using stream.get_dataset_tags / get_dataset_labels) then calls
update_dataset_tags_and_labels_in_stream, which causes a lost-update race for
concurrent partial updates; fix by performing the merge under a per-stream lock
or delegating the merge to storage so it reads-and-updates atomically: acquire a
lock keyed by the stream (or call a new atomic storage method) before computing
final_tags/final_labels and calling
PARSEABLE.storage.get_object_store().update_dataset_tags_and_labels_in_stream so
the storage update sees the latest persisted metadata and prevents TOCTOU races
(update code paths around final_tags, final_labels, stream, and
update_dataset_tags_and_labels_in_stream accordingly).

In `@src/storage/object_storage.rs`:
- Around line 500-523: The helper update_dataset_tags_and_labels_in_stream
currently only rewrites the canonical stream.json, leaving ingestor/distributed
variants stale; modify it to persist the updated dataset_tags and dataset_labels
to every stored stream.json variant (e.g., call the metastore method that
lists/returns all persisted stream JSON variants for stream_name and tenant_id,
then loop each variant: deserialize, set
format.dataset_tags/format.dataset_labels, and call put_stream_json for that
variant) so copies read by create_stream_from_ingestor and get_stream_metadata
reflect the changes; alternatively, if a metastore API exists to atomically
update all variants, use it instead.

---

Nitpick comments:
In `@src/parseable/streams.rs`:
- Around line 982-987: Currently set_dataset_tags and set_dataset_labels each
acquire the write lock separately, allowing readers to see a mixed state; add a
single setter (e.g., set_dataset_metadata or update_dataset_metadata) that takes
both tags: Vec<DatasetTag> and labels: Vec<String> and updates
metadata.dataset_tags and metadata.dataset_labels inside one write() lock (use
the same LOCK_EXPECT path); then replace or have set_dataset_tags and
set_dataset_labels call this new set_dataset_metadata to ensure a single
write-lock acquisition and preserve atomic update semantics used by
put_dataset_metadata.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 75cbad3c-acad-4f94-946d-ac14eafc4542

📥 Commits

Reviewing files that changed from the base of the PR and between f395dea and cacb58c.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (2)
  • src/handlers/http/mod.rs
  • src/connectors/kafka/processor.rs
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/handlers/http/prism_logstream.rs
  • src/storage/field_stats.rs
  • src/handlers/http/ingest.rs
  • src/parseable/mod.rs
  • src/handlers/mod.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
src/handlers/http/datasets.rs (1)

263-278: ⚠️ Potential issue | 🟠 Major

Don't echo raw storage failures to API clients.

The DatasetsError::Storage variant serializes the underlying ObjectStorageError via self.to_string(), which can expose internal backend paths and metastore context on 500 responses. Return a generic client message and log the underlying error server-side.

,

🛡️ Suggested response sanitization
     fn error_response(&self) -> HttpResponse {
+        let error_message = match self {
+            DatasetsError::Storage(err) => {
+                tracing::error!(?err, "dataset API storage failure");
+                "Internal storage error".to_string()
+            }
+            _ => self.to_string(),
+        };
+
         HttpResponse::build(self.status_code()).json(serde_json::json!({
-            "error": self.to_string()
+            "error": error_message
         }))
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/handlers/http/datasets.rs` around lines 263 - 278, The ResponseError impl
for DatasetsError currently exposes internal storage details via
self.to_string() in error_response; update error_response to match on self and
for the DatasetsError::Storage(_) variant return a generic 500 JSON message
(e.g. {"error":"internal server error"}) while logging the underlying
ObjectStorageError server-side (use the original error value from
DatasetsError::Storage to log with the app logger), keeping other variants'
messages unchanged; reference the impl block for actix_web::ResponseError, the
status_code() method, error_response(), and the DatasetsError::Storage variant
to locate and change the behavior.
🧹 Nitpick comments (1)
src/prism/logstream/mod.rs (1)

231-245: Optional tenant/session consistency guard is recommended for future non-HTTP callers only.

The HTTP caller (src/handlers/http/prism_logstream.rs:50) already derives both session_key and tenant_id from the same middleware-normalized request, so consistency is guaranteed in the current HTTP path. If non-HTTP internal callers are introduced in the future, adding a defensive tenant/session mismatch check at the get_datasets boundary would be a good hardening measure to prevent accidental tenant scope violations.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/prism/logstream/mod.rs` around lines 231 - 245, Add a defensive
consistency check at the start of get_datasets to ensure the optional tenant_id
parameter matches the tenant embedded in the provided SessionKey (abort early if
both are present but different). Specifically, in get_datasets (before using
self.streams or calling process_stream), extract the tenant identifier from
SessionKey (e.g., via the SessionKey getter used elsewhere in the codebase such
as key.tenant_id() / key.tenant()), and if tenant_id.is_some() &&
extracted_tenant.is_some() && they !=, return an appropriate PrismLogstreamError
(create or reuse a TenantSessionMismatch/InvalidSession error variant). This
ensures future non-HTTP callers cannot pass mismatched tenant/session pairs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/handlers/http/datasets.rs`:
- Around line 263-278: The ResponseError impl for DatasetsError currently
exposes internal storage details via self.to_string() in error_response; update
error_response to match on self and for the DatasetsError::Storage(_) variant
return a generic 500 JSON message (e.g. {"error":"internal server error"}) while
logging the underlying ObjectStorageError server-side (use the original error
value from DatasetsError::Storage to log with the app logger), keeping other
variants' messages unchanged; reference the impl block for
actix_web::ResponseError, the status_code() method, error_response(), and the
DatasetsError::Storage variant to locate and change the behavior.

---

Nitpick comments:
In `@src/prism/logstream/mod.rs`:
- Around line 231-245: Add a defensive consistency check at the start of
get_datasets to ensure the optional tenant_id parameter matches the tenant
embedded in the provided SessionKey (abort early if both are present but
different). Specifically, in get_datasets (before using self.streams or calling
process_stream), extract the tenant identifier from SessionKey (e.g., via the
SessionKey getter used elsewhere in the codebase such as key.tenant_id() /
key.tenant()), and if tenant_id.is_some() && extracted_tenant.is_some() && they
!=, return an appropriate PrismLogstreamError (create or reuse a
TenantSessionMismatch/InvalidSession error variant). This ensures future
non-HTTP callers cannot pass mismatched tenant/session pairs.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 435b73b2-9ad0-4244-87b9-de4e62fd576b

📥 Commits

Reviewing files that changed from the base of the PR and between f395dea and cacb58c.

📒 Files selected for processing (17)
  • src/connectors/kafka/processor.rs
  • src/handlers/http/datasets.rs
  • src/handlers/http/ingest.rs
  • src/handlers/http/mod.rs
  • src/handlers/http/modal/server.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/handlers/http/prism_logstream.rs
  • src/handlers/mod.rs
  • src/metadata.rs
  • src/migration/mod.rs
  • src/parseable/mod.rs
  • src/parseable/streams.rs
  • src/prism/home/mod.rs
  • src/prism/logstream/mod.rs
  • src/storage/field_stats.rs
  • src/storage/mod.rs
  • src/storage/object_storage.rs
✅ Files skipped from review due to trivial changes (1)
  • src/parseable/streams.rs
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/handlers/http/prism_logstream.rs
  • src/connectors/kafka/processor.rs
  • src/storage/field_stats.rs
  • src/handlers/http/mod.rs
  • src/prism/home/mod.rs
👮 Files not reviewed due to content moderation or server errors (8)
  • src/migration/mod.rs
  • src/handlers/http/ingest.rs
  • src/handlers/mod.rs
  • src/handlers/http/modal/server.rs
  • src/metadata.rs
  • src/handlers/http/modal/utils/logstream_utils.rs
  • src/storage/object_storage.rs
  • src/storage/mod.rs

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 25, 2026
praveen5959
praveen5959 previously approved these changes Mar 28, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/alerts/alert_structs.rs`:
- Around line 194-198: generate_filter_message currently assumes
self.condition_config has at least two entries and ignores the new groups field;
update generate_filter_message to guard against empty/singleton condition_config
by checking length before indexing (use get(0)/get(1) or iterate) and produce a
sensible message when none or only one condition exists, and also traverse and
include self.groups (Option<Vec<Conditions>>) by recursing or iterating to
format nested condition groups (e.g., join sub-group messages with AND/OR as
appropriate) so that both empty/short vectors and nested groups are handled
without panics.

In `@src/alerts/alerts_utils.rs`:
- Around line 391-399: get_filter_string currently treats where_clause.operator
as required and returns an error when None; to preserve backward compatibility
with callers that supply a single condition, change get_filter_string to handle
operator being None by treating it as LogicalOperator::And (i.e., default to
"AND") instead of returning an error. Locate the operator access in
get_filter_string (the where_clause.operator handling) and replace the
ok_or_else error path with a .unwrap_or(&LogicalOperator::And) or equivalent so
the joiner uses " AND " when operator is None; ensure this behavior matches
generate_filter_message's handling of single conditions.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 1430c8cc-d705-432d-a408-da157509ead7

📥 Commits

Reviewing files that changed from the base of the PR and between cacb58c and 5e1f1a6.

📒 Files selected for processing (2)
  • src/alerts/alert_structs.rs
  • src/alerts/alerts_utils.rs

coderabbitai[bot]
coderabbitai bot previously approved these changes Mar 28, 2026
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/alerts/alert_structs.rs`:
- Around line 220-226: The current mapping in group_parts includes empty group
messages because generate_filter_message() can return "", producing "()" in
output; change the logic that builds group_parts (the block using
self.groups.as_deref().unwrap_or_default().iter().map(...).collect()) to skip
empty generated messages—e.g., call generate_filter_message(), check if it's
empty, and only push/map non-empty results (or use filter_map) before wrapping
with format!("({})"), so nested groups with no conditions/sub-groups are omitted
from group_parts.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: a6cfa564-b6d4-4e32-a9de-c1c1242bf6cc

📥 Commits

Reviewing files that changed from the base of the PR and between fc39896 and 586d81e.

📒 Files selected for processing (1)
  • src/alerts/alert_structs.rs

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
src/alerts/alert_structs.rs (1)

201-240: Consider sharing the recursive walk with get_filter_string.

generate_filter_message now mirrors the same condition_config/groups traversal and default-operator handling as src/alerts/alerts_utils.rs:390-424. Pulling that tree walk behind a small shared helper would reduce drift the next time filter semantics change.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/alerts/alert_structs.rs` around lines 201 - 240, Extract the recursive
traversal and string-building logic shared by generate_filter_message and
get_filter_string into a small helper (e.g., build_filter_parts or
compose_filter_string) that accepts a reference to the filter/group struct and
returns the Vec<String> or final String; move format_condition into the shared
helper scope or call it from there so both generate_filter_message and
get_filter_string reuse the same walk; ensure the helper implements the same
behaviors: use self.operator.unwrap_or(&LogicalOperator::And) for default
operator, iterate condition_config -> format_condition, recurse into groups
skipping empty messages, wrap nested group messages in parentheses, and join
parts with the operator separator (producing "" for zero parts, single part
as-is, and bracketed joined form for multiple parts), then have
generate_filter_message and get_filter_string call this helper to remove
duplication and keep semantics consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@src/alerts/alert_structs.rs`:
- Around line 201-240: Extract the recursive traversal and string-building logic
shared by generate_filter_message and get_filter_string into a small helper
(e.g., build_filter_parts or compose_filter_string) that accepts a reference to
the filter/group struct and returns the Vec<String> or final String; move
format_condition into the shared helper scope or call it from there so both
generate_filter_message and get_filter_string reuse the same walk; ensure the
helper implements the same behaviors: use
self.operator.unwrap_or(&LogicalOperator::And) for default operator, iterate
condition_config -> format_condition, recurse into groups skipping empty
messages, wrap nested group messages in parentheses, and join parts with the
operator separator (producing "" for zero parts, single part as-is, and
bracketed joined form for multiple parts), then have generate_filter_message and
get_filter_string call this helper to remove duplication and keep semantics
consistent.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 43154d66-9321-4183-8758-f60f5622a78c

📥 Commits

Reviewing files that changed from the base of the PR and between 586d81e and a91dc03.

📒 Files selected for processing (1)
  • src/alerts/alert_structs.rs

@nikhilsinhaparseable nikhilsinhaparseable merged commit 0b83b8a into parseablehq:main Mar 28, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants