Skip to content

Commit

Permalink
resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
canbekley committed Jan 6, 2025
2 parents 4ba7554 + 82a405e commit 95cb496
Show file tree
Hide file tree
Showing 9 changed files with 661 additions and 123 deletions.
25 changes: 23 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,29 @@
### Unreleased
### Next Version

#### New Features
* [ClickHouse indexes](https://clickhouse.com/docs/en/optimize/sparse-primary-indexes) are now fully supported for `table` materialization.
The index config should be added to the model config. for instance:
```python
{{ config(
materialized='%s',
indexes=[{
'name': 'your_index_name',
'definition': 'your_column TYPE minmax GRANULARITY 2'
}]
) }}
```

### Release [1.8.7], 2025-01-05

### New Features
* Added support for [refreshable materialized view](https://clickhouse.com/docs/en/materialized-view/refreshable-materialized-view) ([#401](https://github.com/ClickHouse/dbt-clickhouse/pull/401))

### Improvement
* Avoid potential data loss by using `CREATE OR REPLACE DICTIONARY` to atomically update a dictionary ([#393](https://github.com/ClickHouse/dbt-clickhouse/pull/393))
* Removed support in python 3.8 as it is no longer supported by dbt ([#402](https://github.com/ClickHouse/dbt-clickhouse/pull/402)

Avoid potential data loss by using `CREATE OR REPLACE DICTIONARY` to atomically update a dictionary (#393)
### Bug Fixes
* Fix a minor bug related to validating existence of an old hanging mv ([#396]())

### Release [1.8.6], 2024-12-05

Expand Down
378 changes: 273 additions & 105 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
@dataclass
class ClickHouseConfig(AdapterConfig):
engine: str = 'MergeTree()'
force_on_cluster: Optional[bool] = False
order_by: Optional[Union[List[str], str]] = 'tuple()'
partition_by: Optional[Union[List[str], str]] = None
sharding_key: Optional[Union[List[str], str]] = 'rand()'
Expand Down
14 changes: 7 additions & 7 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ def create_from(
# If the database is set, and the source schema is "defaulted" to the source.name, override the
# schema with the database instead, since that's presumably what's intended for clickhouse
schema = relation_config.schema

cluster = quoting.credentials.cluster or ''
can_on_cluster = None
# We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages
if relation_config.resource_type == NODE_TYPE_SOURCE:
if schema == relation_config.source_name and relation_config.database:
schema = relation_config.database

if cluster and str(relation_config.config.get("force_on_cluster")).lower() == "true":
can_on_cluster = True

else:
cluster = quoting.credentials.cluster if quoting.credentials.cluster else ''
materialized = (
relation_config.config.materialized if relation_config.config.materialized else ''
)
engine = (
relation_config.config.get('engine') if relation_config.config.get('engine') else ''
)
materialized = relation_config.config.get('materialized') or ''
engine = relation_config.config.get('engine') or ''
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)

return cls.create(
Expand Down
122 changes: 116 additions & 6 deletions dbt/include/clickhouse/macros/materializations/materialized_view.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

{%- set target_relation = this.incorporate(type='table') -%}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{%- set refreshable_clause = refreshable_mv_clause() -%}


{# look for an existing relation for the target table and create backup relations if necessary #}
{%- set existing_relation = load_cached_relation(this) -%}
Expand Down Expand Up @@ -97,7 +99,7 @@
select 1
{%- endcall %}
{% endif %}
{{ clickhouse__create_mvs(existing_relation, cluster_clause, views) }}
{{ clickhouse__create_mvs(existing_relation, cluster_clause, refreshable_clause, views) }}
{% else %}
{{ log('Replacing existing materialized view ' + target_relation.name) }}
{{ clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) }}
Expand Down Expand Up @@ -144,16 +146,18 @@
{% endif %}
{% endcall %}
{%- set cluster_clause = on_cluster_clause(relation) -%}
{%- set refreshable_clause = refreshable_mv_clause() -%}
{%- set mv_relation = relation.derivative('_mv', 'materialized_view') -%}
{{ clickhouse__create_mvs(relation, cluster_clause, views) }}
{{ clickhouse__create_mvs(relation, cluster_clause, refreshable_clause, views) }}
{%- endmacro %}

{% macro clickhouse__drop_mv(mv_relation, cluster_clause) -%}
drop view if exists {{ mv_relation }} {{ cluster_clause }}
{%- endmacro %}u

{% macro clickhouse__create_mv(mv_relation, target_table, cluster_clause, sql) -%}
{% macro clickhouse__create_mv(mv_relation, target_table, cluster_clause, refreshable_clause, sql) -%}
create materialized view if not exists {{ mv_relation }} {{ cluster_clause }}
{{ refreshable_clause }}
to {{ target_table }}
as {{ sql }}
{%- endmacro %}
Expand All @@ -167,18 +171,19 @@
{% endfor %}
{%- endmacro %}

{% macro clickhouse__create_mvs(target_relation, cluster_clause, views) -%}
{% macro clickhouse__create_mvs(target_relation, cluster_clause, refreshable_clause, views) -%}
{% for view, view_sql in views.items() %}
{%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%}
{% call statement('create existing mv: ' + view) -%}
{{ clickhouse__create_mv(mv_relation, target_relation, cluster_clause, view_sql) }};
{{ clickhouse__create_mv(mv_relation, target_relation, cluster_clause, refreshable_clause, view_sql) }};
{% endcall %}
{% endfor %}
{%- endmacro %}

{% macro clickhouse__replace_mv(target_relation, existing_relation, intermediate_relation, backup_relation, sql, views) %}
{# drop existing materialized view while we recreate the target table #}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{%- set refreshable_clause = refreshable_mv_clause() -%}
{{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }}

{# recreate the target table #}
Expand All @@ -189,6 +194,111 @@
{{ adapter.rename_relation(intermediate_relation, target_relation) }}

{# now that the target table is recreated, we can finally create our new view #}
{{ clickhouse__create_mvs(target_relation, cluster_clause, views) }}
{{ clickhouse__create_mvs(target_relation, cluster_clause, refreshable_clause, views) }}
{% endmacro %}

{% macro refreshable_mv_clause() %}
{%- if config.get('refreshable') is not none -%}

{% set refreshable_config = config.get('refreshable') %}
{% if refreshable_config is not mapping %}
{% do exceptions.raise_compiler_error(
"The 'refreshable' configuration must be defined as a dictionary. Please review the docs for more details."
) %}
{% endif %}

{% set refresh_interval = refreshable_config.get('interval', none) %}
{% set refresh_randomize = refreshable_config.get('randomize', none) %}
{% set depends_on = refreshable_config.get('depends_on', none) %}
{% set depends_on_validation = refreshable_config.get('depends_on_validation', false) %}
{% set append = refreshable_config.get('append', false) %}

{% if not refresh_interval %}
{% do exceptions.raise_compiler_error(
"The 'refreshable' configuration is defined, but 'interval' is missing. "
~ "This is required to create a refreshable materialized view."
) %}
{% endif %}

{% if refresh_interval %}
REFRESH {{ refresh_interval }}
{# This is a comment to force a new line between REFRESH and RANDOMIZE clauses #}
{%- if refresh_randomize -%}
RANDOMIZE FOR {{ refresh_randomize }}
{%- endif -%}
{% endif %}

{% if depends_on %}
{% set depends_on_list = [] %}

{% if depends_on is string %}
{% set depends_on_list = [depends_on] %}
{% elif depends_on is iterable %}
{% set temp_list = depends_on_list %}
{%- for dep in depends_on %}
{% if dep is string %}
{% do temp_list.append(dep) %}
{% else %}
{% do exceptions.raise_compiler_error(
"The 'depends_on' configuration must be either a string or a list of strings."
) %}
{% endif %}
{% endfor %}
{% set depends_on_list = temp_list %}
{% else %}
{% do exceptions.raise_compiler_error(
"The 'depends_on' configuration must be either a string or a list of strings."
) %}
{% endif %}

{% if depends_on_validation and depends_on_list | length > 0 %}
{%- for dep in depends_on_list %}
{% do validate_refreshable_mv_existence(dep) %}
{%- endfor %}
{% endif %}

DEPENDS ON {{ depends_on_list | join(', ') }}
{% endif %}

{%- if append -%}
APPEND
{%- endif -%}

{%- endif -%}
{% endmacro %}


{% macro validate_refreshable_mv_existence(mv) %}
{{ log(mv + ' was recognized as a refreshable mv dependency, checking its existence') }}
{% set default_database = "default" %}

{%- set parts = mv.split('.') %}
{%- if parts | length == 2 %}
{%- set database = parts[0] %}
{%- set table = parts[1] %}
{%- else %}
{%- set database = default_database %}
{%- set table = parts[0] %}
{%- endif %}

{%- set condition = "database='" + database + "' and view='" + table + "'" %}

{% set query %}
select count(*)
from system.view_refreshes
where {{ condition }}
{% endset %}

{% set tables_result = run_query(query) %}
{{ log(tables_result.columns[0].values()[0]) }}
{% if tables_result.columns[0].values()[0] > 0 %}
{{ log('MV ' + mv + ' exists.') }}
{% else %}
{% do exceptions.raise_compiler_error(
'No existing MV found matching MV: ' + mv
) %}
{% endif %}
{% endmacro %}



14 changes: 13 additions & 1 deletion dbt/include/clickhouse/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@
{% if config.get('projections')%}
{{ projection_statement(relation) }}
{% endif %}

{% if config.get('indexes') %}
{{ indexes_statement(relation) }}
{% endif %}

{{ clickhouse__insert_into(relation, sql, has_contract) }}
{%- endif %}
Expand All @@ -169,6 +171,16 @@
{%- endfor %}
{%- endmacro %}

{% macro indexes_statement(relation) %}
{%- set indexes = config.get('indexes', default=[]) -%}

{%- for index in indexes %}
{% call statement('add_indexes') %}
ALTER TABLE {{ relation }} ADD INDEX {{ index.get('name') }} {{ index.get('definition') }}
{%endcall %}
{%- endfor %}
{%- endmacro %}

{% macro create_table_or_empty(temporary, relation, sql, has_contract) -%}
{%- set sql_header = config.get('sql_header', none) -%}

Expand Down
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ def _dbt_clickhouse_version():
'clickhouse-driver>=0.2.6',
'setuptools>=0.69',
],
python_requires=">=3.8",
python_requires=">=3.9",
platforms='any',
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: Apache Software License',
'Operating System :: Microsoft :: Windows',
'Operating System :: MacOS :: MacOS X',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,105 @@ def test_base(self, project):
assert len(results) == 1

self.assert_total_count_correct(project)


class TestMergeTreeForceClusterMaterialization(BaseSimpleMaterializations):
'''Test MergeTree materialized view is created across a cluster using the
`force_on_cluster` config argument
'''

@pytest.fixture(scope="class")
def models(self):
config_force_on_cluster = """
{{ config(
engine='MergeTree',
materialized='materialized_view',
force_on_cluster='true'
)
}}
"""

return {
"force_on_cluster.sql": config_force_on_cluster + model_base,
"schema.yml": schema_base_yml,
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"schema.yml": base_seeds_schema_yml,
"base.csv": seeds_base_csv,
}

def assert_total_count_correct(self, project):
'''Check if table is created on cluster'''
cluster = project.test_config['cluster']

# check if data is properly distributed/replicated
table_relation = relation_from_name(project.adapter, "force_on_cluster")
# ClickHouse cluster in the docker-compose file
# under tests/integration is configured with 3 nodes
host_count = project.run_sql(
f"select count(host_name) as host_count from system.clusters where cluster='{cluster}'",
fetch="one",
)
assert host_count[0] > 1

table_count = project.run_sql(
f"select count() From clusterAllReplicas('{cluster}', system.tables) "
f"where database='{table_relation.schema}' and name='{table_relation.identifier}'",
fetch="one",
)

assert table_count[0] == 3

mv_count = project.run_sql(
f"select count() From clusterAllReplicas('{cluster}', system.tables) "
f"where database='{table_relation.schema}' and name='{table_relation.identifier}_mv'",
fetch="one",
)

assert mv_count[0] == 3

@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_base(self, project):
# cluster setting must exist
cluster = project.test_config['cluster']
assert cluster

# seed command
results = run_dbt(["seed"])
# seed result length
assert len(results) == 1

# run command
results = run_dbt()
# run result length
assert len(results) == 1

# names exist in result nodes
check_result_nodes_by_name(results, ["force_on_cluster"])

# check relation types
expected = {
"base": "table",
"replicated": "table",
}
check_relation_types(project.adapter, expected)

relation = relation_from_name(project.adapter, "base")
# table rowcount
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 10

# relations_equal
self.assert_total_count_correct(project)

# run full refresh
results = run_dbt(['--debug', 'run', '--full-refresh'])
# run result length
assert len(results) == 1

self.assert_total_count_correct(project)
Loading

0 comments on commit 95cb496

Please sign in to comment.