Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 93 additions & 3 deletions doc/operations/ledger_snapshot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,102 @@ Uncommitted snapshot files, i.e. those whose evidence has not yet been committed
Join or Recover From Snapshot
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Joining nodes will request a snapshot from the target service to accelerate their join. This behaviour is controlled by the ``command.join.fetch_recent_snapshot`` configuration option, and enabled by default. This removes the need for a shared read-only snapshot mount, and corresponding operator actions to keep it up-to-date. Instead the joiner will send a sequence of HTTP requests, potentially following redirect responses to find the current primary and request a specific snapshot, to download a recent snapshot which should allow them to join rapidly. Any suffix after a snapshot (including the entire ledger, in the rare cases where no snapshot can be found) will be replicated to that node via the consensus protocol.
Joining nodes will request a snapshot from the target service to accelerate their join. This behaviour is controlled by the ``command.join.fetch_recent_snapshot`` configuration option, and enabled by default. This removes the need for a shared read-only snapshot mount, and corresponding operator actions to keep it up-to-date. Instead, if the joiner is told by the network that the snapshot they have locally is too old, the joiner will send a sequence of HTTP requests, potentially following redirect responses to find the current primary and request a specific snapshot, to download a recent snapshot which should allow them to join rapidly. Any suffix after a snapshot (including the entire ledger, in the rare cases where no snapshot can be found) will be replicated to that node via the consensus protocol.

The legacy behaviour without ``fetch_recent_snapshot`` relies on a shared read-only directory. On start-up, the new node will search both ``snapshot.directory`` and ``read_only_directory`` to find the latest committed snapshot file. Operators are responsible for populating these with recent snapshots emitted by the service, and making this available (such as via a shared read-only mounted) on joining nodes.

It is important to note that new nodes cannot join a service if the snapshot they start from is older than the snapshot the primary node started from. For example, if a new node resumes from a snapshot generated at ``seqno 50`` and joins from a (primary) node that originally resumed from a snapshot at ``seqno 100``, the new node will throw a ``StartupSeqnoIsOld`` error shortly after starting up. It is expected that operators copy the *latest* committed snapshot file to new nodes before start up.

In particular, there is hard lower-bound on the age of the snapshot that a joining node can start from. It must be at least as recent as the snapshot that the primary node started from, otherwise the primary will return a ``StartupSeqnoIsOld`` error to the joining node.

The following flowchart summarises the join procedure when ``command.join.fetch_recent_snapshot`` is enabled:

.. mermaid::
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startup_join_recovery

flowchart TD
startup["Startup"]

select.init_assign("startup_seqno = 0")
select.next_local["Choose highest local snapshot file A-B.committed"]
select.verify{"Validate snapshot"}
select.valid("startup_seqno = A")
select.delete("Delete invalid snapshot")

test_mode{"mode?"}

join.complete("Joined")
join.failure["Join failure"]
join.redirect("target_rpc_address = response.headers['Location']")
join.response{"Response type?"}
join.send("Send join request to target_rpc_address, including startup_seqno")
join.timeout("Timeout")

fetch.apply("startup_seqno = A")
fetch.begin("peer_address = target_rpc_address")
fetch.redirect("peer_address = response.headers['Location']")
fetch.response{"Response type?"}
fetch.send("{peer_address} GET /snapshot?since={startup_seqno}")
fetch.fetch("{peer_address} GET /snapshot/{snapshot_name}")

recovery.process("Complete recovery")
recovery.public("Deserialise snapshot public state")

postjoin.service_open{"Service open?"}
postjoin.public("Deserialise snapshot public state")

wait_for_open("Wait for service to open")
secrets_available("Deserialise snapshot private state")
done["Done"]

startup --> select.init_assign
subgraph Select local snapshot
select.init_assign --> select.next_local
select.next_local --> select.verify
select.verify -->|Valid| select.valid
select.verify -->|Invalid| select.delete
select.delete --> select.next_local
end
select.valid --> test_mode
select.next_local -->|No more snapshots| test_mode

test_mode -->|mode == Join| join.send
subgraph Join
join.timeout -.-> join.send
join.send -->|Response received| join.response
join.response -->|200 OK| join.complete
join.response -->|30x Redirect| join.redirect
join.redirect --> join.send

join.response -->|StartupSeqnoIsOld| fetch.begin
subgraph FetchSnapshot
fetch.begin --> fetch.send
fetch.send -->|Response received| fetch.response
fetch.response -->|2xx Success| fetch.fetch
fetch.fetch --> fetch.apply
fetch.response -->|30x Redirect| fetch.redirect
fetch.redirect --> fetch.send
end
fetch.apply --> join.timeout
fetch.send -.-> join.timeout
fetch.response -->|Other error| join.timeout

join.response -.->|Timeout| join.timeout
end
join.response -->|Other error| join.failure

join.complete --> postjoin.service_open

subgraph PostJoin
postjoin.service_open -->|No| postjoin.public
end
postjoin.service_open -->|Yes| secrets_available
postjoin.public --> wait_for_open

test_mode -->|mode == Recovery| recovery.public
subgraph Recovery
recovery.public -.-> recovery.process
end
recovery.process -.-> wait_for_open

wait_for_open --> secrets_available
secrets_available --> done
Historical Transactions
~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
8 changes: 6 additions & 2 deletions include/ccf/node/startup_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,13 @@ namespace ccf
struct Join
{
ccf::NodeInfoNetwork::NetAddress target_rpc_address;
ccf::ds::TimeString retry_timeout = {"1000ms"};
ccf::ds::TimeString retry_timeout;
std::vector<uint8_t> service_cert;
bool follow_redirect = true;
bool follow_redirect{};
bool fetch_recent_snapshot{};
size_t fetch_snapshot_max_attempts{};
ccf::ds::TimeString fetch_snapshot_retry_interval;
ccf::ds::SizeString fetch_snapshot_max_size;
Comment on lines +150 to +153
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot-related join config needs to be passed "into the enclave", so gets duplicated into this struct. I don't think it's worth duplicating any of the default values, so removed those. Hopefully we can flatten these soon, see #7565.

};
Join join = {};

Expand Down
4 changes: 1 addition & 3 deletions src/enclave/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ namespace ccf
CreateNodeStatus create_new_node(
StartType start_type_,
const ccf::StartupConfig& ccf_config_,
std::vector<uint8_t>&& startup_snapshot,
std::vector<uint8_t>& node_cert,
std::vector<uint8_t>& service_cert)
{
Expand All @@ -196,8 +195,7 @@ namespace ccf
{
LOG_TRACE_FMT(
"Creating node with start_type {}", start_type_to_str(start_type));
create_info =
node->create(start_type, ccf_config_, std::move(startup_snapshot));
create_info = node->create(start_type, ccf_config_);
}
catch (const std::exception& e)
{
Expand Down
1 change: 0 additions & 1 deletion src/enclave/entry_points.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace ccf
CreateNodeStatus enclave_create_node(
const EnclaveConfig& enclave_config,
const ccf::StartupConfig& ccf_config,
std::vector<uint8_t>&& startup_snapshot,
std::vector<uint8_t>& node_cert,
std::vector<uint8_t>& service_cert,
StartType start_type,
Expand Down
7 changes: 1 addition & 6 deletions src/enclave/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace ccf
CreateNodeStatus enclave_create_node(
const EnclaveConfig& enclave_config,
const ccf::StartupConfig& ccf_config,
std::vector<uint8_t>&& startup_snapshot,
std::vector<uint8_t>& node_cert,
std::vector<uint8_t>& service_cert,
StartType start_type,
Expand Down Expand Up @@ -130,11 +129,7 @@ namespace ccf
try
{
status = enclave->create_new_node(
start_type,
ccf_config,
std::move(startup_snapshot),
node_cert,
service_cert);
start_type, ccf_config, node_cert, service_cert);
}
catch (...)
{
Expand Down
84 changes: 8 additions & 76 deletions src/host/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
#include "pal/quote_generation.h"
#include "rpc_connections.h"
#include "sig_term.h"
#include "snapshots/fetch.h"
#include "snapshots/filenames.h"
#include "snapshots/snapshot_manager.h"
#include "tcp.h"
#include "ticker.h"
Expand Down Expand Up @@ -420,6 +418,14 @@ namespace ccf
startup_config.join.service_cert =
files::slurp(config.command.service_certificate_file);
startup_config.join.follow_redirect = config.command.join.follow_redirect;
startup_config.join.fetch_recent_snapshot =
config.command.join.fetch_recent_snapshot;
startup_config.join.fetch_snapshot_max_attempts =
config.command.join.fetch_snapshot_max_attempts;
startup_config.join.fetch_snapshot_retry_interval =
config.command.join.fetch_snapshot_retry_interval;
startup_config.join.fetch_snapshot_max_size =
config.command.join.fetch_snapshot_max_size;
}

void populate_config_for_recover(
Expand Down Expand Up @@ -452,81 +458,12 @@ namespace ccf
config.command.recover.self_healing_open;
}

std::vector<uint8_t> load_startup_snapshot(
const host::CCHostConfig& config, snapshots::SnapshotManager& snapshots)
{
std::vector<uint8_t> startup_snapshot = {};

if (
config.command.type != StartType::Join &&
config.command.type != StartType::Recover)
{
return startup_snapshot;
}

auto latest_local_snapshot = snapshots.find_latest_committed_snapshot();

if (
config.command.type == StartType::Join &&
config.command.join.fetch_recent_snapshot)
{
// Try to fetch a recent snapshot from peer
auto latest_peer_snapshot = snapshots::fetch_from_peer(
config.command.join.target_rpc_address,
config.command.service_certificate_file,
config.command.join.fetch_snapshot_max_attempts,
config.command.join.fetch_snapshot_retry_interval.count_ms(),
config.command.join.fetch_snapshot_max_size.count_bytes());

if (latest_peer_snapshot.has_value())
{
LOG_INFO_FMT(
"Received snapshot {} from peer (size: {}) - writing this to "
"disk "
"and using for join startup",
latest_peer_snapshot->snapshot_name,
latest_peer_snapshot->snapshot_data.size());

const auto dst_path = fs::path(config.snapshots.directory) /
fs::path(latest_peer_snapshot->snapshot_name);
if (files::exists(dst_path))
{
LOG_FAIL_FMT(
"Overwriting existing snapshot at {} with data retrieved from "
"peer",
dst_path);
}
files::dump(latest_peer_snapshot->snapshot_data, dst_path);
startup_snapshot = latest_peer_snapshot->snapshot_data;
}
}

if (startup_snapshot.empty() && latest_local_snapshot.has_value())
{
auto& [snapshot_dir, snapshot_file] = latest_local_snapshot.value();
startup_snapshot = files::slurp(snapshot_dir / snapshot_file);

LOG_INFO_FMT(
"Found latest local snapshot file: {} (size: {})",
snapshot_dir / snapshot_file,
startup_snapshot.size());
}
else if (startup_snapshot.empty())
{
LOG_INFO_FMT(
"No snapshot found: Node will replay all historical transactions");
}

return startup_snapshot;
}

std::optional<size_t> create_enclave_node(
const host::CCHostConfig& config,
messaging::BufferProcessor& buffer_processor,
ringbuffer::Circuit& circuit,
EnclaveConfig& enclave_config,
ccf::StartupConfig& startup_config,
std::vector<uint8_t> startup_snapshot,
std::vector<uint8_t>& node_cert,
std::vector<uint8_t>& service_cert,
ccf::LoggerLevel log_level,
Expand All @@ -547,7 +484,6 @@ namespace ccf
auto create_status = enclave_create_node(
enclave_config,
startup_config,
std::move(startup_snapshot),
node_cert,
service_cert,
config.command.type,
Expand Down Expand Up @@ -851,9 +787,6 @@ namespace ccf
return static_cast<int>(CLI::ExitCodes::ValidationError);
}

// Load startup snapshot if needed
auto startup_snapshot = load_startup_snapshot(config, snapshots);

// Used by GET /node/network/nodes/self to return rpc interfaces
// prior to the KV being updated
startup_config.network.rpc_interfaces = config.network.rpc_interfaces;
Expand All @@ -865,7 +798,6 @@ namespace ccf
circuit,
enclave_config,
startup_config,
std::move(startup_snapshot),
node_cert,
service_cert,
log_level,
Expand Down
9 changes: 6 additions & 3 deletions src/host/test/ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,8 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot"))
auto latest_committed_snapshot =
snapshots.find_latest_committed_snapshot();
REQUIRE(latest_committed_snapshot.has_value());
const auto& snapshot = latest_committed_snapshot->second;
REQUIRE(latest_committed_snapshot->parent_path() == snapshot_dir);
const auto& snapshot = latest_committed_snapshot->filename();
REQUIRE(get_snapshot_idx_from_file_name(snapshot) == i);
last_snapshot_idx = i;
REQUIRE(get_snapshot_evidence_idx_from_file_name(snapshot) == i + 1);
Expand All @@ -1641,7 +1642,8 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot"))

auto latest_committed_snapshot = snapshots.find_latest_committed_snapshot();
REQUIRE(latest_committed_snapshot.has_value());
const auto& snapshot = latest_committed_snapshot->second;
REQUIRE(latest_committed_snapshot->parent_path() == snapshot_dir_read_only);
const auto& snapshot = latest_committed_snapshot->filename();
REQUIRE(get_snapshot_idx_from_file_name(snapshot) == last_snapshot_idx);
}

Expand All @@ -1655,7 +1657,8 @@ TEST_CASE("Generate and commit snapshots" * doctest::test_suite("snapshot"))

auto latest_committed_snapshot = snapshots.find_latest_committed_snapshot();
REQUIRE(latest_committed_snapshot.has_value());
const auto& snapshot = latest_committed_snapshot->second;
REQUIRE(latest_committed_snapshot->parent_path() == snapshot_dir);
const auto& snapshot = latest_committed_snapshot->filename();
REQUIRE(get_snapshot_idx_from_file_name(snapshot) == new_snapshot_idx);
}
}
Expand Down
Loading
Loading