Skip to content

Commit aa2bcb6

Browse files
committed
implement daily export and load dags
1 parent 1d88201 commit aa2bcb6

37 files changed

+1741
-0
lines changed

airflow/cloudbuild.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
steps:
2+
- name: gcr.io/cloud-builders/gsutil
3+
dir: 'airflow'
4+
args: ["-m", "rsync", "-r", "-c", "-d", "./dags", "gs://${_BUCKET}/dags"]
5+
6+
substitutions:
7+
_BUCKET: your-bucket

airflow/dags/__init__.py

Whitespace-only changes.

airflow/dags/mainnet_export_dag.py

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from __future__ import print_function
2+
3+
from maticetl_airflow.build_export_dag import build_export_dag
4+
from maticetl_airflow.variables import read_export_dag_vars
5+
6+
# airflow DAG
7+
DAG = build_export_dag(
8+
dag_id='mainnet_export_dag',
9+
**read_export_dag_vars(
10+
var_prefix='mainnet_',
11+
export_schedule_interval='0 1 * * *',
12+
export_start_date='2019-04-22',
13+
export_max_active_runs=3,
14+
export_max_workers=10,
15+
)
16+
)

airflow/dags/mainnet_load_dag.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from __future__ import print_function
2+
3+
import logging
4+
5+
from maticetl_airflow.build_load_dag import build_load_dag
6+
from maticetl_airflow.variables import read_load_dag_vars
7+
8+
logging.basicConfig()
9+
logging.getLogger().setLevel(logging.DEBUG)
10+
11+
# airflow DAG
12+
DAG = build_load_dag(
13+
dag_id='mainnet_load_dag',
14+
chain='matic',
15+
**read_load_dag_vars(
16+
var_prefix='mainnet_',
17+
load_schedule_interval='0 2 * * *'
18+
)
19+
)

airflow/dags/maticetl_airflow/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import json
2+
import logging
3+
4+
from google.cloud import bigquery
5+
from google.api_core.exceptions import Conflict, NotFound, Forbidden
6+
7+
from maticetl_airflow.file_utils import read_file
8+
9+
10+
def create_dataset(client, dataset_name, project=None):
11+
dataset = client.dataset(dataset_name, project=project)
12+
try:
13+
logging.info('Creating new dataset ...')
14+
dataset = client.create_dataset(dataset)
15+
logging.info('New dataset created: ' + dataset_name)
16+
except Conflict as error:
17+
logging.info('Dataset already exists')
18+
except Forbidden as error:
19+
logging.info('User does not have bigquery.datasets.create permission in project')
20+
21+
return dataset
22+
23+
24+
def submit_bigquery_job(job, configuration):
25+
try:
26+
logging.info('Creating a job: ' + json.dumps(configuration.to_api_repr()))
27+
result = job.result()
28+
logging.info(result)
29+
assert job.errors is None or len(job.errors) == 0
30+
return result
31+
except Exception:
32+
logging.info(job.errors)
33+
raise
34+
35+
36+
def read_bigquery_schema_from_file(filepath):
37+
file_content = read_file(filepath)
38+
json_content = json.loads(file_content)
39+
return read_bigquery_schema_from_json_recursive(json_content)
40+
41+
42+
def read_bigquery_schema_from_json_recursive(json_schema):
43+
"""
44+
CAUTION: Recursive function
45+
This method can generate BQ schemas for nested records
46+
"""
47+
result = []
48+
for field in json_schema:
49+
if field.get('type').lower() == 'record' and field.get('fields'):
50+
schema = bigquery.SchemaField(
51+
name=field.get('name'),
52+
field_type=field.get('type', 'STRING'),
53+
mode=field.get('mode', 'NULLABLE'),
54+
description=field.get('description'),
55+
fields=read_bigquery_schema_from_json_recursive(field.get('fields'))
56+
)
57+
else:
58+
schema = bigquery.SchemaField(
59+
name=field.get('name'),
60+
field_type=field.get('type', 'STRING'),
61+
mode=field.get('mode', 'NULLABLE'),
62+
description=field.get('description')
63+
)
64+
result.append(schema)
65+
return result
66+
67+
68+
def query(bigquery_client, sql, destination=None, priority=bigquery.QueryPriority.INTERACTIVE):
69+
job_config = bigquery.QueryJobConfig()
70+
job_config.destination = destination
71+
job_config.priority = priority
72+
logging.info('Executing query: ' + sql)
73+
query_job = bigquery_client.query(sql, location='US', job_config=job_config)
74+
submit_bigquery_job(query_job, job_config)
75+
assert query_job.state == 'DONE'
76+
77+
78+
def create_view(bigquery_client, sql, table_ref):
79+
table = bigquery.Table(table_ref)
80+
table.view_query = sql
81+
82+
logging.info('Creating view: ' + json.dumps(table.to_api_repr()))
83+
84+
try:
85+
table = bigquery_client.create_table(table)
86+
except Conflict:
87+
# https://cloud.google.com/bigquery/docs/managing-views
88+
table = bigquery_client.update_table(table, ['view_query'])
89+
assert table.table_id == table_ref.table_id
90+
return table
91+
92+
93+
def does_table_exist(bigquery_client, table_ref):
94+
try:
95+
table = bigquery_client.get_table(table_ref)
96+
except NotFound:
97+
return False
98+
return True

0 commit comments

Comments
 (0)