From 146c05438be0daa27bb29c00da244bcbd764f772 Mon Sep 17 00:00:00 2001 From: culonculon Date: Thu, 13 Jun 2024 14:10:25 +0900 Subject: [PATCH 1/2] [initial] learn-airflow --- dags/test.py | 38 ++++++ dags/youngjun.py | 105 +++++++++++++++ data/test.csv | 1 + docker-compose.yaml | 310 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 454 insertions(+) create mode 100644 dags/test.py create mode 100644 dags/youngjun.py create mode 100644 data/test.csv create mode 100644 docker-compose.yaml diff --git a/dags/test.py b/dags/test.py new file mode 100644 index 0000000..755449a --- /dev/null +++ b/dags/test.py @@ -0,0 +1,38 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +import pdb + +def upload_to_s3(filename: str, key: str, bucket_name: str) -> None: + pdb.set_trace() # + hook = S3Hook('aws_conn_id') + + files = hook.list_keys(bucket_name) + if files is not None: + print(f"Bucket {bucket_name} contains: {files}") + else: + print(f"Bucket {bucket_name} is empty or not accessible.") + + hook.load_file(filename=filename, + key=key, + bucket_name=bucket_name, + replace=True + ) + + +with DAG('upload_to_s3', + schedule_interval=None, + start_date=datetime(2022, 1, 1), + catchup=False + ) as dag: + upload = PythonOperator(task_id='upload', + python_callable=upload_to_s3, + op_kwargs={ + 'filename': '/opt/airflow/data/test.csv', + 'key': 'dataSource/test.csv', + 'bucket_name': 'dust-dag' + }) + upload \ No newline at end of file diff --git a/dags/youngjun.py b/dags/youngjun.py new file mode 100644 index 0000000..e5123ca --- /dev/null +++ b/dags/youngjun.py @@ -0,0 +1,105 @@ +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator +from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator +from airflow.utils.dates import days_ago +from airflow.models import Variable +import pandas as pd +import requests +import datetime +import os + +import pdb + +# DAG ⺻ +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, +} + +# DAG +with DAG( + 'api_to_s3_to_redshift', + default_args=default_args, + description='Fetch data from API, upload to S3, and load into Redshift', + schedule_interval=None, + start_date=days_ago(1), + catchup=False, +) as dag: + + # 1. API ȣϿ · ȯϴ Լ + def fetch_api_data(): + # ¥ YYYYMMDDHH (: 1ð û) + current_datetime = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime("%Y%m%d%H00") + + # API URL (json û) + api_url = f"http://openapi.seoul.go.kr:8088/4b746a725579756e35386955445a73/json/TimeAverageCityAir/1/100/{current_datetime}" + response = requests.get(api_url) + try: + response.raise_for_status() # HTTP ڵ Ȯ + data = response.json() + except requests.exceptions.HTTPError as http_err: + raise ValueError(f"HTTP error occurred: {http_err}") + except requests.exceptions.RequestException as req_err: + raise ValueError(f"Request exception occurred: {req_err}") + except ValueError as json_err: + raise ValueError(f"JSON decode error: {json_err}") + + if "TimeAverageCityAir" not in data or "row" not in data["TimeAverageCityAir"]: + raise ValueError("No data returned from API") + + items = data["TimeAverageCityAir"]["row"] + if not items: + raise ValueError("No data available for the requested date and time.") + + df = pd.DataFrame(items) + + # ÷ ERD ̸ + df.columns = [ + 'date', 'region_code', 'region_name', 'office_code', 'office_name', + 'dust_1h', 'dust_24h', 'ultradust', 'O3', 'NO2', 'CO', 'SO2' + ] + # UTF-8 ڵ CSV ڿ ȯ + csv_data = df.to_csv(index=False, encoding='utf-8-sig') + + # pdb.set_trace() + + # ۾ 丮 Ͽ + file_path = os.path.join(os.getcwd(), 'api_data.csv') + with open(file_path, 'w', encoding='utf-8-sig') as f: + f.write(csv_data) + + return file_path + + fetch_data = PythonOperator( + task_id='fetch_api_data', + python_callable=fetch_api_data, + ) + + # 2. · ȯ ͸ S3 ε + upload_to_s3 = LocalFilesystemToS3Operator( + task_id='upload_to_s3', + filename="{{ task_instance.xcom_pull(task_ids='fetch_api_data') }}", + dest_bucket='dust-dag', + dest_key='dataSource/api_data.csv', + aws_conn_id='aws_s3', + replace=True # ̹ ϴ  + ) + + # 3. S3 ͸ Redshift ̺ + load_to_redshift = S3ToRedshiftOperator( + task_id='load_to_redshift', + schema='yusuyeon678', + table='raw_data_test_youngjun', + s3_bucket='dust-dag', + s3_key='dataSource/api_data.csv', + copy_options=['csv', 'IGNOREHEADER 1'], + redshift_conn_id='redshift_test_dev', + aws_conn_id='aws_s3', + ) + + # Task + fetch_data >> upload_to_s3 >> load_to_redshift \ No newline at end of file diff --git a/data/test.csv b/data/test.csv new file mode 100644 index 0000000..3f867a9 --- /dev/null +++ b/data/test.csv @@ -0,0 +1 @@ +as,12,zx \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..fee8710 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,310 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.6.3 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +version: '3.8' +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.3} + # build: . + environment: + &airflow-common-env + AIRFLOW_VAR_DATA_DIR: /opt/airflow/data + + + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + # For backward compatibility, with Airflow <2.3 + AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'true' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-pandas requests finance-datareader snowflake-connector-python boto3 plotly pendulum apache-airflow-providers-snowflake} + + AIRFLOW_CONN_AWS_S3: ${AIRFLOW_CONN_S3_DEFAULT} + AIRFLOW_CONN_REDSHIFT_TEST_DEV: ${AIRFLOW_CONN_REDSHIFT_DEFAULT} + + + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data + + + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + image: redis:latest + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + test: + - "CMD-SHELL" + - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + function ver() { + printf "%04d%04d%04d%04d" $${1//./ } + } + airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version) + airflow_version_comparable=$$(ver $${airflow_version}) + min_airflow_version=2.2.0 + min_airflow_version_comparable=$$(ver $${min_airflow_version}) + if (( airflow_version_comparable < min_airflow_version_comparable )); then + echo + echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m" + echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!" + echo + exit 1 + fi + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins /sources/data + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins,data} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_UPGRADE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: \ No newline at end of file From ab9a23de738479bb8d3f96a7ba20666f39ef7b16 Mon Sep 17 00:00:00 2001 From: culonculon Date: Thu, 13 Jun 2024 14:42:13 +0900 Subject: [PATCH 2/2] [fix] error file fix --- dags/test.py | 2 +- dags/youngjun.py | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/dags/test.py b/dags/test.py index 755449a..35e1802 100644 --- a/dags/test.py +++ b/dags/test.py @@ -7,7 +7,7 @@ import pdb def upload_to_s3(filename: str, key: str, bucket_name: str) -> None: - pdb.set_trace() # + pdb.set_trace() # 디버깅 모드 진입 hook = S3Hook('aws_conn_id') files = hook.list_keys(bucket_name) diff --git a/dags/youngjun.py b/dags/youngjun.py index e5123ca..3f7a55d 100644 --- a/dags/youngjun.py +++ b/dags/youngjun.py @@ -11,7 +11,7 @@ import pdb -# DAG ⺻ +# DAG 기본 설정 default_args = { 'owner': 'airflow', 'depends_on_past': False, @@ -20,7 +20,7 @@ 'retries': 1, } -# DAG +# DAG 정의 with DAG( 'api_to_s3_to_redshift', default_args=default_args, @@ -30,16 +30,16 @@ catchup=False, ) as dag: - # 1. API ȣϿ · ȯϴ Լ + # 1. API 호출하여 데이터프레임 형태로 변환하는 함수 def fetch_api_data(): - # ¥ YYYYMMDDHH (: 1ð û) + # 현재 날짜를 YYYYMMDDHH 형식으로 얻기 (예: 1시간 전 데이터 요청) current_datetime = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime("%Y%m%d%H00") - # API URL (json û) + # API URL (json 형식으로 요청) api_url = f"http://openapi.seoul.go.kr:8088/4b746a725579756e35386955445a73/json/TimeAverageCityAir/1/100/{current_datetime}" response = requests.get(api_url) try: - response.raise_for_status() # HTTP ڵ Ȯ + response.raise_for_status() # HTTP 응답 상태 코드 확인 data = response.json() except requests.exceptions.HTTPError as http_err: raise ValueError(f"HTTP error occurred: {http_err}") @@ -57,17 +57,17 @@ def fetch_api_data(): df = pd.DataFrame(items) - # ÷ ERD ̸ + # 컬럼명을 ERD의 영어 이름으로 변경 df.columns = [ 'date', 'region_code', 'region_name', 'office_code', 'office_name', 'dust_1h', 'dust_24h', 'ultradust', 'O3', 'NO2', 'CO', 'SO2' ] - # UTF-8 ڵ CSV ڿ ȯ + # 데이터프레임을 UTF-8 인코딩으로 CSV 형식의 문자열로 변환 csv_data = df.to_csv(index=False, encoding='utf-8-sig') # pdb.set_trace() - # ۾ 丮 Ͽ + # 현재 작업 디렉토리를 사용하여 파일 저장 file_path = os.path.join(os.getcwd(), 'api_data.csv') with open(file_path, 'w', encoding='utf-8-sig') as f: f.write(csv_data) @@ -79,17 +79,17 @@ def fetch_api_data(): python_callable=fetch_api_data, ) - # 2. · ȯ ͸ S3 ε + # 2. 데이터프레임 형태로 변환한 데이터를 S3에 업로드 upload_to_s3 = LocalFilesystemToS3Operator( task_id='upload_to_s3', filename="{{ task_instance.xcom_pull(task_ids='fetch_api_data') }}", dest_bucket='dust-dag', dest_key='dataSource/api_data.csv', aws_conn_id='aws_s3', - replace=True # ̹ ϴ  + replace=True # 파일이 이미 존재하는 경우 덮어쓰기 ) - # 3. S3 ͸ Redshift ̺ + # 3. S3 데이터를 Redshift 테이블에 적재 load_to_redshift = S3ToRedshiftOperator( task_id='load_to_redshift', schema='yusuyeon678', @@ -101,5 +101,5 @@ def fetch_api_data(): aws_conn_id='aws_s3', ) - # Task + # Task 순서 정의 fetch_data >> upload_to_s3 >> load_to_redshift \ No newline at end of file