Skip to content

Commit b7e8c6f

Browse files
committed
Add Braintrust deep research example
1 parent a98d4ea commit b7e8c6f

14 files changed

Lines changed: 2185 additions & 0 deletions

braintrust/README.md

Lines changed: 532 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
from temporalio import activity
2+
from openai import AsyncOpenAI
3+
import braintrust
4+
from braintrust import wrap_openai
5+
from typing import Optional, List, cast, Any, TypeVar, Generic
6+
from typing_extensions import Annotated
7+
from pydantic import BaseModel
8+
from pydantic.functional_validators import BeforeValidator
9+
from pydantic.functional_serializers import PlainSerializer
10+
11+
import importlib
12+
import os
13+
14+
T = TypeVar("T", bound=BaseModel)
15+
16+
17+
def _coerce_class(v: Any) -> type[Any]:
18+
"""Pydantic validator: convert string path to class during deserialization."""
19+
if isinstance(v, str):
20+
mod_path, sep, qual = v.partition(":")
21+
if not sep: # support "package.module.Class"
22+
mod_path, _, qual = v.rpartition(".")
23+
module = importlib.import_module(mod_path)
24+
obj = module
25+
for attr in qual.split("."):
26+
obj = getattr(obj, attr)
27+
return cast(type[Any], obj)
28+
elif isinstance(v, type):
29+
return v
30+
else:
31+
raise ValueError(f"Cannot coerce {v} to class")
32+
33+
34+
def _dump_class(t: type[Any]) -> str:
35+
"""Pydantic serializer: convert class to string path during serialization."""
36+
return f"{t.__module__}:{t.__qualname__}"
37+
38+
39+
# Custom type that automatically handles class <-> string conversion in Pydantic serialization
40+
ClassReference = Annotated[
41+
type[T],
42+
BeforeValidator(_coerce_class),
43+
PlainSerializer(_dump_class, return_type=str),
44+
]
45+
46+
47+
class InvokeModelRequest(BaseModel, Generic[T]):
48+
model: str
49+
instructions: str # Fallback if Braintrust prompt unavailable
50+
input: str
51+
prompt_slug: Optional[str] = None # Braintrust prompt slug (e.g., "report-synthesis")
52+
response_format: Optional[ClassReference[T]] = None
53+
tools: Optional[List[dict]] = None
54+
55+
56+
class InvokeModelResponse(BaseModel, Generic[T]):
57+
# response_format records the type of the response model
58+
response_format: Optional[ClassReference[T]] = None
59+
response_model: Any
60+
61+
@property
62+
def response(self) -> T:
63+
"""Reconstruct the original response type if response_format was provided."""
64+
if self.response_format:
65+
model_cls = self.response_format
66+
return model_cls.model_validate(self.response_model)
67+
return self.response_model
68+
69+
70+
@activity.defn
71+
async def invoke_model(request: InvokeModelRequest[T]) -> InvokeModelResponse[T]:
72+
instructions = request.instructions
73+
74+
# Load prompt from Braintrust if slug provided
75+
if request.prompt_slug:
76+
try:
77+
prompt = braintrust.load_prompt(
78+
project=os.environ.get("BRAINTRUST_PROJECT", "deep-research"),
79+
slug=request.prompt_slug,
80+
)
81+
# Extract system message content only
82+
# NOTE: Other params (temperature, max_tokens, model) are NOT used
83+
built = prompt.build()
84+
for msg in built.get("messages", []):
85+
if msg.get("role") == "system":
86+
instructions = msg["content"]
87+
activity.logger.info(
88+
f"Loaded prompt '{request.prompt_slug}' from Braintrust"
89+
)
90+
break
91+
except Exception as e:
92+
# Log warning but continue with fallback
93+
activity.logger.warning(
94+
f"Failed to load prompt '{request.prompt_slug}': {e}. "
95+
"Using hardcoded fallback."
96+
)
97+
98+
client = wrap_openai(AsyncOpenAI(max_retries=0))
99+
100+
kwargs: dict[str, Any] = {
101+
"model": request.model,
102+
"instructions": instructions,
103+
"input": request.input,
104+
}
105+
106+
if request.response_format:
107+
kwargs["text_format"] = request.response_format
108+
109+
if request.tools:
110+
kwargs["tools"] = request.tools
111+
112+
# Use responses API consistently
113+
resp = await client.responses.parse(**kwargs)
114+
115+
if request.response_format:
116+
# Convert structured response to dict for managed serialization.
117+
# This allows us to reconstruct the original response type while maintaining type safety.
118+
parsed_model = cast(BaseModel, resp.output_parsed)
119+
return InvokeModelResponse(
120+
response_model=parsed_model.model_dump(),
121+
response_format=request.response_format,
122+
)
123+
else:
124+
return InvokeModelResponse(
125+
response_model=resp.output_text, response_format=None
126+
)

braintrust/agents/config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""Configuration constants for the deep research system."""
2+
3+
# Model configuration constants
4+
# Change these values to switch models globally
5+
COMPLEX_REASONING_MODEL = "gpt-4o" # For planning and synthesis tasks
6+
EFFICIENT_PROCESSING_MODEL = "gpt-4o-mini" # For query generation and search analysis
7+
8+
# Alternative model options (uncomment to use):
9+
# COMPLEX_REASONING_MODEL = "gpt-5"
10+
# EFFICIENT_PROCESSING_MODEL = "gpt-5-mini"
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from .shared import ResearchPlan, today_str
2+
from .config import COMPLEX_REASONING_MODEL
3+
from activities.invoke_model import invoke_model, InvokeModelRequest
4+
from temporalio import workflow
5+
from datetime import timedelta
6+
7+
RESEARCH_PLANNING_INSTRUCTIONS = f"""
8+
You are a research planning specialist who creates focused research strategies.
9+
10+
CORE RESPONSIBILITIES:
11+
1. Decompose the user's question into 3-7 key research aspects
12+
2. Identify required sources and evidence types
13+
3. Design a practical search strategy
14+
4. Set clear success criteria
15+
16+
OUTPUT REQUIREMENTS:
17+
- research_question: Clarified version of the original query
18+
- key_aspects: Specific areas requiring investigation, each with:
19+
- aspect: The research area name
20+
- priority: 1-5 ranking (5 highest priority)
21+
- description: What needs to be investigated
22+
- expected_sources: Types of sources likely to contain relevant information
23+
- search_strategy: High-level approach for information gathering
24+
- success_criteria: Specific indicators of research completeness
25+
26+
TODAY'S DATE: {today_str()}
27+
"""
28+
29+
30+
async def plan_research(query: str) -> ResearchPlan:
31+
result = await workflow.execute_activity(
32+
invoke_model,
33+
InvokeModelRequest(
34+
model=COMPLEX_REASONING_MODEL,
35+
instructions=RESEARCH_PLANNING_INSTRUCTIONS,
36+
input=f"Research query: {query}",
37+
response_format=ResearchPlan,
38+
),
39+
start_to_close_timeout=timedelta(seconds=300),
40+
summary="Planning research",
41+
)
42+
return result.response
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from .shared import QueryPlan, ResearchPlan, today_str
2+
from .config import EFFICIENT_PROCESSING_MODEL
3+
from activities.invoke_model import invoke_model, InvokeModelRequest
4+
from temporalio import workflow
5+
from datetime import timedelta
6+
7+
QUERY_GENERATION_INSTRUCTIONS = f"""
8+
You are a search query specialist who crafts effective web searches.
9+
10+
CORE RESPONSIBILITIES:
11+
1. Generate 3-5 diverse search queries based on the research plan
12+
2. Balance specificity with discoverability
13+
3. Target different information types (factual, analytical, recent, historical)
14+
15+
APPROACH:
16+
- Vary query styles: direct questions, topic + keywords, source-specific searches
17+
- Include temporal modifiers when relevant (recent, 2024, historical)
18+
- Use domain-specific terminology appropriately
19+
20+
OUTPUT REQUIREMENTS:
21+
- queries: Search queries, each with:
22+
- query: The actual search string
23+
- rationale: Why this query addresses research needs
24+
- expected_info_type: One of "factual_data", "expert_analysis", "case_studies", "recent_news"
25+
- priority: 1-5 (5 highest priority)
26+
27+
TODAY'S DATE: {today_str()}
28+
"""
29+
30+
31+
async def generate_queries(research_plan: ResearchPlan) -> QueryPlan:
32+
# Prepare input with research plan context
33+
plan_context = f"""
34+
Research Question: {research_plan.research_question}
35+
36+
Key Aspects to Research:
37+
{chr(10).join([f"- {aspect.aspect} (Priority: {aspect.priority}): {aspect.description}" for aspect in research_plan.key_aspects])}
38+
39+
Expected Sources: {", ".join(research_plan.expected_sources)}
40+
Search Strategy: {research_plan.search_strategy}
41+
Success Criteria: {", ".join(research_plan.success_criteria)}
42+
"""
43+
44+
result = await workflow.execute_activity(
45+
invoke_model,
46+
InvokeModelRequest(
47+
model=EFFICIENT_PROCESSING_MODEL,
48+
instructions=QUERY_GENERATION_INSTRUCTIONS,
49+
input=plan_context,
50+
response_format=QueryPlan,
51+
),
52+
start_to_close_timeout=timedelta(seconds=300),
53+
summary="Generating search queries",
54+
)
55+
56+
return result.response
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
from typing import List
2+
from temporalio import workflow
3+
from datetime import timedelta
4+
from .shared import ResearchReport, ResearchPlan, SearchResult, today_str
5+
from .config import COMPLEX_REASONING_MODEL
6+
from activities.invoke_model import invoke_model, InvokeModelRequest
7+
8+
REPORT_SYNTHESIS_INSTRUCTIONS = f"""
9+
You are a research synthesis expert who creates comprehensive research reports.
10+
11+
CORE RESPONSIBILITIES:
12+
1. Synthesize all research into a coherent narrative
13+
2. Structure information logically with evidence support
14+
3. Provide comprehensive citations
15+
4. Assess confidence levels and acknowledge limitations
16+
5. Generate follow-up questions for deeper research
17+
18+
REPORT STRUCTURE:
19+
1. **Executive Summary**: Core findings and conclusions (1-2 paragraphs)
20+
2. **Detailed Analysis**: Examination organized by themes with evidence
21+
3. **Key Findings**: Bullet-point list of important discoveries
22+
4. **Confidence Assessment**: Rate findings as High/Medium/Low/Uncertain
23+
5. **Citations**: Complete source list with URLs
24+
6. **Follow-up Questions**: Up to 5 areas for additional research, as warranted
25+
26+
APPROACH:
27+
- Address contradictory findings transparently
28+
- Weight authoritative sources more heavily
29+
- Distinguish facts from expert opinions
30+
- Be explicit about information limitations
31+
32+
OUTPUT REQUIREMENTS:
33+
- executive_summary: 1-2 paragraph summary of core findings
34+
- detailed_analysis: Multi-paragraph analysis organized by themes
35+
- key_findings: Bullet-point discoveries
36+
- confidence_assessment: Assessment of finding reliability
37+
- citations: All sources referenced
38+
- follow_up_questions: 3-5 specific questions for further research
39+
40+
TODAY'S DATE: {today_str()}
41+
"""
42+
43+
44+
async def generate_synthesis(
45+
original_query: str, research_plan: ResearchPlan, search_results: List[SearchResult]
46+
) -> ResearchReport:
47+
# Prepare comprehensive input with all research context
48+
synthesis_input = f"""
49+
ORIGINAL RESEARCH QUERY: {original_query}
50+
51+
RESEARCH PLAN:
52+
Research Question: {research_plan.research_question}
53+
Key Aspects Investigated: {
54+
", ".join([aspect.aspect for aspect in research_plan.key_aspects])
55+
}
56+
Search Strategy Used: {research_plan.search_strategy}
57+
Success Criteria: {", ".join(research_plan.success_criteria)}
58+
59+
SEARCH RESULTS TO SYNTHESIZE:
60+
{
61+
chr(10).join(
62+
[
63+
f"Query: {result.query}{chr(10)}Findings: {result.key_findings}{chr(10)}Relevance: {result.relevance_score}{chr(10)}Sources: {', '.join(result.sources)}{chr(10)}Citations: {', '.join(result.citations)}{chr(10)}"
64+
for result in search_results
65+
]
66+
)
67+
}
68+
69+
Please synthesize all this information into a comprehensive research report following the specified structure and quality standards.
70+
"""
71+
result = await workflow.execute_activity(
72+
invoke_model,
73+
InvokeModelRequest(
74+
model=COMPLEX_REASONING_MODEL,
75+
instructions=REPORT_SYNTHESIS_INSTRUCTIONS, # Fallback
76+
input=synthesis_input,
77+
prompt_slug="report-synthesis", # Load from Braintrust if available
78+
response_format=ResearchReport,
79+
),
80+
start_to_close_timeout=timedelta(seconds=300),
81+
summary="Generating research report synthesis",
82+
)
83+
84+
return result.response
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from .shared import SearchResult, SearchQuery, today_str
2+
from .config import EFFICIENT_PROCESSING_MODEL
3+
from activities.invoke_model import invoke_model, InvokeModelRequest
4+
from temporalio import workflow
5+
from datetime import timedelta
6+
7+
WEB_SEARCH_INSTRUCTIONS = f"""
8+
You are a web research specialist who finds and evaluates information from web sources.
9+
10+
CORE RESPONSIBILITIES:
11+
1. Execute web searches using the web search tool
12+
2. Prioritize authoritative sources: academic, government, established research organizations, prominent news outlets, primary sources
13+
3. Extract key information relevant to the research question
14+
4. Provide proper citations and assess reliability
15+
16+
APPROACH:
17+
- Focus on information directly relevant to the research question
18+
- Extract specific facts, data points, and evidence
19+
- Note conflicting information and limitations
20+
- Flag questionable or unverified claims
21+
22+
OUTPUT REQUIREMENTS:
23+
- query: The search query that was executed
24+
- sources: URLs and source descriptions consulted
25+
- key_findings: Synthesized information relevant to research question (2-4 paragraphs)
26+
- relevance_score: 0.0-1.0 assessment of how well results address the query
27+
- citations: Formatted sources with URLs
28+
29+
TODAY'S DATE: {today_str()}
30+
"""
31+
32+
33+
async def search_web(query: SearchQuery) -> SearchResult:
34+
search_input = f"""
35+
Search Query: {query.query}
36+
Query Rationale: {query.rationale}
37+
Expected Information Type: {query.expected_info_type}
38+
Priority Level: {query.priority}
39+
40+
Please search for information using the provided query and analyze the results according to the instructions.
41+
"""
42+
result = await workflow.execute_activity(
43+
invoke_model,
44+
InvokeModelRequest(
45+
model=EFFICIENT_PROCESSING_MODEL,
46+
instructions=WEB_SEARCH_INSTRUCTIONS,
47+
input=search_input,
48+
response_format=SearchResult,
49+
tools=[{"type": "web_search"}],
50+
),
51+
start_to_close_timeout=timedelta(seconds=300),
52+
summary="Searching web for information",
53+
)
54+
return result.response

0 commit comments

Comments
 (0)