Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,25 @@ jobs:
python3 - <<'PY'
import re, sys
PATH = "/tmp/ublk_drv.ko"
MAGIC = b"~Module signature appended~\n"
def strip_sig(buf):
# Patching the body invalidates the module's appended signature.
# A module whose signature is present-but-invalid is rejected with
# EKEYREJECTED even when enforcement is OFF (module_sig_check
# returns the verify error unconditionally for bad sigs; only a
# *missing* sig is tolerated). The runner has sig_enforce=N and no
# lockdown, so strip the now-stale sig and the kernel loads the
# unsigned patched module (tainted, fine for CI).
if bytes(buf[-len(MAGIC):]) != MAGIC:
print("no appended signature to strip", file=sys.stderr)
return buf
# sig_len is the last be32 of the 12-byte struct module_signature,
# which sits immediately before MAGIC.
sig_len = int.from_bytes(buf[-len(MAGIC)-4:-len(MAGIC)], "big")
total = sig_len + 12 + len(MAGIC)
del buf[-total:]
print(f"stripped module signature ({total} bytes)", file=sys.stderr)
return buf
data = bytearray(open(PATH, "rb").read())
matches = list(re.finditer(b'\x39\xf0\x72\xd6', bytes(data)))
if len(matches) == 0:
Expand All @@ -173,6 +192,7 @@ jobs:
sys.exit(1)
off = matches[0].start() + 2
data[off:off+2] = b'\x90\x90'
data = strip_sig(data)
open(PATH, "wb").write(bytes(data))
print(f"patched offset {hex(off)} (jb -> nop nop)")
PY
Expand Down Expand Up @@ -239,6 +259,25 @@ jobs:
python3 - <<'PY'
import re, sys
PATH = "/tmp/ublk_drv.ko"
MAGIC = b"~Module signature appended~\n"
def strip_sig(buf):
# Patching the body invalidates the module's appended signature.
# A module whose signature is present-but-invalid is rejected with
# EKEYREJECTED even when enforcement is OFF (module_sig_check
# returns the verify error unconditionally for bad sigs; only a
# *missing* sig is tolerated). The runner has sig_enforce=N and no
# lockdown, so strip the now-stale sig and the kernel loads the
# unsigned patched module (tainted, fine for CI).
if bytes(buf[-len(MAGIC):]) != MAGIC:
print("no appended signature to strip", file=sys.stderr)
return buf
# sig_len is the last be32 of the 12-byte struct module_signature,
# which sits immediately before MAGIC.
sig_len = int.from_bytes(buf[-len(MAGIC)-4:-len(MAGIC)], "big")
total = sig_len + 12 + len(MAGIC)
del buf[-total:]
print(f"stripped module signature ({total} bytes)", file=sys.stderr)
return buf
data = bytearray(open(PATH, "rb").read())
matches = list(re.finditer(b'\x39\xf0\x72\xd6', bytes(data)))
if len(matches) == 0:
Expand All @@ -253,6 +292,7 @@ jobs:
sys.exit(1)
off = matches[0].start() + 2
data[off:off+2] = b'\x90\x90'
data = strip_sig(data)
open(PATH, "wb").write(bytes(data))
print(f"patched offset {hex(off)}")
PY
Expand Down Expand Up @@ -324,6 +364,25 @@ jobs:
python3 - <<'PY'
import re, sys
PATH = "/tmp/ublk_drv.ko"
MAGIC = b"~Module signature appended~\n"
def strip_sig(buf):
# Patching the body invalidates the module's appended signature.
# A module whose signature is present-but-invalid is rejected with
# EKEYREJECTED even when enforcement is OFF (module_sig_check
# returns the verify error unconditionally for bad sigs; only a
# *missing* sig is tolerated). The runner has sig_enforce=N and no
# lockdown, so strip the now-stale sig and the kernel loads the
# unsigned patched module (tainted, fine for CI).
if bytes(buf[-len(MAGIC):]) != MAGIC:
print("no appended signature to strip", file=sys.stderr)
return buf
# sig_len is the last be32 of the 12-byte struct module_signature,
# which sits immediately before MAGIC.
sig_len = int.from_bytes(buf[-len(MAGIC)-4:-len(MAGIC)], "big")
total = sig_len + 12 + len(MAGIC)
del buf[-total:]
print(f"stripped module signature ({total} bytes)", file=sys.stderr)
return buf
data = bytearray(open(PATH, "rb").read())
matches = list(re.finditer(b'\x39\xf0\x72\xd6', bytes(data)))
if len(matches) == 0:
Expand All @@ -338,6 +397,7 @@ jobs:
sys.exit(1)
off = matches[0].start() + 2
data[off:off+2] = b'\x90\x90'
data = strip_sig(data)
open(PATH, "wb").write(bytes(data))
print(f"patched offset {hex(off)}")
PY
Expand All @@ -348,7 +408,15 @@ jobs:
- name: Load kernel modules
run: |
sudo modprobe nbd || echo "nbd not available — nbd kernel tests will skip"
sudo modprobe ublk_drv || echo "ublk_drv not available — ublk tests will skip"
# ublk is required for this job. Fail loudly rather than letting the
# tests silently skip (which previously masked the signed-module
# rejection for weeks).
sudo modprobe ublk_drv
if [ ! -e /dev/ublk-control ]; then
echo "::error::/dev/ublk-control missing after modprobe ublk_drv"
sudo dmesg | grep -iE 'ublk|signature|lockdown' | tail -20 || true
exit 1
fi
- name: Install ublk udev rule
run: |
sudo cp deploy/udev/99-glidefs-ublk.rules /etc/udev/rules.d/99-glidefs-ublk.rules
Expand Down
17 changes: 13 additions & 4 deletions ext4/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ impl<W: Read + Write + Seek> Writer<W> {

// Track the newly created directory's inode
let (_, new_ino, _) = self.lookup(&current_path, true)?;
current_ino = new_ino.unwrap();
current_ino = new_ino.expect("lookup(must_exist=true) returns Err when absent, so child_ino is Some here");
}
Ok(())
}
Expand Down Expand Up @@ -870,7 +870,7 @@ impl<W: Read + Write + Seek> Writer<W> {
}

let (_, old_ino, _) = self.lookup(oldname, true)?;
let old_ino = old_ino.unwrap();
let old_ino = old_ino.expect("lookup(must_exist=true) returns Err when absent, so child_ino is Some here");
let old_file = self.get_inode(old_ino).unwrap();
if old_file.mode & TYPE_MASK == format::S_IFDIR {
return Err(io::Error::other(
Expand Down Expand Up @@ -899,7 +899,7 @@ impl<W: Read + Write + Seek> Writer<W> {
pub fn stat(&mut self, name: &str) -> io::Result<File> {
self.finish_inode()?;
let (_, node_ino, _) = self.lookup(name, true)?;
let node_ino = node_ino.unwrap();
let node_ino = node_ino.expect("lookup(must_exist=true) returns Err when absent, so child_ino is Some here");
let node = self.get_inode(node_ino).unwrap();
let mut f = File {
size: node.size,
Expand Down Expand Up @@ -935,7 +935,16 @@ impl<W: Read + Write + Seek> Writer<W> {
format!("{name}: cannot retrieve link information"),
));
}
f.linkname = String::from_utf8_lossy(&node.data).to_string();
let n = (node.size as usize).min(node.data.len());
let target = &node.data[..n];
// `linkname` is a `String`, so a non-UTF-8 target cannot round-trip
// through it. Surface that as an error rather than silently
// corrupting the path with U+FFFD replacement characters.
f.linkname = std::str::from_utf8(target)
.map_err(|e| {
io::Error::other(format!("{name}: symlink target is not valid UTF-8: {e}"))
})?
.to_string();
}
Ok(f)
}
Expand Down
14 changes: 9 additions & 5 deletions glidefs/src/block/block_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,15 +683,19 @@ impl SequenceNumber {

/// Atomically increment and return the new value.
///
/// Uses Relaxed ordering because sequence numbers only need to be
/// monotonically increasing, not synchronized with other memory.
/// Uses `AcqRel` on the successful update (and `Acquire` on the read for
/// the retry) so a value advanced by one thread is guaranteed visible to
/// the next caller — including across the handoff `advance_to` boundary on
/// weakly-ordered targets (aarch64). A purely `Relaxed` counter could let
/// a post-handoff write reuse a sequence number `advance_to` already
/// skipped past, breaking WAL replay ordering.
/// Saturates at `u64::MAX` instead of wrapping, which would violate
/// the monotonicity invariant that WAL replay depends on.
#[inline]
pub fn next(&self) -> u64 {
match self
.0
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
if v == u64::MAX {
None
} else {
Expand All @@ -712,7 +716,7 @@ impl SequenceNumber {
/// Read the current value without incrementing.
#[inline]
pub fn current(&self) -> u64 {
self.0.load(Ordering::Relaxed)
self.0.load(Ordering::Acquire)
}

/// Atomically advance the counter to at least `target`. Used by the
Expand All @@ -724,7 +728,7 @@ impl SequenceNumber {
pub fn advance_to(&self, target: u64) {
let _ = self
.0
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| {
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |v| {
if v < target { Some(target) } else { None }
});
}
Expand Down
17 changes: 10 additions & 7 deletions glidefs/src/block/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,24 @@ impl BlockHandler {

/// Check if this handler is readonly.
pub fn is_readonly(&self) -> bool {
self.readonly.load(Ordering::Relaxed)
self.readonly.load(Ordering::Acquire)
}

/// Set the readonly flag.
/// Used by promote_export to allow writes after migration.
pub fn set_readonly(&self, readonly: bool) {
self.readonly.store(readonly, Ordering::Relaxed);
// Release-paired with the Acquire loads in the op gates so the flag
// transition is promptly visible to other threads (e.g. weakly-ordered
// aarch64 cloud instances), not just on x86's strong ordering.
self.readonly.store(readonly, Ordering::Release);
}

/// True iff a mutating handler previously panicked. While set, every
/// op short-circuits to `CommandError::IoError`. See the field doc on
/// `degraded` for the safety rationale.
#[inline]
pub fn is_degraded(&self) -> bool {
self.degraded.load(Ordering::Relaxed)
self.degraded.load(Ordering::Acquire)
}

/// Mark this export as degraded after a panic in a mutating handler.
Expand All @@ -302,7 +305,7 @@ impl BlockHandler {
/// Inlined so the happy path is a single atomic load.
#[inline]
fn check_not_degraded(&self) -> CommandResult<()> {
if self.degraded.load(Ordering::Relaxed) {
if self.degraded.load(Ordering::Acquire) {
Err(CommandError::IoError)
} else {
Ok(())
Expand All @@ -315,10 +318,10 @@ impl BlockHandler {
/// Inlined so the happy path is two atomic loads.
#[inline]
fn check_writable(&self) -> CommandResult<()> {
if self.degraded.load(Ordering::Relaxed) {
if self.degraded.load(Ordering::Acquire) {
return Err(CommandError::IoError);
}
if self.frozen.load(Ordering::Relaxed) {
if self.frozen.load(Ordering::Acquire) {
return Err(CommandError::Frozen);
}
Ok(())
Expand All @@ -327,7 +330,7 @@ impl BlockHandler {
/// True iff this handler has been frozen for handoff.
#[inline]
pub fn is_frozen(&self) -> bool {
self.frozen.load(Ordering::Relaxed)
self.frozen.load(Ordering::Acquire)
}

/// Freeze this handler. After this returns, all mutating ops on this
Expand Down
9 changes: 8 additions & 1 deletion glidefs/src/block/ublk/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ pub struct WorkerBufferPool {
pub backpressure_waits: AtomicU64,
}

unsafe impl Send for WorkerBufferPool {}
// NOTE: `WorkerBufferPool` is deliberately NOT `Send`. It contains `RefCell`
// (no internal synchronization) and a raw `*mut u8` region, and it is only
// ever held via `Rc` — in the per-thread `WORKER_POOL` thread-local and in
// `PoolSlot`/`AcquireFut`. `Rc` is itself `!Send`, so every holder is already
// pinned to its originating worker thread. The futures that touch the pool run
// on the worker's single-threaded executor (`Executor::spawn` carries no `Send`
// bound). An `unsafe impl Send` here would contradict the `RefCell` invariant
// and silently permit a cross-thread move that races `free`/`waiters`.

impl WorkerBufferPool {
fn new() -> std::io::Result<Self> {
Expand Down
25 changes: 15 additions & 10 deletions glidefs/src/block/write_cache/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,9 @@ impl WriteCache<Active> {
drop(self.inner.flushing_file.lock().take());
}
let flushing_path = self.inner.config.flushing_path();
match std::fs::remove_file(&flushing_path) {
// tokio::fs to avoid blocking the executor on the journal
// commit that an ext4 unlink can incur.
match tokio::fs::remove_file(&flushing_path).await {
Ok(()) => {
info!("removed orphaned flushing file");
}
Expand Down Expand Up @@ -903,7 +905,7 @@ impl WriteCache<Active> {
self.inner.rotation_seq.store(0, Ordering::Release);
drop(self.inner.flushing_file.lock().take());
let flushing_path = self.inner.config.flushing_path();
match std::fs::remove_file(&flushing_path) {
match tokio::fs::remove_file(&flushing_path).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => warn!(error = %e, "failed to remove empty flushing file"),
Expand Down Expand Up @@ -1038,9 +1040,12 @@ impl WriteCache<Active> {
// unrecoverable blocks have already been marked NOT_PRESENT above.
drop(self.inner.flushing_file.lock().take());
let flushing_path = self.inner.config.flushing_path();
if flushing_path.exists() {
let _ = std::fs::remove_file(&flushing_path);
}
// Synchronous remove: this is the cold takeover-recovery path (runs
// once after a flush failure), and it sits in the same lexical scope
// as the just-dropped data_file write guard — keeping it sync avoids
// an await inside that scope. remove_file on a missing path returns
// NotFound, which we ignore.
let _ = std::fs::remove_file(&flushing_path);
// Only transition blocks whose data was successfully recovered.
for &idx in &recovered {
self.inner.transition_to_dirty(idx);
Expand Down Expand Up @@ -1489,8 +1494,8 @@ impl WriteCache<Active> {
drop(self.inner.flushing_file.lock().take());
if checkpoint_ok {
let flushing_path = self.inner.config.flushing_path();
if flushing_path.exists()
&& let Err(e) = std::fs::remove_file(&flushing_path)
if let Err(e) = tokio::fs::remove_file(&flushing_path).await
&& e.kind() != std::io::ErrorKind::NotFound
{
warn!(error = %e, "failed to remove flushing file after atomic flush");
}
Expand Down Expand Up @@ -1570,10 +1575,10 @@ impl WriteCache<Active> {
// longer needed as a crash-safety net. Delete it so the next flush
// cycle can rotate the data file.
let flushing_path = self.inner.config.flushing_path();
if flushing_path.exists()
&& let Err(e) = std::fs::remove_file(&flushing_path)
if let Err(e) = tokio::fs::remove_file(&flushing_path).await
&& e.kind() != std::io::ErrorKind::NotFound
{
warn!(error = %e, "failed to remove flushing file after manifest sync");
warn!(error = %e, "failed to remove flushing file after manifest sync");
}
Ok(())
}
Expand Down
21 changes: 12 additions & 9 deletions glidefs/src/circuit_breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ pub enum FailurePolicy {
},
/// N failures within the window opens the circuit.
/// Failures older than the window are forgotten.
// Retained, documented, and test-covered failure policy (see module docs
// and `test_windowed_*`). Production currently wires only `Consecutive`
// (router.rs); this variant awaits an edge-proxy caller. dead_code is
// allowed until then rather than deleting a complete, tested feature.
#[allow(dead_code)]
Windowed {
/// Number of failures within the window before opening.
Expand Down Expand Up @@ -131,6 +135,9 @@ impl CircuitBreakerConfig {
}

/// Create a config with windowed failure detection.
// Constructor for the retained `Windowed` policy above; exercised by tests,
// not yet called from production. Justified-dead until an edge-proxy path
// adopts windowed detection.
#[allow(dead_code)]
pub fn windowed(threshold: u32, window: Duration) -> Self {
Self {
Expand All @@ -150,15 +157,6 @@ impl CircuitBreakerConfig {
self.half_open_permits = permits;
self
}

/// Get the failure threshold from the policy.
#[allow(dead_code)]
fn threshold(&self) -> u32 {
match &self.failure_policy {
FailurePolicy::Consecutive { threshold } => *threshold,
FailurePolicy::Windowed { threshold, .. } => *threshold,
}
}
}

/// Lock-free circuit breaker.
Expand Down Expand Up @@ -512,6 +510,8 @@ impl CircuitBreaker {
}

/// Reset the circuit breaker to closed state.
// Public operational primitive (manual recovery / admin override). Kept on
// the API surface deliberately even though no in-tree caller invokes it yet.
#[allow(dead_code)]
pub fn reset(&self) {
self.window_start.store(0, Ordering::Release);
Expand All @@ -520,6 +520,9 @@ impl CircuitBreaker {
}

/// Force the circuit to a specific state (for testing/admin).
// Test/admin helper. Callers live in the test module, so under the
// `test-utils` feature alone (tests not compiled) this reads as dead —
// hence the allow.
#[cfg(any(test, loom, feature = "test-utils"))]
#[allow(dead_code)]
pub fn force_state(&self, new_state: CircuitState) {
Expand Down
Loading
Loading