Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 57 additions & 60 deletions plugins/out_stackdriver/stackdriver.c
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,48 @@ static int pack_payload(int insert_id_extracted,
return ret;
}

/*
* should_skip_record
* Check if a record should be skipped due to invalid fields.
* Returns FLB_TRUE if the record should be dropped.
*
* log_errors: if FLB_TRUE, log why the record is being dropped.
* Set to FLB_FALSE during prescan to avoid duplicate logging.
*/
static int should_skip_record(struct flb_stackdriver *ctx,
msgpack_object *obj,
int log_errors)
{
insert_id_status in_status;
msgpack_object insert_id_obj;
msgpack_object *payload_labels_ptr;

/* Check insertId */
in_status = validate_insert_id(&insert_id_obj, obj);
if (in_status == INSERTID_INVALID) {
if (log_errors == FLB_TRUE) {
flb_plg_error(ctx->ins,
"Incorrect insertId received. "
"InsertId should be non-empty string.");
}
return FLB_TRUE;
}

/* Check labels type */
payload_labels_ptr = get_payload_labels(ctx, obj);
if (payload_labels_ptr != NULL &&
payload_labels_ptr->type != MSGPACK_OBJECT_MAP) {
if (log_errors == FLB_TRUE) {
flb_plg_error(ctx->ins,
"the type of payload labels should be map, "
"dropping record");
}
return FLB_TRUE;
}

return FLB_FALSE;
}

static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
int total_records,
const char *tag, int tag_len,
Expand Down Expand Up @@ -1868,20 +1910,14 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
}

/*
* Search each entry and validate insertId.
* Reject the entry if insertId is invalid.
* Search each entry and validate record fields.
* Reject entries with invalid insertId or non-map labels.
* If all the entries are rejected, stop formatting.
*
*/
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
/* Extract insertId */
in_status = validate_insert_id(&insert_id_obj, log_event.body);

if (in_status == INSERTID_INVALID) {
flb_plg_error(ctx->ins,
"Incorrect insertId received. InsertId should be non-empty string.");
if (should_skip_record(ctx, log_event.body, FLB_FALSE) == FLB_TRUE) {
array_size -= 1;
}
}
Comment on lines 1917 to 1923
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Propagate skipped-record count to the flush metrics.

array_size is reduced here, but the flush path still reports event_chunk->total_events as processed. After this change, a batch with locally skipped records will be counted as fully successful on HTTP 200, and dropped-record metrics will miss the records discarded by should_skip_record(). Please surface the emitted count to cb_stackdriver_flush() and use that for metrics.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/out_stackdriver/stackdriver.c` around lines 1917 - 1923, The loop
that decrements array_size when should_skip_record() returns true adjusts the
emitted count locally but cb_stackdriver_flush() still reports
event_chunk->total_events; update the code to propagate the actual
emitted/processed count to cb_stackdriver_flush() (e.g., compute emitted_count
by starting from event_chunk->total_events and decrementing for each skipped
record inside the flb_log_event_decoder_next()/should_skip_record() loop or
accumulate emitted_count directly) and change metric reporting in
cb_stackdriver_flush() to use this emitted_count (or a new parameter name you
add) instead of event_chunk->total_events so dropped records are reflected in
flush and dropped-record metrics.

Expand All @@ -1890,6 +1926,9 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,

/* Sounds like this should compare to -1 instead of zero */
if (array_size == 0) {
flb_plg_warn(ctx->ins,
"all %d entries skipped due to invalid insertId "
"or labels, dropping batch", total_records);
return NULL;
}

Expand Down Expand Up @@ -2308,6 +2347,12 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
obj = log_event.body;

/* Skip records with invalid fields */
if (should_skip_record(ctx, obj, FLB_TRUE) == FLB_TRUE) {
continue;
}

tms_status = extract_timestamp(obj, &log_event.timestamp);

/*
Expand Down Expand Up @@ -2372,33 +2417,14 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
log_name_extracted = FLB_TRUE;
}

/* Extract insertId */
/* Extract insertId (INVALID case already handled by should_skip_record) */
in_status = validate_insert_id(&insert_id_obj, obj);
if (in_status == INSERTID_VALID) {
insert_id_extracted = FLB_TRUE;
entry_size += 1;
}
else if (in_status == INSERTID_NOT_PRESENT) {
insert_id_extracted = FLB_FALSE;
}
else {
if (trace_extracted == FLB_TRUE) {
flb_sds_destroy(trace);
}

if (span_id_extracted == FLB_TRUE) {
flb_sds_destroy(span_id);
}

if (project_id_extracted == FLB_TRUE) {
flb_sds_destroy(project_id_key);
}

if (log_name_extracted == FLB_TRUE) {
flb_sds_destroy(log_name);
}

continue;
insert_id_extracted = FLB_FALSE;
}

/* Extract operation */
Expand Down Expand Up @@ -2441,37 +2467,8 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
entry_size += 1;
}

/* Extract payload labels */
/* Extract payload labels (non-map case already handled by should_skip_record) */
payload_labels_ptr = get_payload_labels(ctx, obj);
if (payload_labels_ptr != NULL &&
payload_labels_ptr->type != MSGPACK_OBJECT_MAP) {
flb_plg_error(ctx->ins, "the type of payload labels should be map");
flb_sds_destroy(operation_id);
flb_sds_destroy(operation_producer);
flb_sds_destroy(source_location_file);
flb_sds_destroy(source_location_function);

if (trace_extracted == FLB_TRUE) {
flb_sds_destroy(trace);
}

if (span_id_extracted == FLB_TRUE) {
flb_sds_destroy(span_id);
}

if (project_id_extracted == FLB_TRUE) {
flb_sds_destroy(project_id_key);
}

if (log_name_extracted == FLB_TRUE) {
flb_sds_destroy(log_name);
}

flb_log_event_decoder_destroy(&log_decoder);
msgpack_sbuffer_destroy(&mp_sbuf);

return NULL;
}

/* Number of parsed labels */
labels_size = mk_list_size(&ctx->config_labels);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"message":"bad record 1", "logging.googleapis.com/labels": "not_a_map"}
{"message":"bad record 2", "logging.googleapis.com/labels": 12345}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"message":"bad first record", "logging.googleapis.com/labels": "not_a_map"}
{"message":"valid second record"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"message":"valid record with labels", "logging.googleapis.com/labels": {"testA": "valA", "testB": "valB"}}
{"message":"bad labels - not a map", "logging.googleapis.com/labels": "not_a_map"}
{"message":"valid record without labels"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"message":"valid record no special fields"}
{"message":"bad insertId", "logging.googleapis.com/insertId": 123}
{"message":"bad labels", "logging.googleapis.com/labels": "not_a_map"}
{"message":"valid record with labels", "logging.googleapis.com/labels": {"testA": "valA"}}
72 changes: 72 additions & 0 deletions tests/runtime/data/stackdriver/stackdriver_test_labels.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,75 @@
"}" \
"}]"

/* labels value is a string instead of a map - should cause record to be dropped */
#define LABELS_NOT_A_MAP "[" \
"1591649196," \
"{" \
"\"logging.googleapis.com/labels\": \"not_a_map\"" \
"}]"

/* custom labels key with string value instead of map */
#define LABELS_NOT_A_MAP_CUSTOM_KEY "[" \
"1591649196," \
"{" \
"\"logging.googleapis.com/customlabels\": \"not_a_map\"" \
"}]"

/* invalid labels record with extracted fields (httpRequest, operation, */
/* sourceLocation, trace, spanId) to verify cleanup on skip path */
#define LABELS_NOT_A_MAP_WITH_FIELDS "[" \
"1591649196," \
"{" \
"\"logging.googleapis.com/labels\": \"not_a_map\"," \
"\"logging.googleapis.com/trace\": \"test_trace\"," \
"\"logging.googleapis.com/spanId\": \"test_span\"," \
"\"logging.googleapis.com/operation\": " \
"{" \
"\"id\": \"test_id\"," \
"\"producer\": \"test_producer\"," \
"\"first\": true," \
"\"last\": true" \
"}," \
"\"logging.googleapis.com/sourceLocation\": " \
"{" \
"\"file\": \"test_file\"," \
"\"line\": 123," \
"\"function\": \"test_function\"" \
"}," \
"\"logging.googleapis.com/http_request\": " \
"{" \
"\"requestMethod\": \"GET\"," \
"\"requestUrl\": \"https://example.com\"," \
"\"status\": 200" \
"}" \
"}]"

/* two-record batch: first record has invalid labels, second is valid */
#define BATCH_FIRST_RECORD_LABELS_NOT_A_MAP "[" \
"1591649196," \
"{" \
"\"logging.googleapis.com/labels\": \"not_a_map\"," \
"\"message\": \"bad first record\"" \
"}]"

#define BATCH_FIRST_RECORD_VALID "[" \
"1591649196," \
"{" \
"\"message\": \"valid second record\"" \
"}]"

/* all records have invalid labels */
#define BATCH_ALL_LABELS_NOT_A_MAP_1 "[" \
"1591649196," \
"{" \
"\"logging.googleapis.com/labels\": \"not_a_map\"," \
"\"message\": \"bad record 1\"" \
"}]"

#define BATCH_ALL_LABELS_NOT_A_MAP_2 "[" \
"1591649196," \
"{" \
"\"logging.googleapis.com/labels\": 12345," \
"\"message\": \"bad record 2\"" \
"}]"

Loading
Loading