Skip to content

Commit 4a11742

Browse files
Merge pull request #204 from databrickslabs/Issue_22
Issue 22: DAB's DEMO
2 parents 31daa0b + 96a922e commit 4a11742

13 files changed

+1732
-6
lines changed

demo/README.md

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
3. [Append FLOW Autoloader Demo](#append-flow-autoloader-file-metadata-demo): Write to same target from multiple sources using [dlt.append_flow](https://docs.databricks.com/en/delta-live-tables/flows.html#append-flows) and adding [File metadata column](https://docs.databricks.com/en/ingestion/file-metadata-column.html)
55
4. [Append FLOW Eventhub Demo](#append-flow-eventhub-demo): Write to same target from multiple sources using [dlt.append_flow](https://docs.databricks.com/en/delta-live-tables/flows.html#append-flows) and adding [File metadata column](https://docs.databricks.com/en/ingestion/file-metadata-column.html)
66
5. [Silver Fanout Demo](#silver-fanout-demo): This demo showcases the implementation of fanout architecture in the silver layer.
7-
6. [Apply Changes From Snapshot Demo](#Apply-changes-from-snapshot-demo): This demo showcases the implementation of ingesting from snapshots in bronze layer
8-
7. [Lakeflow Declarative Pipelines Sink Demo](#dlt-sink-demo): This demo showcases the implementation of write to external sinks like delta and kafka
9-
10-
The source argument is optional for the demos.
7+
6. [Apply Changes From Snapshot Demo](#apply-changes-from-snapshot-demo): This demo showcases the implementation of ingesting from snapshots in bronze layer
8+
7. [Lakeflow Declarative Pipelines Sink Demo](#lakeflow-declarative-pipelines-sink-demo): This demo showcases the implementation of write to external sinks like delta and kafka
9+
8. [DAB Demo](#dab-demo): This demo showcases how to use Databricks Assets Bundles with dlt-meta
1110

1211

1312
# DAIS 2023 DEMO
@@ -224,7 +223,6 @@ This demo will perform following tasks:
224223
225224
![silver_fanout_dlt.png](../docs/static/images/silver_fanout_dlt.png)
226225
227-
228226
# Apply Changes From Snapshot Demo
229227
- This demo will perform following steps
230228
- Showcase onboarding process for apply changes from snapshot pattern([snapshot-onboarding.template](https://github.com/databrickslabs/dlt-meta/blob/main/demo/conf/snapshot-onboarding.template))
@@ -311,4 +309,81 @@ This demo will perform following tasks:
311309
```
312310
![dlt_demo_sink.png](../docs/static/images/dlt_demo_sink.png)
313311
![dlt_delta_sink.png](../docs/static/images/dlt_delta_sink.png)
314-
![dlt_kafka_sink.png](../docs/static/images/dlt_kafka_sink.png)
312+
![dlt_kafka_sink.png](../docs/static/images/dlt_kafka_sink.png)
313+
314+
315+
# DAB Demo
316+
317+
## Overview
318+
This demo showcases how to use Databricks Asset Bundles (DABs) with DLT-Meta:
319+
* This demo will perform following steps
320+
* * Create dlt-meta schema's for dataflowspec and bronze/silver layer
321+
* * Upload nccessary resources to unity catalog volume
322+
* * Create DAB files with catalog, schema, file locations populated
323+
* * Deploy DAB to databricks workspace
324+
* * Run onboarding usind DAB commands
325+
* * Run Bronze/Silver Pipelines using DAB commands
326+
* * Demo examples will showcase fan-out pattern in silver layer
327+
* * Demo example will show case custom transfomations for bronze/silver layers
328+
* * Adding custom columns and metadata to Bronze tables
329+
* * Implementing SCD Type 1 to Silver tables
330+
* * Applying expectations to filter data in Silver tables
331+
332+
## Prerequisites
333+
1. Launch Command Prompt
334+
335+
2. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)
336+
337+
3. ```commandline
338+
git clone https://github.com/databrickslabs/dlt-meta.git
339+
```
340+
341+
4. ```commandline
342+
cd dlt-meta
343+
```
344+
5. Set python environment variable into terminal
345+
```commandline
346+
dlt_meta_home=$(pwd)
347+
```
348+
```commandline
349+
export PYTHONPATH=$dlt_meta_home
350+
```
351+
352+
6. Generate DAB resources and set up schemas:
353+
This command will:
354+
- Generate DAB configuration files
355+
- Create DLT-Meta schemas
356+
- Upload necessary files to volumes
357+
```commandline
358+
python demo/generate_dabs_resources.py --source=cloudfiles --uc_catalog_name=<your_catalog_name> --profile=<your_profile>
359+
```
360+
> Note: If you don't specify `--profile`, you'll be prompted for your Databricks workspace URL and access token.
361+
362+
7. Deploy and run the DAB bundle:
363+
- Navigate to the DAB directory
364+
```commandline
365+
cd demo/dabs
366+
```
367+
368+
- Validate the bundle configuration
369+
```commandline
370+
databricks bundle validate --profile=<your_profile>
371+
```
372+
373+
- Deploy the bundle to dev environment
374+
```commandline
375+
databricks bundle deploy --target dev --profile=<your_profile>
376+
```
377+
378+
- Run the onboarding job
379+
```commandline
380+
databricks bundle run onboard_people -t dev --profile=<your_profile>
381+
```
382+
383+
- Execute the pipelines
384+
```commandline
385+
databricks bundle run execute_pipelines_people -t dev --profile=<your_profile>
386+
```
387+
388+
![dab_onboarding_job.png](../docs/static/images/dab_onboarding_job.png)
389+
![dab_dlt_pipelines.png](../docs/static/images/dab_dlt_pipelines.png)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"expect_or_drop": {
3+
"valid_id": "id IS NOT NULL"
4+
}
5+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
[
2+
{
3+
"data_flow_id": "007",
4+
"data_flow_group": "my_people",
5+
6+
"source_system": "Manual_Download",
7+
"source_format": "cloudFiles",
8+
9+
"source_details": {
10+
"source_path_dev": "{uc_volume_path}/demo/dabs/resources/data/people",
11+
"source_path_prod": "{uc_volume_path}/demo/dabs/resources/data/people",
12+
"source_metadata": {
13+
"include_autoloader_metadata_column": "True",
14+
"autoloader_metadata_col_name": "source_metadata",
15+
"select_metadata_cols": {
16+
"input_file_name": "_metadata.file_name",
17+
"input_file_path": "_metadata.file_path"
18+
}
19+
}
20+
},
21+
"bronze_catalog_dev": "{uc_catalog_name}",
22+
"bronze_database_dev": "{bronze_schema}",
23+
"bronze_catalog_prod": "{uc_catalog_name}",
24+
"bronze_database_prod": "{bronze_schema}",
25+
"bronze_table": "people_bronze",
26+
"bronze_quarantine_table": "people_bronze_quarantine",
27+
"bronze_cluster_by": ["country"],
28+
29+
"bronze_table_properties" : {
30+
"pipelines.autoOptimize.managed": "true",
31+
"pipelines.reset.allowed": "true",
32+
"delta.autoOptimize.optimizeWrite": "true",
33+
"delta.autoOptimize.autoCompact": "true",
34+
"delta.tuneFileSizesForRewrites": "true",
35+
"delta.columnMapping.mode": "name",
36+
"delta.checkpointRetentionDuration": "30 days",
37+
"delta.deletedFileRetentionDuration": "30 days",
38+
"delta.logRetentionDuration": "30 days"
39+
},
40+
41+
"bronze_reader_options": {
42+
"cloudFiles.format": "csv",
43+
"cloudFiles.inferColumnTypes": "true",
44+
"cloudFiles.rescuedDataColumn": "_rescued_data"
45+
},
46+
"silver_catalog_dev": "{uc_catalog_name}",
47+
"silver_database_dev": "{silver_schema}",
48+
"silver_catalog_prod": "{uc_catalog_name}",
49+
"silver_database_prod": "{silver_schema}",
50+
"silver_table": "people_silver",
51+
"silver_cluster_by": ["country"],
52+
53+
"silver_table_properties" : {
54+
"pipelines.autoOptimize.managed": "true",
55+
"pipelines.reset.allowed": "true",
56+
"delta.autoOptimize.optimizeWrite": "true",
57+
"delta.autoOptimize.autoCompact": "true",
58+
"delta.tuneFileSizesForRewrites": "true",
59+
"delta.columnMapping.mode": "name",
60+
"delta.checkpointRetentionDuration": "30 days",
61+
"delta.deletedFileRetentionDuration": "30 days",
62+
"delta.logRetentionDuration": "30 days"
63+
},
64+
65+
"silver_transformation_json_dev": "{uc_volume_path}/demo/dabs/conf/silver_queries_people.json",
66+
"silver_data_quality_expectations_json_dev": "{uc_volume_path}/demo/dabs/conf/dqe/dqe_silver_people.json",
67+
68+
"silver_transformation_json_prod": "{uc_volume_path}/demo/dabs/conf/silver_queries_people.json",
69+
"silver_data_quality_expectations_json_prod": "{uc_volume_path}/demo/dabs/conf/dqe/dqe_silver_people.json",
70+
71+
"silver_cdc_apply_changes": {
72+
"keys": [
73+
"id"
74+
],
75+
"sequence_by": "id",
76+
"scd_type": "1"
77+
}
78+
}
79+
80+
]
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
[
2+
{
3+
"data_flow_id": "0070",
4+
"data_flow_group": "my_people",
5+
"bronze_catalog_dev": "{uc_catalog_name}",
6+
"bronze_database_dev": "{bronze_schema}",
7+
"bronze_catalog_prod": "{uc_catalog_name}",
8+
"bronze_database_prod": "{bronze_schema}",
9+
"bronze_table": "people_bronze",
10+
11+
"silver_catalog_dev": "{uc_catalog_name}",
12+
"silver_database_dev": "{silver_schema}",
13+
"silver_catalog_prod": "{uc_catalog_name}",
14+
"silver_database_prod": "{silver_schema}",
15+
"silver_table": "people_silver_sal_below_50K",
16+
17+
"silver_transformation_json_dev": "{uc_volume_path}/demo/dabs/conf/silver_queries_people.json",
18+
"silver_transformation_json_prod": "{uc_volume_path}/demo/dabs/conf/silver_queries_people.json"
19+
},
20+
{
21+
"data_flow_id": "0071",
22+
"data_flow_group": "my_people",
23+
"bronze_catalog_dev": "{uc_catalog_name}",
24+
"bronze_database_dev": "{bronze_schema}",
25+
"bronze_catalog_prod": "{uc_catalog_name}",
26+
"bronze_database_prod": "{bronze_schema}",
27+
"bronze_table": "people_bronze",
28+
"silver_catalog_dev": "{uc_catalog_name}",
29+
"silver_database_dev": "{silver_schema}",
30+
"silver_catalog_prod": "{uc_catalog_name}",
31+
"silver_database_prod": "{silver_schema}",
32+
"silver_table": "people_silver_sal_above_50K",
33+
34+
"silver_transformation_json_dev": "{uc_volume_path}/demo/dabs/conf/silver_queries_people.json",
35+
"silver_transformation_json_prod": "{uc_volume_path}/demo/dabs/conf/silver_queries_people.json"
36+
}
37+
]
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
[
2+
{
3+
"target_table": "people_silver",
4+
"select_exp": [
5+
"id",
6+
"md5(concat_ws('-',firstName, middleName, lastName,gender,birthDate,ssn)) as row_id",
7+
"firstName as first_name",
8+
"middleName as middle_name",
9+
"lastName as last_name",
10+
"gender as gender",
11+
"birthDate as birth_date",
12+
"ssn as ssn",
13+
"salary as salary",
14+
"country as country",
15+
"_rescued_data"
16+
]
17+
},
18+
{
19+
"target_table": "people_silver_sal_above_50K",
20+
"select_exp": [
21+
"id",
22+
"md5(concat_ws('-',firstName, middleName, lastName,gender,birthDate,ssn)) as row_id",
23+
"concat(firstName,' ',middleName,' ',lastName) as full_name",
24+
"gender as gender",
25+
"birthDate as birth_date",
26+
"ssn as ssn",
27+
"salary as salary",
28+
"country as country",
29+
"_rescued_data"
30+
],
31+
"where_clause": ["salary > 50000"]
32+
},
33+
{
34+
"target_table": "people_silver_sal_below_50K",
35+
"select_exp": [
36+
"id",
37+
"md5(concat_ws('-',firstName, middleName, lastName,gender,birthDate,ssn)) as row_id",
38+
"concat(firstName,' ',middleName,' ',lastName) as full_name",
39+
"gender as gender",
40+
"birthDate as birth_date",
41+
"ssn as ssn",
42+
"salary as salary",
43+
"country as country",
44+
"_rescued_data"
45+
],
46+
"where_clause": ["salary <= 50000"]
47+
}
48+
]
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "a67ad808",
7+
"metadata": {
8+
"vscode": {
9+
"languageId": "plaintext"
10+
}
11+
},
12+
"outputs": [],
13+
"source": [
14+
"%pip install dlt-meta"
15+
]
16+
},
17+
{
18+
"cell_type": "code",
19+
"execution_count": null,
20+
"id": "0d59a07d",
21+
"metadata": {
22+
"vscode": {
23+
"languageId": "plaintext"
24+
}
25+
},
26+
"outputs": [],
27+
"source": [
28+
"\n",
29+
"from pyspark.sql import DataFrame\n",
30+
"from pyspark.sql.functions import lit\n",
31+
"from pyspark.sql.functions import current_date\n",
32+
"\n",
33+
"def custom_transform_func_test(input_df, _) -> DataFrame:\n",
34+
" \n",
35+
" if layer == \"bronze\":\n",
36+
" dummy_param = spark.conf.get(\"dummy_param\",None)\n",
37+
" else:\n",
38+
" dummy_param = \"Test NA\"\n",
39+
"\n",
40+
" return (input_df\n",
41+
" .withColumn('last_updated_on', current_date())\n",
42+
" .withColumn('some_dummy_from_task_param', lit(dummy_param) )\n",
43+
" )"
44+
]
45+
},
46+
{
47+
"cell_type": "code",
48+
"execution_count": null,
49+
"id": "b3a86db7",
50+
"metadata": {
51+
"vscode": {
52+
"languageId": "plaintext"
53+
}
54+
},
55+
"outputs": [],
56+
"source": [
57+
"\n",
58+
"layer = spark.conf.get(\"layer\", None)\n",
59+
"\n",
60+
"from src.dataflow_pipeline import DataflowPipeline\n",
61+
"DataflowPipeline.invoke_dlt_pipeline(spark, layer, bronze_custom_transform_func=custom_transform_func_test,\n",
62+
"silver_custom_transform_func=custom_transform_func_test\n",
63+
")\n"
64+
]
65+
},
66+
{
67+
"cell_type": "code",
68+
"execution_count": null,
69+
"id": "1d0734f4",
70+
"metadata": {
71+
"vscode": {
72+
"languageId": "plaintext"
73+
}
74+
},
75+
"outputs": [],
76+
"source": []
77+
}
78+
],
79+
"metadata": {
80+
"language_info": {
81+
"name": "python"
82+
}
83+
},
84+
"nbformat": 4,
85+
"nbformat_minor": 5
86+
}

0 commit comments

Comments
 (0)