1717import requests
1818
1919from sentience .models import TraceStats
20+ from sentience .trace_file_manager import TraceFileManager
2021from sentience .tracing import TraceSink
2122
2223
@@ -98,7 +99,7 @@ def __init__(
9899 # Use persistent cache directory instead of temp file
99100 # This ensures traces survive process crashes
100101 cache_dir = Path .home () / ".sentience" / "traces" / "pending"
101- cache_dir . mkdir ( parents = True , exist_ok = True )
102+ TraceFileManager . ensure_directory ( cache_dir )
102103
103104 # Persistent file (survives process crash)
104105 self ._path = cache_dir / f"{ run_id } .jsonl"
@@ -124,9 +125,7 @@ def emit(self, event: dict[str, Any]) -> None:
124125 if self ._closed :
125126 raise RuntimeError ("CloudTraceSink is closed" )
126127
127- json_str = json .dumps (event , ensure_ascii = False )
128- self ._trace_file .write (json_str + "\n " )
129- self ._trace_file .flush () # Ensure written to disk
128+ TraceFileManager .write_event (self ._trace_file , event )
130129
131130 def close (
132131 self ,
@@ -385,7 +384,9 @@ def _upload_index(self) -> None:
385384 if self .logger :
386385 self .logger .warning (f"Error uploading trace index: { e } " )
387386
388- def _infer_final_status_from_trace (self ) -> str :
387+ def _infer_final_status_from_trace (
388+ self , events : list [dict [str , Any ]], run_end : dict [str , Any ] | None
389+ ) -> str :
389390 """
390391 Infer final status from trace events by reading the trace file.
391392
@@ -436,92 +437,20 @@ def _infer_final_status_from_trace(self) -> str:
436437 # If we can't read the trace, default to unknown
437438 return "unknown"
438439
439- def _extract_stats_from_trace (self ) -> dict [ str , Any ] :
440+ def _extract_stats_from_trace (self ) -> TraceStats :
440441 """
441442 Extract execution statistics from trace file.
442443
443444 Returns:
444- Dictionary with stats fields for /v1/traces/complete
445+ TraceStats with stats fields for /v1/traces/complete
445446 """
446447 try :
447448 # Read trace file to extract stats
448- with open (self ._path , encoding = "utf-8" ) as f :
449- events = []
450- for line in f :
451- line = line .strip ()
452- if not line :
453- continue
454- try :
455- event = json .loads (line )
456- events .append (event )
457- except json .JSONDecodeError :
458- continue
459-
460- if not events :
461- return TraceStats (
462- total_steps = 0 ,
463- total_events = 0 ,
464- duration_ms = None ,
465- final_status = "unknown" ,
466- started_at = None ,
467- ended_at = None ,
468- )
469-
470- # Find run_start and run_end events
471- run_start = next ((e for e in events if e .get ("type" ) == "run_start" ), None )
472- run_end = next ((e for e in events if e .get ("type" ) == "run_end" ), None )
473-
474- # Extract timestamps
475- started_at : str | None = None
476- ended_at : str | None = None
477- if run_start :
478- started_at = run_start .get ("ts" )
479- if run_end :
480- ended_at = run_end .get ("ts" )
481-
482- # Calculate duration
483- duration_ms : int | None = None
484- if started_at and ended_at :
485- try :
486- from datetime import datetime
487-
488- start_dt = datetime .fromisoformat (started_at .replace ("Z" , "+00:00" ))
489- end_dt = datetime .fromisoformat (ended_at .replace ("Z" , "+00:00" ))
490- delta = end_dt - start_dt
491- duration_ms = int (delta .total_seconds () * 1000 )
492- except Exception :
493- pass
494-
495- # Count steps (from step_start events, only first attempt)
496- step_indices = set ()
497- for event in events :
498- if event .get ("type" ) == "step_start" :
499- step_index = event .get ("data" , {}).get ("step_index" )
500- if step_index is not None :
501- step_indices .add (step_index )
502- total_steps = len (step_indices ) if step_indices else 0
503-
504- # If run_end has steps count, use that (more accurate)
505- if run_end :
506- steps_from_end = run_end .get ("data" , {}).get ("steps" )
507- if steps_from_end is not None :
508- total_steps = max (total_steps , steps_from_end )
509-
510- # Count total events
511- total_events = len (events )
512-
513- # Infer final status
514- final_status = self ._infer_final_status_from_trace ()
515-
516- return TraceStats (
517- total_steps = total_steps ,
518- total_events = total_events ,
519- duration_ms = duration_ms ,
520- final_status = final_status ,
521- started_at = started_at ,
522- ended_at = ended_at ,
449+ events = TraceFileManager .read_events (self ._path )
450+ # Use TraceFileManager to extract stats (with custom status inference)
451+ return TraceFileManager .extract_stats (
452+ events , infer_status_func = self ._infer_final_status_from_trace
523453 )
524-
525454 except Exception as e :
526455 if self .logger :
527456 self .logger .warning (f"Error extracting stats from trace: { e } " )
@@ -593,28 +522,20 @@ def _extract_screenshots_from_trace(self) -> dict[int, dict[str, Any]]:
593522 sequence = 0
594523
595524 try :
596- with open (self ._path , encoding = "utf-8" ) as f :
597- for line in f :
598- line = line .strip ()
599- if not line :
600- continue
601-
602- try :
603- event = json .loads (line )
604- # Check if this is a snapshot event with screenshot
605- if event .get ("type" ) == "snapshot" :
606- data = event .get ("data" , {})
607- screenshot_base64 = data .get ("screenshot_base64" )
608-
609- if screenshot_base64 :
610- sequence += 1
611- screenshots [sequence ] = {
612- "base64" : screenshot_base64 ,
613- "format" : data .get ("screenshot_format" , "jpeg" ),
614- "step_id" : event .get ("step_id" ),
615- }
616- except json .JSONDecodeError :
617- continue
525+ events = TraceFileManager .read_events (self ._path )
526+ for event in events :
527+ # Check if this is a snapshot event with screenshot
528+ if event .get ("type" ) == "snapshot" :
529+ data = event .get ("data" , {})
530+ screenshot_base64 = data .get ("screenshot_base64" )
531+
532+ if screenshot_base64 :
533+ sequence += 1
534+ screenshots [sequence ] = {
535+ "base64" : screenshot_base64 ,
536+ "format" : data .get ("screenshot_format" , "jpeg" ),
537+ "step_id" : event .get ("step_id" ),
538+ }
618539 except Exception as e :
619540 if self .logger :
620541 self .logger .error (f"Error extracting screenshots: { e } " )
@@ -629,34 +550,23 @@ def _create_cleaned_trace(self, output_path: Path) -> None:
629550 output_path: Path to write cleaned trace file
630551 """
631552 try :
632- with (
633- open (self ._path , encoding = "utf-8" ) as infile ,
634- open (output_path , "w" , encoding = "utf-8" ) as outfile ,
635- ):
636- for line in infile :
637- line = line .strip ()
638- if not line :
639- continue
640-
641- try :
642- event = json .loads (line )
643- # Remove screenshot_base64 from snapshot events
644- if event .get ("type" ) == "snapshot" :
645- data = event .get ("data" , {})
646- if "screenshot_base64" in data :
647- # Create copy without screenshot fields
648- cleaned_data = {
649- k : v
650- for k , v in data .items ()
651- if k not in ("screenshot_base64" , "screenshot_format" )
652- }
653- event ["data" ] = cleaned_data
654-
655- # Write cleaned event
656- outfile .write (json .dumps (event , ensure_ascii = False ) + "\n " )
657- except json .JSONDecodeError :
658- # Skip invalid lines
659- continue
553+ events = TraceFileManager .read_events (self ._path )
554+ with open (output_path , "w" , encoding = "utf-8" ) as outfile :
555+ for event in events :
556+ # Remove screenshot_base64 from snapshot events
557+ if event .get ("type" ) == "snapshot" :
558+ data = event .get ("data" , {})
559+ if "screenshot_base64" in data :
560+ # Create copy without screenshot fields
561+ cleaned_data = {
562+ k : v
563+ for k , v in data .items ()
564+ if k not in ("screenshot_base64" , "screenshot_format" )
565+ }
566+ event ["data" ] = cleaned_data
567+
568+ # Write cleaned event
569+ TraceFileManager .write_event (outfile , event )
660570 except Exception as e :
661571 if self .logger :
662572 self .logger .error (f"Error creating cleaned trace: { e } " )
0 commit comments