Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/preset_cli/api/clients/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ class OwnershipType(TypedDict):


class SupersetClient: # pylint: disable=too-many-public-methods

"""
A client for running queries against Superset.
"""
Expand Down Expand Up @@ -360,9 +359,11 @@ def get_data( # pylint: disable=too-many-locals, too-many-arguments

# and order bys
processed_orderbys = [
(orderby, not order_desc)
if orderby in metric_names
else (convert_to_adhoc_metric(orderby), not order_desc)
(
(orderby, not order_desc)
if orderby in metric_names
else (convert_to_adhoc_metric(orderby), not order_desc)
)
for orderby in (order_by or [])
]

Expand Down
Empty file.
90 changes: 90 additions & 0 deletions src/preset_cli/cli/superset/sync/dj/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
A command to sync DJ cubes into a Superset instance.
"""

from __future__ import annotations

import logging
from uuid import UUID

import click
from datajunction import DJClient
from yarl import URL

from preset_cli.api.clients.superset import SupersetClient
from preset_cli.cli.superset.sync.dj.lib import sync_cube
from preset_cli.lib import split_comma

_logger = logging.getLogger(__name__)


@click.command()
@click.option(
"--database-uuid",
required=True,
help="Database UUID",
)
@click.option(
"--schema",
required=True,
help="Schema where virtual dataset will be created",
)
@click.option(
"--cubes",
callback=split_comma,
help="Comma-separated list of cubes to sync",
)
@click.option(
"dj_url",
"--dj-url",
required=True,
help="DJ URL",
default="http://localhost:8000",
)
@click.option(
"dj_username",
"--dj-username",
required=True,
help="DJ username",
default="dj",
)
@click.option(
"dj_password",
"--dj-password",
required=True,
help="DJ password",
default="dj",
)
@click.option("--external-url-prefix", default="", help="Base URL for resources")
@click.pass_context
def dj( # pylint: disable=invalid-name,too-many-arguments
ctx: click.core.Context,
database_uuid: str,
schema: str,
cubes: list[str],
dj_url: str,
dj_username: str,
dj_password: str,
external_url_prefix: str = "",
) -> None:
"""
Sync DJ cubes to Superset.
"""
superset_auth = ctx.obj["AUTH"]
superset_url = URL(ctx.obj["INSTANCE"])
superset_client = SupersetClient(superset_url, superset_auth)

dj_client = DJClient(dj_url)
dj_client.basic_login(dj_username, dj_password)

base_url = URL(external_url_prefix) if external_url_prefix else None

for cube in cubes:
sync_cube(
UUID(database_uuid),
schema,
dj_client,
superset_client,
cube,
base_url,
)
154 changes: 154 additions & 0 deletions src/preset_cli/cli/superset/sync/dj/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""
Helper functions for DJ sync.
"""

import json
from typing import Any
from uuid import UUID

from datajunction import DJClient
from yarl import URL

from preset_cli.api.clients.superset import SupersetClient
from preset_cli.api.operators import OneToMany


def sync_cube( # pylint: disable=too-many-arguments
database_uuid: UUID,
schema: str,
dj_client: DJClient,
superset_client: SupersetClient,
cube: str,
base_url: URL | None,
) -> None:
"""
Sync a DJ cube to a Superset virtual dataset.
"""
response = dj_client._session.post( # pylint: disable=protected-access
"/graphql",
json={
"query": """
query FindCubes($names:[String!], $tags: [String!]) {
findNodes(names: $names, tags: $tags, nodeTypes: [CUBE]) {
name
current {
description
displayName
cubeMetrics {
name
description
extractedMeasures {
derivedExpression
}
}
cubeDimensions {
name
}
}
}
}
""",
"variables": {"names": [cube]},
},
)
payload = response.json()
description = payload["data"]["findNodes"][0]["current"]["description"]
columns = [
dimension["name"]
for dimension in payload["data"]["findNodes"][0]["current"]["cubeDimensions"]
]
metrics = [
{
"metric_name": metric["name"],
"expression": metric["extractedMeasures"]["derivedExpression"],
"description": metric["description"],
}
for metric in payload["data"]["findNodes"][0]["current"]["cubeMetrics"]
]

response = dj_client._session.post( # pylint: disable=protected-access
"/graphql",
json={
"query": """
query MeasuresSql($metrics: [String!]!, $dimensions: [String!]!) {
measuresSql(
cube: {metrics: $metrics, dimensions: $dimensions, filters: []}
preaggregate: true
) {
sql
}
}
""",
"variables": {
"metrics": [metric["metric_name"] for metric in metrics],
"dimensions": columns,
},
},
)
payload = response.json()
sql = payload["data"]["measuresSql"][0]["sql"]

database = get_database(superset_client, database_uuid)
dataset = get_or_create_dataset(superset_client, database, schema, cube, sql)

superset_client.update_dataset(
dataset["id"],
override_columns=True,
metrics=[],
)

superset_client.update_dataset(
dataset["id"],
override_columns=False,
metrics=metrics,
description=description,
is_managed_externally=True,
external_url=base_url / "nodes" / cube if base_url else None,
extra=json.dumps(
{
"certification": {
"certified_by": "DJ",
"details": "This table is created by DJ.",
},
},
),
sql=sql,
)


def get_database(superset_client: SupersetClient, uuid: UUID) -> dict[str, Any]:
"""
Get database info given its UUID.
"""
databases = superset_client.get_databases(uuid=str(uuid))
if not databases:
raise ValueError(f"Database with UUID {uuid} not found in Superset.")

return databases[0]


def get_or_create_dataset(
superset_client: SupersetClient,
database: dict[str, Any],
schema: str,
cube: str,
sql: str,
) -> dict[str, Any]:
"""
Get or create a dataset in Superset.
"""
if existing := superset_client.get_datasets(
database=OneToMany(database["id"]), # type: ignore
schema=schema,
table_name=cube,
):
dataset = existing[0]
return superset_client.get_dataset(dataset["id"])

return superset_client.create_dataset(
database=database["id"],
catalog=None,
schema=schema,
table_name=cube,
sql=sql,
)
2 changes: 2 additions & 0 deletions src/preset_cli/cli/superset/sync/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import click

from preset_cli.cli.superset.sync.dbt.command import dbt_cloud, dbt_core
from preset_cli.cli.superset.sync.dj.command import dj
from preset_cli.cli.superset.sync.native.command import native


Expand All @@ -16,6 +17,7 @@ def sync() -> None:


sync.add_command(native)
sync.add_command(dj)
sync.add_command(dbt_cloud, name="dbt-cloud")
sync.add_command(dbt_core, name="dbt-core")
# for backwards compatibility
Expand Down
Empty file.
61 changes: 61 additions & 0 deletions tests/cli/superset/sync/dj/command_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Tests for the DJ sync command.
"""

# pylint: disable=invalid-name

from uuid import UUID

from click.testing import CliRunner
from pytest_mock import MockerFixture
from yarl import URL

from preset_cli.cli.superset.main import superset_cli


def test_dj_command(mocker: MockerFixture) -> None:
"""
Tests for the sync command.
"""
SupersetClient = mocker.patch(
"preset_cli.cli.superset.sync.dj.command.SupersetClient",
)
UsernamePasswordAuth = mocker.patch(
"preset_cli.cli.superset.main.UsernamePasswordAuth",
)
DJClient = mocker.patch("preset_cli.cli.superset.sync.dj.command.DJClient")
sync_cube = mocker.patch("preset_cli.cli.superset.sync.dj.command.sync_cube")

runner = CliRunner()
result = runner.invoke(
superset_cli,
[
"https://superset.example.org/",
"sync",
"dj",
"--cubes",
"default.repair_orders_cube",
"--database-uuid",
"a1ad7bd5-b1a3-4d64-afb1-a84c2f4d7715",
"--schema",
"schema",
],
catch_exceptions=False,
)
assert result.exit_code == 0

SupersetClient.assert_called_once_with(
URL("https://superset.example.org/"),
UsernamePasswordAuth(),
)
DJClient.assert_called_once_with("http://localhost:8000")
DJClient().basic_login.assert_called_once_with("dj", "dj")

sync_cube.assert_called_once_with(
UUID("a1ad7bd5-b1a3-4d64-afb1-a84c2f4d7715"),
"schema",
DJClient(),
SupersetClient(),
"default.repair_orders_cube",
None,
)
Loading
Loading