Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ muzzle {
pass {
group = 'com.openai'
module = 'openai-java'
versions = '[2.8.0,)'
versions = '[2.15.0,)'
}
}

Expand All @@ -17,7 +17,7 @@ dependencies {
compileOnly 'net.bytebuddy:byte-buddy:1.17.5'

// Target library — compileOnly because it will be on the app classpath at runtime
compileOnly 'com.openai:openai-java:2.8.0'
compileOnly 'com.openai:openai-java:2.15.0'

// Test dependencies
testImplementation(testFixtures(project(":test-harness")))
Expand All @@ -26,7 +26,7 @@ dependencies {
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testImplementation 'net.bytebuddy:byte-buddy-agent:1.17.5'
testRuntimeOnly "org.slf4j:slf4j-simple:${slf4jVersion}"
testImplementation 'com.openai:openai-java:2.8.0'
testImplementation 'com.openai:openai-java:2.15.0'
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
public class BraintrustOpenAI {
/** Instrument openai client with braintrust traces */
public static OpenAIClient wrapOpenAI(OpenTelemetry openTelemetry, OpenAIClient openAIClient) {
return dev.braintrust.instrumentation.openai.v2_8_0.BraintrustOpenAI.wrapOpenAI(
return dev.braintrust.instrumentation.openai.v2_15_0.BraintrustOpenAI.wrapOpenAI(
openTelemetry, openAIClient);
}

public static ChatCompletionCreateParams buildChatCompletionsPrompt(
BraintrustPrompt prompt, Map<String, Object> parameters) {
return dev.braintrust.instrumentation.openai.v2_8_0.BraintrustOpenAI
return dev.braintrust.instrumentation.openai.v2_15_0.BraintrustOpenAI
.buildChatCompletionsPrompt(prompt, parameters);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dev.braintrust.instrumentation.openai.v2_8_0;
package dev.braintrust.instrumentation.openai.v2_15_0;

import com.openai.client.OpenAIClient;
import com.openai.core.ClientOptions;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package dev.braintrust.instrumentation.openai.v2_8_0;
package dev.braintrust.instrumentation.openai.v2_15_0;

import com.fasterxml.jackson.databind.json.JsonMapper;
import com.openai.core.ObjectMappers;
import com.openai.core.RequestOptions;
import com.openai.core.http.HttpClient;
import com.openai.core.http.HttpRequest;
import com.openai.core.http.HttpRequestBody;
import com.openai.core.http.HttpResponse;
import com.openai.core.http.*;
import com.openai.helpers.ChatCompletionAccumulator;
import com.openai.helpers.ResponseAccumulator;
import com.openai.models.chat.completions.ChatCompletionChunk;
import com.openai.models.responses.ResponseStreamEvent;
import dev.braintrust.bootstrap.BraintrustBridge;
import dev.braintrust.instrumentation.InstrumentationSemConv;
import dev.braintrust.json.BraintrustJsonMapper;
Expand All @@ -23,6 +24,7 @@

@Slf4j
class TracingHttpClient implements HttpClient {
private static final JsonMapper JSON_MAPPER = ObjectMappers.jsonMapper();
private final Tracer tracer;
private final HttpClient underlying;

Expand Down Expand Up @@ -173,7 +175,8 @@ private static void tagSpanFromBuffer(Span span, byte[] bytes, Long timeToFirstT
if (bytes.length == 0) return;
try {
String firstLine = firstNonEmptyLine(bytes);
if (firstLine != null && firstLine.startsWith("data:")) {
if (firstLine != null
&& (firstLine.startsWith("data:") || firstLine.startsWith("event:"))) {
tagSpanFromSseBytes(span, bytes, timeToFirstTokenNanos);
} else {
InstrumentationSemConv.tagLLMSpanResponse(
Expand Down Expand Up @@ -205,26 +208,60 @@ private static String firstNonEmptyLine(byte[] bytes) {
private static void tagSpanFromSseBytes(
Span span, byte[] sseBytes, Long timeToFirstTokenNanos) {
try {
var accumulator = ChatCompletionAccumulator.create();
var reader =
new BufferedReader(
new InputStreamReader(
new ByteArrayInputStream(sseBytes), StandardCharsets.UTF_8));
String line;
String responseJson = null;
while ((line = reader.readLine()) != null) {
if (!line.startsWith("data:")) continue;
String data = line.substring("data:".length()).strip();
if (data.isEmpty() || data.equals("[DONE]")) continue;
ChatCompletionChunk chunk =
BraintrustJsonMapper.get().readValue(data, ChatCompletionChunk.class);
accumulator.accumulate(chunk);
var firstEventJson = line.substring("data:".length()).strip();
// after the first data chunk is found, read the rest of the stream with the proper
// accumulator type
var jsonTree = JSON_MAPPER.readTree(firstEventJson);
if (jsonTree.has("type") && jsonTree.get("type").asText().startsWith("response")) {
// response API SSEvents
ResponseAccumulator accumulator = ResponseAccumulator.create();
accumulator.accumulate(
JSON_MAPPER.readValue(firstEventJson, ResponseStreamEvent.class));
while ((line = reader.readLine()) != null) {
if (!line.startsWith("data:")) continue;
String data = line.substring("data:".length()).strip();
if (data.isEmpty() || data.equals("[DONE]")) continue;
ResponseStreamEvent rse =
JSON_MAPPER.readValue(data, ResponseStreamEvent.class);
accumulator.accumulate(rse);
}
responseJson = JSON_MAPPER.writeValueAsString(accumulator.response());
} else if (jsonTree.has("object")
&& jsonTree.get("object").asText().equals("chat.completion.chunk")) {
// completions API SSEvents
var accumulator = ChatCompletionAccumulator.create();
accumulator.accumulate(
JSON_MAPPER.readValue(firstEventJson, ChatCompletionChunk.class));
while ((line = reader.readLine()) != null) {
if (!line.startsWith("data:")) continue;
String data = line.substring("data:".length()).strip();
if (data.isEmpty() || data.equals("[DONE]")) continue;
ChatCompletionChunk chunk =
BraintrustJsonMapper.get()
.readValue(data, ChatCompletionChunk.class);
accumulator.accumulate(chunk);
}
responseJson = JSON_MAPPER.writeValueAsString(accumulator.chatCompletion());
} else {
log.warn("unknown SSE object {}", firstEventJson);
}
break;
}
if (null != responseJson) {
InstrumentationSemConv.tagLLMSpanResponse(
span,
InstrumentationSemConv.PROVIDER_NAME_OPENAI,
responseJson,
timeToFirstTokenNanos);
}
var chatCompletion = accumulator.chatCompletion();
InstrumentationSemConv.tagLLMSpanResponse(
span,
InstrumentationSemConv.PROVIDER_NAME_OPENAI,
BraintrustJsonMapper.toJson(chatCompletion),
timeToFirstTokenNanos);
} catch (Exception e) {
log.error("Could not parse SSE buffer to tag streaming span output", e);
}
Expand All @@ -233,8 +270,7 @@ private static void tagSpanFromSseBytes(
/**
* {@link HttpResponse} wrapper for streaming (SSE) responses. Its {@link #body()} returns a tee
* {@link InputStream} that copies every byte the caller reads into an in-memory buffer. When
* the stream is fully consumed and {@link #close()} is called, the accumulated bytes are
* available via {@link #collectedBytes()} for span tagging.
* the stream is fully consumed and {@link #close()} is called.
*/
private static final class TeeingStreamHttpResponse implements HttpResponse {
private final HttpResponse delegate;
Expand Down Expand Up @@ -271,17 +307,13 @@ private void onStreamClosed() {
}
}

byte[] collectedBytes() {
return teeBuffer.toByteArray();
}

@Override
public int statusCode() {
return delegate.statusCode();
}

@Override
public com.openai.core.http.Headers headers() {
public Headers headers() {
return delegate.headers();
}

Expand All @@ -295,7 +327,7 @@ public void close() {
try {
teeStream.close(); // triggers onStreamClosed if not already fired (e.g. abandoned
// stream)
} catch (java.io.IOException ignored) {
} catch (IOException ignored) {
}
delegate.close();
}
Expand All @@ -322,7 +354,7 @@ private static final class TeeInputStream extends InputStream {
}

@Override
public int read() throws java.io.IOException {
public int read() throws IOException {
int b = source.read();
if (b == -1) {
notifyClosed();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package dev.braintrust.instrumentation.openai.v2_8_0.auto;
package dev.braintrust.instrumentation.openai.v2_15_0.auto;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
Expand All @@ -8,7 +8,7 @@
import dev.braintrust.instrumentation.InstrumentationModule;
import dev.braintrust.instrumentation.TypeInstrumentation;
import dev.braintrust.instrumentation.TypeTransformer;
import dev.braintrust.instrumentation.openai.v2_8_0.BraintrustOpenAI;
import dev.braintrust.instrumentation.openai.v2_15_0.BraintrustOpenAI;
import io.opentelemetry.api.GlobalOpenTelemetry;
import java.util.List;
import java.util.Set;
Expand All @@ -20,10 +20,10 @@
@AutoService(InstrumentationModule.class)
public class OpenAIInstrumentationModule extends InstrumentationModule {
private static final String MANUAL_INSTRUMENTATION_PACKAGE =
"dev.braintrust.instrumentation.openai.v2_8_0.";
"dev.braintrust.instrumentation.openai.v2_15_0.";

public OpenAIInstrumentationModule() {
super("openai_2_8_0");
super("openai_2_15_0");
}

@Override
Expand Down
Loading
Loading