Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 3 additions & 33 deletions lib/braintrust/api/internal/btql.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,26 @@ module Internal
# Internal BTQL client for querying spans.
# Not part of the public API — instantiated directly where needed.
class BTQL
# Maximum number of retries before returning partial results.
# Covers both freshness lag (partially indexed) and ingestion lag
# (spans not yet visible to BTQL after OTel flush).
MAX_FRESHNESS_RETRIES = 7

# Base delay (seconds) between retries (doubles each attempt, capped).
FRESHNESS_BASE_DELAY = 1.0

# Maximum delay (seconds) between retries. Caps exponential growth
# so we keep polling at a reasonable rate in the later window.
# Schedule: 1, 2, 4, 8, 8, 8, 8 = ~39s total worst-case.
MAX_FRESHNESS_DELAY = 8.0

def initialize(state)
@state = state
end

# Query spans belonging to a specific trace within an object.
#
# Builds a BTQL SQL query that matches the root_span_id and excludes scorer spans.
# Retries with exponential backoff if the response indicates data is not yet fresh.
# Returns a single-shot result; callers are responsible for retry and error handling.
#
# @param object_type [String] e.g. "experiment"
# @param object_id [String] Object UUID
# @param root_span_id [String] Hex trace ID of the root span
# @return [Array<Hash>] Parsed span data
# @return [Array(Array<Hash>, String)] [rows, freshness]
def trace_spans(object_type:, object_id:, root_span_id:)
query = build_trace_query(
object_type: object_type,
object_id: object_id,
root_span_id: root_span_id
)
payload = {query: query, fmt: "jsonl"}

retries = 0
loop do
rows, freshness = execute_query(payload)
# Return when data is fresh AND non-empty, or we've exhausted retries.
# We retry on empty even when "complete" because there is ingestion lag
# between OTel flush and BTQL indexing — the server may report "complete"
# before it knows about newly-flushed spans.
return rows if (freshness == "complete" && !rows.empty?) || retries >= MAX_FRESHNESS_RETRIES

retries += 1
delay = [FRESHNESS_BASE_DELAY * (2**(retries - 1)), MAX_FRESHNESS_DELAY].min
sleep(delay)
end
rescue => e
Braintrust::Log.warn("[BTQL] Query failed: #{e.message}")
[]
execute_query(query: query, fmt: "jsonl")
end

private
Expand Down
21 changes: 18 additions & 3 deletions lib/braintrust/eval/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require_relative "trace"
require_relative "../internal/thread_pool"
require_relative "../api/internal/btql"
require_relative "../internal/retry"

require "opentelemetry/sdk"
require "json"
Expand Down Expand Up @@ -223,9 +224,23 @@ def build_trace(eval_span)
object_id = eval_context.experiment_id
btql = API::Internal::BTQL.new(eval_context.state)

Eval::Trace.new(
spans: -> { btql.trace_spans(object_type: object_type, object_id: object_id, root_span_id: root_span_id) }
)
Eval::Trace.new(spans: -> { fetch_trace_spans(btql, object_type, object_id, root_span_id) })
end

# Fetch trace spans with retry to handle freshness and ingestion lag.
# @return [Array<Hash>] Parsed span data
def fetch_trace_spans(btql, object_type, object_id, root_span_id)
rows, _freshness = Internal::Retry.with_backoff(
max_retries: 7, base_delay: 1.0, max_delay: 8.0,
until: ->(result) {
r, f = result
f == "complete" && !r.empty?
}
) { btql.trace_spans(object_type: object_type, object_id: object_id, root_span_id: root_span_id) }
rows || []
rescue => e
Braintrust::Log.warn("[BTQL] Query failed: #{e.message}")
[]
end

# Build a CaseContext from a Case struct
Expand Down
41 changes: 41 additions & 0 deletions lib/braintrust/internal/retry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

module Braintrust
module Internal
module Retry
MAX_RETRIES = 7
BASE_DELAY = 1.0
MAX_DELAY = 8.0

# Retry a block with exponential backoff.
#
# The block is the task to attempt. Its return value is captured each attempt.
#
# @param max_retries [Integer] Maximum number of retries after the first attempt
# @param base_delay [Float] Initial delay in seconds (doubles each retry)
# @param max_delay [Float] Cap on delay between retries
# @param until [Proc, nil] Optional condition — receives block result, truthy stops retrying.
# When omitted, the block result's own truthiness decides.
# @return The last block result (whether retries were exhausted or condition was met)
#
# @example Simple: retry until truthy
# conn = Retry.with_backoff(max_retries: 5) { try_connect }
#
# @example With condition: retry until non-empty
# data = Retry.with_backoff(until: ->(r) { r.any? }) { api.fetch }
#
def self.with_backoff(max_retries: MAX_RETRIES, base_delay: BASE_DELAY, max_delay: MAX_DELAY, until: nil, &task)
check = binding.local_variable_get(:until)
result = task.call
retries = 0
while retries < max_retries && !(check ? check.call(result) : result)
retries += 1
delay = [base_delay * (2**(retries - 1)), max_delay].min
sleep(delay)
result = task.call
end
result
end
end
end
end
26 changes: 11 additions & 15 deletions test/braintrust/api/internal/btql_integration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,23 @@ def test_trace_spans_queries_experiment

# Query back via BTQL
btql = Braintrust::API::Internal::BTQL.new(state)
result = btql.trace_spans(
rows, freshness = btql.trace_spans(
object_type: "experiment",
object_id: experiment["id"],
root_span_id: root_span_id
)

refute_empty result, "BTQL should return spans for the trace"
refute_empty rows, "BTQL should return spans for the trace"
assert_equal "complete", freshness

result.each do |span|
rows.each do |span|
assert span.key?("span_id"), "span should have span_id"
assert span.key?("root_span_id"), "span should have root_span_id"
assert span.key?("span_attributes"), "span should have span_attributes"
end

# Verify score spans are excluded by the query filter
types = result.map { |s| s.dig("span_attributes", "type") }
types = rows.map { |s| s.dig("span_attributes", "type") }
refute_includes types, "score"
ensure
cleanup_experiment(experiments, experiment)
Expand All @@ -76,18 +77,13 @@ def test_trace_spans_returns_empty_for_nonexistent_trace
)

btql = Braintrust::API::Internal::BTQL.new(state)
rows, _freshness = btql.trace_spans(
object_type: "experiment",
object_id: experiment["id"],
root_span_id: "0000000000000000ffffffffffffffff"
)

Braintrust::API::Internal::BTQL.stub_const(:FRESHNESS_BASE_DELAY, 0.001) do
Braintrust::API::Internal::BTQL.stub_const(:MAX_FRESHNESS_DELAY, 0.001) do
result = btql.trace_spans(
object_type: "experiment",
object_id: experiment["id"],
root_span_id: "0000000000000000ffffffffffffffff"
)

assert_equal [], result
end
end
assert_equal [], rows
ensure
cleanup_experiment(experiments, experiment)
end
Expand Down
Loading
Loading