77import gzip
88import json
99import os
10- import tempfile
10+ import threading
11+ from collections .abc import Callable
12+ from pathlib import Path
1113from typing import Any
1214
1315import requests
1416
15- from sentience .tracing import TraceEvent , TraceSink
17+ from sentience .tracing import TraceSink
1618
1719
1820class CloudTraceSink (TraceSink ):
1921 """
2022 Enterprise Cloud Sink: "Local Write, Batch Upload" pattern.
2123
2224 Architecture:
23- 1. **Local Buffer**: Writes to temp file (zero latency, non-blocking)
25+ 1. **Local Buffer**: Writes to persistent cache directory (zero latency, non-blocking)
2426 2. **Pre-signed URL**: Uses secure pre-signed PUT URL from backend API
2527 3. **Batch Upload**: Uploads complete file on close() or at intervals
2628 4. **Zero Credential Exposure**: Never embeds DigitalOcean credentials in SDK
29+ 5. **Crash Recovery**: Traces survive process crashes (stored in ~/.sentience/traces/pending/)
2730
2831 This design ensures:
2932 - Fast agent performance (microseconds per emit, not milliseconds)
3033 - Security (credentials stay on backend)
3134 - Reliability (network issues don't crash the agent)
35+ - Data durability (traces survive crashes and can be recovered)
3236
3337 Tiered Access:
3438 - Free Tier: Falls back to JsonlTraceSink (local-only)
@@ -39,36 +43,40 @@ class CloudTraceSink(TraceSink):
3943 >>> from sentience.tracing import Tracer
4044 >>> # Get upload URL from API
4145 >>> upload_url = "https://sentience.nyc3.digitaloceanspaces.com/..."
42- >>> sink = CloudTraceSink(upload_url)
46+ >>> sink = CloudTraceSink(upload_url, run_id="demo" )
4347 >>> tracer = Tracer(run_id="demo", sink=sink)
4448 >>> tracer.emit_run_start("SentienceAgent")
4549 >>> tracer.close() # Uploads to cloud
50+ >>> # Or non-blocking:
51+ >>> tracer.close(blocking=False) # Returns immediately
4652 """
4753
48- def __init__ (self , upload_url : str ):
54+ def __init__ (self , upload_url : str , run_id : str ):
4955 """
5056 Initialize cloud trace sink.
5157
5258 Args:
5359 upload_url: Pre-signed PUT URL from Sentience API
5460 (e.g., "https://sentience.nyc3.digitaloceanspaces.com/...")
61+ run_id: Unique identifier for this agent run (used for persistent cache)
5562 """
5663 self .upload_url = upload_url
64+ self .run_id = run_id
5765
58- # Create temporary file for buffering
59- # delete=False so we can read it back before uploading
60- self ._temp_file = tempfile .NamedTemporaryFile (
61- mode = "w+" ,
62- encoding = "utf-8" ,
63- suffix = ".jsonl" ,
64- delete = False ,
65- )
66- self ._path = self ._temp_file .name
66+ # Use persistent cache directory instead of temp file
67+ # This ensures traces survive process crashes
68+ cache_dir = Path .home () / ".sentience" / "traces" / "pending"
69+ cache_dir .mkdir (parents = True , exist_ok = True )
70+
71+ # Persistent file (survives process crash)
72+ self ._path = cache_dir / f"{ run_id } .jsonl"
73+ self ._trace_file = open (self ._path , "w" , encoding = "utf-8" )
6774 self ._closed = False
75+ self ._upload_successful = False
6876
6977 def emit (self , event : dict [str , Any ]) -> None :
7078 """
71- Write event to local temp file (Fast, non-blocking).
79+ Write event to local persistent file (Fast, non-blocking).
7280
7381 Performance: ~10 microseconds per write vs ~50ms for HTTP request
7482
@@ -79,31 +87,68 @@ def emit(self, event: dict[str, Any]) -> None:
7987 raise RuntimeError ("CloudTraceSink is closed" )
8088
8189 json_str = json .dumps (event , ensure_ascii = False )
82- self ._temp_file .write (json_str + "\n " )
83- self ._temp_file .flush () # Ensure written to disk
84-
85- def close (self ) -> None :
90+ self ._trace_file .write (json_str + "\n " )
91+ self ._trace_file .flush () # Ensure written to disk
92+
93+ def close (
94+ self ,
95+ blocking : bool = True ,
96+ on_progress : Callable [[int , int ], None ] | None = None ,
97+ ) -> None :
8698 """
8799 Upload buffered trace to cloud via pre-signed URL.
88100
101+ Args:
102+ blocking: If False, returns immediately and uploads in background thread
103+ on_progress: Optional callback(uploaded_bytes, total_bytes) for progress updates
104+
89105 This is the only network call - happens once at the end.
90106 """
91107 if self ._closed :
92108 return
93109
94110 self ._closed = True
95111
112+ # Close file first
113+ self ._trace_file .close ()
114+
115+ if not blocking :
116+ # Fire-and-forget background upload
117+ thread = threading .Thread (
118+ target = self ._do_upload ,
119+ args = (on_progress ,),
120+ daemon = True ,
121+ )
122+ thread .start ()
123+ return # Return immediately
124+
125+ # Blocking mode
126+ self ._do_upload (on_progress )
127+
128+ def _do_upload (self , on_progress : Callable [[int , int ], None ] | None = None ) -> None :
129+ """
130+ Internal upload method with progress tracking.
131+
132+ Args:
133+ on_progress: Optional callback(uploaded_bytes, total_bytes) for progress updates
134+ """
96135 try :
97- # 1. Close temp file
98- self ._temp_file .close ()
136+ # Read file size for progress
137+ file_size = os .path .getsize (self ._path )
138+
139+ if on_progress :
140+ on_progress (0 , file_size )
99141
100- # 2. Compress for upload
142+ # Read and compress
101143 with open (self ._path , "rb" ) as f :
102144 trace_data = f .read ()
103145
104146 compressed_data = gzip .compress (trace_data )
105147
106- # 3. Upload to DigitalOcean Spaces via pre-signed URL
148+ if on_progress :
149+ on_progress (len (compressed_data ), file_size )
150+
151+ # Upload to DigitalOcean Spaces via pre-signed URL
107152 print (f"📤 [Sentience] Uploading trace to cloud ({ len (compressed_data )} bytes)..." )
108153
109154 response = requests .put (
@@ -117,27 +162,26 @@ def close(self) -> None:
117162 )
118163
119164 if response .status_code == 200 :
165+ self ._upload_successful = True
120166 print ("✅ [Sentience] Trace uploaded successfully" )
167+ # Delete file only on successful upload
168+ if os .path .exists (self ._path ):
169+ try :
170+ os .remove (self ._path )
171+ except Exception :
172+ pass # Ignore cleanup errors
121173 else :
174+ self ._upload_successful = False
122175 print (f"❌ [Sentience] Upload failed: HTTP { response .status_code } " )
123176 print (f" Response: { response .text } " )
124177 print (f" Local trace preserved at: { self ._path } " )
125178
126179 except Exception as e :
180+ self ._upload_successful = False
127181 print (f"❌ [Sentience] Error uploading trace: { e } " )
128182 print (f" Local trace preserved at: { self ._path } " )
129183 # Don't raise - preserve trace locally even if upload fails
130184
131- finally :
132- # 4. Cleanup temp file (only if upload succeeded)
133- if os .path .exists (self ._path ):
134- try :
135- # Only delete if upload was successful
136- if hasattr (self , "_upload_successful" ) and self ._upload_successful :
137- os .remove (self ._path )
138- except Exception :
139- pass # Ignore cleanup errors
140-
141185 def __enter__ (self ):
142186 """Context manager support."""
143187 return self
0 commit comments