Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class TraceExporter # rubocop:disable Metrics/ClassLength
# Default timeouts in seconds.
KEEP_ALIVE_TIMEOUT = 30
RETRY_COUNT = 5
private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT)
RESPONSE_BODY_LIMIT = 4_194_304 # 4 MB
private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT, :RESPONSE_BODY_LIMIT)

ERROR_MESSAGE_INVALID_HEADERS = 'headers must be a String with comma-separated URL Encoded UTF-8 k=v pairs or a Hash'
private_constant(:ERROR_MESSAGE_INVALID_HEADERS)
Expand Down Expand Up @@ -158,18 +159,19 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/MethodLength

case response
when Net::HTTPSuccess
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
SUCCESS
when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1, reason: response.code)
FAILURE
when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
redo if backoff?(retry_count: retry_count += 1, reason: response.code)
FAILURE
when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError
log_status(response.body)
body = read_response_body(response)
log_status(body)
@metrics_reporter.add_to_counter('otel.otlp_exporter.failure', labels: { 'reason' => response.code })
FAILURE
when Net::HTTPRedirection
Expand Down Expand Up @@ -216,14 +218,42 @@ def handle_redirect(location)
end

def log_status(body)
truncation_note = @body_truncated ? ' (body truncated due to size limit)' : ''
status = Google::Rpc::Status.decode(body)
details = status.details.map do |detail|
klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass
detail.unpack(klass_or_nil) if klass_or_nil
end.compact
OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}}")
OpenTelemetry.handle_error(message: "OTLP exporter received rpc.Status{message=#{status.message}, details=#{details}}#{truncation_note}")
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::Exporter#log_status')
OpenTelemetry.handle_error(exception: e, message: "unexpected error decoding rpc.Status in OTLP::Exporter#log_status#{truncation_note}")
ensure
@body_truncated = false
end

def read_response_body(response)
return '' if response.nil?

body = +''
truncated = false

response.read_body do |chunk|
if body.bytesize + chunk.bytesize <= RESPONSE_BODY_LIMIT
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before starting to read the response body, should we not first read the content-length or transfer-encoding and short circuit if the body exceeds the limit?

IIUC the Go collector will switch to using chunked responses that exceed the 2KiB default buffer size. In cases where the response is less than 2KiB or the backend supports larger buffer sizes we may get the content-length responses and exit early.

https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Content-Length

https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Transfer-Encoding

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of using the headers to make this more performant!

Just to make sure I'm on the same page, are you proposing:

  • Use the transfer-encoding / content-length headers to determine body size
  • Skip chunking when the body is less than the limit and just return the body
  • Truncate the response using chunking for bodies that exceed the limit

Or are you suggesting something else?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that it is either or.

The response will either have a Content-Length or Transfer-Encoding Chunked

In the case the Content-Length is present and it exceeds the size then discard the response body and exit immediately.

If it's a chunked response use the code you've introduced here.

body << chunk
else
remaining = RESPONSE_BODY_LIMIT - body.bytesize
body << chunk.byteslice(0, remaining) if remaining > 0
truncated = true
break
end
end

body.force_encoding('UTF-8')
@body_truncated = truncated
body
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'error reading response body')
''
end

def measure_request_duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,4 +742,81 @@
end
end
end

describe 'response body reading' do
let(:exporter) { OpenTelemetry::Exporter::OTLP::HTTP::TraceExporter.new }
let(:span_data) { OpenTelemetry::TestHelpers.create_span_data }

it 'discards body for successful responses without reading into memory' do
stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 200, body: 'success body')

result = exporter.export([span_data])

_(result).must_equal(success)
end

it 'discards body for retryable responses without reading into memory' do
stub_request(:post, 'http://localhost:4318/v1/traces')
.to_return(status: 503, body: 'service unavailable', headers: { 'Retry-After' => '0' })
.then.to_return(status: 200)

result = exporter.export([span_data])

_(result).must_equal(success)
end

it 'reads and parses error response body smaller than limit' do
log_stream = StringIO.new
logger = OpenTelemetry.logger
OpenTelemetry.logger = ::Logger.new(log_stream)

details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: 'error details'))]
status = ::Google::Rpc::Status.encode(::Google::Rpc::Status.new(code: 3, message: 'invalid argument', details: details))
stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: status)

result = exporter.export([span_data])

_(result).must_equal(export_failure)
_(log_stream.string).must_match(/invalid argument/)
_(log_stream.string).wont_match(/truncated/)
ensure
OpenTelemetry.logger = logger
end

it 'truncates error response body larger than 4 MB limit' do
log_stream = StringIO.new
logger = OpenTelemetry.logger
OpenTelemetry.logger = ::Logger.new(log_stream)

# Create a body larger than 4 MB
large_message = 'x' * 5_000_000 # 5 MB
details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: large_message))]
large_status = ::Google::Rpc::Status.new(code: 3, message: 'large error', details: details)
large_body = ::Google::Rpc::Status.encode(large_status)

stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: large_body)

result = exporter.export([span_data])

_(result).must_equal(export_failure)
_(log_stream.string).must_match(/body truncated due to size limit/)
ensure
OpenTelemetry.logger = logger
end

it 'handles malformed error response body gracefully' do
log_stream = StringIO.new
logger = OpenTelemetry.logger
OpenTelemetry.logger = ::Logger.new(log_stream)

stub_request(:post, 'http://localhost:4318/v1/traces').to_return(status: 400, body: 'not valid protobuf')

result = exporter.export([span_data])

_(result).must_equal(export_failure)
_(log_stream.string).must_match(/unexpected error decoding rpc.Status/)
ensure
OpenTelemetry.logger = logger
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class LogsExporter # rubocop:disable Metrics/ClassLength
# Default timeouts in seconds.
KEEP_ALIVE_TIMEOUT = 30
RETRY_COUNT = 5
private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT)
RESPONSE_BODY_LIMIT = 4_194_304 # 4 MB
private_constant(:KEEP_ALIVE_TIMEOUT, :RETRY_COUNT, :RESPONSE_BODY_LIMIT)

ERROR_MESSAGE_INVALID_HEADERS = 'headers must be a String with comma-separated URL Encoded UTF-8 k=v pairs or a Hash'
private_constant(:ERROR_MESSAGE_INVALID_HEADERS)
Expand Down Expand Up @@ -167,23 +168,24 @@ def send_bytes(bytes, timeout:) # rubocop:disable Metrics/CyclomaticComplexity,

case response
when Net::HTTPSuccess
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
SUCCESS
when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
handle_http_error(response)
redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1)
FAILURE
when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
handle_http_error(response)
redo if backoff?(retry_count: retry_count += 1)
FAILURE
when Net::HTTPNotFound
handle_http_error(response)
FAILURE
when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError
log_status(response.body)
body = read_response_body(response)
log_status(body)
FAILURE
when Net::HTTPRedirection
@http.finish
Expand Down Expand Up @@ -234,14 +236,42 @@ def handle_redirect(location)
end

def log_status(body)
truncation_note = @body_truncated ? ' (body truncated due to size limit)' : ''
status = Google::Rpc::Status.decode(body)
details = status.details.map do |detail|
klass_or_nil = ::Google::Protobuf::DescriptorPool.generated_pool.lookup(detail.type_name).msgclass
detail.unpack(klass_or_nil) if klass_or_nil
end.compact
OpenTelemetry.handle_error(message: "OTLP logs exporter received rpc.Status{message=#{status.message}, details=#{details}}")
OpenTelemetry.handle_error(message: "OTLP logs exporter received rpc.Status{message=#{status.message}, details=#{details}}#{truncation_note}")
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'unexpected error decoding rpc.Status in OTLP::Exporter#log_status')
OpenTelemetry.handle_error(exception: e, message: "unexpected error decoding rpc.Status in OTLP::Exporter#log_status#{truncation_note}")
ensure
@body_truncated = false
end

def read_response_body(response)
return '' if response.nil?

body = +''
truncated = false

response.read_body do |chunk|
if body.bytesize + chunk.bytesize <= RESPONSE_BODY_LIMIT
body << chunk
else
remaining = RESPONSE_BODY_LIMIT - body.bytesize
body << chunk.byteslice(0, remaining) if remaining > 0
truncated = true
break
end
end

body.force_encoding('UTF-8')
@body_truncated = truncated
body
rescue StandardError => e
OpenTelemetry.handle_error(exception: e, message: 'error reading response body')
''
end

def backoff?(retry_count:, retry_after: nil) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,4 +955,81 @@
end
end
end

describe 'response body reading' do
let(:exporter) { OpenTelemetry::Exporter::OTLP::Logs::LogsExporter.new }
let(:log_record_data) { OpenTelemetry::TestHelpers.create_log_record_data }

it 'discards body for successful responses without reading into memory' do
stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 200, body: 'success body')

result = exporter.export([log_record_data])

_(result).must_equal(SUCCESS)
end

it 'discards body for retryable responses without reading into memory' do
stub_request(:post, 'http://localhost:4318/v1/logs')
.to_return(status: 503, body: 'service unavailable', headers: { 'Retry-After' => '0' })
.then.to_return(status: 200)

result = exporter.export([log_record_data])

_(result).must_equal(SUCCESS)
end

it 'reads and parses error response body smaller than limit' do
log_stream = StringIO.new
logger = OpenTelemetry.logger
OpenTelemetry.logger = ::Logger.new(log_stream)

details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: 'error details'))]
status = ::Google::Rpc::Status.encode(::Google::Rpc::Status.new(code: 3, message: 'invalid argument', details: details))
stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 400, body: status)

result = exporter.export([log_record_data])

_(result).must_equal(FAILURE)
_(log_stream.string).must_match(/invalid argument/)
_(log_stream.string).wont_match(/truncated/)
ensure
OpenTelemetry.logger = logger
end

it 'truncates error response body larger than 4 MB limit' do
log_stream = StringIO.new
logger = OpenTelemetry.logger
OpenTelemetry.logger = ::Logger.new(log_stream)

# Create a body larger than 4 MB
large_message = 'x' * 5_000_000 # 5 MB
details = [::Google::Protobuf::Any.pack(::Google::Protobuf::StringValue.new(value: large_message))]
large_status = ::Google::Rpc::Status.new(code: 3, message: 'large error', details: details)
large_body = ::Google::Rpc::Status.encode(large_status)

stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 400, body: large_body)

result = exporter.export([log_record_data])

_(result).must_equal(FAILURE)
_(log_stream.string).must_match(/body truncated due to size limit/)
ensure
OpenTelemetry.logger = logger
end

it 'handles malformed error response body gracefully' do
log_stream = StringIO.new
logger = OpenTelemetry.logger
OpenTelemetry.logger = ::Logger.new(log_stream)

stub_request(:post, 'http://localhost:4318/v1/logs').to_return(status: 400, body: 'not valid protobuf')

result = exporter.export([log_record_data])

_(result).must_equal(FAILURE)
_(log_stream.string).must_match(/unexpected error decoding rpc.Status/)
ensure
OpenTelemetry.logger = logger
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,24 @@ def send_bytes(bytes, timeout:)
response = @http.request(request)
case response
when Net::HTTPSuccess
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
SUCCESS
when Net::HTTPServiceUnavailable, Net::HTTPTooManyRequests
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
redo if backoff?(retry_after: response['Retry-After'], retry_count: retry_count += 1, reason: response.code)
OpenTelemetry.logger.warn('Net::HTTPServiceUnavailable/Net::HTTPTooManyRequests in MetricsExporter#send_bytes')
FAILURE
when Net::HTTPRequestTimeOut, Net::HTTPGatewayTimeOut, Net::HTTPBadGateway
response.body # Read and discard body
response.read_body(nil) # Discard without reading into memory
redo if backoff?(retry_count: retry_count += 1, reason: response.code)
OpenTelemetry.logger.warn('Net::HTTPRequestTimeOut/Net::HTTPGatewayTimeOut/Net::HTTPBadGateway in MetricsExporter#send_bytes')
FAILURE
when Net::HTTPNotFound
OpenTelemetry.handle_error(message: "OTLP metrics_exporter received http.code=404 for uri: '#{@path}'")
FAILURE
when Net::HTTPBadRequest, Net::HTTPClientError, Net::HTTPServerError
log_status(response.body)
body = read_response_body(response)
log_status(body)
OpenTelemetry.logger.warn('Net::HTTPBadRequest/Net::HTTPClientError/Net::HTTPServerError in MetricsExporter#send_bytes')
FAILURE
when Net::HTTPRedirection
Expand Down
Loading
Loading