Skip to content

Commit 143144b

Browse files
committed
trace index upload
1 parent 1c5f069 commit 143144b

File tree

2 files changed

+259
-1
lines changed

2 files changed

+259
-1
lines changed

sentience/cloud_tracing.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,10 @@ def _do_upload(self, on_progress: Callable[[int, int], None] | None = None) -> N
213213
if on_progress:
214214
on_progress(compressed_size, compressed_size)
215215

216-
# Call /v1/traces/complete to report file sizes (NEW)
216+
# Upload trace index file
217+
self._upload_index()
218+
219+
# Call /v1/traces/complete to report file sizes
217220
self._complete_trace()
218221

219222
# Delete file only on successful upload
@@ -244,6 +247,95 @@ def _generate_index(self) -> None:
244247
# Non-fatal: log but don't crash
245248
print(f"⚠️ Failed to generate trace index: {e}")
246249

250+
def _upload_index(self) -> None:
251+
"""
252+
Upload trace index file to cloud storage.
253+
254+
Called after successful trace upload to provide fast timeline rendering.
255+
The index file enables O(1) step lookups without parsing the entire trace.
256+
"""
257+
# Construct index file path (same as trace file with .index.json extension)
258+
index_path = Path(str(self._path).replace(".jsonl", ".index.json"))
259+
260+
if not index_path.exists():
261+
if self.logger:
262+
self.logger.warning("Index file not found, skipping index upload")
263+
return
264+
265+
try:
266+
# Request index upload URL from API
267+
if not self.api_key:
268+
# No API key - skip index upload
269+
if self.logger:
270+
self.logger.info("No API key provided, skipping index upload")
271+
return
272+
273+
response = requests.post(
274+
f"{self.api_url}/v1/traces/index_upload",
275+
headers={"Authorization": f"Bearer {self.api_key}"},
276+
json={"run_id": self.run_id},
277+
timeout=10,
278+
)
279+
280+
if response.status_code != 200:
281+
if self.logger:
282+
self.logger.warning(
283+
f"Failed to get index upload URL: HTTP {response.status_code}"
284+
)
285+
return
286+
287+
upload_data = response.json()
288+
index_upload_url = upload_data.get("upload_url")
289+
290+
if not index_upload_url:
291+
if self.logger:
292+
self.logger.warning("No upload URL in index upload response")
293+
return
294+
295+
# Read and compress index file
296+
with open(index_path, "rb") as f:
297+
index_data = f.read()
298+
299+
compressed_index = gzip.compress(index_data)
300+
index_size = len(compressed_index)
301+
302+
if self.logger:
303+
self.logger.info(f"Index file size: {index_size / 1024:.2f} KB")
304+
305+
print(f"📤 [Sentience] Uploading trace index ({index_size} bytes)...")
306+
307+
# Upload index to cloud storage
308+
index_response = requests.put(
309+
index_upload_url,
310+
data=compressed_index,
311+
headers={
312+
"Content-Type": "application/json",
313+
"Content-Encoding": "gzip",
314+
},
315+
timeout=30,
316+
)
317+
318+
if index_response.status_code == 200:
319+
print("✅ [Sentience] Trace index uploaded successfully")
320+
321+
# Delete local index file after successful upload
322+
try:
323+
os.remove(index_path)
324+
except Exception:
325+
pass # Ignore cleanup errors
326+
else:
327+
if self.logger:
328+
self.logger.warning(
329+
f"Index upload failed: HTTP {index_response.status_code}"
330+
)
331+
print(f"⚠️ [Sentience] Index upload failed: HTTP {index_response.status_code}")
332+
333+
except Exception as e:
334+
# Non-fatal: log but don't crash
335+
if self.logger:
336+
self.logger.warning(f"Error uploading trace index: {e}")
337+
print(f"⚠️ [Sentience] Error uploading trace index: {e}")
338+
247339
def _complete_trace(self) -> None:
248340
"""
249341
Call /v1/traces/complete to report file sizes to gateway.

tests/test_cloud_tracing.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,3 +538,169 @@ def test_tracer_api_unchanged(self):
538538
# Verify all events written
539539
lines = trace_path.read_text().strip().split("\n")
540540
assert len(lines) == 5
541+
542+
def test_cloud_trace_sink_index_upload_success(self):
543+
"""Test CloudTraceSink uploads index file after trace upload."""
544+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
545+
run_id = "test-index-upload"
546+
547+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
548+
patch("sentience.cloud_tracing.requests.post") as mock_post:
549+
# Mock successful trace upload
550+
trace_response = Mock()
551+
trace_response.status_code = 200
552+
553+
# Mock successful index upload URL request
554+
index_url_response = Mock()
555+
index_url_response.status_code = 200
556+
index_url_response.json.return_value = {
557+
"upload_url": "https://sentience.nyc3.digitaloceanspaces.com/traces/test.index.json.gz"
558+
}
559+
560+
# Mock successful index upload
561+
index_upload_response = Mock()
562+
index_upload_response.status_code = 200
563+
564+
mock_put.side_effect = [trace_response, index_upload_response]
565+
mock_post.return_value = index_url_response
566+
567+
# Create sink and emit events
568+
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
569+
sink.emit({"v": 1, "type": "run_start", "seq": 1, "data": {"agent": "TestAgent"}})
570+
sink.emit({"v": 1, "type": "step_start", "seq": 2, "data": {"step": 1}})
571+
sink.emit({"v": 1, "type": "snapshot", "seq": 3, "data": {"url": "https://example.com"}})
572+
sink.emit({"v": 1, "type": "run_end", "seq": 4, "data": {"steps": 1}})
573+
574+
# Close triggers upload
575+
sink.close()
576+
577+
# Verify trace upload
578+
assert mock_put.call_count == 2 # Once for trace, once for index
579+
580+
# Verify index upload URL request
581+
assert mock_post.called
582+
assert "/v1/traces/index_upload" in mock_post.call_args[0][0]
583+
assert mock_post.call_args[1]["json"] == {"run_id": run_id}
584+
585+
# Verify index file upload
586+
index_call = mock_put.call_args_list[1]
587+
assert "index.json.gz" in index_call[0][0]
588+
assert index_call[1]["headers"]["Content-Type"] == "application/json"
589+
assert index_call[1]["headers"]["Content-Encoding"] == "gzip"
590+
591+
# Cleanup
592+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
593+
index_path = cache_dir / f"{run_id}.index.json"
594+
if index_path.exists():
595+
os.remove(index_path)
596+
597+
def test_cloud_trace_sink_index_upload_no_api_key(self):
598+
"""Test CloudTraceSink skips index upload when no API key provided."""
599+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
600+
run_id = "test-no-api-key"
601+
602+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
603+
patch("sentience.cloud_tracing.requests.post") as mock_post:
604+
# Mock successful trace upload
605+
mock_put.return_value = Mock(status_code=200)
606+
607+
# Create sink WITHOUT api_key
608+
sink = CloudTraceSink(upload_url, run_id=run_id)
609+
sink.emit({"v": 1, "type": "run_start", "seq": 1})
610+
611+
sink.close()
612+
613+
# Verify trace upload happened
614+
assert mock_put.called
615+
616+
# Verify index upload was NOT attempted (no API key)
617+
assert not mock_post.called
618+
619+
# Cleanup
620+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
621+
trace_path = cache_dir / f"{run_id}.jsonl"
622+
index_path = cache_dir / f"{run_id}.index.json"
623+
if trace_path.exists():
624+
os.remove(trace_path)
625+
if index_path.exists():
626+
os.remove(index_path)
627+
628+
def test_cloud_trace_sink_index_upload_failure_non_fatal(self, capsys):
629+
"""Test CloudTraceSink continues gracefully if index upload fails."""
630+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
631+
run_id = "test-index-fail"
632+
633+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
634+
patch("sentience.cloud_tracing.requests.post") as mock_post:
635+
# Mock successful trace upload
636+
trace_response = Mock()
637+
trace_response.status_code = 200
638+
639+
# Mock failed index upload URL request
640+
index_url_response = Mock()
641+
index_url_response.status_code = 500
642+
643+
mock_put.return_value = trace_response
644+
mock_post.return_value = index_url_response
645+
646+
# Create sink
647+
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
648+
sink.emit({"v": 1, "type": "run_start", "seq": 1})
649+
650+
# Close should succeed even if index upload fails
651+
sink.close()
652+
653+
# Verify trace upload succeeded
654+
assert mock_put.called
655+
656+
# Verify warning was printed
657+
captured = capsys.readouterr()
658+
# Index upload failure is non-fatal, so main upload should succeed
659+
assert "✅" in captured.out # Trace upload success
660+
661+
# Cleanup
662+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
663+
trace_path = cache_dir / f"{run_id}.jsonl"
664+
index_path = cache_dir / f"{run_id}.index.json"
665+
if trace_path.exists():
666+
os.remove(trace_path)
667+
if index_path.exists():
668+
os.remove(index_path)
669+
670+
def test_cloud_trace_sink_index_file_missing(self, capsys):
671+
"""Test CloudTraceSink handles missing index file gracefully."""
672+
upload_url = "https://sentience.nyc3.digitaloceanspaces.com/traces/test.jsonl.gz"
673+
run_id = "test-missing-index"
674+
675+
with patch("sentience.cloud_tracing.requests.put") as mock_put, \
676+
patch("sentience.cloud_tracing.requests.post") as mock_post, \
677+
patch("sentience.cloud_tracing.write_trace_index") as mock_write_index:
678+
# Mock index generation to fail (simulating missing index)
679+
mock_write_index.side_effect = Exception("Index generation failed")
680+
681+
# Mock successful trace upload
682+
mock_put.return_value = Mock(status_code=200)
683+
684+
# Create sink
685+
sink = CloudTraceSink(upload_url, run_id=run_id, api_key="sk_test_123")
686+
sink.emit({"v": 1, "type": "run_start", "seq": 1})
687+
688+
# Close should succeed even if index generation fails
689+
sink.close()
690+
691+
# Verify trace upload succeeded
692+
assert mock_put.called
693+
694+
# Verify index upload was not attempted (index file missing)
695+
assert not mock_post.called
696+
697+
# Verify warning was printed
698+
captured = capsys.readouterr()
699+
assert "⚠️" in captured.out
700+
assert "Failed to generate trace index" in captured.out
701+
702+
# Cleanup
703+
cache_dir = Path.home() / ".sentience" / "traces" / "pending"
704+
trace_path = cache_dir / f"{run_id}.jsonl"
705+
if trace_path.exists():
706+
os.remove(trace_path)

0 commit comments

Comments
 (0)