Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ compio = { version = "=0.19.0", features = [
"fs",
] }
compio-buf = "0.8.2"
# Pin compio-driver >= 0.11.2 to fix musl compilation (compio-rs/compio#668)
compio-driver = "0.12.1"
compio-quic = "=0.8.0"
compio-ws = "=0.4.0"
Expand Down
176 changes: 102 additions & 74 deletions core/consensus/src/metadata_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,44 @@ use crate::client_table::{ClientTable, REGISTER_REQUEST_ID, RequestStatus};
use crate::{Consensus, Pipeline, PipelineEntry, VsrConsensus};
use iggy_binary_protocol::{EvictionHeader, EvictionReason, HEADER_SIZE};
use message_bus::MessageBus;
use server_common::Message;
use server_common::iobuf::Frozen;
use server_common::{MESSAGE_ALIGN, Message};
use std::cell::RefCell;

/// What [`request_preflight`] decided, without touching the wire itself.
///
/// The preflight runs on shard 0 (the metadata owner), but the client's
/// transport connection lives on its home shard. Sending a resend/eviction
/// from here would route by the VSR consensus `client_id`, whose top bits are
/// random and carry no home-shard routing -- so it (almost) never reaches the
/// client. Returning the decision instead lets the home shard
/// (`submit_request_in_process` -> `handle_client_request`) emit the frame by
/// transport id, exactly like a fresh commit.
pub enum PreflightOutcome {
/// New (client, request): dispatch a fresh prepare through consensus.
Dispatch,
/// Duplicate retry: resend this cached committed reply (wire bytes).
Replay(Frozen<MESSAGE_ALIGN>),
/// Session gone (`NoSession`) or rotated past the retry (`SessionTooLow`):
/// the client must be told with an eviction frame.
Evict(EvictionReason),
/// Absorbed with nothing to send: in-flight prepare, not-caught-up
/// primary, stale/gap retry, or a client-bug newer session.
Drop,
}

/// Request preflight (metadata only): session validation, dedup, in-flight check.
///
/// # Returns
/// `true` -> dispatch through consensus. `false` -> absorbed here
/// (cached reply resent / duplicate dropped / eviction sent).
#[allow(clippy::future_not_send)]
pub async fn request_preflight<B, P>(
/// Pure decision -- emits no frames (see [`PreflightOutcome`]). Callers turn
/// the outcome into a reply: the home-shard path resends by transport id, the
/// message-plane paths fall back to [`apply_preflight_consensus_plane`].
pub fn request_preflight<B, P>(
consensus: &VsrConsensus<B, P>,
client_table: &RefCell<ClientTable>,
client_id: u128,
session: u64,
request: u64,
) -> bool
) -> PreflightOutcome
where
B: MessageBus,
P: Pipeline<Entry = PipelineEntry>,
Expand All @@ -55,7 +77,7 @@ where
request,
"request_preflight: in-flight prepare, drop"
);
return false;
return PreflightOutcome::Drop;
}

// Catch-up gate: stale ClientTable on a new primary could return `New`
Expand All @@ -73,77 +95,82 @@ where
commit_max = consensus.commit_max(),
"request_preflight: not caught up, drop"
);
return false;
return PreflightOutcome::Drop;
}

let status = client_table
.borrow()
.check_request(client_id, session, request);
match status {
// Frozen-backed cache -> refcount handoff to the home shard, no copy.
RequestStatus::Duplicate(cached_reply) => {
// TODO(vsr-resend-misroute): `client_id` here is the VSR
// consensus id (the random u128 the SDK mints), but
// `send_to_client` routes by the top 16 bits, which only carry
// home-shard bits for *transport* ids. A VSR id's top bits are
// random, so this resend routes to a garbage shard (~never the
// owning one, even single-shard) -> no registry slot -> dropped.
// The client then never receives the cached reply, retries the
// same (client, request), hits Duplicate again, and livelocks
// until the session times out. shard 0 cannot reconstruct the
// VSR->transport mapping, so the fix is to return the Duplicate
// outcome to the home shard and let it resend the cached body by
// transport id (mirroring the fresh-commit path in
// `submit_request_in_process` / `handle_client_request`). Must
// land before VSR client traffic goes past tier-1; not reached by
// the single-shard tcp happy path the tier-1 suite covers.
// Best-effort resend (client may have disconnected). Frozen-backed
// cache -> refcount handoff, no copy.
let _ = consensus
.message_bus()
.send_to_client(client_id, cached_reply.into_wire_bytes())
.await;
false
}
RequestStatus::NoSession => {
// Session evicted under capacity pressure. SAFETY: catch-up
// gate makes this replica authoritative for session truth.
// TODO(vsr-resend-misroute): `send_eviction_to_client` resends by
// the VSR consensus `client_id` and misroutes exactly like the
// Duplicate arm above -- the client never learns it was evicted.
// Same fix: route the eviction through the home shard by transport
// id. See the Duplicate arm for the full rationale.
send_eviction_to_client(consensus, client_id, EvictionReason::NoSession).await;
false
PreflightOutcome::Replay(cached_reply.into_wire_bytes())
}
// Session evicted under capacity pressure. SAFETY: catch-up gate makes
// this replica authoritative for session truth.
RequestStatus::NoSession => PreflightOutcome::Evict(EvictionReason::NoSession),
RequestStatus::SessionMismatch { expected, received } => {
// expected > received: stale session (rotated post-eviction) -> terminal eviction.
// expected < received: client bug; silent drop, log.
// SAFETY: catch-up gate makes this replica authoritative.
if expected > received {
// TODO(vsr-resend-misroute): same VSR-id misroute as the
// Duplicate / NoSession arms -- the eviction is sent by the
// consensus client_id and never reaches the client. See the
// Duplicate arm for the full rationale + fix.
send_eviction_to_client(consensus, client_id, EvictionReason::SessionTooLow).await;
PreflightOutcome::Evict(EvictionReason::SessionTooLow)
} else {
// Catch-up gate rules out network race; newer-than-issued
// session = client bug. Error log,
// no eviction (transient bug must not kill session), no
// rate limit (per-event).
// session = client bug. Error log, no eviction (transient bug
// must not kill session), no rate limit (per-event).
tracing::error!(
client_id,
expected,
received,
"request_preflight: ignoring newer session (client bug)"
);
PreflightOutcome::Drop
}
false
}
// Client bug; recovered by client retry. Silent drop.
RequestStatus::Stale
| RequestStatus::RequestGap { .. }
| RequestStatus::AlreadyRegistered { .. } => false,
RequestStatus::New => true,
| RequestStatus::AlreadyRegistered { .. } => PreflightOutcome::Drop,
RequestStatus::New => PreflightOutcome::Dispatch,
}
}

/// Message-plane fallback for [`request_preflight`]: best-effort resend by the
/// consensus `client_id`.
///
/// The wire-path ingress (`on_request`) and the queued retry drain have no
/// home-shard transport context to route through, so they apply the outcome
/// here. Returns `true` iff the caller should dispatch a fresh prepare.
///
/// Delivery is best-effort: a VSR consensus id's top bits are random, so
/// `send_to_client` may not reach the client. The in-process client path
/// (`submit_request_in_process`) instead carries the outcome back to the home
/// shard and resends by transport id.
#[allow(clippy::future_not_send)]
pub async fn apply_preflight_consensus_plane<B, P>(
consensus: &VsrConsensus<B, P>,
outcome: PreflightOutcome,
client_id: u128,
) -> bool
where
B: MessageBus,
P: Pipeline<Entry = PipelineEntry>,
{
match outcome {
PreflightOutcome::Dispatch => true,
PreflightOutcome::Replay(reply) => {
let _ = consensus
.message_bus()
.send_to_client(client_id, reply)
.await;
false
}
PreflightOutcome::Evict(reason) => {
send_eviction_to_client(consensus, client_id, reason).await;
false
}
PreflightOutcome::Drop => false,
}
}

Expand Down Expand Up @@ -339,7 +366,6 @@ mod tests {
use crate::{CLIENTS_TABLE_MAX, LocalPipeline};
use iggy_binary_protocol::{Command2, Operation, ReplyHeader};
use message_bus::SendError;
use server_common::{MESSAGE_ALIGN, iobuf::Frozen};

/// Production-sized `ClientTable`.
fn fresh_client_table() -> RefCell<ClientTable> {
Expand Down Expand Up @@ -464,12 +490,16 @@ mod tests {

let client_id: u128 = 0xCAFE;

let result = futures::executor::block_on(request_preflight(
let result = futures::executor::block_on(apply_preflight_consensus_plane(
&consensus,
&client_table,
request_preflight(
&consensus,
&client_table,
client_id,
10, // session
1, // request
),
client_id,
10, // session
1, // request
));
assert!(!result, "NoSession short-circuits");

Expand Down Expand Up @@ -503,12 +533,10 @@ mod tests {
.commit_register(client_id, initial_reply, |_| false);

// Older retry (17 < 99): stale-session case.
let result = futures::executor::block_on(request_preflight(
let result = futures::executor::block_on(apply_preflight_consensus_plane(
&consensus,
&client_table,
request_preflight(&consensus, &client_table, client_id, 17, 1),
client_id,
17,
1,
));
assert!(!result, "SessionMismatch short-circuits");

Expand Down Expand Up @@ -541,12 +569,10 @@ mod tests {
.commit_register(client_id, initial_reply, |_| false);

// Client claims newer session (99 > 17), client bug.
let result = futures::executor::block_on(request_preflight(
let result = futures::executor::block_on(apply_preflight_consensus_plane(
&consensus,
&client_table,
request_preflight(&consensus, &client_table, client_id, 99, 1),
client_id,
99,
1,
));
assert!(!result, "SessionMismatch short-circuits");

Expand All @@ -569,12 +595,16 @@ mod tests {

let client_id: u128 = 0xCAFE;

let result = futures::executor::block_on(request_preflight(
let result = futures::executor::block_on(apply_preflight_consensus_plane(
&consensus,
&client_table,
request_preflight(
&consensus,
&client_table,
client_id,
10, // session
1, // request
),
client_id,
10, // session
1, // request
));
assert!(!result, "NoSession short-circuits");

Expand Down Expand Up @@ -605,12 +635,10 @@ mod tests {
.borrow_mut()
.commit_reply(client_id, session, advanced);

let result = futures::executor::block_on(request_preflight(
let result = futures::executor::block_on(apply_preflight_consensus_plane(
&consensus,
&client_table,
request_preflight(&consensus, &client_table, client_id, session, 3), // stale
client_id,
session,
3, // stale
));
assert!(!result);

Expand Down
86 changes: 86 additions & 0 deletions core/integration/tests/sdk/clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

//! VSR connected-client reads: `get_me` (own connection, from the local
//! `SessionManager`) and `get_clients` (cluster-wide, scatter-gathered
//! across all shards). Run on a 3-node cluster so the gather genuinely
//! crosses shards.
//!
//! TODO(remove-when-scenarios-vsr): these are a focused stopgap. The
//! canonical coverage lives in `server::scenarios::authentication_scenario`
//! (`get_clients`/`get_me`/`get_client`), but that whole suite is
//! `#[cfg(not(feature = "vsr"))]` because it also does real
//! `send_messages` / `poll_messages`, which are not wired under VSR yet
//! (data plane / partition reconciliation). Once the data plane lands and
//! the `server` scenario suite is un-gated under VSR, it subsumes these
//! tests -- delete this file then.

#![cfg(feature = "vsr")]

use iggy::prelude::*;
use integration::iggy_harness;

#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic])]
async fn get_me_returns_own_connection(harness: &TestHarness) {
let client = harness.new_client().await.unwrap();
client
.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
.await
.unwrap();

let me = client.get_me().await.expect("get_me");
// Sourced from the per-shard SessionManager: authenticated user id +
// a real transport label + a non-empty peer address.
assert_eq!(me.user_id, Some(0), "root is user id 0 in server-ng");
assert!(!me.transport.is_empty(), "transport label must be set");
assert!(!me.address.is_empty(), "peer address must be set");

client.logout_user().await.unwrap();
}

#[iggy_harness(test_client_transport = [Tcp, WebSocket, Quic])]
async fn get_clients_gathers_all_connections(harness: &TestHarness) {
// Two extra clients alongside the harness client; on a 3-node cluster
// their connections are spread across shards, so a complete list
// exercises the cross-shard gather.
let a = harness.new_client().await.unwrap();
a.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
.await
.unwrap();
let b = harness.new_client().await.unwrap();
b.login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD)
.await
.unwrap();

let clients = a.get_clients().await.expect("get_clients");
// At least the two we just logged in (the gather may also include
// other harness/bookkeeping connections).
assert!(
clients.len() >= 2,
"expected >= 2 connected clients, got {}",
clients.len()
);
// Records are real, not the old empty stub: transport + address set.
for info in &clients {
assert!(!info.transport.is_empty(), "transport label must be set");
assert!(!info.address.is_empty(), "peer address must be set");
}

a.logout_user().await.unwrap();
b.logout_user().await.unwrap();
}
2 changes: 2 additions & 0 deletions core/integration/tests/sdk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* under the License.
*/

#[cfg(feature = "vsr")]
mod clients;
mod hello_world;
#[cfg(not(feature = "vsr"))]
mod producer;
Expand Down
Loading
Loading