33try :
44 import aio_pika
55 import wrapt
6+ from typing import (
7+ TYPE_CHECKING ,
8+ Dict ,
9+ Any ,
10+ Callable ,
11+ Tuple ,
12+ Type ,
13+ Optional ,
14+ )
615
716 from instana .log import logger
817 from instana .propagators .format import Format
918 from instana .util .traceutils import get_tracer_tuple , tracing_is_off
1019 from instana .singletons import tracer
1120
12- def _extract_span_attributes (span , connection , sort , routing_key , exchange ) -> None :
21+ if TYPE_CHECKING :
22+ from instana .span .span import InstanaSpan
23+ from aio_pika .exchange import Exchange
24+ from aiormq .abc import ConfirmationFrameType
25+ from aio_pika .abc import ConsumerTag , AbstractMessage
26+ from aio_pika .queue import Queue , QueueIterator
27+
28+ def _extract_span_attributes (
29+ span : "InstanaSpan" , connection , sort : str , routing_key : str , exchange : str
30+ ) -> None :
1331 span .set_attribute ("address" , str (connection .url ))
1432
1533 span .set_attribute ("sort" , sort )
1634 span .set_attribute ("key" , routing_key )
1735 span .set_attribute ("exchange" , exchange )
1836
1937 @wrapt .patch_function_wrapper ("aio_pika" , "Exchange.publish" )
20- async def publish_with_instana (wrapped , instance , args , kwargs ):
38+ async def publish_with_instana (
39+ wrapped : Callable [..., Optional ["ConfirmationFrameType" ]],
40+ instance : "Exchange" ,
41+ args : Tuple [object ],
42+ kwargs : Dict [str , Any ],
43+ ) -> Optional ["ConfirmationFrameType" ]:
2144 if tracing_is_off ():
2245 return await wrapped (* args , ** kwargs )
2346
@@ -47,20 +70,32 @@ async def publish_with_instana(wrapped, instance, args, kwargs):
4770 return response
4871
4972 @wrapt .patch_function_wrapper ("aio_pika" , "Queue.consume" )
50- async def consume_with_instana (wrapped , instance , args , kwargs ):
73+ async def consume_with_instana (
74+ wrapped : Callable [..., "ConsumerTag" ],
75+ instance : Type ["Queue" ],
76+ args : Tuple [object ],
77+ kwargs : Dict [str , Any ],
78+ ) -> "ConsumerTag" :
5179 connection = instance .channel ._connection
5280 callback = kwargs ["callback" ] if kwargs .get ("callback" ) else args [0 ]
5381
5482 @wrapt .decorator
55- async def callback_wrapper (wrapped , instance , args , kwargs ):
83+ async def callback_wrapper (
84+ wrapped : Callable [[Type ["AbstractMessage" ]], Any ],
85+ instance : Type ["QueueIterator" ],
86+ args : Tuple [Type ["AbstractMessage" ], ...],
87+ kwargs : Dict [str , Any ],
88+ ) -> Callable [[Type ["AbstractMessage" ]], Any ]:
5689 message = args [0 ]
5790 parent_context = tracer .extract (
5891 Format .HTTP_HEADERS , message .headers , disable_w3c_trace_context = True
5992 )
6093 with tracer .start_as_current_span (
6194 "rabbitmq" , span_context = parent_context
6295 ) as span :
63- _extract_span_attributes (span , connection , "consume" , message .routing_key , message .exchange )
96+ _extract_span_attributes (
97+ span , connection , "consume" , message .routing_key , message .exchange
98+ )
6499 try :
65100 response = await wrapped (* args , ** kwargs )
66101 except Exception as exc :
@@ -75,7 +110,7 @@ async def callback_wrapper(wrapped, instance, args, kwargs):
75110 args = (wrapped_callback ,) + args [1 :]
76111
77112 return await wrapped (* args , ** kwargs )
78-
113+
79114 logger .debug ("Instrumenting aio-pika" )
80115
81116except ImportError :
0 commit comments