|
10 | 10 |
|
11 | 11 |
|
12 | 12 | @pytest.mark.engines(["control"]) |
13 | | -@pytest.mark.xfail( |
14 | | - reason="log file may be empty due to control client buffering", |
15 | | - strict=True, |
16 | | -) |
17 | 13 | def test_control_client_lists_clients( |
18 | | - control_client_logs: tuple[t.Any, t.Any, t.Any], |
| 14 | + control_client_logs: tuple[t.Any, ControlProtocol], |
19 | 15 | ) -> None: |
20 | 16 | """Raw control client should report itself with control-mode flag.""" |
21 | | - proc, stdout_path, _stderr_path = control_client_logs |
| 17 | + proc, proto = control_client_logs |
22 | 18 |
|
23 | 19 | assert proc.stdin is not None |
| 20 | + list_ctx = CommandContext( |
| 21 | + argv=[ |
| 22 | + "tmux", |
| 23 | + "list-clients", |
| 24 | + "-F", |
| 25 | + "#{client_pid} #{client_flags} #{session_name}", |
| 26 | + ], |
| 27 | + ) |
| 28 | + proto.register_command(list_ctx) |
| 29 | + detach_ctx = CommandContext(argv=["tmux", "detach-client"]) |
| 30 | + proto.register_command(detach_ctx) |
24 | 31 | proc.stdin.write('list-clients -F"#{client_pid} #{client_flags} #{session_name}"\n') |
| 32 | + proc.stdin.write("detach-client\n") |
25 | 33 | proc.stdin.flush() |
26 | 34 |
|
27 | | - lines = stdout_path.read_text().splitlines() |
28 | | - assert any(len(line.split()) >= 2 and "C" in line.split()[1] for line in lines) |
| 35 | + stdout_data, _ = proc.communicate(timeout=5) |
| 36 | + for line in stdout_data.splitlines(): |
| 37 | + proto.feed_line(line.rstrip("\n")) |
| 38 | + |
| 39 | + assert list_ctx.done.wait(timeout=0.5) |
| 40 | + result = proto.build_result(list_ctx) |
| 41 | + saw_control_flag = any( |
| 42 | + len(parts := line.split()) >= 2 |
| 43 | + and ("C" in parts[1] or "control-mode" in parts[1]) |
| 44 | + for line in result.stdout |
| 45 | + ) |
| 46 | + assert saw_control_flag |
29 | 47 |
|
30 | 48 |
|
31 | 49 | @pytest.mark.engines(["control"]) |
32 | 50 | def test_control_client_capture_stream_parses( |
33 | | - control_client_logs: tuple[t.Any, t.Any, t.Any], |
| 51 | + control_client_logs: tuple[t.Any, ControlProtocol], |
34 | 52 | ) -> None: |
35 | 53 | """Ensure ControlProtocol can parse raw stream from the logged control client.""" |
36 | | - proc, stdout_path, _stderr_path = control_client_logs |
| 54 | + proc, proto = control_client_logs |
37 | 55 | assert proc.stdin is not None |
38 | 56 |
|
| 57 | + display_ctx = CommandContext(argv=["tmux", "display-message", "-p", "hello"]) |
| 58 | + proto.register_command(display_ctx) |
| 59 | + detach_ctx = CommandContext(argv=["tmux", "detach-client"]) |
| 60 | + proto.register_command(detach_ctx) |
39 | 61 | proc.stdin.write("display-message -p hello\n") |
| 62 | + proc.stdin.write("detach-client\n") |
40 | 63 | proc.stdin.flush() |
41 | 64 |
|
42 | | - lines = stdout_path.read_text().splitlines() |
43 | | - |
44 | | - proto = ControlProtocol() |
45 | | - ctx = CommandContext(argv=["tmux", "display-message", "-p", "hello"]) |
46 | | - proto.register_command(ctx) |
| 65 | + stdout_data, _ = proc.communicate(timeout=5) |
47 | 66 |
|
48 | | - # Feed lines; expect begin/payload/end |
49 | | - for line in lines: |
50 | | - proto.feed_line(line) |
| 67 | + for line in stdout_data.splitlines(): |
| 68 | + proto.feed_line(line.rstrip("\n")) |
51 | 69 |
|
52 | | - # If tmux emitted begin/end, ctx should be done |
53 | | - if ctx.done.is_set(): |
54 | | - result = proto.build_result(ctx) |
55 | | - assert "hello" in result.stdout |
| 70 | + assert display_ctx.done.wait(timeout=0.5) |
| 71 | + result = proto.build_result(display_ctx) |
| 72 | + assert "hello" in result.stdout or "hello" in "".join(result.stdout) |
0 commit comments