-
Notifications
You must be signed in to change notification settings - Fork 44
fix(cdk): propagate api_budget to CustomRequester subclasses of HttpRequester #993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f5377a8
0787f79
1b448ba
fb8bff3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1859,7 +1859,63 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) -> | |||||||||||
| for class_field in component_fields.keys() | ||||||||||||
| if class_field in model_args | ||||||||||||
| } | ||||||||||||
| return custom_component_class(**kwargs) | ||||||||||||
|
|
||||||||||||
| # Propagate the top-level api_budget to custom components that are subclasses of | ||||||||||||
| # HttpRequester (and therefore accept an `api_budget` field), unless the manifest | ||||||||||||
| # or an explicit kwarg has already provided one. Without this, custom requesters | ||||||||||||
| # silently lose the connector-level HTTPAPIBudget and any configured rate-limit | ||||||||||||
| # policies have no effect at runtime. | ||||||||||||
| injected_api_budget = False | ||||||||||||
| if ( | ||||||||||||
| self._api_budget is not None | ||||||||||||
| and "api_budget" in component_fields | ||||||||||||
| and kwargs.get("api_budget") is None | ||||||||||||
| and isinstance(custom_component_class, type) | ||||||||||||
| and issubclass(custom_component_class, HttpRequester) | ||||||||||||
| ): | ||||||||||||
| kwargs["api_budget"] = self._api_budget | ||||||||||||
| injected_api_budget = True | ||||||||||||
|
|
||||||||||||
| custom_component = custom_component_class(**kwargs) | ||||||||||||
| if injected_api_budget and isinstance(custom_component, HttpRequester): | ||||||||||||
| self._sync_injected_api_budget_with_http_client(custom_component) | ||||||||||||
|
|
||||||||||||
| return custom_component | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def _sync_injected_api_budget_with_http_client(custom_requester: HttpRequester) -> None: | ||||||||||||
| """Align an injected `api_budget` with the active `HttpClient` on custom requesters. | ||||||||||||
|
|
||||||||||||
| Custom requesters can replace `_http_client` in `__post_init__` without forwarding | ||||||||||||
| `api_budget`. If the factory injected a manifest-level budget and the replacement | ||||||||||||
| client kept the default empty `APIBudget`, point both the client and its underlying | ||||||||||||
| `LimiterSession`/`CachedLimiterSession` at the injected budget so rate-limiting is | ||||||||||||
| actually enforced at request time. Non-`APIBudget` implementations (custom | ||||||||||||
| `AbstractAPIBudget` subclasses) are left untouched. | ||||||||||||
| """ | ||||||||||||
| http_client = getattr(custom_requester, "_http_client", None) | ||||||||||||
| http_client_api_budget = getattr(http_client, "_api_budget", None) | ||||||||||||
| injected_api_budget = custom_requester.api_budget | ||||||||||||
|
|
||||||||||||
| if ( | ||||||||||||
| http_client is None | ||||||||||||
| or http_client_api_budget is None | ||||||||||||
| or injected_api_budget is None | ||||||||||||
| or http_client_api_budget is injected_api_budget | ||||||||||||
| ): | ||||||||||||
| return | ||||||||||||
|
|
||||||||||||
| if ( | ||||||||||||
| isinstance(http_client_api_budget, APIBudget) | ||||||||||||
| and len(http_client_api_budget._policies) == 0 | ||||||||||||
| ): | ||||||||||||
| http_client._api_budget = injected_api_budget | ||||||||||||
|
||||||||||||
| http_client._api_budget = injected_api_budget | |
| http_client._api_budget = injected_api_budget | |
| http_client_session = getattr(http_client, "_session", None) | |
| if getattr(http_client_session, "_api_budget", None) is not None: | |
| http_client_session._api_budget = injected_api_budget |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch — verified this against HttpClient.__init__ (sets self._api_budget then passes self._api_budget into LimiterSession/CachedLimiterSession at line 176/180) and LimiterMixin.__init__ which stores it as self._api_budget = api_budget and calls it directly in send(). Without syncing the session, requests would keep going through the original (empty) budget.
Addressed in 1b448ba. _sync_injected_api_budget_with_http_client now also updates http_client._session._api_budget when the session is still holding the same reference as the client's replaced budget:
http_client._api_budget = injected_api_budget
http_client_session = getattr(http_client, "_session", None)
if (
http_client_session is not None
and getattr(http_client_session, "_api_budget", None) is http_client_api_budget
):
http_client_session._api_budget = injected_api_budgetKept the existing ref-swap approach rather than mutating the budget in place, to stay consistent with how HttpClient itself wires the reference at construction time.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,7 @@ | |
| from airbyte_cdk.sources.declarative.models import ( | ||
| CustomRecordExtractor as CustomRecordExtractorModel, | ||
| ) | ||
| from airbyte_cdk.sources.declarative.models import CustomRequester as CustomRequesterModel | ||
| from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel | ||
| from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel | ||
| from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel | ||
|
|
@@ -4312,6 +4313,202 @@ def test_api_budget_fixed_window_policy(): | |
| assert matcher._url_path_pattern.pattern == "/v2/data" | ||
|
|
||
|
|
||
| def test_api_budget_propagated_to_custom_requester_subclass_of_http_requester(): | ||
| """Top-level `api_budget` must be forwarded to custom components that subclass `HttpRequester`. | ||
|
|
||
| Without this propagation, connectors using a `CustomRequester` (i.e., a Python subclass of | ||
| `HttpRequester`) silently lose the manifest-level rate-limit policies because | ||
| `create_custom_component` does not forward `self._api_budget` the way | ||
| `create_http_requester` does. See airbytehq/oncall#12011 for the reproducer. | ||
| """ | ||
| manifest_api_budget = { | ||
| "type": "HTTPAPIBudget", | ||
| "policies": [ | ||
| { | ||
| "type": "MovingWindowCallRatePolicy", | ||
| "rates": [ | ||
| { | ||
| "type": "Rate", | ||
| "limit": 60, | ||
| "interval": "PT1M", | ||
| } | ||
| ], | ||
| "matchers": [], | ||
| } | ||
| ], | ||
| } | ||
|
|
||
| custom_requester_definition = { | ||
| "type": "CustomRequester", | ||
| "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequester", | ||
| "url_base": "https://example.org", | ||
| "path": "/v1/data", | ||
| "http_method": "GET", | ||
| } | ||
|
|
||
| config: Mapping[str, Any] = {} | ||
| local_factory = ModelToComponentFactory() | ||
| local_factory.set_api_budget(manifest_api_budget, config) | ||
|
|
||
| custom_requester = local_factory.create_component( | ||
| model_type=CustomRequesterModel, | ||
| component_definition=custom_requester_definition, | ||
| config=config, | ||
| name="custom_stream", | ||
| ) | ||
|
|
||
| assert isinstance(custom_requester, HttpRequester) | ||
| assert custom_requester.api_budget is not None, ( | ||
| "Manifest-level api_budget was not propagated to the CustomRequester instance" | ||
| ) | ||
| assert len(custom_requester.api_budget._policies) == 1 | ||
| policy = custom_requester.api_budget._policies[0] | ||
| assert isinstance(policy, MovingWindowCallRatePolicy) | ||
| # Verify the underlying HttpClient AND its LimiterSession both received the same | ||
| # budget: rate-limiting is enforced on the session at send() time, so asserting only | ||
| # on the client field is insufficient to prove the policies are actually active. | ||
| assert custom_requester._http_client._api_budget is custom_requester.api_budget | ||
| assert custom_requester._http_client._session._api_budget is custom_requester.api_budget | ||
|
|
||
|
|
||
| def test_api_budget_propagated_to_custom_requester_that_replaces_http_client(): | ||
| """Injected api budgets must survive requesters that replace `_http_client` in `__post_init__`. | ||
|
|
||
| Some connector requesters call `super().__post_init__()` and then swap in a custom `HttpClient` | ||
| implementation without forwarding `api_budget`. The budget should still be applied to the active | ||
| client so manifest-level rate limits continue to work. | ||
| """ | ||
| manifest_api_budget = { | ||
| "type": "HTTPAPIBudget", | ||
| "policies": [ | ||
| { | ||
| "type": "MovingWindowCallRatePolicy", | ||
| "rates": [ | ||
| { | ||
| "type": "Rate", | ||
| "limit": 60, | ||
| "interval": "PT1M", | ||
| } | ||
| ], | ||
| "matchers": [], | ||
| } | ||
| ], | ||
| } | ||
|
|
||
| custom_requester_definition = { | ||
| "type": "CustomRequester", | ||
| "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClient", | ||
| "url_base": "https://example.org", | ||
| "path": "/v1/data", | ||
| "http_method": "GET", | ||
| } | ||
|
|
||
| config: Mapping[str, Any] = {} | ||
| local_factory = ModelToComponentFactory() | ||
| local_factory.set_api_budget(manifest_api_budget, config) | ||
|
|
||
| custom_requester = local_factory.create_component( | ||
| model_type=CustomRequesterModel, | ||
| component_definition=custom_requester_definition, | ||
| config=config, | ||
| name="custom_stream", | ||
| ) | ||
|
|
||
| assert isinstance(custom_requester, HttpRequester) | ||
| assert custom_requester.api_budget is not None | ||
| assert custom_requester._http_client._api_budget is custom_requester.api_budget | ||
|
Comment on lines
+4417
to
+4419
|
||
| # The LimiterSession holds its own reference to the budget (captured at client | ||
| # construction time) and is what actually enforces rate limits on send(). Assert | ||
| # it was synced too, otherwise the injected budget is effectively inert. | ||
| assert custom_requester._http_client._session._api_budget is custom_requester.api_budget | ||
|
|
||
|
|
||
| def test_api_budget_not_overwriting_non_empty_budget_on_replaced_http_client(): | ||
| """A requester that intentionally installs its own budget should keep it.""" | ||
| manifest_api_budget = { | ||
| "type": "HTTPAPIBudget", | ||
| "policies": [ | ||
| { | ||
| "type": "MovingWindowCallRatePolicy", | ||
| "rates": [ | ||
| { | ||
| "type": "Rate", | ||
| "limit": 60, | ||
| "interval": "PT1M", | ||
| } | ||
| ], | ||
| "matchers": [], | ||
| } | ||
| ], | ||
| } | ||
|
|
||
| custom_requester_definition = { | ||
| "type": "CustomRequester", | ||
| "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClientAndOwnBudget", | ||
| "url_base": "https://example.org", | ||
| "path": "/v1/data", | ||
| "http_method": "GET", | ||
| } | ||
|
|
||
| config: Mapping[str, Any] = {} | ||
| local_factory = ModelToComponentFactory() | ||
| local_factory.set_api_budget(manifest_api_budget, config) | ||
|
|
||
| custom_requester = local_factory.create_component( | ||
| model_type=CustomRequesterModel, | ||
| component_definition=custom_requester_definition, | ||
| config=config, | ||
| name="custom_stream", | ||
| ) | ||
|
|
||
| assert isinstance(custom_requester, HttpRequester) | ||
| assert custom_requester.api_budget is not None | ||
| assert custom_requester._http_client._api_budget is not custom_requester.api_budget | ||
| assert len(custom_requester._http_client._api_budget._policies) == 1 | ||
| # The client's own budget must remain wired into its LimiterSession as well, so | ||
| # the sync step never silently swaps an intentionally-installed budget out from | ||
| # under the active session. | ||
| assert ( | ||
| custom_requester._http_client._session._api_budget | ||
| is custom_requester._http_client._api_budget | ||
| ) | ||
|
|
||
|
|
||
| def test_api_budget_not_propagated_to_non_http_requester_custom_components(): | ||
| """Custom components that do NOT subclass `HttpRequester` must not receive `api_budget`. | ||
|
|
||
| This guards against accidentally injecting an `api_budget` kwarg into arbitrary custom | ||
| components (e.g., custom error handlers, partition routers) whose constructors would | ||
| reject the unexpected keyword. | ||
| """ | ||
| manifest_api_budget = { | ||
| "type": "HTTPAPIBudget", | ||
| "policies": [ | ||
| { | ||
| "type": "MovingWindowCallRatePolicy", | ||
| "rates": [{"type": "Rate", "limit": 1, "interval": "PT60S"}], | ||
| "matchers": [], | ||
| } | ||
| ], | ||
| } | ||
|
|
||
| custom_error_handler_definition = { | ||
| "type": "CustomErrorHandler", | ||
| "class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingSomeComponent", | ||
| "basic_field": "expected", | ||
| } | ||
|
|
||
| config: Mapping[str, Any] = {} | ||
| local_factory = ModelToComponentFactory() | ||
| local_factory.set_api_budget(manifest_api_budget, config) | ||
|
|
||
| # Must not raise TypeError about an unexpected "api_budget" kwarg. | ||
| custom_component = local_factory.create_component( | ||
| CustomErrorHandlerModel, custom_error_handler_definition, config | ||
| ) | ||
| assert custom_component.basic_field == "expected" | ||
|
|
||
|
|
||
| def test_create_grouping_partition_router_with_underlying_router(): | ||
| content = """ | ||
| schema_loader: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure we need this logic here in the cdk. CDK can not know about whether api budget override was done on purpose or not. CDK should be responsible on passing this param to custom component, but not overriding it. If we currently have connectors with empty api budget in their custom requesters but it's expected to be non-empty budget since its defined in manifest, custom requester should be updated then in the source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Daryna Ishchenko (@darynaishchenko) — that's a fair architectural point and worth surfacing to Alfredo García (@alfredogarcia) (who authored commit 0787f79, adding this sync block).
To ground the discussion in what the original oncall actually needs:
source-youtube-analytics'ContentOwnerRequester/JobRequestersimply subclassHttpRequesterand overrideget_request_params/send_request— they do not override__post_init__or replace_http_client. So the reported bug (airbytehq/oncall#12011) is fully fixed by the first commit (f5377a8) alone, which just forwardsself._api_budgetinto the dataclass kwargs._sync_injected_api_budget_with_http_client, handles a different, speculative pattern: connectors whose custom requester replaces_http_clientin__post_init__without forwardingapi_budget. As you note, the CDK can't really distinguish "accidentally dropped the budget" from "intentionally installed an empty one" on such a replacement client — theisinstance(APIBudget) and len(_policies) == 0heuristic is a best-effort guess.Given that, I agree the cleaner separation of concerns is the one you describe: CDK forwards
api_budgetto the custom component (commit 1), and any connector whose custom requester replaces_http_clientis responsible for forwardingself.api_budgetinto that replacement. If we want to revert the sync block and keep only the kwarg injection, I'm happy to do that — but I'd like Alfredo García (@alfredogarcia) to weigh in first since they added it.Session URL: https://app.devin.ai/sessions/36dd4bce63bb414d8e4cbb61935d6f1e