Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,63 @@ def create_custom_component(self, model: Any, config: Config, **kwargs: Any) ->
for class_field in component_fields.keys()
if class_field in model_args
}
return custom_component_class(**kwargs)

# Propagate the top-level api_budget to custom components that are subclasses of
# HttpRequester (and therefore accept an `api_budget` field), unless the manifest
# or an explicit kwarg has already provided one. Without this, custom requesters
# silently lose the connector-level HTTPAPIBudget and any configured rate-limit
# policies have no effect at runtime.
injected_api_budget = False
if (
self._api_budget is not None
and "api_budget" in component_fields
and kwargs.get("api_budget") is None
and isinstance(custom_component_class, type)
and issubclass(custom_component_class, HttpRequester)
):
kwargs["api_budget"] = self._api_budget
injected_api_budget = True

custom_component = custom_component_class(**kwargs)
if injected_api_budget and isinstance(custom_component, HttpRequester):
self._sync_injected_api_budget_with_http_client(custom_component)

return custom_component

@staticmethod
def _sync_injected_api_budget_with_http_client(custom_requester: HttpRequester) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we need this logic here in the cdk. CDK can not know about whether api budget override was done on purpose or not. CDK should be responsible on passing this param to custom component, but not overriding it. If we currently have connectors with empty api budget in their custom requesters but it's expected to be non-empty budget since its defined in manifest, custom requester should be updated then in the source.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Daryna Ishchenko (@darynaishchenko) — that's a fair architectural point and worth surfacing to Alfredo García (@alfredogarcia) (who authored commit 0787f79, adding this sync block).

To ground the discussion in what the original oncall actually needs:

  • source-youtube-analytics' ContentOwnerRequester / JobRequester simply subclass HttpRequester and override get_request_params / send_request — they do not override __post_init__ or replace _http_client. So the reported bug (airbytehq/oncall#12011) is fully fixed by the first commit (f5377a8) alone, which just forwards self._api_budget into the dataclass kwargs.
  • The second commit (0787f79), which adds _sync_injected_api_budget_with_http_client, handles a different, speculative pattern: connectors whose custom requester replaces _http_client in __post_init__ without forwarding api_budget. As you note, the CDK can't really distinguish "accidentally dropped the budget" from "intentionally installed an empty one" on such a replacement client — the isinstance(APIBudget) and len(_policies) == 0 heuristic is a best-effort guess.

Given that, I agree the cleaner separation of concerns is the one you describe: CDK forwards api_budget to the custom component (commit 1), and any connector whose custom requester replaces _http_client is responsible for forwarding self.api_budget into that replacement. If we want to revert the sync block and keep only the kwarg injection, I'm happy to do that — but I'd like Alfredo García (@alfredogarcia) to weigh in first since they added it.

Session URL: https://app.devin.ai/sessions/36dd4bce63bb414d8e4cbb61935d6f1e

"""Align an injected `api_budget` with the active `HttpClient` on custom requesters.

Custom requesters can replace `_http_client` in `__post_init__` without forwarding
`api_budget`. If the factory injected a manifest-level budget and the replacement
client kept the default empty `APIBudget`, point both the client and its underlying
`LimiterSession`/`CachedLimiterSession` at the injected budget so rate-limiting is
actually enforced at request time. Non-`APIBudget` implementations (custom
`AbstractAPIBudget` subclasses) are left untouched.
"""
http_client = getattr(custom_requester, "_http_client", None)
http_client_api_budget = getattr(http_client, "_api_budget", None)
injected_api_budget = custom_requester.api_budget

if (
http_client is None
or http_client_api_budget is None
or injected_api_budget is None
or http_client_api_budget is injected_api_budget
):
return

if (
isinstance(http_client_api_budget, APIBudget)
and len(http_client_api_budget._policies) == 0
):
http_client._api_budget = injected_api_budget
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_sync_injected_api_budget_with_http_client reassigns http_client._api_budget, but HttpClient passes the budget into LimiterSession/CachedLimiterSession at construction time and the session keeps its own _api_budget reference. Updating only HttpClient._api_budget won’t affect the active session’s limiter, so requests can still run with the original empty budget. Consider also updating http_client._session._api_budget (when present) or mutating the existing budget object instead of swapping the reference so the session sees the injected policies.

Suggested change
http_client._api_budget = injected_api_budget
http_client._api_budget = injected_api_budget
http_client_session = getattr(http_client, "_session", None)
if getattr(http_client_session, "_api_budget", None) is not None:
http_client_session._api_budget = injected_api_budget

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — verified this against HttpClient.__init__ (sets self._api_budget then passes self._api_budget into LimiterSession/CachedLimiterSession at line 176/180) and LimiterMixin.__init__ which stores it as self._api_budget = api_budget and calls it directly in send(). Without syncing the session, requests would keep going through the original (empty) budget.

Addressed in 1b448ba. _sync_injected_api_budget_with_http_client now also updates http_client._session._api_budget when the session is still holding the same reference as the client's replaced budget:

http_client._api_budget = injected_api_budget
http_client_session = getattr(http_client, "_session", None)
if (
    http_client_session is not None
    and getattr(http_client_session, "_api_budget", None) is http_client_api_budget
):
    http_client_session._api_budget = injected_api_budget

Kept the existing ref-swap approach rather than mutating the budget in place, to stay consistent with how HttpClient itself wires the reference at construction time.


Devin session

http_client_session = getattr(http_client, "_session", None)
if (
http_client_session is not None
and getattr(http_client_session, "_api_budget", None) is http_client_api_budget
):
http_client_session._api_budget = injected_api_budget

@staticmethod
def _get_class_from_fully_qualified_class_name(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
from airbyte_cdk.sources.declarative.models import (
CustomRecordExtractor as CustomRecordExtractorModel,
)
from airbyte_cdk.sources.declarative.models import CustomRequester as CustomRequesterModel
from airbyte_cdk.sources.declarative.models import CustomSchemaLoader as CustomSchemaLoaderModel
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor as DatetimeBasedCursorModel
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
Expand Down Expand Up @@ -4312,6 +4313,202 @@ def test_api_budget_fixed_window_policy():
assert matcher._url_path_pattern.pattern == "/v2/data"


def test_api_budget_propagated_to_custom_requester_subclass_of_http_requester():
"""Top-level `api_budget` must be forwarded to custom components that subclass `HttpRequester`.

Without this propagation, connectors using a `CustomRequester` (i.e., a Python subclass of
`HttpRequester`) silently lose the manifest-level rate-limit policies because
`create_custom_component` does not forward `self._api_budget` the way
`create_http_requester` does. See airbytehq/oncall#12011 for the reproducer.
"""
manifest_api_budget = {
"type": "HTTPAPIBudget",
"policies": [
{
"type": "MovingWindowCallRatePolicy",
"rates": [
{
"type": "Rate",
"limit": 60,
"interval": "PT1M",
}
],
"matchers": [],
}
],
}

custom_requester_definition = {
"type": "CustomRequester",
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequester",
"url_base": "https://example.org",
"path": "/v1/data",
"http_method": "GET",
}

config: Mapping[str, Any] = {}
local_factory = ModelToComponentFactory()
local_factory.set_api_budget(manifest_api_budget, config)

custom_requester = local_factory.create_component(
model_type=CustomRequesterModel,
component_definition=custom_requester_definition,
config=config,
name="custom_stream",
)

assert isinstance(custom_requester, HttpRequester)
assert custom_requester.api_budget is not None, (
"Manifest-level api_budget was not propagated to the CustomRequester instance"
)
assert len(custom_requester.api_budget._policies) == 1
policy = custom_requester.api_budget._policies[0]
assert isinstance(policy, MovingWindowCallRatePolicy)
# Verify the underlying HttpClient AND its LimiterSession both received the same
# budget: rate-limiting is enforced on the session at send() time, so asserting only
# on the client field is insufficient to prove the policies are actually active.
assert custom_requester._http_client._api_budget is custom_requester.api_budget
assert custom_requester._http_client._session._api_budget is custom_requester.api_budget


def test_api_budget_propagated_to_custom_requester_that_replaces_http_client():
"""Injected api budgets must survive requesters that replace `_http_client` in `__post_init__`.

Some connector requesters call `super().__post_init__()` and then swap in a custom `HttpClient`
implementation without forwarding `api_budget`. The budget should still be applied to the active
client so manifest-level rate limits continue to work.
"""
manifest_api_budget = {
"type": "HTTPAPIBudget",
"policies": [
{
"type": "MovingWindowCallRatePolicy",
"rates": [
{
"type": "Rate",
"limit": 60,
"interval": "PT1M",
}
],
"matchers": [],
}
],
}

custom_requester_definition = {
"type": "CustomRequester",
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClient",
"url_base": "https://example.org",
"path": "/v1/data",
"http_method": "GET",
}

config: Mapping[str, Any] = {}
local_factory = ModelToComponentFactory()
local_factory.set_api_budget(manifest_api_budget, config)

custom_requester = local_factory.create_component(
model_type=CustomRequesterModel,
component_definition=custom_requester_definition,
config=config,
name="custom_stream",
)

assert isinstance(custom_requester, HttpRequester)
assert custom_requester.api_budget is not None
assert custom_requester._http_client._api_budget is custom_requester.api_budget
Comment on lines +4417 to +4419
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests assert custom_requester._http_client._api_budget is updated, but HttpClient enforces rate limiting via its internal session (LimiterSession/CachedLimiterSession) which stores its own _api_budget reference at construction time. To catch regressions where the session is still using the default empty budget, consider asserting on custom_requester._http_client._session._api_budget (or exercising a request path) rather than only the client field.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — asserting only on _http_client._api_budget would have kept passing even if the session still held the original empty budget (which is what actually enforces rate limiting on send()). Tightened all three tests in 1b448ba to additionally assert on custom_requester._http_client._session._api_budget:

  • test_api_budget_propagated_to_custom_requester_subclass_of_http_requester: asserts the session's budget is the injected one.
  • test_api_budget_propagated_to_custom_requester_that_replaces_http_client: asserts the session's budget was synced to the injected one (this test would now fail without the session-sync change in the _sync_injected_api_budget_with_http_client fix).
  • test_api_budget_not_overwriting_non_empty_budget_on_replaced_http_client: asserts the session's budget remains the requester's own intentionally-installed budget (regression guard that we never silently swap a live session's budget).

Pytest: 120 passed locally.


Devin session

# The LimiterSession holds its own reference to the budget (captured at client
# construction time) and is what actually enforces rate limits on send(). Assert
# it was synced too, otherwise the injected budget is effectively inert.
assert custom_requester._http_client._session._api_budget is custom_requester.api_budget


def test_api_budget_not_overwriting_non_empty_budget_on_replaced_http_client():
"""A requester that intentionally installs its own budget should keep it."""
manifest_api_budget = {
"type": "HTTPAPIBudget",
"policies": [
{
"type": "MovingWindowCallRatePolicy",
"rates": [
{
"type": "Rate",
"limit": 60,
"interval": "PT1M",
}
],
"matchers": [],
}
],
}

custom_requester_definition = {
"type": "CustomRequester",
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingRequesterWithReplacedHttpClientAndOwnBudget",
"url_base": "https://example.org",
"path": "/v1/data",
"http_method": "GET",
}

config: Mapping[str, Any] = {}
local_factory = ModelToComponentFactory()
local_factory.set_api_budget(manifest_api_budget, config)

custom_requester = local_factory.create_component(
model_type=CustomRequesterModel,
component_definition=custom_requester_definition,
config=config,
name="custom_stream",
)

assert isinstance(custom_requester, HttpRequester)
assert custom_requester.api_budget is not None
assert custom_requester._http_client._api_budget is not custom_requester.api_budget
assert len(custom_requester._http_client._api_budget._policies) == 1
# The client's own budget must remain wired into its LimiterSession as well, so
# the sync step never silently swaps an intentionally-installed budget out from
# under the active session.
assert (
custom_requester._http_client._session._api_budget
is custom_requester._http_client._api_budget
)


def test_api_budget_not_propagated_to_non_http_requester_custom_components():
"""Custom components that do NOT subclass `HttpRequester` must not receive `api_budget`.

This guards against accidentally injecting an `api_budget` kwarg into arbitrary custom
components (e.g., custom error handlers, partition routers) whose constructors would
reject the unexpected keyword.
"""
manifest_api_budget = {
"type": "HTTPAPIBudget",
"policies": [
{
"type": "MovingWindowCallRatePolicy",
"rates": [{"type": "Rate", "limit": 1, "interval": "PT60S"}],
"matchers": [],
}
],
}

custom_error_handler_definition = {
"type": "CustomErrorHandler",
"class_name": "unit_tests.sources.declarative.parsers.testing_components.TestingSomeComponent",
"basic_field": "expected",
}

config: Mapping[str, Any] = {}
local_factory = ModelToComponentFactory()
local_factory.set_api_budget(manifest_api_budget, config)

# Must not raise TypeError about an unexpected "api_budget" kwarg.
custom_component = local_factory.create_component(
CustomErrorHandlerModel, custom_error_handler_definition, config
)
assert custom_component.basic_field == "expected"


def test_create_grouping_partition_router_with_underlying_router():
content = """
schema_loader:
Expand Down
43 changes: 43 additions & 0 deletions unit_tests/sources/declarative/parsers/testing_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from dataclasses import dataclass, field
from datetime import timedelta
from typing import Any, ClassVar, List, Mapping, Optional

from airbyte_cdk.sources.declarative.extractors import DpathExtractor
Expand All @@ -21,6 +22,8 @@
RequestInput,
)
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
from airbyte_cdk.sources.streams.call_rate import APIBudget, MovingWindowCallRatePolicy, Rate
from airbyte_cdk.sources.streams.http import HttpClient


@dataclass
Expand Down Expand Up @@ -114,3 +117,43 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
parameters=parameters or {},
)
super().__post_init__(parameters)


@dataclass
class TestingRequesterWithReplacedHttpClient(HttpRequester):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self._http_client = HttpClient(
name=self.name,
logger=self.logger,
error_handler=self.error_handler,
authenticator=self._authenticator,
use_cache=self.use_cache,
backoff_strategy=None,
disable_retries=self.disable_retries,
message_repository=self.message_repository,
)


@dataclass
class TestingRequesterWithReplacedHttpClientAndOwnBudget(HttpRequester):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
super().__post_init__(parameters)
self._http_client = HttpClient(
name=self.name,
logger=self.logger,
error_handler=self.error_handler,
api_budget=APIBudget(
policies=[
MovingWindowCallRatePolicy(
rates=[Rate(limit=1, interval=timedelta(seconds=30))],
matchers=[],
)
]
),
authenticator=self._authenticator,
use_cache=self.use_cache,
backoff_strategy=None,
disable_retries=self.disable_retries,
message_repository=self.message_repository,
)
Loading