Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/fluent-bit/flb_coro.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ struct flb_coro {
#define FLB_CORO_STACK_SIZE_BYTE ((3 * STACK_FACTOR * PTHREAD_STACK_MIN) / 2)
#endif

#if FLB_HAVE_SANITIZE_ADDRESS
#undef FLB_CORO_STACK_SIZE_BYTE
#define FLB_CORO_STACK_SIZE_BYTE (128 * 1024) /* Extend coro stack size for ASan */
#endif

#define FLB_CORO_DATA(coro) (((char *) coro) + sizeof(struct flb_coro))

static FLB_INLINE void flb_coro_yield(struct flb_coro *coro, int ended)
Expand Down
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_time.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static inline void flb_time_from_uint64(struct flb_time *dst, uint64_t value)

static inline void flb_time_from_double(struct flb_time *dst, double d)
{
dst->tm.tv_sec = (int) d;
dst->tm.tv_sec = (time_t) d;
dst->tm.tv_nsec = (long) ((d - dst->tm.tv_sec) * 1000000000L);
}

Expand Down
3 changes: 2 additions & 1 deletion plugins/out_stdout/stdout.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <cprofiles/cprof_decode_msgpack.h>

#include <msgpack.h>
#include <inttypes.h>
#include "stdout.h"


Expand Down Expand Up @@ -300,7 +301,7 @@ static void cb_stdout_flush(struct flb_event_chunk *event_chunk,

printf("[%zd] %s: [[", cnt++, event_chunk->tag);

printf("%"PRId32".%09lu, ", (int32_t) log_event.timestamp.tm.tv_sec,
printf("%"PRId64".%09lu, ", (int64_t) log_event.timestamp.tm.tv_sec,
log_event.timestamp.tm.tv_nsec);

msgpack_object_print(stdout, *log_event.metadata);
Expand Down
61 changes: 47 additions & 14 deletions src/flb_log_event_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <fluent-bit/flb_compat.h>
#include <fluent-bit/flb_log.h>

#include <inttypes.h>

#define FLB_LOG_EVENT_DECODER_MAX_RECURSION_DEPTH 1000 /* Safety limit for recursion */

static int create_empty_map(struct flb_log_event_decoder *context) {
Expand Down Expand Up @@ -177,6 +179,45 @@ void flb_log_event_decoder_destroy(struct flb_log_event_decoder *context)
}
}

static inline int flb_log_event_decoder_decode_ext_time(msgpack_object *input,
struct flb_time *output)
{
uint32_t sec;
uint32_t nsec;

sec = FLB_UINT32_TO_HOST_BYTE_ORDER(
FLB_ALIGNED_DWORD_READ(
(unsigned char *) &input->via.ext.ptr[0]));

nsec = FLB_UINT32_TO_HOST_BYTE_ORDER(
FLB_ALIGNED_DWORD_READ(
(unsigned char *) &input->via.ext.ptr[4]));

/*
* EventTime stores seconds as unsigned 32-bit.
* Keep -1/-2 semantics for group markers, allow normal post-2038
* timestamps, and preserve handling for intentionally crafted negative
* marker-like values used by tests/corruption checks (e.g. -3, -10,
* -1000), which are encoded near UINT32_MAX.
*/
if (sec == (uint32_t) -1) {
output->tm.tv_sec = FLB_LOG_EVENT_GROUP_START;
}
else if (sec == (uint32_t) -2) {
output->tm.tv_sec = FLB_LOG_EVENT_GROUP_END;
}
else if (sec >= 0xFFFFFC00U) {
output->tm.tv_sec = (time_t) ((int32_t) sec);
}
else {
output->tm.tv_sec = (time_t) sec;
}

output->tm.tv_nsec = (uint32_t) nsec;

return FLB_EVENT_DECODER_SUCCESS;
}

int flb_log_event_decoder_decode_timestamp(msgpack_object *input,
struct flb_time *output)
{
Expand All @@ -194,15 +235,7 @@ int flb_log_event_decoder_decode_timestamp(msgpack_object *input,
return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
}

output->tm.tv_sec =
(int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
FLB_ALIGNED_DWORD_READ(
(unsigned char *) &input->via.ext.ptr[0]));

output->tm.tv_nsec =
(int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
FLB_ALIGNED_DWORD_READ(
(unsigned char *) &input->via.ext.ptr[4]));
return flb_log_event_decoder_decode_ext_time(input, output);
}
else {
return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
Expand Down Expand Up @@ -313,7 +346,7 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context,
int result;
int record_type;
size_t previous_offset;
int32_t invalid_timestamp;
int64_t invalid_timestamp;

if (context == NULL) {
return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT;
Expand Down Expand Up @@ -368,8 +401,8 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context,
* to avoid losing valid group metadata if corruption occurs mid-group.
* Skip the record and continue processing.
*/
invalid_timestamp = (int32_t) event->timestamp.tm.tv_sec;
flb_debug("[decoder] Invalid group marker timestamp (%d), skipping record. "
invalid_timestamp = (int64_t) event->timestamp.tm.tv_sec;
flb_debug("[decoder] Invalid group marker timestamp (%" PRId64 "), skipping record. "
"Group state preserved.", invalid_timestamp);

/* Increment recursion depth before recursive call */
Expand Down Expand Up @@ -457,9 +490,9 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context,

int flb_log_event_decoder_get_record_type(struct flb_log_event *event, int32_t *type)
{
int32_t s;
time_t s;

s = (int32_t) event->timestamp.tm.tv_sec;
s = event->timestamp.tm.tv_sec;

if (s >= 0) {
*type = FLB_LOG_EVENT_NORMAL;
Expand Down
35 changes: 34 additions & 1 deletion tests/internal/log_event_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,40 @@ void decoder_corrupted_group_timestamps()
flb_log_event_decoder_destroy(&dec);
msgpack_sbuffer_destroy(&sbuf2);

/* Test Case 3: Very negative timestamp - should skip */
/* Test Case 3: Post-2038 timestamp should remain a normal record */
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);

/* 2040-01-03 03:15:10 UTC */
flb_time_set(&tm1, 2209072510LL, 808241446);

msgpack_pack_array(&pck, 2);
msgpack_pack_array(&pck, 2);
pack_event_time(&pck, &tm1);
msgpack_pack_map(&pck, 0);
msgpack_pack_map(&pck, 1);
msgpack_pack_str(&pck, 3);
msgpack_pack_str_body(&pck, "log", 3);
msgpack_pack_str(&pck, 4);
msgpack_pack_str_body(&pck, "2040", 4);

ret = flb_log_event_decoder_init(&dec, (char *)sbuf.data, sbuf.size);
TEST_CHECK(ret == FLB_EVENT_DECODER_SUCCESS);

ret = flb_log_event_decoder_read_groups(&dec, FLB_TRUE);
TEST_CHECK(ret == 0);

ret = flb_log_event_decoder_next(&dec, &event);
TEST_CHECK(ret == FLB_EVENT_DECODER_SUCCESS);
ret = flb_log_event_decoder_get_record_type(&event, &decoded_record_type);
TEST_CHECK(ret == 0);
TEST_CHECK(decoded_record_type == FLB_LOG_EVENT_NORMAL);
TEST_CHECK(flb_time_equal(&tm1, &event.timestamp));

flb_log_event_decoder_destroy(&dec);
msgpack_sbuffer_destroy(&sbuf);

/* Test Case 4: Very negative timestamp - should skip */
msgpack_sbuffer_init(&sbuf);
msgpack_packer_init(&pck, &sbuf, msgpack_sbuffer_write);

Expand Down
Loading