Skip to content

Commit

Permalink
Contracts4 (#2116)
Browse files Browse the repository at this point in the history
Added Atlan plugin package that can push contracts to Atlan
  • Loading branch information
tombaeyens authored Jun 28, 2024
1 parent ea74f4d commit d377e91
Show file tree
Hide file tree
Showing 44 changed files with 1,257 additions and 798 deletions.
7 changes: 7 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,10 @@ ATHENA_SCHEMA=***

# Create and test checks with views instead of tables
TEST_WITH_VIEWS=false

CONTRACTS_POSTGRES_HOST=***
CONTRACTS_POSTGRES_USERNAME=***
CONTRACTS_POSTGRES_PASSWORD=***
CONTRACTS_POSTGRES_DATABASE=***

ATLAN_API_KEY=***
6 changes: 5 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# This file is autogenerated by pip-compile with Python 3.10
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile dev-requirements.in
Expand Down Expand Up @@ -59,6 +59,8 @@ identify==2.5.36
# via pre-commit
idna==3.7
# via requests
importlib-metadata==7.1.0
# via build
iniconfig==2.0.0
# via pytest
mypy-extensions==1.0.0
Expand Down Expand Up @@ -168,6 +170,8 @@ wheel==0.43.0
# via
# -r dev-requirements.in
# pip-tools
zipp==3.19.2
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# pip
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
./soda/vertica
./soda/teradata
./soda/contracts
./soda/atlan
16 changes: 16 additions & 0 deletions soda/atlan/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env python

from setuptools import find_namespace_packages, setup

package_name = "soda-core-atlan"
package_version = "3.3.8"
description = "Soda Core Atlan Package"

requires = [f"soda-core=={package_version}", "pyatlan>=2.2.4, <3.0"]

setup(
name=package_name,
version=package_version,
install_requires=requires,
packages=find_namespace_packages(include=["soda*"]),
)
65 changes: 65 additions & 0 deletions soda/atlan/soda/atlan/atlan_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from json import dumps

from pyatlan.errors import AtlanError

from soda.contracts.contract import ContractResult
from soda.contracts.impl.logs import Logs
from soda.contracts.impl.plugin import Plugin
from soda.contracts.impl.yaml_helper import YamlFile


class AtlanPlugin(Plugin):

def __init__(self, logs: Logs, plugin_name: str, plugin_yaml_files: list[YamlFile]):
super().__init__(logs, plugin_name, plugin_yaml_files)
atlan_configuration_dict: dict = self.plugin_yaml_files[0].dict
self.atlan_api_key: str = atlan_configuration_dict["atlan_api_key"]
self.atlan_base_url: str = atlan_configuration_dict["atlan_base_url"]

def process_contract_results(self, contract_result: ContractResult) -> None:
error_messages: list[str] = []
atlan_qualified_name: str = contract_result.data_source_yaml_dict.get("atlan_qualified_name")
if not isinstance(atlan_qualified_name, str):
error_messages.append("atlan_qualified_name is required in a data source configuration yaml")

database_name: str = contract_result.contract.database_name
if not isinstance(database_name, str):
error_messages.append("database is required in the contract yaml")

schema_name: str = contract_result.contract.schema_name
if not isinstance(schema_name, str):
error_messages.append("schema is required in the contract yaml")

dataset_name: str = contract_result.contract.dataset_name
dataset_atlan_qualified_name: str = f"{atlan_qualified_name}/{database_name}/{schema_name}/{dataset_name}"

if error_messages:
error_messages_text = ", ".join(error_messages)
self.logs.error(
f"Atlan integration cannot be activated as not all "
f"integration requirements are met: {error_messages_text}"
)
return None

contract_dict: dict = contract_result.contract.contract_file.dict.copy()
contract_dict.setdefault("type", "Table")
contract_dict.setdefault("status", "DRAFT")
contract_dict.setdefault("kind", "DataContract")

contract_json_str: str = dumps(contract_dict)

self.logs.info(f"Pushing contract to Atlan: {dataset_atlan_qualified_name}")

from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import DataContract

client = AtlanClient(base_url=self.atlan_base_url, api_key=self.atlan_api_key)
contract = DataContract.creator( #
asset_qualified_name=dataset_atlan_qualified_name,
contract_json=contract_json_str,
)
try:
response = client.asset.save(contract)
self.logs.info(str(response))
except AtlanError as e:
self.logs.error(f"Atlan integration error: {e}")
23 changes: 23 additions & 0 deletions soda/atlan/tests/atlan/setup/init_contracts_schema_in_postgres.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env bash

SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
ENV_FILE_PATH=$( cd -- "$( dirname -- "${SCRIPT_DIR}/../.." )" &> /dev/null && pwd )
echo Loading environment vars from: "$ENV_FILE_PATH"

. "$ENV_FILE_PATH/.env"

if [ -z $CONTRACTS_POSTGRES_HOST ] | [ -z $CONTRACTS_POSTGRES_USERNAME ] | [ -z $CONTRACTS_POSTGRES_PASSWORD ] | [ -z $CONTRACTS_POSTGRES_DATABASE ]; then
echo CONTRACTS_POSTGRES_* variables not defined. Copy .env.example to .env and fill in the variables
exit 1
else
echo DB host $CONTRACTS_POSTGRES_HOST
echo DB user $CONTRACTS_POSTGRES_USERNAME
fi

INIT_SQL_FILE_PATH="$SCRIPT_DIR/init_contracts_schema_in_postgres.sql"

echo Initializing contracts DB with SQL file: "$INIT_SQL_FILE_PATH"

CONTRACTS_POSTGRES_URL="postgresql://$CONTRACTS_POSTGRES_USERNAME:$CONTRACTS_POSTGRES_PASSWORD@$CONTRACTS_POSTGRES_HOST/$CONTRACTS_POSTGRES_DATABASE"

psql "$CONTRACTS_POSTGRES_URL" -a -f "$INIT_SQL_FILE_PATH"
15 changes: 15 additions & 0 deletions soda/atlan/tests/atlan/setup/init_contracts_schema_in_postgres.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CREATE SCHEMA IF NOT EXISTS contracts AUTHORIZATION CURRENT_USER;

DROP TABLE IF EXISTS contracts.students;

CREATE TABLE contracts.students (
id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255),
age INT
);

INSERT INTO contracts.students VALUES
('1', 'John Doe', 30),
('2', 'Jack Black', 40),
('3', NULL, 50)
;
73 changes: 73 additions & 0 deletions soda/atlan/tests/atlan/test_atlan_contract_push_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from textwrap import dedent

import pytest
from dotenv import load_dotenv
from helpers.fixtures import project_root_dir

from soda.contracts.contract_verification import (
ContractVerification,
ContractVerificationResult,
)


@pytest.mark.skip(
"Takes too long to be part of the local development test suite & depends on Atlan & Soda Cloud services"
)
def test_atlan_contract_push_plugin():
load_dotenv(f"{project_root_dir}/.env", override=True)

data_source_yaml_str: str = dedent(
"""
name: postgres_ds
type: postgres
atlan_qualified_name: default/postgres/1718112025
connection:
host: ${CONTRACTS_POSTGRES_HOST}
database: ${CONTRACTS_POSTGRES_DATABASE}
username: ${CONTRACTS_POSTGRES_USERNAME}
password: ${CONTRACTS_POSTGRES_PASSWORD}
schema: contracts
"""
)

contract_yaml_str: str = dedent(
"""
data_source: postgres_ds
database: ${CONTRACTS_POSTGRES_DATABASE}
schema: contracts
dataset: students
columns:
- name: id
data_type: varchar
- name: name
data_type: varchar
- name: age
data_type: integer
"""
)

soda_cloud_yaml_str: str = dedent(
"""
api_key_id: ${DEV_SODADATA_IO_API_KEY_ID}
api_key_secret: ${DEV_SODADATA_IO_API_KEY_SECRET}
"""
)

atlan_yaml_str: str = dedent(
"""
plugin: atlan
atlan_api_key: ${ATLAN_API_KEY}
atlan_base_url: https://soda-partner.atlan.com
"""
)

contract_verification_result: ContractVerificationResult = (
ContractVerification.builder()
.with_contract_yaml_str(contract_yaml_str)
.with_data_source_yaml_str(data_source_yaml_str)
.with_soda_cloud_yaml_str(soda_cloud_yaml_str)
.with_plugin_yaml_str(atlan_yaml_str)
.execute()
)

contract_verification_result.assert_ok()
4 changes: 2 additions & 2 deletions soda/contracts/adr/02_contract_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ contract verification parameter or some other way.

A wrapper around the DBAPI connection is needed to handle the SQL differences.
It's anticipated that initially the implementation will be based on the existing Soda Core
Warehouse and Scan. But that later there will be direct connection implementations
DataSource and Scan. But that later there will be direct connection implementations
for each database.

The returned connection is immediately open.

```python
import logging
from soda.contracts.impl.warehouse import Connection, SodaException
from soda.contracts.impl.data_source import Connection, SodaException
from soda.contracts.contract import Contract, ContractResult
from soda.contracts.impl.soda_cloud import SodaCloud

Expand Down
2 changes: 1 addition & 1 deletion soda/contracts/adr/04_link_contract_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ then we can add a simpler API like
```python
import logging
from soda.contracts.contract import Contracts
from soda.contracts.impl.warehouse import SodaException
from soda.contracts.impl.data_source import SodaException

try:
Contracts.execute(["postgres_localhost_db/schemas/CSTMR_DATA_PROD/datasets/*.sdc.yml"])
Expand Down
6 changes: 3 additions & 3 deletions soda/contracts/adr/09_contract_check_identities.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ checks are not unique, users must use the name property to ensure uniqueness.
Checks automatically generate a unique identity if you have max 1 check in each scope.
A scope is defined by
* warehouse
* data_source
* schema
* dataset
* column
Expand All @@ -36,13 +36,13 @@ there are multiple checks with the same check type. To keep those unique, a `na
The contract check identity will be a consistent hash (soda/contracts/soda/contracts/impl/consistent_hash_builder.py) based on:

For schema checks:
* warehouse
* data_source
* schema
* dataset
* check type (=schema)

For all other checks:
* warehouse
* data_source
* schema
* dataset
* column
Expand Down
Loading

0 comments on commit d377e91

Please sign in to comment.