Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 48 additions & 15 deletions crates/recording/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use cap_cursor_capture::CursorCropBounds;
use cap_cursor_info::CursorShape;
use cap_project::{
CursorClickEvent, CursorEvents, CursorMoveEvent, KeyPressEvent, KeyboardEvents, XY,
CursorClickEvent, CursorMoveEvent, KeyPressEvent, KeyboardEvents, XY,
};
use cap_timestamp::Timestamps;
use futures::{FutureExt, future::Shared};
use serde::Serialize;
use std::{
collections::HashMap,
fs::File,
io::{BufWriter, Write},
path::{Path, PathBuf},
time::Instant,
};
Expand Down Expand Up @@ -50,19 +53,38 @@ impl CursorActor {

const CURSOR_FLUSH_INTERVAL_SECS: u64 = 5;

#[derive(Serialize)]
struct CursorEventsSnapshot<'a> {
clicks: &'a [CursorClickEvent],
moves: &'a [CursorMoveEvent],
}

fn flush_cursor_data(output_path: &Path, moves: &[CursorMoveEvent], clicks: &[CursorClickEvent]) {
let events = CursorEvents {
clicks: clicks.to_vec(),
moves: moves.to_vec(),
};
if let Ok(json) = serde_json::to_string_pretty(&events)
&& let Err(e) = std::fs::write(output_path, json)
{
tracing::error!(
"Failed to write cursor data to {}: {}",
output_path.display(),
e
);
let events = CursorEventsSnapshot { clicks, moves };
match File::create(output_path) {
Ok(file) => {
let mut writer = BufWriter::new(file);
if let Err(e) = serde_json::to_writer(&mut writer, &events) {
tracing::error!(
"Failed to serialize cursor data to {}: {}",
output_path.display(),
e
);
} else if let Err(e) = writer.flush() {
tracing::error!(
"Failed to flush cursor data to {}: {}",
output_path.display(),
e
);
}
Comment on lines +66 to +79
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Silent flush failure on BufWriter drop. serde_json::to_writer serializes in chunks via Write::write_all, so BufWriter's internal buffer typically still holds the last few KB when serialization finishes. Rust's BufWriter drop calls flush() but discards the error — if it fails (e.g. disk full), the cursor file is silently truncated. An explicit writer.flush() before the block ends captures and logs that error.

Suggested change
let mut writer = BufWriter::new(file);
if let Err(e) = serde_json::to_writer(&mut writer, &events) {
tracing::error!(
"Failed to serialize cursor data to {}: {}",
output_path.display(),
e
);
}
let mut writer = BufWriter::new(file);
if let Err(e) = serde_json::to_writer(&mut writer, &events) {
tracing::error!(
"Failed to serialize cursor data to {}: {}",
output_path.display(),
e
);
} else if let Err(e) = writer.flush() {
tracing::error!(
"Failed to flush cursor data to {}: {}",
output_path.display(),
e
);
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/recording/src/cursor.rs
Line: 66-73

Comment:
Silent flush failure on BufWriter drop. `serde_json::to_writer` serializes in chunks via `Write::write_all`, so BufWriter's internal buffer typically still holds the last few KB when serialization finishes. Rust's `BufWriter` drop calls `flush()` but discards the error — if it fails (e.g. disk full), the cursor file is silently truncated. An explicit `writer.flush()` before the block ends captures and logs that error.

```suggestion
            let mut writer = BufWriter::new(file);
            if let Err(e) = serde_json::to_writer(&mut writer, &events) {
                tracing::error!(
                    "Failed to serialize cursor data to {}: {}",
                    output_path.display(),
                    e
                );
            } else if let Err(e) = writer.flush() {
                tracing::error!(
                    "Failed to flush cursor data to {}: {}",
                    output_path.display(),
                    e
                );
            }
```

How can I resolve this? If you propose a fix, please make it concise.

}
Err(e) => {
tracing::error!(
"Failed to write cursor data to {}: {}",
output_path.display(),
e
);
}
}
}

Expand Down Expand Up @@ -227,6 +249,8 @@ pub fn spawn_cursor_recorder(
let mut last_flush = Instant::now();
let flush_interval = Duration::from_secs(CURSOR_FLUSH_INTERVAL_SECS);
let mut last_cursor_id: Option<String> = None;
let mut last_flushed_cursor_counts: Option<(usize, usize)> = None;
let mut last_flushed_keyboard_count: Option<usize> = None;

loop {
let sleep = tokio::time::sleep(Duration::from_millis(16));
Expand Down Expand Up @@ -361,11 +385,20 @@ pub fn spawn_cursor_recorder(
last_keys = current_keys;

if last_flush.elapsed() >= flush_interval {
let cursor_counts = (response.moves.len(), response.clicks.len());
let keyboard_count = response.keyboard_presses.len();

if let Some(ref path) = incremental_outputs.cursor {
flush_cursor_data(path, &response.moves, &response.clicks);
if last_flushed_cursor_counts != Some(cursor_counts) {
flush_cursor_data(path, &response.moves, &response.clicks);
last_flushed_cursor_counts = Some(cursor_counts);
}
}
if let Some(ref kb_path) = incremental_outputs.keyboard {
flush_keyboard_data(kb_path, &response.keyboard_presses);
if last_flushed_keyboard_count != Some(keyboard_count) {
flush_keyboard_data(kb_path, &response.keyboard_presses);
last_flushed_keyboard_count = Some(keyboard_count);
}
}
last_flush = Instant::now();
}
Expand Down
113 changes: 82 additions & 31 deletions crates/recording/src/output_pipeline/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
use tracing::*;

const DEFAULT_MP4_MUXER_BUFFER_SIZE: usize = 60;
const DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT: usize = 240;
const DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT: usize = 96;
const DEFAULT_MP4_AUDIO_FINISH_TIMEOUT: Duration = Duration::from_secs(2);
const DEFAULT_MP4_AUDIO_FINISH_TIMEOUT_INSTANT: Duration = Duration::from_secs(8);

Expand All @@ -51,14 +51,15 @@ fn get_available_disk_space_mb(path: &std::path::Path) -> Option<u64> {
}

fn get_mp4_muxer_buffer_size(instant_mode: bool) -> usize {
std::env::var("CAP_MP4_MUXER_BUFFER_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(if instant_mode {
DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT
} else {
DEFAULT_MP4_MUXER_BUFFER_SIZE
})
let parse_env = |name: &str| std::env::var(name).ok().and_then(|s| s.parse().ok());

if instant_mode {
parse_env("CAP_MP4_MUXER_BUFFER_SIZE_INSTANT")
.or_else(|| parse_env("CAP_MP4_MUXER_BUFFER_SIZE"))
.unwrap_or(DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT)
} else {
parse_env("CAP_MP4_MUXER_BUFFER_SIZE").unwrap_or(DEFAULT_MP4_MUXER_BUFFER_SIZE)
}
}

fn get_mp4_audio_finish_timeout(instant_mode: bool) -> Duration {
Expand Down Expand Up @@ -1197,6 +1198,22 @@ mod tests {
mod mp4_muxer_buffer_size {
use super::*;

static MP4_MUXER_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());

fn with_muxer_env_lock(f: impl FnOnce()) {
let _guard = MP4_MUXER_ENV_LOCK
.lock()
.expect("mp4 muxer env lock should not be poisoned");
f();
}

fn clear_muxer_env_overrides() {
unsafe {
std::env::remove_var("CAP_MP4_MUXER_BUFFER_SIZE");
std::env::remove_var("CAP_MP4_MUXER_BUFFER_SIZE_INSTANT");
}
}

#[test]
fn instant_mode_buffer_is_larger_than_normal() {
let instant = DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT;
Expand All @@ -1210,8 +1227,8 @@ mod tests {
}

#[test]
fn instant_mode_default_is_240() {
assert_eq!(DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT, 240);
fn instant_mode_default_is_96() {
assert_eq!(DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT, 96);
}

#[test]
Expand All @@ -1221,30 +1238,64 @@ mod tests {

#[test]
fn env_override_takes_precedence() {
unsafe {
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE", "500");
}
let normal = get_mp4_muxer_buffer_size(false);
let instant = get_mp4_muxer_buffer_size(true);
unsafe {
std::env::remove_var("CAP_MP4_MUXER_BUFFER_SIZE");
}
assert_eq!(normal, 500);
assert_eq!(instant, 500);
with_muxer_env_lock(|| {
clear_muxer_env_overrides();
unsafe {
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE", "500");
}
let normal = get_mp4_muxer_buffer_size(false);
let instant = get_mp4_muxer_buffer_size(true);
clear_muxer_env_overrides();
assert_eq!(normal, 500);
assert_eq!(instant, 500);
});
}

#[test]
fn instant_env_override_takes_precedence_over_global_override() {
with_muxer_env_lock(|| {
clear_muxer_env_overrides();
unsafe {
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE", "500");
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE_INSTANT", "120");
}
let normal = get_mp4_muxer_buffer_size(false);
let instant = get_mp4_muxer_buffer_size(true);
clear_muxer_env_overrides();
assert_eq!(normal, 500);
assert_eq!(instant, 120);
});
}

#[test]
fn invalid_env_falls_back_to_defaults() {
unsafe {
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE", "not_a_number");
}
let normal = get_mp4_muxer_buffer_size(false);
let instant = get_mp4_muxer_buffer_size(true);
unsafe {
std::env::remove_var("CAP_MP4_MUXER_BUFFER_SIZE");
}
assert_eq!(normal, DEFAULT_MP4_MUXER_BUFFER_SIZE);
assert_eq!(instant, DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT);
with_muxer_env_lock(|| {
clear_muxer_env_overrides();
unsafe {
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE", "not_a_number");
}
let normal = get_mp4_muxer_buffer_size(false);
let instant = get_mp4_muxer_buffer_size(true);
clear_muxer_env_overrides();
assert_eq!(normal, DEFAULT_MP4_MUXER_BUFFER_SIZE);
assert_eq!(instant, DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT);
});
}

#[test]
fn invalid_instant_override_falls_back_to_global_override() {
with_muxer_env_lock(|| {
clear_muxer_env_overrides();
unsafe {
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE", "80");
std::env::set_var("CAP_MP4_MUXER_BUFFER_SIZE_INSTANT", "not_a_number");
}
let normal = get_mp4_muxer_buffer_size(false);
let instant = get_mp4_muxer_buffer_size(true);
clear_muxer_env_overrides();
assert_eq!(normal, 80);
assert_eq!(instant, 80);
});
}
Comment on lines 1254 to +1299
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Env-var tests are not safe to run in parallel. Both new tests (instant_env_override_takes_precedence_over_global_override and invalid_instant_override_falls_back_to_global_override) mutate process-wide env vars (CAP_MP4_MUXER_BUFFER_SIZE and CAP_MP4_MUXER_BUFFER_SIZE_INSTANT) without any locking. Because cargo test runs unit tests on multiple threads by default, these tests — and the pre-existing ones in this module — can observe each other's set_var/remove_var calls, producing spurious failures or incorrect passing results. The serial_test crate (or a shared Mutex in a lazy_static) is the idiomatic fix.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/recording/src/output_pipeline/macos.rs
Line: 1237-1281

Comment:
Env-var tests are not safe to run in parallel. Both new tests (`instant_env_override_takes_precedence_over_global_override` and `invalid_instant_override_falls_back_to_global_override`) mutate process-wide env vars (`CAP_MP4_MUXER_BUFFER_SIZE` and `CAP_MP4_MUXER_BUFFER_SIZE_INSTANT`) without any locking. Because `cargo test` runs unit tests on multiple threads by default, these tests — and the pre-existing ones in this module — can observe each other's `set_var`/`remove_var` calls, producing spurious failures or incorrect passing results. The `serial_test` crate (or a shared `Mutex` in a `lazy_static`) is the idiomatic fix.

How can I resolve this? If you propose a fix, please make it concise.

}

Expand Down