Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
429: ErrorResolution(
response_action=ResponseAction.RATE_LIMITED,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 429. Error: Too many requests.",
error_message="Rate limit exceeded (HTTP status code 429). Try decreasing the number of workers to stay within API rate limits.",
),
500: ErrorResolution(
response_action=ResponseAction.RETRY,
Expand Down
13 changes: 11 additions & 2 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,19 @@ def _send_with_retry(

return response
except BaseBackoffException as e:
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
if isinstance(e, RateLimitBackoffException):
self._logger.error("Rate limit retries exhausted.", exc_info=True)
raise AirbyteTracedException(
internal_message=f"Rate limit retries exhausted. Exception: {e}",
message="Rate limit exceeded and retries exhausted. Try decreasing the number of workers to stay within API rate limits.",
failure_type=e.failure_type or FailureType.transient_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
)
self._logger.error("Retries exhausted with backoff exception.", exc_info=True)
raise AirbyteTracedException(
internal_message=f"Exhausted available request attempts. Exception: {e}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
message=f"Exhausted available request attempts. Exception: {e}",
failure_type=e.failure_type or FailureType.system_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
Expand Down
4 changes: 3 additions & 1 deletion airbyte_cdk/sources/streams/http/rate_limiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ def log_retry_attempt(details: Mapping[str, Any]) -> None:
f"Status code: {exc.response.status_code!r}, Response Content: {exc.response.content!r}"
)
logger.info(
f"Caught retryable error '{str(exc)}' after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
f"Rate limit hit after {details['tries']} tries. Waiting {details['wait']} seconds then retrying. "
f"Try decreasing the number of workers to stay within API rate limits. "
f"Last error: {str(exc)}"
)

return backoff.on_exception( # type: ignore # Decorator function returns a function with a different signature than the input function, so mypy can't infer the type of the returned function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 429. Error: Too many requests.",
error_message="Rate limit exceeded (HTTP status code 429). Try decreasing the number of workers to stay within API rate limits.",
),
id="test_http_code_matches_retry_action",
),
Expand Down
5 changes: 3 additions & 2 deletions unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ def get_error_handler(self) -> Optional[ErrorHandler]:
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(
AirbyteTracedException, match="Exception: HTTP Status Code: 429. Error: Too many requests."
AirbyteTracedException,
match="Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.",
):
list(stream.read_records(SyncMode.full_refresh))
if retries <= 0:
Expand Down Expand Up @@ -316,7 +317,7 @@ def test_raise_on_http_errors_off_429(mocker):
mocker.patch.object(requests.Session, "send", return_value=req)
with pytest.raises(
AirbyteTracedException,
match="Exhausted available request attempts. Please see logs for more details. Exception: HTTP Status Code: 429. Error: Too many requests.",
match="Exhausted available request attempts. Exception: Rate limit exceeded \\(HTTP status code 429\\). Try decreasing the number of workers to stay within API rate limits.",
):
stream.exit_on_rate_limit = True
list(stream.read_records(SyncMode.full_refresh))
Expand Down
Loading