Skip to content

Feat: New CDK-Native FAST Standard tests, replaces CAT #349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 73 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
3de3edf
add skeleton implementation, imported from airbyte-pytest
aaronsteers Feb 10, 2025
3b88490
tidy imports
aaronsteers Feb 11, 2025
1250472
fix some imports
aaronsteers Feb 15, 2025
9dec6e3
Merge branch 'main' into aj/feat/mini-cat-test-suites
aaronsteers Feb 19, 2025
11d89f6
working test of tests
aaronsteers Feb 20, 2025
42d8a4e
remove old file
aaronsteers Feb 24, 2025
c8e7722
misc fixes
aaronsteers Feb 24, 2025
6248994
allow subclasses of abstractsource
aaronsteers Feb 24, 2025
a0f8ed5
Merge remote-tracking branch 'origin/main' into aj/feat/mini-cat-test…
aaronsteers Apr 7, 2025
fcfe697
fix test handling for expected errors
aaronsteers Apr 7, 2025
03f3360
Auto-fix lint and format issues
Apr 7, 2025
460c40a
remove unused code
aaronsteers Apr 7, 2025
8918bfe
fix tests
aaronsteers Apr 8, 2025
83344f3
use pep440 style dynamic versioning
aaronsteers Apr 9, 2025
76a92a9
update ci job (temporarily run on all pushes)
aaronsteers Apr 9, 2025
e6911e5
Merge remote-tracking branch 'origin/main' into aj/feat/mini-cat-test…
aaronsteers Apr 10, 2025
039eef0
Merge remote-tracking branch 'origin/main' into aj/feat/mini-cat-test…
aaronsteers Apr 10, 2025
5658441
Update .github/workflows/pypi_publish.yml
aaronsteers Apr 10, 2025
310ea68
fix fixture resource paths
aaronsteers Apr 10, 2025
1e74eef
lint fixes
aaronsteers Apr 10, 2025
c9ad3bd
fix imports
aaronsteers Apr 10, 2025
f8b581e
fix lint issues
aaronsteers Apr 12, 2025
4a05f46
fix lint and typing
aaronsteers Apr 12, 2025
e58dfad
[cherry-pick-me][chore]: resolve pytest warnings undeclared marks, an…
aaronsteers Apr 12, 2025
418dcb9
fix relative path logic
aaronsteers Apr 12, 2025
8d75d0d
ruff fix
aaronsteers Apr 12, 2025
c3a32ae
[cherry-pick-me][fix]: dataclasses constructor break with __test__ me…
aaronsteers Apr 12, 2025
0897b83
fix paths
aaronsteers Apr 12, 2025
b282e5d
[cherry-pick-me]: use kw args
aaronsteers Apr 12, 2025
9a8b5eb
Merge branch 'main' into aj/feat/mini-cat-test-suites
aaronsteers Apr 12, 2025
d5aaf3c
[cherry-pick-me]: kw args
aaronsteers Apr 12, 2025
7a00ff4
[cherry-pick-me]: use kw args for Test* dataclasses
aaronsteers Apr 12, 2025
5dff39e
Merge remote-tracking branch 'origin/main' into aj/feat/mini-cat-test…
aaronsteers Apr 12, 2025
3160691
fix relative resource paths
aaronsteers Apr 12, 2025
015ced9
fix imports
aaronsteers Apr 12, 2025
e2a9fb9
fix more tests
aaronsteers Apr 12, 2025
bfc1943
format fix
aaronsteers Apr 12, 2025
f82b76a
[cherry-pick-me][chore]: resolve pytest warnings undeclared marks, an…
aaronsteers Apr 12, 2025
f091b53
[cherry-pick-me][fix]: dataclasses constructor break with __test__ me…
aaronsteers Apr 12, 2025
12afc3d
[cherry-pick-me]: use kw args
aaronsteers Apr 12, 2025
cca55f7
[cherry-pick-me]: kw args
aaronsteers Apr 12, 2025
4f7a84e
[cherry-pick-me]: use kw args for Test* dataclasses
aaronsteers Apr 12, 2025
e24594b
Merge branch 'devin/1744436819-cherry-pick-test-fixes' into aj/feat/m…
aaronsteers Apr 12, 2025
9389cc5
reduce code needed for inheritance
aaronsteers Apr 12, 2025
e4cae20
fix type hint
aaronsteers Apr 12, 2025
18ff2e8
Use ClassVar[bool] for __test__ instead of kw_only=True
devin-ai-integration[bot] Apr 12, 2025
40fddcc
Use ClassVar[bool] for __test__ attributes instead of kw_only=True
devin-ai-integration[bot] Apr 12, 2025
e2c69df
Update more classes to use ClassVar[bool] for __test__ attributes
devin-ai-integration[bot] Apr 12, 2025
e79e901
Apply ruff formatting
devin-ai-integration[bot] Apr 12, 2025
c26345e
Fix remaining files with ruff formatting
devin-ai-integration[bot] Apr 12, 2025
eb0f643
addl cleanup
aaronsteers Apr 12, 2025
7a750d6
Merge branch 'devin/1744436819-cherry-pick-test-fixes' into aj/feat/m…
aaronsteers Apr 13, 2025
b8407e5
Merge branch 'main' into devin/1744436819-cherry-pick-test-fixes
aaronsteers Apr 14, 2025
88acba3
Merge branch 'main' into aj/feat/mini-cat-test-suites
aaronsteers Apr 14, 2025
4b6c53e
poe lock
aaronsteers Apr 14, 2025
f0bea0c
remove unrelated changes
aaronsteers Apr 14, 2025
8c91964
Merge branch 'devin/1744436819-cherry-pick-test-fixes' into aj/feat/m…
aaronsteers Apr 14, 2025
4f84c14
revert unnecessary changes
aaronsteers Apr 14, 2025
d0aba9f
Merge branch 'devin/1744436819-cherry-pick-test-fixes' into aj/feat/m…
aaronsteers Apr 14, 2025
1468a48
clean up naming, remove unused
aaronsteers Apr 14, 2025
2812e7d
tidy up pr
aaronsteers Apr 14, 2025
c4e6655
clean up IConnector interface
aaronsteers Apr 14, 2025
f6d6ccd
ruff fix
aaronsteers Apr 14, 2025
d20d91f
poe lock
aaronsteers Apr 14, 2025
b363b52
Merge branch 'main' into aj/feat/mini-cat-test-suites
aaronsteers Apr 15, 2025
b045532
clean up test module structure and pytest hooks
aaronsteers Apr 15, 2025
dc95a71
remove extra pytest files
aaronsteers Apr 15, 2025
a36d6a9
add usage docs
aaronsteers Apr 15, 2025
da442a9
clean up
aaronsteers Apr 15, 2025
c532a92
Merge branch 'main' into aj/feat/mini-cat-test-suites
aaronsteers Apr 16, 2025
594be48
Auto-fix lint and format issues
Apr 16, 2025
f39f480
cleaner error prints
aaronsteers Apr 16, 2025
9fac89f
finish rename module 'declarative' -> 'standard_tests'
aaronsteers Apr 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ def records(self) -> List[AirbyteMessage]:
def state_messages(self) -> List[AirbyteMessage]:
return self._get_message_by_types([Type.STATE])

@property
def connection_status_messages(self) -> List[AirbyteMessage]:
return self._get_message_by_types([Type.CONNECTION_STATUS])

@property
def most_recent_state(self) -> Any:
state_messages = self._get_message_by_types([Type.STATE])
Expand Down
46 changes: 46 additions & 0 deletions airbyte_cdk/test/standard_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
'''FAST Airbyte Standard Tests

This module provides a set of base classes for declarative connector test suites.
The goal of this module is to provide a robust and extensible framework for testing Airbyte
connectors.

Example usage:

```python
# `test_airbyte_standards.py`
from airbyte_cdk.test import standard_tests

pytest_plugins = [
"airbyte_cdk.test.standard_tests.pytest_hooks",
]


class TestSuiteSourcePokeAPI(standard_tests.DeclarativeSourceTestSuite):
"""Test suite for the source."""
```

Available test suites base classes:
- `DeclarativeSourceTestSuite`: A test suite for declarative sources.
- `SourceTestSuiteBase`: A test suite for sources.
- `DestinationTestSuiteBase`: A test suite for destinations.

'''

from airbyte_cdk.test.standard_tests.connector_base import (
ConnectorTestScenario,
ConnectorTestSuiteBase,
)
from airbyte_cdk.test.standard_tests.declarative_sources import (
DeclarativeSourceTestSuite,
)
from airbyte_cdk.test.standard_tests.destination_base import DestinationTestSuiteBase
from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase

__all__ = [
"ConnectorTestScenario",
"ConnectorTestSuiteBase",
"DeclarativeSourceTestSuite",
"DestinationTestSuiteBase",
"SourceTestSuiteBase",
]
159 changes: 159 additions & 0 deletions airbyte_cdk/test/standard_tests/_job_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Job runner for Airbyte Standard Tests."""

import logging
import tempfile
import uuid
from dataclasses import asdict
from pathlib import Path
from typing import Any, Callable, Literal

import orjson
from typing_extensions import Protocol, runtime_checkable

from airbyte_cdk.models import (
ConfiguredAirbyteCatalog,
Status,
)
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.standard_tests.models import (
ConnectorTestScenario,
)


def _errors_to_str(
entrypoint_output: entrypoint_wrapper.EntrypointOutput,
) -> str:
"""Convert errors from entrypoint output to a string."""
if not entrypoint_output.errors:
# If there are no errors, return an empty string.
return ""

return "\n" + "\n".join(
[
str(error.trace.error).replace(
"\\n",
"\n",
)
for error in entrypoint_output.errors
if error.trace
],
)


@runtime_checkable
class IConnector(Protocol):
"""A connector that can be run in a test scenario.

Note: We currently use 'spec' to determine if we have a connector object.
In the future, it would be preferred to leverage a 'launch' method instead,
directly on the connector (which doesn't yet exist).
"""

def spec(self, logger: logging.Logger) -> Any:
"""Connectors should have a `spec` method."""


def run_test_job(
connector: IConnector | type[IConnector] | Callable[[], IConnector],
verb: Literal["read", "check", "discover"],
test_scenario: ConnectorTestScenario,
*,
catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None,
) -> entrypoint_wrapper.EntrypointOutput:
"""Run a test scenario from provided CLI args and return the result."""
if not connector:
raise ValueError("Connector is required")

if catalog and isinstance(catalog, ConfiguredAirbyteCatalog):
# Convert the catalog to a dict if it's already a ConfiguredAirbyteCatalog.
catalog = asdict(catalog)

connector_obj: IConnector
if isinstance(connector, type) or callable(connector):
# If the connector is a class or a factory lambda, instantiate it.
connector_obj = connector()
elif isinstance(connector, IConnector):
connector_obj = connector
else:
raise ValueError(
f"Invalid connector input: {type(connector)}",
)

args: list[str] = [verb]
if test_scenario.config_path:
args += ["--config", str(test_scenario.config_path)]
elif test_scenario.config_dict:
config_path = (
Path(tempfile.gettempdir()) / "airbyte-test" / f"temp_config_{uuid.uuid4().hex}.json"
)
config_path.parent.mkdir(parents=True, exist_ok=True)
config_path.write_text(orjson.dumps(test_scenario.config_dict).decode())
args += ["--config", str(config_path)]

catalog_path: Path | None = None
if verb not in ["discover", "check"]:
# We need a catalog for read.
if catalog:
# Write the catalog to a temp json file and pass the path to the file as an argument.
catalog_path = (
Path(tempfile.gettempdir())
/ "airbyte-test"
/ f"temp_catalog_{uuid.uuid4().hex}.json"
)
catalog_path.parent.mkdir(parents=True, exist_ok=True)
catalog_path.write_text(orjson.dumps(catalog).decode())
elif test_scenario.configured_catalog_path:
catalog_path = Path(test_scenario.configured_catalog_path)

if catalog_path:
args += ["--catalog", str(catalog_path)]

# This is a bit of a hack because the source needs the catalog early.
# Because it *also* can fail, we have to redundantly wrap it in a try/except block.

result: entrypoint_wrapper.EntrypointOutput = entrypoint_wrapper._run_command( # noqa: SLF001 # Non-public API
source=connector_obj, # type: ignore [arg-type]
args=args,
expecting_exception=test_scenario.expect_exception,
)
if result.errors and not test_scenario.expect_exception:
raise AssertionError(
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)

if verb == "check":
# Check is expected to fail gracefully without an exception.
# Instead, we assert that we have a CONNECTION_STATUS message with
# a failure status.
assert len(result.connection_status_messages) == 1, (
"Expected exactly one CONNECTION_STATUS message. Got "
f"{len(result.connection_status_messages)}:\n"
+ "\n".join([str(msg) for msg in result.connection_status_messages])
+ _errors_to_str(result)
)
if test_scenario.expect_exception:
conn_status = result.connection_status_messages[0].connectionStatus
assert conn_status, (
"Expected CONNECTION_STATUS message to be present. Got: \n"
+ "\n".join([str(msg) for msg in result.connection_status_messages])
)
assert conn_status.status == Status.FAILED, (
"Expected CONNECTION_STATUS message to be FAILED. Got: \n"
+ "\n".join([str(msg) for msg in result.connection_status_messages])
)

return result

# For all other verbs, we assert check that an exception is raised (or not).
if test_scenario.expect_exception:
if not result.errors:
raise AssertionError("Expected exception but got none.")

return result

assert not result.errors, (
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
)

return result
148 changes: 148 additions & 0 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Base class for connector test suites."""

from __future__ import annotations

import abc
import inspect
import sys
from collections.abc import Callable
from pathlib import Path
from typing import cast

import yaml
from boltons.typeutils import classproperty

from airbyte_cdk.models import (
AirbyteMessage,
Type,
)
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job
from airbyte_cdk.test.standard_tests.models import (
ConnectorTestScenario,
)

ACCEPTANCE_TEST_CONFIG = "acceptance-test-config.yml"
MANIFEST_YAML = "manifest.yaml"


class ConnectorTestSuiteBase(abc.ABC):
"""Base class for connector test suites."""

connector: type[IConnector] | Callable[[], IConnector] | None = None
"""The connector class or a factory function that returns an scenario of IConnector."""

@classmethod
def get_test_class_dir(cls) -> Path:
"""Get the file path that contains the class."""
module = sys.modules[cls.__module__]
# Get the directory containing the test file
return Path(inspect.getfile(module)).parent

@classmethod
def create_connector(
cls,
scenario: ConnectorTestScenario,
) -> IConnector:
"""Instantiate the connector class."""
connector = cls.connector # type: ignore
if connector:
if callable(connector) or isinstance(connector, type):
# If the connector is a class or factory function, instantiate it:
return cast(IConnector, connector()) # type: ignore [redundant-cast]

# Otherwise, we can't instantiate the connector. Fail with a clear error message.
raise NotImplementedError(
"No connector class or connector factory function provided. "
"Please provide a class or factory function in `cls.connector`, or "
"override `cls.create_connector()` to define a custom initialization process."
)

# Test Definitions

def test_check(
self,
scenario: ConnectorTestScenario,
) -> None:
"""Run `connection` acceptance tests."""
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
self.create_connector(scenario),
"check",
test_scenario=scenario,
)
conn_status_messages: list[AirbyteMessage] = [
msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS
] # noqa: SLF001 # Non-public API
assert len(conn_status_messages) == 1, (
f"Expected exactly one CONNECTION_STATUS message. Got: {result._messages}"
)

@classmethod
def get_connector_root_dir(cls) -> Path:
"""Get the root directory of the connector."""
for parent in cls.get_test_class_dir().parents:
if (parent / MANIFEST_YAML).exists():
return parent
if (parent / ACCEPTANCE_TEST_CONFIG).exists():
return parent
if parent.name == "airbyte_cdk":
break
# If we reach here, we didn't find the manifest file in any parent directory
# Check if the manifest file exists in the current directory
for parent in Path.cwd().parents:
if (parent / MANIFEST_YAML).exists():
return parent
if (parent / ACCEPTANCE_TEST_CONFIG).exists():
return parent
if parent.name == "airbyte_cdk":
break

raise FileNotFoundError(
"Could not find connector root directory relative to "
f"'{str(cls.get_test_class_dir())}' or '{str(Path.cwd())}'."
)

@classproperty
def acceptance_test_config_path(cls) -> Path:
"""Get the path to the acceptance test config file."""
result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
if result.exists():
return result

raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}")

@classmethod
def get_scenarios(
cls,
) -> list[ConnectorTestScenario]:
"""Get acceptance tests for a given category.

This has to be a separate function because pytest does not allow
parametrization of fixtures with arguments from the test class itself.
"""
category = "connection"
all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
if "acceptance_tests" not in all_tests_config:
raise ValueError(
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
f" Found only: {str(all_tests_config)}."
)
if category not in all_tests_config["acceptance_tests"]:
return []
if "tests" not in all_tests_config["acceptance_tests"][category]:
raise ValueError(f"No tests found for category {category}")

tests_scenarios = [
ConnectorTestScenario.model_validate(test)
for test in all_tests_config["acceptance_tests"][category]["tests"]
if "iam_role" not in test["config_path"]
]
connector_root = cls.get_connector_root_dir().absolute()
for test in tests_scenarios:
if test.config_path:
test.config_path = connector_root / test.config_path
if test.configured_catalog_path:
test.configured_catalog_path = connector_root / test.configured_catalog_path

return tests_scenarios
Loading
Loading