Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

***Fixed:***

- Fix race condition in the telemetry daemon payload processing logic

## 0.33.2 - 2026-05-13

***Fixed:***
Expand Down
55 changes: 41 additions & 14 deletions src/dda/telemetry/daemon/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dda.utils.fs import Path

if TYPE_CHECKING:
from collections.abc import AsyncIterator
from collections.abc import AsyncIterator, Callable

from dda.telemetry.daemon.base import TelemetryClient

Expand All @@ -30,34 +30,57 @@
atexit.register(finalize_error)


def get_client(id: str, **kwargs: Any) -> TelemetryClient: # noqa: A002
if id == "trace":
from dda.telemetry.daemon.trace import TraceTelemetryClient
def create_trace_client(**kwargs: Any) -> TelemetryClient:
from dda.telemetry.daemon.trace import TraceTelemetryClient

return TraceTelemetryClient(**kwargs)


def create_log_client(**kwargs: Any) -> TelemetryClient:
from dda.telemetry.daemon.log import LogTelemetryClient

return TraceTelemetryClient(**kwargs)
return LogTelemetryClient(**kwargs)

if id == "log":
from dda.telemetry.daemon.log import LogTelemetryClient

return LogTelemetryClient(**kwargs)
CLIENT_FACTORIES: dict[str, Callable[..., TelemetryClient]] = {
"trace": create_trace_client,
"log": create_log_client,
}

message = f"Unknown client ID: {id}"
raise ValueError(message)

def get_client_id(filename: str) -> str | None:
client_id, separator, _ = filename.partition("_")
if separator and client_id in CLIENT_FACTORIES and filename.endswith(".json"):
return client_id

return None


def get_client(id: str, **kwargs: Any) -> TelemetryClient: # noqa: A002
try:
create_client = CLIENT_FACTORIES[id]
except KeyError:
message = f"Unknown client ID: {id}"
raise ValueError(message) from None

return create_client(**kwargs)


async def watch_events(stop_event: asyncio.Event) -> AsyncIterator[Path]:
# Use as an ordered set
existing_files = dict.fromkeys(os.listdir(WRITE_DIR))
existing_files = dict.fromkeys(
filename for filename in os.listdir(WRITE_DIR) if get_client_id(filename) is not None
)
try:
async for changes in watchfiles.awatch(
WRITE_DIR,
stop_event=stop_event,
recursive=False,
rust_timeout=0,
watch_filter=lambda c, p: (
# Only filter the final atomically written file
# Only process final atomically written telemetry payloads
c == watchfiles.Change.added
and not (fn := os.path.basename(p)).startswith("tmp")
and get_client_id(fn := os.path.basename(p)) is not None
# ... and ignore files that were created before watching
and fn not in existing_files
),
Expand All @@ -83,7 +106,11 @@ async def process_changes(stop_event: asyncio.Event, **kwargs: Any) -> None:
with ExitStack() as stack:
clients: dict[str, TelemetryClient] = {}
async for path in watch_events(stop_event):
client_id = path.name.split("_")[0]
client_id = get_client_id(path.name)
if client_id is None:
logging.debug("Ignoring unknown telemetry file: %s", path)
continue

if (client := clients.get(client_id)) is None:
try:
client = get_client(client_id, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion src/dda/utils/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def open_atomic(self, *args: Any, **kwargs: Any) -> Generator[IO[Any], None, Non

fd = -1
for _ in range(100):
path = self.parent / f".{self.name}.{token_hex(8)}.tmp"
path = self.parent / f".tmp.{token_hex(8)}.tmp"
try:
fd = os.open(
path,
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def test_open_atomic_uses_existing_permissions_for_temporary_file(self, tmp_path
try:
with path.open_atomic("w", encoding="utf-8") as f:
f.write("setting = true\n")
(temporary_path,) = tmp_path.glob(".config.toml.*.tmp")
(temporary_path,) = tmp_path.glob(".tmp.*.tmp")
assert temporary_path.stat().st_mode & 0o777 == 0o600
finally:
os.umask(old_umask)
Expand Down
Loading