Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions LICENSE-3rdparty.csv

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions quickwit/license-tool.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@

# `ring` has a custom license that is mostly "ISC-style" but parts of it also fall under OpenSSL licensing.
"ring-0.17.8" = { license = "ISC AND Custom" }

# openssl-macros doesn't publish a repository URL on crates.io but it's part of the rust-openssl project.
"openssl-macros-0.1.1" = { license = "MIT OR Apache-2.0", origin = "https://github.com/sfackler/rust-openssl" }
3 changes: 2 additions & 1 deletion quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ async fn test_cmd_update_index() {
index_metadata.index_config.retention_policy_opt,
Some(RetentionPolicy {
retention_period: String::from("1 week"),
evaluation_schedule: String::from("daily")
evaluation_schedule: String::from("daily"),
jitter: None,
})
);

Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ itertools = { workspace = true }
json_comments = { workspace = true }
new_string_template = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
187 changes: 180 additions & 7 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use quickwit_common::uri::Uri;
use quickwit_common::{is_true, true_fn};
use quickwit_doc_mapper::{DocMapper, DocMapperBuilder, DocMapping};
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
use rand::{Rng, distr, rng};
use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
pub use serialize::{load_index_config_from_user_config, load_index_config_update};
use siphasher::sip::SipHasher;
use tracing::warn;
Expand Down Expand Up @@ -221,6 +222,18 @@ pub struct RetentionPolicy {
#[serde(default = "RetentionPolicy::default_schedule")]
#[serde(rename = "schedule")]
pub evaluation_schedule: String,

/// A jitter to apply to the schedule. The policy will be evaluated [0..jitter_second] seconds
/// after the scheduled time. When many indexes use the same schedule, this can be used to
/// spread the load instead of causing a very bursty load.o
///
/// If unset, a default jitter of `min(1 hour, next_next_evaluation - next_evaluation)` is
/// applied. Said otherwise, an operation may start any time between the next time it's
/// scheduled, and the time after that, but no later than 1h after the scheduled time.
#[serde(default, deserialize_with = "parse_human_duration_opt")]
#[serde(serialize_with = "serialize_duration_opt")]
#[serde(skip_serializing_if = "Option::is_none")]
pub jitter: Option<Duration>,
}

impl RetentionPolicy {
Expand All @@ -237,7 +250,7 @@ impl RetentionPolicy {
})
}

pub fn evaluation_schedule(&self) -> anyhow::Result<Schedule> {
fn evaluation_schedule(&self) -> anyhow::Result<Schedule> {
let evaluation_schedule = prepend_at_char(&self.evaluation_schedule);

Schedule::from_str(&evaluation_schedule).with_context(|| {
Expand All @@ -250,13 +263,28 @@ impl RetentionPolicy {

pub fn duration_until_next_evaluation(&self) -> anyhow::Result<Duration> {
let schedule = self.evaluation_schedule()?;
let future_date = schedule
.upcoming(Utc)
let mut schedule_iter = schedule.upcoming(Utc);
let future_date = schedule_iter
.next()
.expect("Failed to obtain next evaluation date.");
let duration = (future_date - Utc::now())
let mut duration = (future_date - Utc::now())
.to_std()
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
let jitter_max_secs = self
.jitter
.unwrap_or_else(|| {
if let Some(next_next_date) = schedule_iter.next() {
let time_between_schedules = next_next_date - future_date;
Duration::from_secs(time_between_schedules.num_seconds().clamp(0, 3600) as u64)
} else {
// we don't know when the schedule is. That's odd. Let's allow no jitter
warn!("found retention policy schedule with no next execution");
Duration::ZERO
}
})
.as_secs();
let jitter = rng().sample::<u64, _>(distr::StandardUniform) % (jitter_max_secs + 1);
duration += Duration::from_secs(jitter);
Ok(duration)
}

Expand All @@ -267,6 +295,27 @@ impl RetentionPolicy {
}
}

fn parse_human_duration_opt<'de, D>(deserializer: D) -> Result<Option<Duration>, D::Error>
where D: Deserializer<'de> {
let value: Option<String> = Deserialize::deserialize(deserializer)?;
match value {
None => Ok(None),
Some(s) => humantime::parse_duration(&s).map(Some).map_err(|error| {
de::Error::custom(format!(
"failed to parse human-readable duration `{s}`: {error:?}",
))
}),
}
}

fn serialize_duration_opt<S>(value: &Option<Duration>, s: S) -> Result<S::Ok, S::Error>
where S: Serializer {
match value {
None => s.serialize_none(),
Some(d) => s.serialize_str(&humantime::format_duration(*d).to_string()),
}
}

/// Prepends an `@` char at the start of the cron expression if necessary:
/// `hourly` -> `@hourly`
fn prepend_at_char(schedule: &str) -> String {
Expand Down Expand Up @@ -503,6 +552,7 @@ impl crate::TestableForRegression for IndexConfig {
let retention_policy_opt = Some(RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
jitter: None,
});
IndexConfig {
index_id: "my-index".to_string(),
Expand Down Expand Up @@ -675,6 +725,7 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
jitter: None,
};
assert_eq!(
index_config.retention_policy_opt.unwrap(),
Expand Down Expand Up @@ -854,6 +905,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
};
let retention_policy_yaml = serde_yaml::to_string(&retention_policy).unwrap();
assert_eq!(
Expand All @@ -874,6 +926,7 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
};
assert_eq!(retention_policy, expected_retention_policy);
}
Expand All @@ -888,17 +941,42 @@ mod tests {
let expected_retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "daily".to_string(),
jitter: None,
};
assert_eq!(retention_policy, expected_retention_policy);
}
}

#[test]
fn test_retention_policy_jitter_deserialization() {
let retention_policy_yaml = r#"
period: 90 days
jitter: 30 minutes
"#;
let retention_policy =
serde_yaml::from_str::<RetentionPolicy>(retention_policy_yaml).unwrap();
assert_eq!(retention_policy.jitter, Some(Duration::from_secs(30 * 60)));
}

#[test]
fn test_retention_policy_jitter_serialization_roundtrip() {
let retention_policy = RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: Some(Duration::from_secs(30 * 60)),
};
let retention_policy_yaml = serde_yaml::to_string(&retention_policy).unwrap();
let deserialized: RetentionPolicy = serde_yaml::from_str(&retention_policy_yaml).unwrap();
assert_eq!(deserialized, retention_policy);
}

#[test]
fn test_parse_retention_policy_period() {
{
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
};
assert_eq!(
retention_policy.retention_period().unwrap(),
Expand All @@ -908,6 +986,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "foo".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
};
assert_eq!(
retention_policy.retention_period().unwrap_err().to_string(),
Expand All @@ -932,6 +1011,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "@hourly".to_string(),
jitter: None,
};
assert_eq!(
retention_policy.evaluation_schedule().unwrap(),
Expand All @@ -942,6 +1022,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
};
assert_eq!(
retention_policy.evaluation_schedule().unwrap(),
Expand All @@ -952,6 +1033,7 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "0 * * * * *".to_string(),
jitter: None,
};
let evaluation_schedule = retention_policy.evaluation_schedule().unwrap();
assert_eq!(evaluation_schedule.seconds().count(), 1);
Expand All @@ -965,20 +1047,23 @@ mod tests {
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
};
retention_policy.validate().unwrap();
}
{
let retention_policy = RetentionPolicy {
retention_period: "foo".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
};
retention_policy.validate().unwrap_err();
}
{
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: "foo".to_string(),
jitter: None,
};
retention_policy.validate().unwrap_err();
}
Expand All @@ -987,10 +1072,11 @@ mod tests {
#[test]
fn test_retention_schedule_duration() {
let schedule_test_helper_fn = |schedule_str: &str| {
let hourly_schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: schedule_str.to_string(),
jitter: Some(Duration::ZERO),
};

let next_evaluation_duration = chrono::Duration::nanoseconds(
Expand All @@ -1000,7 +1086,7 @@ mod tests {
.as_nanos() as i64,
);
let next_evaluation_date = Utc::now() + next_evaluation_duration;
let expected_date = hourly_schedule.upcoming(Utc).next().unwrap();
let expected_date = schedule.upcoming(Utc).next().unwrap();
assert_eq!(next_evaluation_date.timestamp(), expected_date.timestamp());
};

Expand All @@ -1011,6 +1097,93 @@ mod tests {
schedule_test_helper_fn("* * * ? * ?");
}

#[test]
fn test_retention_schedule_duration_with_jitter() {
let schedule_test_helper_fn = |schedule_str: &str| {
let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: schedule_str.to_string(),
jitter: Some(Duration::from_secs(60 * 30)),
};

for _ in 0..11 {
// we run this a few times in case we are unlucky and pick a null jitter.
// This happens in one in 3601 tries, 11 unlucky tries in a row is as likely as
// finding the right aes128 key to decrypt some message at random on 1st try.
let next_evaluation_duration = chrono::Duration::nanoseconds(
retention_policy
.duration_until_next_evaluation()
.unwrap()
.as_nanos() as i64,
);
let next_evaluation_date = Utc::now() + next_evaluation_duration;
let expected_date_early = schedule.upcoming(Utc).next().unwrap();
let expected_date_late =
schedule.upcoming(Utc).next().unwrap() + chrono::Duration::seconds(30 * 60);
assert!(next_evaluation_date.timestamp() >= expected_date_early.timestamp());
assert!(next_evaluation_date.timestamp() <= expected_date_late.timestamp());
if next_evaluation_date.timestamp() != expected_date_early.timestamp() {
return;
}
}
panic!("got no jitter at all on multiple successive runs")
};

schedule_test_helper_fn("hourly");
schedule_test_helper_fn("daily");
schedule_test_helper_fn("weekly");
schedule_test_helper_fn("monthly");
schedule_test_helper_fn("* * * ? * ?");
}

#[test]
fn test_retention_schedule_duration_with_default_jitter() {
let schedule_test_helper_fn = |schedule_str: &str| {
let schedule = Schedule::from_str(&prepend_at_char(schedule_str)).unwrap();
let retention_policy = RetentionPolicy {
retention_period: "1 hour".to_string(),
evaluation_schedule: schedule_str.to_string(),
jitter: None,
};
let max_1s_delay = schedule_str.starts_with('*');
let (limit, max_delay) = if max_1s_delay {
// one of our policies only allow 2 start dates, to make the test reliable, try a
// few more times
(128, 1)
} else {
(11, 3600)
};
for _ in 0..limit {
// we run this a few times in case we are unlucky and pick a null jitter.
// This happens in one in 3601 tries, 11 unlucky tries in a row is as likely as
// finding the right aes128 key to decrypt some message at random on 1st try.
let next_evaluation_duration = chrono::Duration::nanoseconds(
retention_policy
.duration_until_next_evaluation()
.unwrap()
.as_nanos() as i64,
);
let next_evaluation_date = Utc::now() + next_evaluation_duration;
let expected_date_early = schedule.upcoming(Utc).next().unwrap();
let expected_date_late =
schedule.upcoming(Utc).next().unwrap() + chrono::Duration::seconds(max_delay);
assert!(dbg!(next_evaluation_date.timestamp()) >= expected_date_early.timestamp());
assert!(next_evaluation_date.timestamp() <= expected_date_late.timestamp());
if next_evaluation_date.timestamp() != expected_date_early.timestamp() {
return;
}
}
panic!("got no jitter at all on multiple successive runs")
};

schedule_test_helper_fn("hourly");
schedule_test_helper_fn("daily");
schedule_test_helper_fn("weekly");
schedule_test_helper_fn("monthly");
schedule_test_helper_fn("* * * ? * ?");
}

#[test]
fn test_ingest_settings_serde() {
let settings = IngestSettings {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ mod test {
invalid_index_config.retention_policy_opt = Some(RetentionPolicy {
retention_period: "90 days".to_string(),
evaluation_schedule: "hourly".to_string(),
jitter: None,
});
let validation_err = invalid_index_config
.build_and_validate(None)
Expand Down
Loading
Loading