Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Demo wkf_airflow_metadata #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions __main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,26 @@
if __name__ == "__main__":
result = defs.get_job_def(job_name).execute_in_process(
run_config=RunConfig(
ops={
"catfacts": ApiConfig(max_length=70, limit=20),
},
#ops={
# "catfacts": ApiConfig(max_length=70, limit=20),
#},
resources={
"io_manager": S3Config(
s3_bucket="konpyutaika-product-catfacts-staging",
s3_prefix="catfacts",
),
"dataframe_io_manager": S3Config(
"stream_string_io_manager": S3Config(
s3_bucket="konpyutaika-product-catfacts-staging",
s3_prefix="catfacts/parquet",
s3_prefix="local",
),
"s3_path_io_manager": S3Config(
s3_bucket="konpyutaika-product-catfacts-staging",
s3_prefix="local",
)
#"dataframe_io_manager": S3Config(
# s3_bucket="konpyutaika-product-catfacts-staging",
# s3_prefix="catfacts/parquet",
#)
}
)
)
10 changes: 6 additions & 4 deletions catfacts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

from dagster import Definitions

from .assets import cats_assets
from .jobs import catfacts_job
from .assets import cats_assets, airflow_metadata_assets, tableau_assets
from .sensors.tableau_sensors import my_asset_sensor
from .jobs import catfacts_job, tableau_job
from .resources import RESOURCES_LOCAL, RESOURCES_STAGING, RESOURCES_PROD

all_assets = [*cats_assets]
all_assets = [*cats_assets, *airflow_metadata_assets, *tableau_assets]

resources_by_deployment_name = {
"prod": RESOURCES_PROD,
Expand All @@ -17,5 +18,6 @@
defs = Definitions(
assets=all_assets,
resources=resources_by_deployment_name[os.getenv("ENV", "dev")],
jobs=[catfacts_job],
jobs=[catfacts_job, tableau_job],
sensors=[my_asset_sensor]
)
6 changes: 6 additions & 0 deletions catfacts/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from dagster import load_assets_from_package_module

from . import cats
from . import airflow_metadata
from . import tableau

CATS = "cats"
TABLEAU = "tableau"
AIRFLOW_METADATA = "airflow_metadata"

cats_assets = load_assets_from_package_module(package_module=cats, group_name=CATS)
airflow_metadata_assets = load_assets_from_package_module(package_module=airflow_metadata, group_name=AIRFLOW_METADATA)
tableau_assets = load_assets_from_package_module(package_module=tableau, group_name=TABLEAU)
Empty file.
41 changes: 41 additions & 0 deletions catfacts/assets/airflow_metadata/dagrun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from dagster import asset, AssetIn
import psycopg2
import json
import pandas.io.sql as psql
from typing import List, Tuple



@asset(io_manager_key="stream_string_io_manager")
def dagrun():
"""
requeter dagrun airflow from metadata db & store on S3
:return:
"""
def _myiterator():
query = """
SELECT dag_id,execution_date,state,run_id,external_trigger,end_date,start_date,data_interval_end,run_type,last_scheduling_decision,queued_at
from airflow.dag_run
limit 5
"""
with psycopg2.connect("xxxxxx") as conn:
for record in psql.read_sql_query(query, conn, chunksize=10000):
dataframe = record.fillna("").astype(str).replace(r"\.0$", "", regex=True).replace({"NaT": None})
list_records = dataframe.to_dict(orient="records")
stringify = map(lambda r: json.dumps(r, ensure_ascii=False), list_records)
yield from stringify
return _myiterator


@asset(ins={"dagrun":AssetIn(key="dagrun",input_manager_key="s3_path_io_manager")},
required_resource_keys={"redshift"})
def redshift_table(context,dagrun):
query = f"""
COPY admin.dag_run
FROM '{dagrun}'
CREDENTIALS 'aws_iam_role=arn:aws:iam::301420533736:role/RedshiftDatafoundation'
JSON 'auto' DATEFORMAT 'auto' TIMEFORMAT 'auto' TRUNCATECOLUMNS
REGION 'eu-central-1';
"""
context.resources.redshift.execute_query(query=query)
return {"schema": "admin", "table": "dag_run"}
Empty file.
5 changes: 5 additions & 0 deletions catfacts/assets/tableau/tableau.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dagster import asset, AssetIn

@asset(ins={"redshift_table":AssetIn(key="redshift_table")})
def refresh_tableau(redshift_table):
print(f'refresh after {redshift_table} received')
8 changes: 6 additions & 2 deletions catfacts/jobs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from dagster import AssetSelection, define_asset_job

from .assets import CATS
from .assets import CATS, AIRFLOW_METADATA, TABLEAU


job_name = "catfacts_job"
catfacts_job = define_asset_job(
name=job_name,
selection=AssetSelection.groups(CATS),
selection=AssetSelection.groups(AIRFLOW_METADATA),
tags={
"job": job_name
},
)
tableau_job = define_asset_job(
name="tableau_job",
selection=AssetSelection.groups(TABLEAU)
)
17 changes: 17 additions & 0 deletions catfacts/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,28 @@

from .api_resource import ApiResource
from .s3_pd_to_parquet_io_manager import s3_pd_to_parquet_io_manager
from .s3_string_io_manager import s3_string_io_manager
from .s3_stream_string_io_manager import s3_stream_string_io_manager, s3_path_io_manager
from dagster_aws.redshift.resources import redshift_resource




RESOURCES_LOCAL = {
"io_manager": s3_pickle_io_manager,
"stream_string_io_manager": s3_stream_string_io_manager,
"s3_path_io_manager": s3_path_io_manager,
"dataframe_io_manager": s3_pd_to_parquet_io_manager,
"s3": s3_resource,
"redshift": redshift_resource.configured(
{
"host": "xxxxxxx",
"port": 5439,
"user": "xxxxxx",
"password": "xxxxxxx",
"database": "xxxxxxx",
}
),
"catfacts_client": ApiResource(host="catfact.ninja", protocol="https", endpoint="facts")
}

Expand Down
141 changes: 141 additions & 0 deletions catfacts/resources/s3_stream_string_io_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from io import BytesIO,StringIO
from typing import Optional, Union, Sequence,Callable, Iterable as _Iterable
import pandas as pd
from dagster import (
InputContext,
OutputContext,
MetadataValue,
IOManager,
StringSource,
Field,
_check as check,
io_manager
)
import pyarrow.parquet as pq
import pyarrow as pa
import os

def to_sized_file_stream(line_stream: _Iterable[str], file_mb_size: int = 100) -> _Iterable[str]:
"""
Assembles a stream of lines into fixed file stream
"""
file = StringIO()
file_bytes_size = 10**6 * file_mb_size
has_returned_file = False
for line in line_stream:
if file.tell():
file.write("\n")
file.write(line)
if file.tell() >= file_bytes_size:
has_returned_file = True
yield file.getvalue()
file = StringIO()
if file.tell() > 0 or not has_returned_file:
yield file.getvalue()


class S3StreamStringIOManager(IOManager):
def __init__(
self,
s3_bucket,
s3_session,
s3_prefix=None,
):
self.s3_bucket = check.str_param(s3_bucket, "s3_bucket")
self.s3_prefix = check.opt_str_param(s3_prefix, "s3_prefix")
self.s3 = s3_session
self.s3.list_objects(Bucket=self.s3_bucket, Prefix=self.s3_prefix, MaxKeys=1)

def _get_path(self, context: Union[InputContext, OutputContext]) -> str:
path: Sequence[str]
if context.has_asset_key:
path = context.get_asset_identifier()
else:
path = ["storage", *context.get_identifier]

return "/".join([self.s3_prefix, *path])

def _rm_object(self, key):
self.s3.delete_object(Bucket=self.s3_bucket, Key=key)

def _has_object(self, key):
try:
self.s3.get_object(Bucket=self.s3_bucket, Key=key)
found_object = True
except self.s3.exceptions.NoSuchKey:
found_object = False

return found_object

def _uri_for_key(self, key):
return "s3://" + self.s3_bucket + "/" + "{key}".format(key=key)

def load_input(self, context: "InputContext") -> Optional[str]:
if context.dagster_type.typing_type == type(None):
return None

key = self._get_path(context)
context.log.debug(f"Loading S3 object from: {self._uri_for_key(key)}")
return self.s3.get_object(Bucket=self.s3_bucket, Key=key)["Body"].read().decode('utf-8')

def handle_output(self, context: "OutputContext", obj: Callable) -> None:
if context.dagster_type.typing_type == type(None):
return None

key = self._get_path(context)
path = self._uri_for_key(key)

context.log.debug(f"Writing S3 object at: {path}")

if self._has_object(key):
context.log.warning(f"Removing existing S3 key: {key}")
self._rm_object(key)

response = self.s3.create_multipart_upload(Bucket=self.s3_bucket, Key=key)
lines = to_sized_file_stream(obj(),6)
upload_id = response['UploadId']
etags = []
for part_number, line in enumerate(lines, start=1):
response = self.s3.upload_part(Bucket=self.s3_bucket, Key=key, UploadId=upload_id,PartNumber=part_number, Body=line)
etags.append({'PartNumber': part_number, 'ETag': response['ETag']})

self.s3.complete_multipart_upload(Bucket=self.s3_bucket, Key=key, UploadId=upload_id, MultipartUpload={'Parts': etags})
context.add_output_metadata({"uri": MetadataValue.path(path)})

class S3PathIOManager(S3StreamStringIOManager):

def load_input(self, context: "InputContext") -> Optional[str]:
asset_key = context.asset_key
event_log_entry = context.step_context.instance.get_latest_materialization_event(asset_key)
metadata = event_log_entry.dagster_event.event_specific_data.materialization.metadata
return metadata.get("uri").value




@io_manager(
config_schema={
"s3_bucket": Field(StringSource),
"s3_prefix": Field(StringSource, is_required=False, default_value="dagster"),
},
required_resource_keys={"s3"},
)
def s3_stream_string_io_manager(init_context):
s3_session = init_context.resources.s3
s3_bucket = init_context.resource_config["s3_bucket"]
s3_prefix = init_context.resource_config.get("s3_prefix") # s3_prefix is optional
return S3StreamStringIOManager(s3_bucket, s3_session, s3_prefix=s3_prefix)


@io_manager(
config_schema={
"s3_bucket": Field(StringSource),
"s3_prefix": Field(StringSource, is_required=False, default_value="dagster"),
},
required_resource_keys={"s3"},
)
def s3_path_io_manager(init_context):
s3_session = init_context.resources.s3
s3_bucket = init_context.resource_config["s3_bucket"]
s3_prefix = init_context.resource_config.get("s3_prefix") # s3_prefix is optional
return S3PathIOManager(s3_bucket, s3_session, s3_prefix=s3_prefix)
Loading