Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 9 additions & 92 deletions components/nimbus/src/stateful/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ use crate::error::{NimbusError, Result, debug, info, warn};
//
// ⚠️ Warning : Altering the type of `DB_VERSION` would itself require a DB migration. ⚠️
pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";

/// The current database version.
pub(crate) const DB_VERSION: u16 = 3;

/// The minimum database version that will be migrated.
///
/// If the version is below this threshold, the database will be reset.
pub(crate) const DB_MIN_VERSION: u16 = 2;

const RKV_MAX_DBS: u32 = 6;

pub(crate) const DB_KEY_EXPERIMENT_PARTICIPATION: &str = "user-opt-in-experiments";
Expand Down Expand Up @@ -355,7 +363,7 @@ impl Database {
return Ok(());
}

if current_version == 0 || current_version > DB_VERSION {
if current_version == 0 || current_version > DB_VERSION || current_version < DB_MIN_VERSION {
info!(
"maybe_upgrade: current_version: {}, DB_VERSION: {}; wiping most stores",
current_version, DB_VERSION
Expand All @@ -368,18 +376,6 @@ impl Database {
return Ok(());
}

if current_version == 1 {
info!("Migrating database from v1 to v2");
if let Err(e) = self.migrate_v1_to_v2(&mut writer) {
error_support::report_error!(
"nimbus-database-migration",
"Error migrating database v1 to v2: {:?}. Wiping experiments and enrollments",
e
);
self.clear_experiments_and_enrollments(&mut writer)?;
}
}

if current_version == 2 {
info!("Migrating database from v2 to v3");
if let Err(e) = self.migrate_v2_to_v3(&mut writer) {
Expand Down Expand Up @@ -418,85 +414,6 @@ impl Database {
Ok(())
}

/// Migrates a v1 database to v2
///
/// Note that any Err returns from this function (including stuff
/// propagated up via the ? operator) will cause maybe_update (our caller)
/// to assume that this is unrecoverable and wipe the database, removing
/// people from any existing enrollments and blowing away their experiment
/// history, so that they don't get left in an inconsistent state.
fn migrate_v1_to_v2(&self, writer: &mut Writer) -> Result<()> {
info!("Upgrading from version 1 to version 2");

// use try_collect_all to read everything except records that serde
// returns deserialization errors on. Some logging of those errors
// happens, but it's not ideal.
let reader = self.read()?;

// XXX write a test to verify that we don't need to gc any
// enrollments that don't have experiments because the experiments
// were discarded either during try_collect_all (these wouldn't have been
// detected during the filtering phase) or during the filtering phase
// itself. The test needs to run evolve_experiments, as that should
// correctly drop any orphans, even if the migrators aren't perfect.

let enrollments: Vec<ExperimentEnrollment> =
self.enrollment_store.try_collect_all(&reader)?;
let experiments: Vec<Experiment> = self.experiment_store.try_collect_all(&reader)?;

// figure out which experiments have records that need to be dropped
// and log that we're going to drop them and why
let empty_string = "".to_string();
let slugs_with_experiment_issues: HashSet<String> = experiments
.iter()
.filter_map(
|e| {
let branch_with_empty_feature_ids =
e.branches.iter().find(|b| b.feature.is_none() || b.feature.as_ref().unwrap().feature_id.is_empty());
if branch_with_empty_feature_ids.is_some() {
warn!("{:?} experiment has branch missing a feature prop; experiment & enrollment will be discarded", &e.slug);
Some(e.slug.to_owned())
} else if e.feature_ids.is_empty() || e.feature_ids.contains(&empty_string) {
warn!("{:?} experiment has invalid feature_ids array; experiment & enrollment will be discarded", &e.slug);
Some(e.slug.to_owned())
} else {
None
}
})
.collect();
let slugs_to_discard: HashSet<_> = slugs_with_experiment_issues;

// filter out experiments to be dropped
let updated_experiments: Vec<Experiment> = experiments
.into_iter()
.filter(|e| !slugs_to_discard.contains(&e.slug))
.collect();
debug!("updated experiments = {:?}", updated_experiments);

// filter out enrollments to be dropped
let updated_enrollments: Vec<ExperimentEnrollment> = enrollments
.into_iter()
.filter(|e| !slugs_to_discard.contains(&e.slug))
.collect();
debug!("updated enrollments = {:?}", updated_enrollments);

// rewrite both stores
self.experiment_store.clear(writer)?;
for experiment in updated_experiments {
self.experiment_store
.put(writer, &experiment.slug, &experiment)?;
}

self.enrollment_store.clear(writer)?;
for enrollment in updated_enrollments {
self.enrollment_store
.put(writer, &enrollment.slug, &enrollment)?;
}
debug!("exiting migrate_v1_to_v2");

Ok(())
}

/// Migrates a v2 database to v3
///
/// Separates global user participation into experiments and rollouts participation.
Expand Down