Skip to content

Commit 10cccbe

Browse files
committed
wip: added context propagation to the kafka spans
Signed-off-by: Cagri Yonca <cagri@ibm.com>
1 parent 5619ddb commit 10cccbe

5 files changed

Lines changed: 229 additions & 97 deletions

File tree

src/instana/agent/host.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,12 @@ def filter_spans(self, spans: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
360360
)
361361
if service == "kafka":
362362
endpoint = span.data[service][service_specifier_key]
363+
# comment these out only if you like to ignore internal events
364+
# if (
365+
# service == "urllib3"
366+
# and "com.instana" in span.data["http"]["url"]
367+
# ):
368+
# continue
363369
method = span.data[service][operation_specifier_key]
364370
if isinstance(method, str) and self.__is_endpoint_ignored(
365371
service, method, endpoint

src/instana/instrumentation/kafka/kafka_python.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,16 @@ def trace_kafka_send(
3737
span.set_attribute("kafka.access", "send")
3838

3939
# context propagation
40+
headers = kwargs.get("headers", [])
4041
tracer.inject(
4142
span.context,
4243
Format.KAFKA_HEADERS,
43-
kwargs.get("headers", {}),
44+
headers,
4445
disable_w3c_trace_context=True,
4546
)
4647

4748
try:
49+
kwargs["headers"] = headers
4850
res = wrapped(*args, **kwargs)
4951
except Exception as exc:
5052
span.record_exception(exc)
@@ -62,28 +64,30 @@ def trace_kafka_consume(
6264
return wrapped(*args, **kwargs)
6365

6466
tracer, parent_span, _ = get_tracer_tuple()
65-
66-
parent_context = (
67-
parent_span.get_span_context()
68-
if parent_span
69-
else tracer.extract(
70-
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
67+
exception = ""
68+
try:
69+
res = wrapped(*args, **kwargs)
70+
except Exception as exc:
71+
exception = exc
72+
else:
73+
parent_context = (
74+
parent_span.get_span_context()
75+
if parent_span
76+
else tracer.extract(
77+
Format.KAFKA_HEADERS,
78+
res.headers,
79+
disable_w3c_trace_context=True,
80+
)
7181
)
72-
)
73-
74-
with tracer.start_as_current_span(
75-
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
76-
) as span:
77-
topic = list(instance.subscription())[0]
78-
span.set_attribute("kafka.service", topic)
79-
span.set_attribute("kafka.access", "consume")
80-
81-
try:
82-
res = wrapped(*args, **kwargs)
83-
except Exception as exc:
84-
span.record_exception(exc)
85-
else:
86-
return res
82+
with tracer.start_as_current_span(
83+
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
84+
) as span:
85+
span.set_attribute("kafka.service", res.topic)
86+
span.set_attribute("kafka.access", "consume")
87+
88+
if exception:
89+
span.record_exception(exception)
90+
return res
8791

8892
@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll")
8993
def trace_kafka_poll(

0 commit comments

Comments
 (0)