Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: apply ruff lint rules #246

Draft
wants to merge 6 commits into
base: aj/ci/add-ruff-lint-rules
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
10 changes: 3 additions & 7 deletions airbyte_cdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
InternalConfig,
ResourceSchemaLoader,
check_config_against_spec_or_exit,
expand_refs,
expand_refs, # noqa: F401
split_config,
)
from .sources.utils.transform import TransformConfig, TypeTransformer
Expand All @@ -187,6 +187,7 @@
from .utils.spec_schema_transformations import resolve_refs
from .utils.stream_status_utils import as_airbyte_message


__all__ = [
# Availability strategy
"AvailabilityStrategy",
Expand All @@ -200,7 +201,6 @@
"ConcurrentSourceAdapter",
"Cursor",
"CursorField",
"DEFAULT_CONCURRENCY",
"EpochValueConcurrentStreamStateConverter",
"FinalStateCursor",
"IsoMillisConcurrentStreamStateConverter",
Expand Down Expand Up @@ -258,7 +258,6 @@
"RequestOption",
"RequestOptionType",
"Requester",
"ResponseStatus",
"SimpleRetriever",
"SinglePartitionRouter",
"StopConditionPaginationStrategyDecorator",
Expand All @@ -276,13 +275,11 @@
"DefaultBackoffException",
"default_backoff_handler",
"HttpAPIBudget",
"HttpAuthenticator",
"HttpRequestMatcher",
"HttpStream",
"HttpSubStream",
"LimiterSession",
"MovingWindowCallRatePolicy",
"MultipleTokenAuthenticator",
"Oauth2Authenticator",
"Rate",
"SingleUseRefreshTokenOauth2Authenticator",
Expand Down Expand Up @@ -317,7 +314,6 @@
# Stream
"IncrementalMixin",
"Stream",
"StreamData",
"package_name_from_class",
# Utils
"AirbyteTracedException",
Expand Down Expand Up @@ -354,5 +350,5 @@
third_choice=_dunamai.Version.from_any_vcs,
fallback=_dunamai.Version("0.0.0+dev"),
).serialize()
except:
except: # noqa: E722
__version__ = "0.0.0+dev"
2 changes: 2 additions & 0 deletions airbyte_cdk/cli/source_declarative_manifest/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
from airbyte_cdk.cli.source_declarative_manifest._run import run


__all__ = [
"run",
]
16 changes: 8 additions & 8 deletions airbyte_cdk/cli/source_declarative_manifest/_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
ConcurrentDeclarativeSource,
)
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.source import TState
from airbyte_cdk.sources.source import TState # noqa: TC001


class SourceLocalYaml(YamlDeclarativeSource):
Expand All @@ -56,7 +56,7 @@ def __init__(
catalog: ConfiguredAirbyteCatalog | None,
config: Mapping[str, Any] | None,
state: TState,
**kwargs: Any,
**kwargs: Any, # noqa: ANN401, ARG002
) -> None:
"""
HACK!
Expand All @@ -77,7 +77,7 @@ def __init__(
)


def _is_local_manifest_command(args: list[str]) -> bool:
def _is_local_manifest_command(args: list[str]) -> bool: # noqa: ARG001
# Check for a local manifest.yaml file
return Path("/airbyte/integration_code/source_declarative_manifest/manifest.yaml").exists()

Expand Down Expand Up @@ -111,7 +111,7 @@ def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
)
).decode()
)
raise error
raise error # noqa: TRY201


def handle_local_manifest_command(args: list[str]) -> None:
Expand Down Expand Up @@ -167,12 +167,12 @@ def create_declarative_source(
state: list[AirbyteStateMessage]
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
if config is None or "__injected_declarative_manifest" not in config:
raise ValueError(
raise ValueError( # noqa: TRY301
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
f"of the config but config only has keys: {list(config.keys() if config else [])}"
)
if not isinstance(config["__injected_declarative_manifest"], dict):
raise ValueError(
raise ValueError( # noqa: TRY004, TRY301
"Invalid config: `__injected_declarative_manifest` should be a dictionary, "
f"but got type: {type(config['__injected_declarative_manifest'])}"
)
Expand All @@ -181,7 +181,7 @@ def create_declarative_source(
config=config,
catalog=catalog,
state=state,
source_config=cast(dict[str, Any], config["__injected_declarative_manifest"]),
source_config=cast(dict[str, Any], config["__injected_declarative_manifest"]), # noqa: TC006
)
except Exception as error:
print(
Expand All @@ -201,7 +201,7 @@ def create_declarative_source(
)
).decode()
)
raise error
raise error # noqa: TRY201


def _parse_inputs_into_config_catalog_state(
Expand Down
15 changes: 8 additions & 7 deletions airbyte_cdk/config_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
)

import time
from collections.abc import MutableMapping
from copy import copy
from typing import Any, List, MutableMapping
from typing import Any

import orjson

Expand All @@ -27,7 +28,7 @@ def __init__(
self,
non_observed_mapping: MutableMapping[Any, Any],
observer: ConfigObserver,
update_on_unchanged_value: bool = True,
update_on_unchanged_value: bool = True, # noqa: FBT001, FBT002
) -> None:
non_observed_mapping = copy(non_observed_mapping)
self.observer = observer
Expand All @@ -38,25 +39,25 @@ def __init__(
non_observed_mapping[item] = ObservedDict(value, observer)

# Observe nested list of dicts
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, observer)
super().__init__(non_observed_mapping)

def __setitem__(self, item: Any, value: Any) -> None:
def __setitem__(self, item: Any, value: Any) -> None: # noqa: ANN401
"""Override dict.__setitem__ by:
1. Observing the new value if it is a dict
2. Call observer update if the new value is different from the previous one
"""
previous_value = self.get(item)
if isinstance(value, MutableMapping):
value = ObservedDict(value, self.observer)
if isinstance(value, List):
if isinstance(value, list):
for i, sub_value in enumerate(value):
if isinstance(sub_value, MutableMapping):
value[i] = ObservedDict(sub_value, self.observer)
super(ObservedDict, self).__setitem__(item, value)
super(ObservedDict, self).__setitem__(item, value) # noqa: UP008
if self.update_on_unchanged_value or value != previous_value:
self.observer.update()

Expand All @@ -77,7 +78,7 @@ def observe_connector_config(
non_observed_connector_config: MutableMapping[str, Any],
) -> ObservedDict:
if isinstance(non_observed_connector_config, ObservedDict):
raise ValueError("This connector configuration is already observed")
raise ValueError("This connector configuration is already observed") # noqa: TRY004
connector_config_observer = ConfigObserver()
observed_connector_config = ObservedDict(
non_observed_connector_config, connector_config_observer
Expand Down
26 changes: 13 additions & 13 deletions airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import os
import pkgutil
from abc import ABC, abstractmethod
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
from collections.abc import Mapping
from typing import Any, Generic, Protocol, TypeVar

import yaml

Expand All @@ -19,7 +20,7 @@
)


def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
def load_optional_package_file(package: str, filename: str) -> bytes | None:
"""Gets a resource from a package, returning None if it does not exist"""
try:
return pkgutil.get_data(package, filename)
Expand All @@ -45,29 +46,28 @@ def read_config(config_path: str) -> Mapping[str, Any]:
config = BaseConnector._read_json_file(config_path)
if isinstance(config, Mapping):
return config
else:
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)
raise ValueError(
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
)

@staticmethod
def _read_json_file(file_path: str) -> Any:
with open(file_path, "r") as file:
def _read_json_file(file_path: str) -> Any: # noqa: ANN401
with open(file_path) as file: # noqa: FURB101, PLW1514, PTH123
contents = file.read()

try:
return json.loads(contents)
except json.JSONDecodeError as error:
raise ValueError(
raise ValueError( # noqa: B904
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
)

@staticmethod
def write_config(config: TConfig, config_path: str) -> None:
with open(config_path, "w") as fh:
with open(config_path, "w") as fh: # noqa: FURB103, PLW1514, PTH123
fh.write(json.dumps(config))

def spec(self, logger: logging.Logger) -> ConnectorSpecification:
def spec(self, logger: logging.Logger) -> ConnectorSpecification: # noqa: ARG002
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
required to run this integration. By default, this will be loaded from a "spec.yaml" or a "spec.json" in the package root.
Expand All @@ -89,7 +89,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
try:
spec_obj = json.loads(json_spec)
except json.JSONDecodeError as error:
raise ValueError(
raise ValueError( # noqa: B904
f"Could not read json spec file: {error}. Please ensure that it is a valid JSON."
)
else:
Expand All @@ -115,7 +115,7 @@ class DefaultConnectorMixin:
def configure(
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
) -> Mapping[str, Any]:
config_path = os.path.join(temp_dir, "config.json")
config_path = os.path.join(temp_dir, "config.json") # noqa: PTH118
self.write_config(config, config_path)
return config

Expand Down
10 changes: 6 additions & 4 deletions airbyte_cdk/connector_builder/connector_builder_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#

import dataclasses
from collections.abc import Mapping
from datetime import datetime
from typing import Any, List, Mapping
from typing import Any

from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
from airbyte_cdk.models import (
Expand All @@ -23,6 +24,7 @@
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
DEFAULT_MAXIMUM_RECORDS = 100
Expand Down Expand Up @@ -69,7 +71,7 @@ def read_stream(
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
state: list[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
try:
Expand All @@ -90,7 +92,7 @@ def read_stream(
error = AirbyteTracedException.from_exception(
exc,
message=filter_secrets(
f"Error reading stream with config={config} and catalog={configured_catalog}: {str(exc)}"
f"Error reading stream with config={config} and catalog={configured_catalog}: {exc!s}"
),
)
return error.as_airbyte_message()
Expand All @@ -108,7 +110,7 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
)
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error resolving manifest: {str(exc)}"
exc, message=f"Error resolving manifest: {exc!s}"
)
return error.as_airbyte_message()

Expand Down
26 changes: 13 additions & 13 deletions airbyte_cdk/connector_builder/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@


import sys
from typing import Any, List, Mapping, Optional, Tuple
from collections.abc import Mapping
from typing import Any

import orjson

Expand All @@ -30,8 +31,8 @@


def get_config_and_catalog_from_args(
args: List[str],
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
args: list[str],
) -> tuple[str, Mapping[str, Any], ConfiguredAirbyteCatalog | None, Any]:
# TODO: Add functionality for the `debug` logger.
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
parsed_args = AirbyteEntrypoint.parse_args(args)
Expand Down Expand Up @@ -70,22 +71,21 @@ def handle_connector_builder_request(
source: ManifestDeclarativeSource,
command: str,
config: Mapping[str, Any],
catalog: Optional[ConfiguredAirbyteCatalog],
state: List[AirbyteStateMessage],
catalog: ConfiguredAirbyteCatalog | None,
state: list[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
if command == "resolve_manifest":
return resolve_manifest(source)
elif command == "test_read":
assert (
catalog is not None
), "`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
if command == "test_read":
assert catalog is not None, (
"`test_read` requires a valid `ConfiguredAirbyteCatalog`, got None."
)
return read_stream(source, config, catalog, state, limits)
else:
raise ValueError(f"Unrecognized command {command}.")
raise ValueError(f"Unrecognized command {command}.")


def handle_request(args: List[str]) -> str:
def handle_request(args: list[str]) -> str:
command, config, catalog, state = get_config_and_catalog_from_args(args)
limits = get_limits(config)
source = create_source(config, limits)
Expand All @@ -101,7 +101,7 @@ def handle_request(args: List[str]) -> str:
print(handle_request(sys.argv[1:]))
except Exception as exc:
error = AirbyteTracedException.from_exception(
exc, message=f"Error handling request: {str(exc)}"
exc, message=f"Error handling request: {exc!s}"
)
m = error.as_airbyte_message()
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
Loading
Loading