Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/12203_journald_globbing_matches.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added support for globbing in the `journald` source's `include_matches` and `exclude_matches` options.

authors: AmitPr
126 changes: 115 additions & 11 deletions src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use futures::{StreamExt, poll, stream::BoxStream, task::Poll};
use glob::{Pattern, PatternError};
use nix::{
sys::signal::{Signal, kill},
unistd::Pid,
Expand Down Expand Up @@ -89,6 +90,12 @@ enum BuildError {
value,
))]
DuplicatedMatches { field: String, value: String },
#[snafu(display("Invalid glob pattern {:?} for field {:?}: {}", pattern, field, source))]
InvalidGlobPattern {
field: String,
pattern: String,
source: PatternError,
},
#[snafu(display(
"`current_boot_only: false` not supported for systemd versions 250 through 257 (got {}).",
systemd_version
Expand All @@ -97,6 +104,7 @@ enum BuildError {
}

type Matches = HashMap<String, HashSet<String>>;
type CompiledMatches = HashMap<String, Vec<Pattern>>;

/// Configuration for the `journald` source.
#[configurable_component(source("journald", "Collect logs from JournalD."))]
Expand Down Expand Up @@ -137,6 +145,9 @@ pub struct JournaldConfig {
/// If empty or not present, all journal fields are accepted.
///
/// If `include_units` is specified, it is merged into this list.
///
/// Values support glob patterns (e.g., `my-app-*` matches `my-app-foo`).
/// Special characters: `*` (any sequence), `?` (any char), `[...]` (char class).
#[serde(default)]
#[configurable(metadata(
docs::additional_props_description = "The set of field values to match in journal entries that are to be included."
Expand All @@ -148,6 +159,9 @@ pub struct JournaldConfig {
/// excludes the entry from this source.
///
/// If `exclude_units` is specified, it is merged into this list.
///
/// Values support glob patterns (e.g., `my-app-*` matches `my-app-foo`).
/// Special characters: `*` (any sequence), `?` (any char), `[...]` (char class).
#[serde(default)]
#[configurable(metadata(
docs::additional_props_description = "The set of field values to match in journal entries that are to be excluded."
Expand Down Expand Up @@ -367,6 +381,9 @@ impl SourceConfig for JournaldConfig {
return Err(BuildError::DuplicatedMatches { field, value }.into());
}

let include_matches = compile_matches(&include_matches)?;
let exclude_matches = compile_matches(&exclude_matches)?;

let mut checkpoint_path = data_dir;
checkpoint_path.push(CHECKPOINT_FILENAME);

Expand Down Expand Up @@ -429,8 +446,8 @@ impl SourceConfig for JournaldConfig {
}

struct JournaldSource {
include_matches: Matches,
exclude_matches: Matches,
include_matches: CompiledMatches,
exclude_matches: CompiledMatches,
checkpoint_path: PathBuf,
batch_size: usize,
remap_priority: bool,
Expand Down Expand Up @@ -1008,7 +1025,7 @@ fn remap_priority(priority: &mut JsonValue) {
}
}

fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bool {
fn filter_matches(record: &Record, includes: &CompiledMatches, excludes: &CompiledMatches) -> bool {
match (includes.is_empty(), excludes.is_empty()) {
(true, true) => false,
(false, true) => !contains_match(record, includes),
Expand All @@ -1017,14 +1034,13 @@ fn filter_matches(record: &Record, includes: &Matches, excludes: &Matches) -> bo
}
}

fn contains_match(record: &Record, matches: &Matches) -> bool {
let f = move |(field, value)| {
fn contains_match(record: &Record, matches: &CompiledMatches) -> bool {
record.iter().any(|(field, value)| {
matches
.get(field)
.map(|x| x.contains(value))
.map(|patterns| patterns.iter().any(|p| p.matches(value)))
.unwrap_or(false)
};
record.iter().any(f)
})
}

fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(String, String)> {
Expand All @@ -1043,6 +1059,24 @@ fn find_duplicate_match(a_matches: &Matches, b_matches: &Matches) -> Option<(Str
None
}

fn compile_matches(matches: &Matches) -> Result<CompiledMatches, BuildError> {
let mut compiled = HashMap::new();
for (field, values) in matches {
let patterns: Result<Vec<Pattern>, _> = values
.iter()
.map(|v| {
Pattern::new(v).map_err(|source| BuildError::InvalidGlobPattern {
field: field.clone(),
pattern: v.clone(),
source,
})
})
.collect();
compiled.insert(field.clone(), patterns?);
}
Ok(compiled)
}

enum Finalizer {
Sync(SharedCheckpointer),
Async(OrderedFinalizer<String>),
Expand Down Expand Up @@ -1307,6 +1341,10 @@ mod tests {
matches
}

fn compile_test_matches(matches: &Matches) -> CompiledMatches {
compile_matches(matches).expect("test patterns should be valid")
}

#[tokio::test]
async fn reads_journal() {
let received = run_with_units(&[], &[], None).await;
Expand Down Expand Up @@ -1488,9 +1526,9 @@ mod tests {

#[test]
fn filter_matches_works_correctly() {
let empty: Matches = HashMap::new();
let includes = create_unit_matches(vec!["one", "two"]);
let excludes = create_unit_matches(vec!["foo", "bar"]);
let empty: CompiledMatches = HashMap::new();
let includes = compile_test_matches(&create_unit_matches(vec!["one", "two"]));
let excludes = compile_test_matches(&create_unit_matches(vec!["foo", "bar"]));

let zero = HashMap::new();
assert!(!filter_matches(&zero, &empty, &empty));
Expand Down Expand Up @@ -1822,4 +1860,70 @@ mod tests {

matches_schema(&config, LogNamespace::Legacy)
}

#[tokio::test]
async fn includes_matches_with_glob() {
// Test that *.service glob matches service units
let include_matches = create_matches(vec![(SYSTEMD_UNIT, "*.service")]);
let received = run_journal(include_matches, HashMap::new(), None, false).await;
// Should match: unit.service, badunit.service, syslog.service, NetworkManager.service
assert_eq!(received.len(), 4);
// Verify we got the expected service units
let messages: Vec<_> = received.iter().map(|e| message(e)).collect();
assert!(messages.contains(&Value::Bytes("unit message".into())));
assert!(messages.contains(&Value::Bytes("¿Hello?".into()))); // badunit.service
}

#[tokio::test]
async fn excludes_matches_with_glob() {
// Test that bad* glob excludes badunit.service
let exclude_matches = create_matches(vec![(SYSTEMD_UNIT, "bad*")]);
let received = run_journal(HashMap::new(), exclude_matches, None, false).await;
// Should exclude badunit.service (1 entry), leaving 7 entries
assert_eq!(received.len(), 7);
// Verify badunit.service message is NOT in results
let messages: Vec<_> = received.iter().map(|e| message(e)).collect();
assert!(!messages.contains(&Value::Bytes("¿Hello?".into())));
}

#[test]
fn contains_match_with_glob_works() {
// Test glob pattern matching in contains_match
let matches = compile_test_matches(&create_matches(vec![
(SYSTEMD_UNIT, "*.service"),
("PRIORITY", "ERR"),
]));

// Should match: unit.service matches *.service
let mut record1 = HashMap::new();
record1.insert(String::from(SYSTEMD_UNIT), String::from("unit.service"));
assert!(contains_match(&record1, &matches));

// Should match: any.service matches *.service
let mut record2 = HashMap::new();
record2.insert(String::from(SYSTEMD_UNIT), String::from("any.service"));
assert!(contains_match(&record2, &matches));

// Should NOT match: sysinit.target does not match *.service
let mut record3 = HashMap::new();
record3.insert(String::from(SYSTEMD_UNIT), String::from("sysinit.target"));
assert!(!contains_match(&record3, &matches));

// Should match: PRIORITY=ERR matches exactly
let mut record4 = HashMap::new();
record4.insert(String::from("PRIORITY"), String::from("ERR"));
assert!(contains_match(&record4, &matches));

// Test ? wildcard: matches single character
let matches_question =
compile_test_matches(&create_matches(vec![(SYSTEMD_UNIT, "uni?.service")]));
let mut record5 = HashMap::new();
record5.insert(String::from(SYSTEMD_UNIT), String::from("unit.service"));
assert!(contains_match(&record5, &matches_question));

// Test [abc] character class
let matches_class =
compile_test_matches(&create_matches(vec![(SYSTEMD_UNIT, "[ub]nit.service")]));
assert!(contains_match(&record5, &matches_class)); // matches 'unit.service'
}
}
Loading