Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
edc1285
feat(sinks): add per-sink `authoritative` acknowledgement field
connoryy Apr 22, 2026
0247ac2
feat(sinks): implement RFC 6517 authoritative sink semantics
connoryy Apr 23, 2026
f50aa97
Move finalizer stripping from sink-level to fan-out-level
connoryy Apr 24, 2026
df3ec28
Address review feedback on authoritative sink acks
connoryy Apr 27, 2026
31828f7
test: add compute_authoritative_components and backwards compat tests
connoryy Apr 27, 2026
ad83b95
test(topology): add e2e test proving authoritative sink controls ack …
connoryy Apr 27, 2026
fa18d76
fix: add authors to changelog fragment
connoryy Apr 28, 2026
c11c9dd
style: fix formatting
connoryy Apr 28, 2026
ff4754a
chore(ci): make nightly S3 verify resilient to CDN staleness (#25259)
pront Apr 28, 2026
6f38579
fix(vrl): restore stdlib functions in CLI and playground (#25310)
pront Apr 28, 2026
23016ad
chore(internal docs): fix release issue templates (#25318)
pront Apr 28, 2026
96ad9ed
fix(website): improve docs search ranking for component pages (#25319)
thomasqueirozb Apr 28, 2026
ce6ca43
chore(codecs): centralize `events_dropped` emission for batch encodin…
pront Apr 28, 2026
d6cdf03
fix(releasing): enable codecs-parquet in all release feature sets (#2…
pront Apr 28, 2026
233a35c
fix(ci): correct cross-build artifact name and path (#25282)
thomasqueirozb Apr 28, 2026
d1fa4b0
Merge branch 'master' into connor/upstream-authoritative-acks
connoryy Apr 29, 2026
d47d3e6
fix: clippy map_unwrap_or in fanout replace handler
connoryy Apr 29, 2026
13da597
Merge branch 'master' of github.com:vectordotdev/vector into connor/u…
connoryy Apr 29, 2026
24e9db5
Merge remote-tracking branch 'connoryy/connor/upstream-authoritative-…
connoryy Apr 29, 2026
f609fea
fix(topology): require enabled for authoritative sinks in BFS
connoryy Apr 29, 2026
6257497
fix(topology): preserve legacy ack behavior for pipelines without aut…
connoryy Apr 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/ISSUE_TEMPLATE/minor-release.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ Automated steps include:
# On the day of release

- [ ] Make sure the release branch is in sync with origin/master and has only one squashed commit with all commits from the prepare branch. If you made a PR from the prepare branch into the release branch this should already be the case.
- [ ] `git checkout "${RELEASE_BRANCH}"`
- [ ] `git fetch origin`
- [ ] `git checkout "${RELEASE_BRANCH}" && git pull --ff-only origin "${RELEASE_BRANCH}"`
- [ ] `git show --stat HEAD` - This should show the squashed prepare commit.
- [ ] Ensure release date in `website/cue/reference/releases/0.XX.Y.cue` matches current date.
- If this needs to be updated commit and squash it in the release branch.
Expand All @@ -88,7 +89,7 @@ Automated steps include:
- [ ] Wait for release workflow to complete.
- Discoverable via [release.yml](https://github.com/vectordotdev/vector/actions/workflows/release.yml)
- [ ] Reset the `website` branch to the `HEAD` of the release branch to update https://vector.dev
- [ ] `git switch website && git reset --hard origin/"${RELEASE_BRANCH}" && git push`
- [ ] `git fetch origin && git switch website && git reset --hard origin/"${RELEASE_BRANCH}" && git push --force-with-lease`
- [ ] Confirm that the release changelog was published to https://vector.dev/releases/
- Refer to the internal releasing doc to monitor the deployment.
- [ ] Release Linux packages. Refer to the internal releasing doc.
Expand Down
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/patch-release.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ export PREP_BRANCH=prepare-v-0-"${CURRENT_MINOR_VERSION}"-"${NEW_PATCH_VERSION}"
- Follow the [instructions at the top of the mirror.yaml file](https://github.com/DataDog/images/blob/fbf12868e90d52e513ebca0389610dea8a3c7e1a/mirror.yaml#L33-L49).
- [ ] Cherry-pick any release commits from the release branch that are not on `master`, to `master`
- [ ] Reset the `website` branch to the `HEAD` of the release branch to update https://vector.dev
- [ ] `git checkout website && git reset --hard origin/"${RELEASE_BRANCH}" && git push`
- [ ] `git fetch origin && git checkout website && git reset --hard origin/"${RELEASE_BRANCH}" && git push --force-with-lease`
- [ ] Kick-off post-mortems for any regressions resolved by the release
4 changes: 2 additions & 2 deletions .github/workflows/cross.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,5 @@ jobs:
- run: make cross-build-${{ matrix.target }}
- uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: "vector-debug-${{ matrix.target }}"
path: "./target/${{ matrix.target }}/debug/vector"
name: "vector-${{ matrix.target }}"
path: "./target/${{ matrix.target }}/release/vector"
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ vendored = ["rdkafka?/gssapi-vendored"]

# Default features for *-pc-windows-msvc
# TODO: Enable SASL https://github.com/vectordotdev/vector/pull/3081#issuecomment-659298042
base = ["api", "enrichment-tables", "sinks", "sources", "transforms", "secrets", "vrl/stdlib"]
base = ["api", "enrichment-tables", "sinks", "sources", "transforms", "secrets", "vrl/stdlib", "codecs-parquet"]
enable-api-client = ["base", "api-client"]
enable-unix = ["enable-api-client", "sources-dnstap", "unix"]

Expand Down
3 changes: 3 additions & 0 deletions changelog.d/25199_batch_encoding_events_dropped.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Sinks using batch encoding (Parquet, Arrow IPC) now consistently emit `ComponentEventsDropped` for every encode failure path. Previously some `build_record_batch` failures (notably type mismatches) dropped events silently. A new `EncoderRecordBatchError` internal event also reports `component_errors_total` with `error_code="arrow_json_decode"` or `"arrow_record_batch_creation"` at `stage="sending"` for granular alerting.

authors: pront
2 changes: 2 additions & 0 deletions changelog.d/25267_vrl_get_env_var_playground.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Restored the full VRL stdlib, including `get_env_var`, in the standalone VRL CLI and web playground by default.
authors: pront
3 changes: 3 additions & 0 deletions changelog.d/25313_enable_codecs_parquet_in_releases.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Parquet encoding in the `aws_s3` sink (`batch_encoding`) now works out of the box in the official release binaries. Previously it required compiling Vector from source with the `codecs-parquet` feature.

authors: pront
3 changes: 3 additions & 0 deletions changelog.d/authoritative_sink_acks.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added `authoritative` field to sink acknowledgements configuration. The field defaults to `false`; the feature only activates when at least one sink explicitly sets `authoritative: true`. When active, only **authoritative** sinks block source acknowledgements -- non-authoritative sinks have their event finalizers stripped so the source can acknowledge events as soon as all authoritative sinks have processed them. This prevents non-critical sinks (console, metrics, etc.) from blocking acknowledgement of critical sinks. When no sink sets `authoritative: true`, there is zero behavioral change from previous versions.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed

authors: connoryy
29 changes: 24 additions & 5 deletions lib/codecs/src/encoding/format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ pub struct ArrowStreamSerializerConfig {

/// Allow null values for non-nullable fields in the schema.
///
/// When enabled, missing or incompatible values will be encoded as null even for fields
/// When enabled, missing or incompatible values are encoded as null, even for fields
/// marked as non-nullable in the Arrow schema. This is useful when working with downstream
/// systems that can handle null values through defaults, computed columns, or other mechanisms.
///
/// When disabled (default), missing values for non-nullable fields will cause encoding errors,
/// ensuring all required data is present before sending to the sink.
/// When disabled (default), missing values for non-nullable fields results in encoding errors. This is to
/// help ensure all required data is present before sending it to the sink.
#[serde(default)]
#[configurable(derived)]
pub allow_nullable_fields: bool,
Expand Down Expand Up @@ -322,7 +322,6 @@ pub(crate) fn build_record_batch(
});
vector_common::internal_event::emit(crate::internal_events::EncoderNullConstraintError {
error: &error,
count: values.len(),
});
return Err(ArrowEncodingError::NullConstraint {
field_name: missing.join(", "),
Expand All @@ -331,12 +330,32 @@ pub(crate) fn build_record_batch(

let mut decoder = ReaderBuilder::new(schema)
.build_decoder()
.inspect_err(|e| {
vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
error: e,
error_code: "arrow_record_batch_creation",
});
})
.context(RecordBatchCreationSnafu)?;

decoder.serialize(values).context(ArrowJsonDecodeSnafu)?;
decoder
.serialize(values)
.inspect_err(|e| {
vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
error: e,
error_code: "arrow_json_decode",
});
})
.context(ArrowJsonDecodeSnafu)?;

decoder
.flush()
.inspect_err(|e| {
vector_common::internal_event::emit(crate::internal_events::EncoderRecordBatchError {
error: e,
error_code: "arrow_json_decode",
});
})
.context(ArrowJsonDecodeSnafu)?
.ok_or(ArrowEncodingError::NoEvents)
}
Expand Down
70 changes: 39 additions & 31 deletions lib/codecs/src/encoding/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,20 @@ type EventsDroppedError = ComponentEventsDropped<'static, UNINTENTIONAL>;
/// Compression algorithm and optional level for archive objects.
#[configurable_component]
#[derive(Default, Copy, Clone, Debug, PartialEq)]
#[configurable(metadata(
docs::enum_tag_description = "Compression codec applied per column page inside the Parquet file."
))]
#[serde(tag = "algorithm", rename_all = "snake_case")]
pub enum ParquetCompression {
/// Zstd compression. Level must be between 1 and 21.
Zstd {
/// Compression level (1–21). This is the range Vector currently supports; higher values compress more but are slower.
/// Compression level (1–21). This is the range Vector supports; higher values compress more but are slower.
#[configurable(validation(range(min = 1, max = 21)))]
level: u8,
},
/// Gzip compression. Level must be between 1 and 9.
Gzip {
/// Compression level (1–9). This is the range Vector currently supports; higher values compress more but are slower.
/// Compression level (1–9). This is the range Vector supports; higher values compress more but are slower.
#[configurable(validation(range(min = 1, max = 9)))]
level: u8,
},
Expand Down Expand Up @@ -276,39 +279,29 @@ impl ParquetSerializer {

/// Writes `record_batch` into `buffer` as a complete Parquet file.
///
/// On failure, emits an [`ArrowWriterError`] internal event (which also
/// increments `component_errors_total` and emits the events-dropped metric)
/// before returning the error.
/// On failure, emits an [`ArrowWriterError`] internal event (which
/// increments `component_errors_total`) before returning the error.
/// The caller is responsible for emitting `events_dropped`.
fn write_record_batch(
&self,
record_batch: &RecordBatch,
buffer: &mut BytesMut,
event_count: usize,
writer_props: &WriterProperties,
) -> Result<(), parquet::errors::ParquetError> {
let mut writer = ArrowWriter::try_new(
buffer.writer(),
Arc::clone(record_batch.schema_ref()),
Some((*self.writer_props).clone()),
Some(writer_props.clone()),
)
.inspect_err(|e| {
emit(ArrowWriterError {
error: e,
batch_count: event_count,
});
emit(ArrowWriterError { error: e });
})?;

writer.write(record_batch).inspect_err(|e| {
emit(ArrowWriterError {
error: e,
batch_count: event_count,
});
emit(ArrowWriterError { error: e });
})?;

writer.close().inspect_err(|e| {
emit(ArrowWriterError {
error: e,
batch_count: event_count,
});
emit(ArrowWriterError { error: e });
})?;

Ok(())
Expand All @@ -326,10 +319,7 @@ impl tokio_util::codec::Encoder<Vec<Event>> for ParquetSerializer {
let json_values = match vector_log_events_to_json_values(&events) {
Ok(values) => values,
Err(e) => {
emit(JsonSerializationError {
error: &e,
batch_count: events.len(),
});
emit(JsonSerializationError { error: &e });
return Err(Box::new(e));
}
};
Expand Down Expand Up @@ -358,7 +348,6 @@ impl tokio_util::codec::Encoder<Vec<Event>> for ParquetSerializer {
{
for top_level in object_map.keys() {
if !self.schema_field_names.contains(top_level.as_str()) {
self.events_dropped_handle.emit(Count(events.len()));
return Err(Box::new(ArrowEncodingError::SchemaFetchError {
message: format!(
"Strict schema mode: event contains field '{top_level}' not in schema",
Expand All @@ -381,8 +370,7 @@ impl tokio_util::codec::Encoder<Vec<Event>> for ParquetSerializer {
let record_batch =
build_record_batch(Arc::clone(&self.schema), &json_values).map_err(Box::new)?;

self.write_record_batch(&record_batch, buffer, json_values.len())
.map_err(Box::new)?;
Self::write_record_batch(&record_batch, buffer, &self.writer_props).map_err(Box::new)?;

Ok(())
}
Expand All @@ -394,10 +382,7 @@ impl ParquetSchemaGenerator {
pub fn infer_schema(events: &[serde_json::Value]) -> Result<Schema, Error> {
let schema = infer_json_schema_from_iterator(events.iter().map(Ok::<_, ArrowError>))
.map_err(|e| {
emit(SchemaGenerationError {
error: &e,
batch_count: events.len(),
});
emit(SchemaGenerationError { error: &e });
Error::new(ErrorKind::InvalidData, e.to_string())
})?;

Expand Down Expand Up @@ -916,6 +901,29 @@ mod tests {
assert_eq!(columns, vec!["name"]);
}

#[test]
fn test_parquet_type_mismatch_returns_error() {
let schema_path =
write_temp_schema("type_mismatch", "message logs {\n required int64 name;\n}");

let mut serializer = ParquetSerializer::new(ParquetSerializerConfig {
schema_file: Some(schema_path),
schema_mode: ParquetSchemaMode::Relaxed,
..Default::default()
})
.expect("Failed to create serializer");

let events = vec![create_event(vec![("name", "not_an_integer")])];
let mut buffer = BytesMut::new();
let result = serializer.encode(events, &mut buffer);
assert!(result.is_err(), "Type mismatch should return an error");
let err = result.unwrap_err().to_string();
assert!(
err.contains("Int64"),
"Error should mention the expected type, got: {err}"
);
}

#[test]
fn test_parquet_schema_file_binary_without_string_annotation_rejected() {
// Native Parquet "binary" without (STRING) annotation resolves to Arrow Binary,
Expand Down
Loading
Loading