Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions airbyte_cdk/test/standard_tests/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
Status,
SyncMode,
)
from airbyte_cdk.models.connector_metadata import MetadataFile
Expand Down Expand Up @@ -210,15 +211,16 @@ def test_docker_image_build_and_check(
"""Run `docker_image` acceptance tests.

This test builds the connector image and runs the `check` command inside the container.
It validates that the connector emits exactly one `CONNECTION_STATUS` message whose
status matches the scenario's expected outcome (`SUCCEEDED` or `FAILED`). Scenarios
whose `acceptance-test-config.yml` entry has `status: "failed"` are exercised too,
so that `check` against bad configs is validated end-to-end rather than skipped.

Note:
- It is expected for docker image caches to be reused between test runs.
- In the rare case that image caches need to be cleared, please clear
the local docker image cache using `docker image prune -a` command.
"""
if scenario.expected_outcome.expect_exception():
pytest.skip("Skipping test_docker_image_build_and_check (expected to fail).")

tag = "dev-latest"
connector_root = self.get_connector_root_dir()
metadata = MetadataFile.from_file(connector_root / "metadata.yaml")
Expand All @@ -233,11 +235,13 @@ def test_docker_image_build_and_check(
no_verify=False,
)

expect_success = scenario.expected_outcome.expect_success()

container_config_path = "/secrets/config.json"
with scenario.with_temp_config_file(
connector_root=connector_root,
) as temp_config_file:
_ = run_docker_airbyte_command(
result = run_docker_airbyte_command(
[
"docker",
"run",
Expand All @@ -249,7 +253,50 @@ def test_docker_image_build_and_check(
"--config",
container_config_path,
],
raise_if_errors=True,
# Only raise on trace errors when we expect a successful check. When the
# scenario expects the check to fail, the connector is supposed to exit
# cleanly with a `CONNECTION_STATUS: FAILED` message rather than raising.
raise_if_errors=expect_success,
)

self._assert_check_result_matches_expected_outcome(result, scenario)

@staticmethod
def _assert_check_result_matches_expected_outcome(
result: EntrypointOutput,
scenario: ConnectorTestScenario,
) -> None:
"""Assert that the `check` output matches the scenario's expected outcome.

The connector must emit exactly one `CONNECTION_STATUS` message. When the scenario
expects success, the status must be `SUCCEEDED`; when it expects an exception (i.e.
`acceptance-test-config.yml` declares `status: "failed"` or `status: "exception"`),
the status must be `FAILED`. When the expected outcome is `ALLOW_ANY`, only the
presence of a single `CONNECTION_STATUS` message is validated.
"""
status_messages = result.connection_status_messages
assert len(status_messages) == 1, (
"Expected exactly one CONNECTION_STATUS message but got "
f"{len(status_messages)}:\n"
+ "\n".join(str(msg) for msg in status_messages)
+ result.get_formatted_error_message()
)

connection_status = status_messages[0].connectionStatus
assert connection_status is not None, (
"Expected CONNECTION_STATUS message to have a connectionStatus payload. Got: \n"
+ "\n".join(str(msg) for msg in status_messages)
)

if scenario.expected_outcome.expect_success():
assert connection_status.status == Status.SUCCEEDED, (
"Expected CONNECTION_STATUS to be SUCCEEDED but got "
f"{connection_status.status}. Message: {connection_status.message!r}"
)
elif scenario.expected_outcome.expect_exception():
assert connection_status.status == Status.FAILED, (
"Expected CONNECTION_STATUS to be FAILED but got "
f"{connection_status.status}. Message: {connection_status.message!r}"
)

@pytest.mark.skipif(
Expand Down
102 changes: 102 additions & 0 deletions unit_tests/test/test_docker_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Unit tests for `DockerConnectorTestSuite._assert_check_result_matches_expected_outcome`.

These tests cover the `CONNECTION_STATUS` validation added to
`test_docker_image_build_and_check` (see airbyte-internal-issues#16212). They
construct `EntrypointOutput` objects directly with synthetic messages rather
than invoking Docker, which lets us exercise the assertion logic in isolation
without requiring the Docker CLI.
"""

from __future__ import annotations

import json
from contextlib import AbstractContextManager, nullcontext

import pytest

from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
from airbyte_cdk.test.models import ConnectorTestScenario
from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite


def _connection_status_message(status: str, message: str | None = None) -> str:
payload: dict[str, object] = {"status": status}
if message is not None:
payload["message"] = message
return json.dumps({"type": "CONNECTION_STATUS", "connectionStatus": payload})


_SUCCEEDED = _connection_status_message("SUCCEEDED")
_FAILED = _connection_status_message("FAILED", "bad credentials")


@pytest.mark.parametrize(
("scenario_status", "messages", "expectation"),
[
pytest.param(
"succeed",
[_SUCCEEDED],
nullcontext(),
id="succeed_scenario_with_succeeded_status_passes",
),
pytest.param(
"succeed",
[_FAILED],
pytest.raises(AssertionError, match="SUCCEEDED"),
id="succeed_scenario_with_failed_status_raises__gap_1",
),
pytest.param(
"failed",
[_FAILED],
nullcontext(),
id="failed_scenario_with_failed_status_passes__gap_2",
),
pytest.param(
"failed",
[_SUCCEEDED],
pytest.raises(AssertionError, match="FAILED"),
id="failed_scenario_with_succeeded_status_raises",
),
pytest.param(
"exception",
[_FAILED],
nullcontext(),
id="exception_scenario_with_failed_status_passes",
),
pytest.param(
None,
[_SUCCEEDED],
nullcontext(),
id="no_expectation_accepts_succeeded",
),
pytest.param(
None,
[_FAILED],
nullcontext(),
id="no_expectation_accepts_failed",
),
pytest.param(
"succeed",
[],
pytest.raises(AssertionError, match="Expected exactly one CONNECTION_STATUS"),
id="missing_connection_status_message_raises",
),
pytest.param(
"succeed",
[_SUCCEEDED, _SUCCEEDED],
pytest.raises(AssertionError, match="Expected exactly one CONNECTION_STATUS"),
id="multiple_connection_status_messages_raises",
),
],
)
def test_assert_check_result_matches_expected_outcome(
scenario_status: str | None,
messages: list[str],
expectation: AbstractContextManager[object],
) -> None:
output = EntrypointOutput(messages=messages)
scenario = ConnectorTestScenario(status=scenario_status)

with expectation:
DockerConnectorTestSuite._assert_check_result_matches_expected_outcome(output, scenario)
Loading