Skip to content

Commit 05fe577

Browse files
committed
added unit tests for interceptor
2 parents cb32296 + 01e6b36 commit 05fe577

File tree

4 files changed

+951
-40
lines changed

4 files changed

+951
-40
lines changed

google/cloud/bigtable/data/_async/metrics_interceptor.py

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def wrapper(self, continuation, client_call_details, request):
5454
operation: "ActiveOperationMetric" = self.operation_map.get(key)
5555
if operation:
5656
# start a new attempt if not started
57-
if operation.state != OperationState.ACTIVE_ATTEMPT:
57+
if operation.state == OperationState.CREATED or operation.state == OperationState.BETWEEN_ATTEMPTS:
5858
operation.start_attempt()
5959
# wrap continuation in logic to process the operation
6060
return func(self, operation, continuation, client_call_details, request)
@@ -65,6 +65,26 @@ def wrapper(self, continuation, client_call_details, request):
6565
return wrapper
6666

6767

68+
def _end_attempt(operation, exc, metadata):
69+
"""Helper to add metadata and exception to an operation"""
70+
if metadata is not None:
71+
operation.add_response_metadata(metadata)
72+
if exc is not None:
73+
# end attempt. If it succeeded, let higher levels decide when to end operation
74+
operation.end_attempt_with_status(exc)
75+
76+
77+
@CrossSync.convert
78+
async def _get_metadata(source):
79+
"""Helper to extract metadata from a call or RpcError"""
80+
try:
81+
return (await source.trailing_metadata() or []) + (
82+
await source.initial_metadata() or []
83+
)
84+
except Exception:
85+
# ignore errors while fetching metadata
86+
return None
87+
6888
@CrossSync.convert_class(sync_name="BigtableMetricsInterceptor")
6989
class AsyncBigtableMetricsInterceptor(
7090
UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, MetricsHandler
@@ -105,29 +125,23 @@ async def intercept_unary_unary(
105125
self, operation, continuation, client_call_details, request
106126
):
107127
encountered_exc: Exception | None = None
108-
call = None
128+
metadata = None
109129
try:
110130
call = await continuation(client_call_details, request)
131+
metadata = await _get_metadata(call)
111132
return call
112-
except Exception as e:
113-
encountered_exc = e
114-
raise
133+
except Exception as rpc_error:
134+
metadata = await _get_metadata(rpc_error)
135+
encountered_exc = rpc_error
136+
raise rpc_error
115137
finally:
116-
if call is not None:
117-
metadata = (
118-
await call.trailing_metadata() + await call.initial_metadata()
119-
)
120-
operation.add_response_metadata(metadata)
121-
if encountered_exc is not None:
122-
# end attempt. If it succeeded, let higher levels decide when to end operation
123-
operation.end_attempt_with_status(encountered_exc)
138+
_end_attempt(operation, encountered_exc, metadata)
124139

125140
@CrossSync.convert
126141
@_with_operation_from_metadata
127142
async def intercept_unary_stream(
128143
self, operation, continuation, client_call_details, request
129144
):
130-
# TODO: benchmark
131145
async def response_wrapper(call):
132146
has_first_response = operation.first_response_latency is not None
133147
encountered_exc = None
@@ -140,17 +154,17 @@ async def response_wrapper(call):
140154
)
141155
has_first_response = True
142156
yield response
143-
144157
except Exception as e:
158+
# handle errors while processing stream
145159
encountered_exc = e
146160
raise
147161
finally:
148-
metadata = (
149-
await call.trailing_metadata() + await call.initial_metadata()
150-
)
151-
operation.add_response_metadata(metadata)
152-
if encountered_exc is not None:
153-
# end attempt. If it succeeded, let higher levels decide when to end operation
154-
operation.end_attempt_with_status(encountered_exc)
155-
156-
return response_wrapper(await continuation(client_call_details, request))
162+
if call is not None:
163+
_end_attempt(operation, encountered_exc, await _get_metadata(call))
164+
165+
try:
166+
return response_wrapper(await continuation(client_call_details, request))
167+
except Exception as rpc_error:
168+
# handle errors while intializing stream
169+
_end_attempt(operation, rpc_error, await _get_metadata(rpc_error))
170+
raise rpc_error

google/cloud/bigtable/data/_sync_autogen/metrics_interceptor.py

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# This file is automatically generated by CrossSync. Do not edit manually.
1616

1717
from __future__ import annotations
18+
import time
1819
from functools import wraps
1920
from google.cloud.bigtable.data._metrics.data_model import (
2021
OPERATION_INTERCEPTOR_METADATA_KEY,
@@ -42,7 +43,10 @@ def wrapper(self, continuation, client_call_details, request):
4243
)
4344
operation: "ActiveOperationMetric" = self.operation_map.get(key)
4445
if operation:
45-
if operation.state != OperationState.ACTIVE_ATTEMPT:
46+
if (
47+
operation.state == OperationState.CREATED
48+
or operation.state == OperationState.BETWEEN_ATTEMPTS
49+
):
4650
operation.start_attempt()
4751
return func(self, operation, continuation, client_call_details, request)
4852
else:
@@ -51,6 +55,22 @@ def wrapper(self, continuation, client_call_details, request):
5155
return wrapper
5256

5357

58+
def _end_attempt(operation, exc, metadata):
59+
"""Helper to add metadata and exception to an operation"""
60+
if metadata is not None:
61+
operation.add_response_metadata(metadata)
62+
if exc is not None:
63+
operation.end_attempt_with_status(exc)
64+
65+
66+
def _get_metadata(source):
67+
"""Helper to extract metadata from a call or RpcError"""
68+
try:
69+
return (source.trailing_metadata() or []) + (source.initial_metadata() or [])
70+
except Exception:
71+
return None
72+
73+
5474
class BigtableMetricsInterceptor(
5575
UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, MetricsHandler
5676
):
@@ -76,7 +96,8 @@ def register_operation(self, operation):
7696
operation.handlers.append(self)
7797

7898
def on_operation_complete(self, op):
79-
del self.operation_map[op.uuid]
99+
if op.uuid in self.operation_map:
100+
del self.operation_map[op.uuid]
80101

81102
def on_operation_cancelled(self, op):
82103
self.on_operation_complete(op)
@@ -86,36 +107,42 @@ def intercept_unary_unary(
86107
self, operation, continuation, client_call_details, request
87108
):
88109
encountered_exc: Exception | None = None
89-
call = None
110+
metadata = None
90111
try:
91112
call = continuation(client_call_details, request)
113+
metadata = _get_metadata(call)
92114
return call
93-
except Exception as e:
94-
encountered_exc = e
95-
raise
115+
except Exception as rpc_error:
116+
metadata = _get_metadata(rpc_error)
117+
encountered_exc = rpc_error
118+
raise rpc_error
96119
finally:
97-
if call is not None:
98-
metadata = call.trailing_metadata() + call.initial_metadata()
99-
operation.add_response_metadata(metadata)
100-
if encountered_exc is not None:
101-
operation.end_attempt_with_status(encountered_exc)
120+
_end_attempt(operation, encountered_exc, metadata)
102121

103122
@_with_operation_from_metadata
104123
def intercept_unary_stream(
105124
self, operation, continuation, client_call_details, request
106125
):
107126
def response_wrapper(call):
127+
has_first_response = operation.first_response_latency is not None
108128
encountered_exc = None
109129
try:
110130
for response in call:
131+
if not has_first_response:
132+
operation.first_response_latency_ns = (
133+
time.monotonic_ns() - operation.start_time_ns
134+
)
135+
has_first_response = True
111136
yield response
112137
except Exception as e:
113138
encountered_exc = e
114139
raise
115140
finally:
116-
metadata = call.trailing_metadata() + call.initial_metadata()
117-
operation.add_response_metadata(metadata)
118-
if encountered_exc is not None:
119-
operation.end_attempt_with_status(encountered_exc)
141+
if call is not None:
142+
_end_attempt(operation, encountered_exc, _get_metadata(call))
120143

121-
return response_wrapper(continuation(client_call_details, request))
144+
try:
145+
return response_wrapper(continuation(client_call_details, request))
146+
except Exception as rpc_error:
147+
_end_attempt(operation, rpc_error, _get_metadata(rpc_error))
148+
raise rpc_error

0 commit comments

Comments
 (0)