Skip to content

Commit 48eae01

Browse files
committed
in_opentelemetry: add a function to store metadata
This function is to store fields other than body as metadata. The fields are defined at https://opentelemetry.io/docs/specs/otel/logs/data-model/#definitions-used-in-this-document Signed-off-by: Takahiro Yamashita <nokute78@gmail.com>
1 parent 0f0e64f commit 48eae01

1 file changed

Lines changed: 140 additions & 43 deletions

File tree

plugins/in_opentelemetry/opentelemetry_prot.c

Lines changed: 140 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -387,17 +387,148 @@ static int otlp_pack_any_value(msgpack_packer *mp_pck,
387387
return result;
388388
}
389389

390+
/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition */
391+
static int otel_pack_v1_metadata(msgpack_packer *mp_pck,
392+
struct Opentelemetry__Proto__Logs__V1__LogRecord *log_record,
393+
Opentelemetry__Proto__Resource__V1__Resource *resource,
394+
Opentelemetry__Proto__Common__V1__InstrumentationScope *scope)
395+
{
396+
struct flb_mp_map_header mh;
397+
struct flb_mp_map_header scope_mh;
398+
int ret;
399+
flb_mp_map_header_init(&mh, mp_pck);
400+
401+
flb_mp_map_header_append(&mh);
402+
msgpack_pack_str(mp_pck, 17);
403+
msgpack_pack_str_body(mp_pck, "ObservedTimestamp", 17);
404+
msgpack_pack_uint64(mp_pck, log_record->observed_time_unix_nano);
405+
406+
/* Value of 0 indicates unknown or missing timestamp. */
407+
if (log_record->time_unix_nano != 0) {
408+
flb_mp_map_header_append(&mh);
409+
msgpack_pack_str(mp_pck, 9);
410+
msgpack_pack_str_body(mp_pck, "Timestamp", 9);
411+
msgpack_pack_uint64(mp_pck, log_record->time_unix_nano);
412+
}
413+
414+
/* https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber */
415+
if (log_record->severity_number >= 1 && log_record->severity_number <= 24) {
416+
flb_mp_map_header_append(&mh);
417+
msgpack_pack_str(mp_pck, 14);
418+
msgpack_pack_str_body(mp_pck, "SeverityNumber", 14);
419+
msgpack_pack_uint64(mp_pck, log_record->severity_number);
420+
}
421+
422+
if (log_record->severity_text != NULL && strlen(log_record->severity_text) > 0) {
423+
flb_mp_map_header_append(&mh);
424+
msgpack_pack_str(mp_pck, 12);
425+
msgpack_pack_str_body(mp_pck, "SeverityText", 12);
426+
msgpack_pack_str(mp_pck, strlen(log_record->severity_text));
427+
msgpack_pack_str_body(mp_pck, log_record->severity_text, strlen(log_record->severity_text));
428+
}
429+
430+
if (log_record->n_attributes > 0) {
431+
flb_mp_map_header_append(&mh);
432+
msgpack_pack_str(mp_pck, 10);
433+
msgpack_pack_str_body(mp_pck, "Attributes", 10);
434+
ret = otel_pack_kvarray(mp_pck,
435+
log_record->attributes,
436+
log_record->n_attributes);
437+
if (ret != 0) {
438+
return ret;
439+
}
440+
}
441+
442+
if (log_record->trace_id.len > 0) {
443+
flb_mp_map_header_append(&mh);
444+
msgpack_pack_str(mp_pck, 7);
445+
msgpack_pack_str_body(mp_pck, "TraceId", 7);
446+
ret = otel_pack_bytes(mp_pck, log_record->trace_id);
447+
if (ret != 0) {
448+
return ret;
449+
}
450+
}
451+
452+
if (log_record->span_id.len > 0) {
453+
flb_mp_map_header_append(&mh);
454+
msgpack_pack_str(mp_pck, 6);
455+
msgpack_pack_str_body(mp_pck, "SpanId", 6);
456+
ret = otel_pack_bytes(mp_pck, log_record->span_id);
457+
if (ret != 0) {
458+
return ret;
459+
}
460+
}
461+
462+
flb_mp_map_header_append(&mh);
463+
msgpack_pack_str(mp_pck, 10);
464+
msgpack_pack_str_body(mp_pck, "TraceFlags", 10);
465+
msgpack_pack_uint8(mp_pck, (uint8_t)log_record->flags & 0xff);
466+
467+
468+
469+
if (resource != NULL && resource->n_attributes > 0 && resource->attributes) {
470+
flb_mp_map_header_append(&mh);
471+
msgpack_pack_str(mp_pck, 8);
472+
msgpack_pack_str_body(mp_pck, "Resource", 8);
473+
474+
ret = otel_pack_kvarray(mp_pck,
475+
resource->attributes,
476+
resource->n_attributes);
477+
if (ret != 0) {
478+
return ret;
479+
}
480+
}
481+
482+
if (scope != NULL && (scope->name || scope->version || scope->n_attributes > 0)) {
483+
flb_mp_map_header_append(&mh);
484+
msgpack_pack_str(mp_pck, 20);
485+
msgpack_pack_str_body(mp_pck, "InstrumentationScope", 20);
486+
487+
flb_mp_map_header_init(&scope_mh, mp_pck);
488+
if (scope->name != NULL && strlen(scope->name) > 0) {
489+
flb_mp_map_header_append(&scope_mh);
490+
msgpack_pack_str(mp_pck, 4);
491+
msgpack_pack_str_body(mp_pck, "Name", 4);
492+
msgpack_pack_str(mp_pck, strlen(scope->name));
493+
msgpack_pack_str_body(mp_pck, scope->name, strlen(scope->name));
494+
}
495+
if (scope->version != NULL && strlen(scope->version) > 0) {
496+
flb_mp_map_header_append(&scope_mh);
497+
msgpack_pack_str(mp_pck, 7);
498+
msgpack_pack_str_body(mp_pck, "Version", 7);
499+
msgpack_pack_str(mp_pck, strlen(scope->version));
500+
msgpack_pack_str_body(mp_pck, scope->version, strlen(scope->version));
501+
}
502+
if (scope->n_attributes > 0 && scope->attributes) {
503+
msgpack_pack_str(mp_pck, 10);
504+
msgpack_pack_str_body(mp_pck, "Attributes", 10);
505+
ret = otel_pack_kvarray(mp_pck,
506+
scope->attributes,
507+
scope->n_attributes);
508+
if (ret != 0) {
509+
return ret;
510+
}
511+
}
512+
513+
flb_mp_map_header_end(&scope_mh);
514+
}
515+
516+
flb_mp_map_header_end(&mh);
517+
return 0;
518+
}
519+
390520
static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
391521
uint8_t *in_buf,
392522
size_t in_size)
393523
{
394524
int ret;
395525
msgpack_packer packer;
396526
msgpack_sbuffer buffer;
527+
msgpack_packer meta_packer;
528+
msgpack_sbuffer meta_buffer;
397529
int resource_logs_index;
398530
int scope_log_index;
399531
int log_record_index;
400-
struct flb_mp_map_header mh;
401532

402533
Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
403534
Opentelemetry__Proto__Logs__V1__ScopeLogs **scope_logs;
@@ -409,6 +540,8 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
409540

410541
msgpack_sbuffer_init(&buffer);
411542
msgpack_packer_init(&packer, &buffer, msgpack_sbuffer_write);
543+
msgpack_sbuffer_init(&meta_buffer);
544+
msgpack_packer_init(&meta_packer, &meta_buffer, msgpack_sbuffer_write);
412545

413546
input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
414547
if (input_logs == NULL) {
@@ -453,55 +586,18 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
453586
}
454587

455588
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
456-
flb_mp_map_header_init(&mh, &packer);
457-
458-
/* pack resource */
459-
flb_mp_map_header_append(&mh);
460-
msgpack_pack_str(&packer, 8);
461-
msgpack_pack_str_body(&packer, "resource", 8);
462-
if (resource != NULL) {
463-
msgpack_pack_map(&packer, 1);
464-
465-
msgpack_pack_str(&packer, 10);
466-
msgpack_pack_str_body(&packer, "attributes", 10);
467-
ret = otel_pack_kvarray(
468-
&packer,
469-
resource->attributes,
470-
resource->n_attributes);
471-
}
472-
else {
473-
msgpack_pack_map(&packer, 0);
474-
}
475-
476-
if (ret != 0) {
477-
flb_error("[otel] Failed to convert log resource attributes");
478-
goto binary_payload_to_msgpack_end;
479-
}
480-
481-
/* pack logRecords */
482-
flb_mp_map_header_append(&mh);
483-
msgpack_pack_str(&packer, 10);
484-
msgpack_pack_str_body(&packer, "logRecords", 10);
485-
486-
msgpack_pack_map(&packer, 1);
487-
msgpack_pack_str(&packer, 10);
488-
msgpack_pack_str_body(&packer, "attributes", 10);
489-
ret = otel_pack_kvarray(
490-
&packer,
491-
log_records[log_record_index]->attributes,
492-
log_records[log_record_index]->n_attributes);
493-
589+
msgpack_sbuffer_clear(&meta_buffer);
590+
ret = otel_pack_v1_metadata(&meta_packer, log_records[log_record_index], resource, scope_log->scope);
494591
if (ret != 0) {
495-
flb_error("[otel] Failed to convert log record attributes");
592+
flb_error("[otel] Failed to convert log record");
496593

497594
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
498595
}
499596
else {
500-
flb_mp_map_header_end(&mh);
501597
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
502598
encoder,
503-
buffer.data,
504-
buffer.size);
599+
meta_buffer.data,
600+
meta_buffer.size);
505601
}
506602

507603
msgpack_sbuffer_clear(&buffer);
@@ -549,6 +645,7 @@ static int binary_payload_to_msgpack(struct flb_log_event_encoder *encoder,
549645

550646
binary_payload_to_msgpack_end:
551647
msgpack_sbuffer_destroy(&buffer);
648+
msgpack_sbuffer_destroy(&meta_buffer);
552649
if (input_logs) {
553650
opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked(
554651
input_logs, NULL);

0 commit comments

Comments
 (0)