diff --git a/src/SQLBasedCDCDataIngestion/.gitignore b/src/SQLBasedCDCDataIngestion/.gitignore new file mode 100644 index 00000000..383cdd50 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/.gitignore @@ -0,0 +1,10 @@ +*.swp +package-lock.json +__pycache__ +.pytest_cache +.env +*.egg-info + +# CDK asset staging directory +.cdk.staging +cdk.out diff --git a/src/SQLBasedCDCDataIngestion/README.md b/src/SQLBasedCDCDataIngestion/README.md new file mode 100644 index 00000000..660847f6 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/README.md @@ -0,0 +1,292 @@ +# SQLBasedCDCDataIngestion + +Traditionally populating a data warehouse involved running expensive queries overnight, using sophisticated tools and learning domain-specific programming languages to extract, transform and process information at scale. Often, the complexity of this task required to bring in-demand data engineering skills and/or expensive proprietary tools before even starting the data warehousing journey. + +[Amazon Redshift](https://aws.amazon.com/redshift/) allows you to bring structured and unstructured data from multiple sources like relational databases and [Data Lakes based on Amazon S3](https://aws.amazon.com/products/storage/data-lake-storage/) to gain new insights, create reports and dashboards. Furthermore, the rich [SQL interface of Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/dg/c_redshift-sql.html) enables you to perform a wide variety of transactions and complex series of tasks on very large datasets. All of this happening within your data warehouse. + +In this project we show you how, using only the SQL interface of Amazon Redshift, you can build a pipeline that loads and transforms data from a relational database that uses the [Change Data Capture](https://en.wikipedia.org/wiki/Change_data_capture) (CDC) technique to enable data extraction at scale. Therefore, you no longer need to spend time and effort learning how to use domain-specific programming languages or tools. You can accelerate the journey to generate valuable business insights while leveraging the skillset in your existing database team. + +See the AWS architecture diagram below for an overview of a sample solution that you can implement in your own AWS account. + +![Architecture diagram](redshiftcdc_architecture.png) + +The implementation of this solution consists of five steps: + +1. Setup the data source with a sample application. +2. Load the data to S3 and replicate on-going changes (CDC). +3. Discover the Data Lake and populate the catalog with [AWS Glue](https://aws.amazon.com/glue/). +4. Schedule and run the [ELT](https://en.wikipedia.org/wiki/ELT) process in Amazon Redshift. +5. Unload the resulting data to the Optimized zone of the Data Lake. + +Before going over each step in detail, let’s deploy the resources from the sample architecture. + +## Deployment + +1. [AWS Cloud9](https://aws.amazon.com/cloud9/) is a cloud-based integrated development environment (IDE) that provides terminal access with just a browser. Sign in to the [AWS Cloud 9 web console](https://console.aws.amazon.com/cloud9/) and [create an environment](https://docs.aws.amazon.com/cloud9/latest/user-guide/tutorial-create-environment.html) using Amazon Linux 2 as its base platform. +2. [Open your AWS Cloud9 environment](https://docs.aws.amazon.com/cloud9/latest/user-guide/open-environment.html). On the menu bar (at the top of the AWS Cloud9 IDE), choose **Window**, **New Terminal** (or use an existing terminal window). +3. In the terminal window, clone the GitHub repository. + +``` +git clone https://github.com/awslabs/amazon-redshift-utils.git +cd src/SQLBasedCDCDataIngestion +``` + +1. Open the file `app.py` and adjust the `user_name` parameter to your current IAM user name and `email_address` to your email address. Save your changes. + +``` +# your user name +user_name = "administrator" +# your user_name for notifications +email_address="youremail@yourdomain.com" +``` + +1. Run the deployment script. This will trigger an application built with the [AWS Cloud Development Kit (CDK)](https://aws.amazon.com/cdk/) that will provision the required resources of the solution. This action should be completed in about 10 minutes. + +``` +./deploy.sh +``` + +## Walkthrough + +### Setup the data source with a sample application + +Now let’s see what happened in the previous step. The first element that you deployed is a relational database built with [Amazon Relational Database Service (RDS) for PostgreSQL](https://aws.amazon.com/rds/) that serves as the main data source of the solution. The database is configured with the [prerequisites](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html#CHAP_Source.PostgreSQL.Prerequisites) to use it as a source for [AWS Database Migration Service](https://aws.amazon.com/dms/) (DMS). + +Then, in order to generate a flow of database transactions to simulate the regular workload of a running application, you deployed an [Amazon EC2](https://aws.amazon.com/ec2) instance hosting a [pgbench](https://www.postgresql.org/docs/11/pgbench.html) performance testing utility that runs every minute, during periods of 30 seconds (`-T 30`), and simulates transactions from 10 clients (`-c 10`). This is performed with a sequence of SQL statements running against for 4 tables: + +* pgbench_branches +* pgbench_tellers +* pgbench_accounts +* pgbench_history + +If you want to customize the parameters that control the workload generated by pgbench, you can use [AWS System Manager Session Manager](https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-sessions-start.html) to quickly and securely [connect to the EC2 instance](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/session-manager.html). The utility is configured in the operative system scheduler as shown below. + +``` +sudo cat /etc/cron.d/pgbench_cron +*/2 * * * * root pgbench -c 10 -T 30 -h your_rds.rds.amazonaws.com -p 5432 -U postgres test +``` + +### Load the data to S3 and replicate on-going changes (CDC) + +In order to store the data from the application into the Data Lake, first you deployed S3 buckets corresponding to the Raw zone and the Optimized zone. Next, you setup a replication instance with [AWS Data Migration Service (DMS)](https://aws.amazon.com/dms/) that is linked to a set of source and target endpoints and hosts two migration tasks: + + +1. **fullload**: Migrates the data from all the database tables, at a particular point in time, to the Raw S3 bucket under the `full/` prefix. It adds an additional column that contains the migration timestamp of each row. +2. **cdcload**: Captures on-going record changes (Inserts, Updates and Deletes) from the database tables and replicates them to the Raw S3 bucket under the `cdc/` prefix. It adds two additional columns: One containing the timestamp of the row change and a second one containing the type of transaction operation applied, like `I` (insert), `U` (update), or `D` (delete). + +This AWS DMS setup is highly resilient and self–healing. In case of interruption, it automatically restarts the process and continues the replication of changes to the target S3 endpoint from the value of its latest checkpoint. + +You can see the status of the migration tasks by opening the [AWS DMS Console](https://aws.amazon.com/dms/) and choosing the **Tasks** option of the navigation pane. A table showing the summary of the existing migration tasks will appear. +### Discover the Data Lake and populate the catalog with AWS Glue + +All the changes happening in the source database are captured by DMS and replicated into the Raw S3 bucket using the [Apache Parquet](https://parquet.apache.org/) file format. Parquet facilitates metadata discovery through it’s schema on-read feature and increases efficiency on data compression tasks. + +The files corresponding to the full-load table can be found under the prefix: + +`bucket-raw--/full// ` + +The files corresponding to the CDC updates can be found under the prefix: + +`bucket-raw--/CDC///YYYYMMDD` + +Note that a new YYYYMMDD prefix is generated on a daily basis. It contains files with changes committed during a given day. + +In order to discover the data structure, two [AWS Glue](https://aws.amazon.com/glue) crawlers (FullLoad and CDC) are setup to create and update tables in the Glue Data Catalog. The Glue crawlers connect to the raw data bucket and progress through a prioritized list of classifiers to determine the schema of the data. After this, they update the corresponding table metadata. + +You can manually start both crawlers using the [AWS Glue console](https://aws.amazon.com/glue/). Go to the **Crawlers** section in the left navigation pane, select the desired crawler and click on the **Run crawler** button. Once the process concludes you will be able to see a change in the metric **Tables added** if the tables were discovered in this run, or in **Tables updated** if the tables were already present but a new change had to be updated in the table metadata, such as new date partitions. + +On the first run, the FullLoad Glue Crawler will discover 4 tables under its corresponding S3 prefix and store them into the `cdc_database` with a prefix in the table name set as `raw_full_`. The CDC Glue Crawler, on the other hand, will discover 4 tables under its S3 prefix and store them into the `cdc_database` with a prefix in the table name set as `raw_cdc_` and a partitioning strategy using the prefix YYYYMMDD (commit date). + +At this point, all the tables containing data from the full load and the CDC updates are registered in the AWS Glue Data Catalog and available to be queried with services like [Amazon Athena](https://aws.amazon.com/athena/), [Amazon EMR](https://aws.amazon.com/emr/), and [Amazon Redshift Spectrum](https://docs.aws.amazon.com/redshift/latest/dg/c-using-spectrum.html). + +### Schedule and run the ELT process in Amazon Redshift + +In this phase you need to ingest the data into the Amazon Redshift cluster. To do this, first you need to [retrieve the credentials stored as a secret](https://docs.aws.amazon.com/secretsmanager/latest/userguide/manage_retrieve-secret.html) named `RedshiftSecret` in [AWS Secret Manager](https://aws.amazon.com/secrets-manager/). Then, open the [Amazon Redshift web console](https://console.aws.amazon.com/redshift/) select **Editor** from the left navigation pane and select **Query Editor**. Now, you can use the retrieved secret values to [create a new connection](https://docs.aws.amazon.com/redshift/latest/mgmt/query-editor.html#query-editor-connecting) to the `redshiftetldemo` cluster. + +Once connected to the `demo` database, you will find 4 tables pre-created for you in the `public` schema. + +In order to query your S3 tables using Amazon Redshift Spectrum it is required to have an external schema pointing to the Glue database containing the tables of interest. You can create this external schema by running the following SQL statement in the query editor. + +``` +`create external schema cdc_database from data catalog database 'cdc_database' +iam_role '' create external database if not exists;` +``` + +You can use the Amazon Redshift console to get the `RedshiftSpectrumRole` ARN. First click on the **CLUSTERS** option of the navigation pane. Then, choose the `redshiftetldemo` cluster and click on the **Properties** tab. You will be able to see the ARN in the **Cluster permissions** section. + +Now you are ready to start the ingestion queries. Run a full ingestion from S3 for each table using the following statements. + +``` +insert into public.pgbench_accounts +(select * from cdc_database.raw_full_pgbench_accounts); +insert into public.pgbench_branches +(select * from cdc_database.raw_full_pgbench_branches); +insert into public.pgbench_tellers +(select * from cdc_database.raw_full_pgbench_tellers; +insert into public.pgbench_history +(select * from cdc_database.raw_full_pgbench_history); +``` + +Finally, you need to [create a stored procedure](https://docs.aws.amazon.com/redshift/latest/dg/stored-procedure-overview.html) to load and merge the CDC data on a daily basis. If your business intelligence or reporting tools often need to perform complex analytical queries over large amounts of data, using materialized views is a good option to increase the performance of your queries since you can precompute the desired datasets. Once your tables are updated, it is perfect opportunity to refresh your existing materialized views so that they reflect the latest version of the data. + +The stored procedure on the example below illustrates a typical process that updates the `accounts` table with the latest available changes and then refreshes an existing [materialized view](https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-overview.html) called `weekly_account_change_summary_mv`. + +``` +CREATE OR REPLACE PROCEDURE refresh_account_change_summary() +AS $$ + BEGIN + -- Create staging table with a column for type of operation + CREATE TABLE IF NOT EXISTS "public"."stage_pgbench_accounts" (LIKE "public"."pgbench_accounts"); + ALTER TABLE "public"."stage_pgbench_accounts" ADD COLUMN op VARCHAR(8); + -- Save the last version of the previous day records into the staging table + INSERT INTO "public"."stage_pgbench_accounts" (changets, aid, bid, abalance, filler, op) + SELECT changets, aid, bid, abalance, filler, op + FROM ( + SELECT *, row_number() over (partition by aid order by changets desc) AS lastchange + FROM "cdcdata"."cdc_pgbench_accounts" + WHERE partition_0>=to_char(DATE(DATEADD(DAY, -1, GETDATE())), 'YYYYMMDD') + ) + WHERE lastchange=1; + -- Clean changed records from the original table + DELETE FROM "public"."pgbench_accounts" USING "public"."stage_pgbench_accounts" + WHERE "public"."pgbench_accounts"."aid" = "public"."stage_pgbench_accounts"."aid" + AND "public"."pgbench_accounts"."changets" < "public"."stage_pgbench_accounts"."changets"; + -- Clean pre-existing records from the staging table + DELETE FROM "public"."stage_pgbench_accounts" USING "public"."pgbench_accounts" + WHERE "public"."pgbench_accounts"."aid" = "public"."stage_pgbench_accounts"."aid" + AND"public"."pgbench_accounts"."changets" >= "public"."stage_pgbench_accounts"."changets"; + -- Update the destination table with records from the staging table + INSERT INTO "public"."pgbench_accounts" + SELECT changets, aid, bid, abalance, filler + FROM ( + SELECT * FROM "public"."stage_pgbench_accounts" WHERE op='I' OR op='U' + ); + -- Refresh the materialized view + REFRESH MATERIALIZED VIEW "public"."weekly_account_change_summary_mv"; + -- Delete the staging table + DROP TABLE "public"."stage_pgbench_accounts"; + END; +$$ LANGUAGE plpgsql; +``` + +You can [schedule a SQL query using the Amazon Redshift web console](https://docs.aws.amazon.com/redshift/latest/mgmt/query-editor-schedule-query.html). This feature enables you to call on a daily basis the stored procedure described in the previous example using the SQL statement below. + +``` +CALL refresh_account_change_summary(); +``` + + +Follow the prompts to configure the schedule according to your needs: + + +* In the **Scheduler permissions** section select `redshift_query_schedule_role` as the IAM role and pick AWS Secrets Manager with `RedshiftSecret` as your secret to access the `demo` database. +* In the **Query information** section, introduce a name for your query. +* For **Scheduling options,** select the desired schedule frequency for your query (time is based on UTC). +* ** **Lastly in the** Monitoring **section, enable SNS notifications and select the `RedshiftDataFailureTopic` as your SNS topic. + +You can have CDC updates merged for additional tables and their corresponding materialized views refreshed by simply appending the corresponding SQL statements to the procedure. + +### Unload the resulting data to the Optimized zone of the Data Lake + +The AWS DMS CDC streaming task generates multiple small files (under 1MB) in the CDC S3 prefix of the Raw S3 bucket. Having a big number of small files (generally less than 128 MB) represent a challenge for many analytical tools due to the overhead of listing the files, getting each object metadata, opening files individually and so on. + +The solution contains an Unload Python function deployed in [AWS Lambda](https://aws.amazon.com/lambda/) to address this performance issue. This function iterates through all the CDC tables available in the Glue Catalog and initiates an [UNLOAD](https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) SQL statement on Redshift to export a lower number of larger files per table into the Optimized S3 bucket. The SQL statements are sent using [Amazon Redshift Data API](https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html), the API doesn't require a persistent connection to the cluster and all the calls are asynchronous. + +``` +import boto3 +from os import environ +from datetime import date +from datetime import timedelta + +# get all environment variables +glue_database = environ['glue_database'] +redshift_database = environ['redshift_database'] +redshift_role_arn = environ['redshift_role_arn'] +cluster_identifier= environ['cluster_identifier'] +secret_arn= environ['secret_arn'] + +# create all clients +glue_client=boto3.client("glue") +redshift_data_client = boto3.client('redshift-data') + +def get_yeterday_partition(): + """ + get partition in YYYYMMDD format - the same as DMS + """ + return str(date.today() - timedelta(days = 1)).replace("-", "") + +def execute_statement(sql_statement, table): + """ + Executes sql statement using Redshift Data API + """ + statement_name = f'unload table - {table} partition - {get_yeterday_partition()}' + response = redshift_data_client.execute_statement( + ClusterIdentifier=cluster_identifier, + Database=redshift_database, + SecretArn=secret_arn, + Sql=sql_statement, + StatementName = statement_name , + WithEvent=True) + +def create_unload_statement(table_name, table_location): + """ + Creates unload statement for yesterdays date. Note: change in bucket name for output + from -raw- to -optimized- + """ + optimized_location = table_location.replace("-raw-", "-optimized-") + yesterday_partition = get_yeterday_partition() + sql_statement = f""" + UNLOAD ('select * from {glue_database}.{table_name} where partition_0=\\'{yesterday_partition}\\'') + TO '{optimized_location}changedate={yesterday_partition}/part_' + iam_role '{redshift_role_arn}' + FORMAT PARQUET + PARALLEL OFF + MAXFILESIZE 200 MB + ALLOWOVERWRITE""" + + return sql_statement + + +def lambda_handler(event, context): + + # get paginator just in case of many tables + paginator = glue_client.get_paginator('get_tables') + + # filter for tables starting with raw_cdc_ as per convention + response_iterator = paginator.paginate( + DatabaseName=glue_database, + Expression='raw_cdc_*', + PaginationConfig={ + 'PageSize': 10 + }) + + for page in response_iterator: + for table in page["TableList"]: + table_name=table['Name'] + + # for each table create and execute unload statement + sql_statement = create_unload_statement(table['Name'], table['StorageDescriptor']['Location']) + execute_statement(sql_statement, table_name) + + return "Success" +``` + +This task is scheduled to be performed daily on the files that were generated a day before. You need to run this demo for at least a day to see the output created in the optimized bucket. + + +## Cleaning up + +To remove all the resources deployed during this demo run the following command from your AWS Cloud9 instance. + +``` +cd redshiftcdcelt +cdk destroy "*" +``` + +Clean up your AWS Cloud9 by following the instructions in [Clean up documentation](https://docs.aws.amazon.com/cloud9/latest/user-guide/tutorial-clean-up.html). + +## Summary + +In this project, we presented a detailed approach to hydrate a data warehouse in a scalable way with streams of RDBMS data using the skillset of a typical database administration team. We showed how the rich SQL interface of Amazon Redshift can help to avoid purchasing expensive proprietary tools and/or acquiring in-demand data engineering skills. Moreover, how it can help your end users to accelerate their journey to get valuable business insights from your data. + +You can follow these guidelines to create and schedule scalable end-to-end data processing pipelines that help you to move your transactional data to Amazon Redshift using the change data capture approach and integrate with your existing Data Lake architecture. diff --git a/src/SQLBasedCDCDataIngestion/app.py b/src/SQLBasedCDCDataIngestion/app.py new file mode 100644 index 00000000..6891999c --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/app.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +from aws_cdk import core + +from dlpoc.vpc_stack import VpcStack +from dlpoc.rds_stack import RdsStack +from dlpoc.bastion_stack import BastionStack +from dlpoc.s3_stack import S3Stack +from dlpoc.dms_stack import DMSStack +from dlpoc.redshift_stack import RedshiftStack +from dlpoc.glue_stack import GlueStack +from dlpoc.unload_stack import UnloadStack +from dlpoc.query_schedule_stack import QueryScheduleStack + +email_address="shveda@amazon.com" + +app = core.App() +dl_vpc = VpcStack(app, "DLVPC") +rds_stack = RdsStack(app, "RDS", vpc=dl_vpc.vpc) +bastion_stack = BastionStack(app, "Bastion", vpc=dl_vpc.vpc, dms_sg=rds_stack.dms_sg, secret=rds_stack.secret) +s3_stack = S3Stack(app, "S3") +dms_stack = DMSStack(app, "DMS", vpc=dl_vpc.vpc, rds_secret=rds_stack.secret, raw_bucket=s3_stack.s3_raw, dms_sg=rds_stack.dms_sg) +redshift_stack = RedshiftStack(app, "Redshift", vpc=dl_vpc.vpc, raw_bucket=s3_stack.s3_raw, optimized_bucket=s3_stack.s3_optimized) +glue_stack = GlueStack(scope=app, id="Glue", raw_bucket=s3_stack.s3_raw, optimized_bucket=s3_stack.s3_optimized) +unload_stack = UnloadStack( + app, "Unload", email_address=email_address, + glue_database = glue_stack.glue_database, redshift_database=redshift_stack.default_database_name, + redshift_cluster=redshift_stack.redshift_cluster, redshift_role=redshift_stack.redshift_spectrum_role) +query_schedule_stack = QueryScheduleStack( + scope=app, id="QuerySchedule", + glue_database=glue_stack.glue_database, redshift_database=redshift_stack.default_database_name, + redshift_cluster=redshift_stack.redshift_cluster, redshift_role=redshift_stack.redshift_spectrum_role) +app.synth() diff --git a/src/SQLBasedCDCDataIngestion/cdk.json b/src/SQLBasedCDCDataIngestion/cdk.json new file mode 100644 index 00000000..cd2273a5 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/cdk.json @@ -0,0 +1,8 @@ +{ + "app": "python3 app.py", + "context": { + "@aws-cdk/core:enableStackNameDuplicates": "true", + "aws-cdk:enableDiffNoFail": "true", + "@aws-cdk/core:stackRelativeExports": "true" + } +} diff --git a/src/SQLBasedCDCDataIngestion/console_setup/create_external_schema.sql b/src/SQLBasedCDCDataIngestion/console_setup/create_external_schema.sql new file mode 100644 index 00000000..311c8585 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/console_setup/create_external_schema.sql @@ -0,0 +1 @@ +create external schema cdc_database from data catalog database 'cdc_database' iam_role '' create external database if not exists; \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/deploy.sh b/src/SQLBasedCDCDataIngestion/deploy.sh new file mode 100644 index 00000000..2e17075b --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/deploy.sh @@ -0,0 +1,34 @@ +#!/bin/bash +# This script deploys components of demo Redshift ELT blog +# from Cloud9 environment + + +# The following steps need to be performed first: +# clone code repo +# git clone https://github.com/AndrewShvedRepo/redshiftcdcelt +# cd redshiftcdcelt/ +# ./deploy.sh + + + +# upgrade cdk +sudo npm install -g aws-cdk@1.72.0 + +# install dependencies +sudo pip3 install -r requirements.txt + +# remove unnecessary docker images to free up space +docker rmi -f $(docker image ls -a -q) + +# bootstrap ckd +cdk bootstrap + +# deploy all stacks +cdk deploy --require-approval never "*" + + +# install boto3 +sudo pip3 install boto3 + +#start dms tasks +python dlpoc/start_dms_tasks.py diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/__init__.py b/src/SQLBasedCDCDataIngestion/dlpoc/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/bastion_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/bastion_stack.py new file mode 100644 index 00000000..fb7a62bf --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/bastion_stack.py @@ -0,0 +1,56 @@ +import os + +from aws_cdk import core +from aws_cdk.aws_s3_assets import Asset +import aws_cdk.aws_ec2 as ec2 +import aws_cdk.aws_iam as iam +import aws_cdk.aws_secretsmanager as secretsmanager + +DIRNAME = os.path.dirname(__file__) + + +class BastionStack(core.Stack): + def __init__( + self, scope: core.Construct, id: str, + vpc: ec2.IVpc, dms_sg: ec2.ISecurityGroup, + secret: secretsmanager.ISecret, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + # Allow SSH Access + dms_sg.add_ingress_rule( + peer=ec2.Peer.ipv4("52.95.4.3/32"), connection=ec2.Port.tcp(22), + description="SSH access" + ) + dms_sg.add_ingress_rule( + peer=ec2.Peer.ipv4("3.231.255.131/32"), connection=ec2.Port.tcp(22), + description="SSH access" + ) + + # Create Bastion + self.bastion_host = ec2.BastionHostLinux( + scope=self, id="BastionHost", + vpc=vpc, subnet_selection=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC), + instance_name="BastionHost", instance_type=ec2.InstanceType(instance_type_identifier="t3.micro"), + security_group=dms_sg + ) + + # Permission for using Secrets Manager + self.bastion_host.role.add_managed_policy( + iam.ManagedPolicy.from_aws_managed_policy_name("SecretsManagerReadWrite") + ) + + # Script asset + asset_script = Asset(scope=self, id="pgbench_script", path=os.path.join(DIRNAME, "bootstrap.sh")) + local_bootstrap_path = self.bastion_host.instance.user_data.add_s3_download_command( + bucket=asset_script.bucket, + bucket_key=asset_script.s3_object_key + ) + + # Execute script asset + self.bastion_host.instance.user_data.add_execute_file_command( + file_path=local_bootstrap_path, + arguments=f"-s {secret.secret_full_arn}" + ) + + # Read permissions to assets + asset_script.grant_read(self.bastion_host.role) diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/bootstrap.sh b/src/SQLBasedCDCDataIngestion/dlpoc/bootstrap.sh new file mode 100644 index 00000000..5b376ae8 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/bootstrap.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# This script bootstraps the installation of pgbench +# initiates it and sets it on the cron for continuous execution +set -e + +POSITIONAL=() +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + -s|--secret-id) SECRET_ID="$2"; shift;; + --debug) set -x;; + *) + POSITIONAL+=("$1"); shift;; + esac +done +set -- "${POSITIONAL[@]}" + +function install_system_packages() { + /usr/bin/yum -y install postgresql-contrib jq +} + +function initiate_pgbench() { + echo "${PG_HOST}:${PG_PORT}:*:${PG_USER}:${PG_PASSWORD}" | tee ~/.pgpass + chmod 0600 ~/.pgpass + pgbench -i -h "${PG_HOST}" -p "${PG_PORT}" -U "${PG_USER}" "${PG_DBNAME}" +} + +function schedule_pgbench() { + BENCHMARKING_SCRIPT="*/2 * * * * root pgbench -c 10 -T 30 -h ${PG_HOST} -p ${PG_PORT} -U ${PG_USER} ${PG_DBNAME}" + echo "$BENCHMARKING_SCRIPT" | tee /etc/cron.d/pgbench_cron +} + +install_system_packages +REGION=$(curl -s http://169.254.169.254/latest/dynamic/instance-identity/document | jq .region -r) +SECRET=$(aws secretsmanager get-secret-value --secret-id "$SECRET_ID" --region "$REGION" | jq .SecretString -r) +PG_PASSWORD=$(echo "$SECRET" | jq .password -r) +PG_HOST=$(echo "$SECRET" | jq .host -r) +PG_PORT=$(echo "$SECRET" | jq .port -r) +PG_USER=$(echo "$SECRET" | jq .username -r) +PG_DBNAME=$(echo "$SECRET" | jq .dbname -r) +initiate_pgbench +schedule_pgbench +echo "Bootstrapping complete." diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/dms_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/dms_stack.py new file mode 100644 index 00000000..80d772e4 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/dms_stack.py @@ -0,0 +1,118 @@ +from aws_cdk import core +import aws_cdk.aws_dms as dms +import aws_cdk.aws_ec2 as ec2 +import aws_cdk.aws_s3 as s3 +import aws_cdk.aws_iam as iam +import aws_cdk.aws_secretsmanager as secretsmanager +import json + + + +class DMSStack(core.Stack): + def __init__(self, scope: core.Construct, id: str, vpc: ec2.IVpc, rds_secret: secretsmanager.ISecret, dms_sg: ec2.ISecurityGroup ,raw_bucket: s3.IBucket, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + replication_subnet_group = dms.CfnReplicationSubnetGroup( + self, "ReplicationSubnetGroup", + replication_subnet_group_description="Replication Subnet Group for DL", + replication_subnet_group_identifier="replicationsubnetgroupdl", + subnet_ids=[subnet.subnet_id for subnet in vpc.private_subnets]) + + replication_subnet_group.to_string + + dms_s3_role = iam.Role(self, "DMS S3 Role", assumed_by=iam.ServicePrincipal("dms.amazonaws.com")) + raw_bucket.grant_read_write(dms_s3_role) + dms_replication_instance=dms.CfnReplicationInstance( + self, "DMS Replication Instance", + replication_instance_class="dms.t2.large", + allocated_storage=50, + auto_minor_version_upgrade=True, + engine_version="3.4.2", + replication_subnet_group_identifier="replicationsubnetgroupdl", + vpc_security_group_ids=[dms_sg.security_group_id] + ) + dms_replication_instance.add_depends_on(replication_subnet_group) + + + source_endpoint = dms.CfnEndpoint( + self, "Source Endpoint", + endpoint_identifier="sourcerdsdatabase", + endpoint_type="source", + username=rds_secret.secret_value_from_json("username").to_string(), + password=rds_secret.secret_value_from_json("password").to_string(), + database_name=rds_secret.secret_value_from_json("dbname").to_string(), + server_name=rds_secret.secret_value_from_json("host").to_string(), + port=5432, + engine_name=rds_secret.secret_value_from_json("engine").to_string() + ) + + target_endpoint_cdc = dms.CfnEndpoint( + self, "Target Endpoint CDC", + endpoint_identifier="targetendpointcdc", + endpoint_type="target", + engine_name="s3", + extra_connection_attributes="AddColumnName=true;timestampColumnName=changets;dataFormat=parquet;DatePartitionEnabled=true;DatePartitionSequence=YYYYMMDD;DatePartitionDelimiter=NONE;bucketFolder=CDC", + s3_settings=dms.CfnEndpoint.S3SettingsProperty( + bucket_name=raw_bucket.bucket_name, + service_access_role_arn=dms_s3_role.role_arn) + ) + table_mappings = { + "rules": [ + { + "rule-type": "selection", + "rule-id": "1", + "rule-name": "1", + "object-locator": { + "schema-name": "%", + "table-name": "%" + }, + "rule-action": "include", + "filters": [] + }] + } + target_endpoint_full=dms.CfnEndpoint( + self, "Target Endpoint Full", + endpoint_identifier="targetendpointfull", + endpoint_type="target", + engine_name="s3", + extra_connection_attributes=f"AddColumnName=true;timestampColumnName=changets;dataFormat=parquet;bucketFolder=full", + s3_settings=dms.CfnEndpoint.S3SettingsProperty( + bucket_name=raw_bucket.bucket_name, + service_access_role_arn=dms_s3_role.role_arn) + ) + table_mappings = { + "rules": [ + { + "rule-type": "selection", + "rule-id": "1", + "rule-name": "1", + "object-locator": { + "schema-name": "%", + "table-name": "%" + }, + "rule-action": "include", + "filters": [] + }] + } + + replication_task_cdc = dms.CfnReplicationTask( + self, "Replication Task CDC", + migration_type="cdc", + replication_task_identifier="cdcload", + replication_instance_arn=dms_replication_instance.ref, + source_endpoint_arn=source_endpoint.ref, + target_endpoint_arn=target_endpoint_cdc.ref, + table_mappings=json.dumps(table_mappings) + ) + replication_task_full = dms.CfnReplicationTask( + self, "Replication Task Full", + migration_type="full-load", + replication_task_identifier="fullload", + replication_instance_arn=dms_replication_instance.ref, + source_endpoint_arn=source_endpoint.ref, + target_endpoint_arn=target_endpoint_full.ref, + table_mappings=json.dumps(table_mappings) + ) + + core.CfnOutput(self, "Full Load Task Arn", value=replication_task_full.ref) + core.CfnOutput(self, "CDC Load Task Arn", value=replication_task_cdc.ref) diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/glue_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/glue_stack.py new file mode 100644 index 00000000..27261d62 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/glue_stack.py @@ -0,0 +1,107 @@ +import json + +from aws_cdk import core +import aws_cdk.aws_s3 as s3 +import aws_cdk.aws_glue as glue +import aws_cdk.aws_iam as iam + +TABLES = [ + "pgbench_accounts", + "pgbench_branches", + "pgbench_history", + "pgbench_tellers" +] + + +class GlueStack(core.Stack): + def __init__(self, scope: core.Construct, id: str, + raw_bucket: s3.IBucket, optimized_bucket: s3.IBucket, + **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + # Glue Database + self.glue_database = glue.Database(scope=self, id="CDC Database", database_name="cdc_database") + + # IAM Role for Glue Crawler + glue_crawler_role = iam.Role( + scope=self, id="Glue Crawler Role", + assumed_by=iam.ServicePrincipal("glue.amazonaws.com") + ) + glue_crawler_role.add_managed_policy( + iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSGlueServiceRole") + ) + raw_bucket.grant_read_write(glue_crawler_role) + optimized_bucket.grant_read_write(glue_crawler_role) + + # S3 Targets for Glue Crawlers + full_s3_targets = glue.CfnCrawler.TargetsProperty( + s3_targets=[glue.CfnCrawler.S3TargetProperty(path=f"s3://{raw_bucket.bucket_name}/full/public/{table}/") for table in TABLES] + ) + + cdc_s3_targets = glue.CfnCrawler.TargetsProperty( + s3_targets=[glue.CfnCrawler.S3TargetProperty(path=f"s3://{raw_bucket.bucket_name}/CDC/public/{table}/") for table in TABLES] + ) + + optimized_s3_targets = glue.CfnCrawler.TargetsProperty( + s3_targets=[glue.CfnCrawler.S3TargetProperty(path=f"s3://{optimized_bucket.bucket_name}/CDC/public/{table}/") for table in TABLES] + ) + + # Schedule for Glue Crawlers + raw_crawler_schedule = glue.CfnCrawler.ScheduleProperty(schedule_expression="cron(0 0 * * ? *)") + optimized_crawler_schedule = glue.CfnCrawler.ScheduleProperty(schedule_expression="cron(0 3 * * ? *)") + + # Schema Change Policy for Glue Crawlers + crawler_schema_policy = glue.CfnCrawler.SchemaChangePolicyProperty( + delete_behavior="DELETE_FROM_DATABASE", + update_behavior="LOG" + ) + + # Custom Crawler Configuration + crawler_config = { + "Version": 1.0, + "CrawlerOutput": { + "Partitions": { + "AddOrUpdateBehavior": "InheritFromTable" + } + } + } + + + # Raw Full load crawler + glue.CfnCrawler( + scope=self, + id=f"Full Load Crawler", + role=glue_crawler_role.role_arn, + targets=full_s3_targets, + configuration=json.dumps(crawler_config), + database_name=self.glue_database.database_name, + schedule=None, + schema_change_policy=crawler_schema_policy, + table_prefix="raw_full_" + ) + + # Raw CDC crawler + glue.CfnCrawler( + scope=self, + id=f"CDC Crawler", + role=glue_crawler_role.role_arn, + targets=cdc_s3_targets, + configuration=json.dumps(crawler_config), + database_name=self.glue_database.database_name, + schedule=raw_crawler_schedule, + schema_change_policy=crawler_schema_policy, + table_prefix="raw_cdc_" + ) + + # Optimized crawler + glue.CfnCrawler( + scope=self, + id="Optimized Crawler", + role=glue_crawler_role.role_arn, + targets=optimized_s3_targets, + configuration=json.dumps(crawler_config), + database_name=self.glue_database.database_name, + schedule=optimized_crawler_schedule, + schema_change_policy=crawler_schema_policy, + table_prefix="optimized_" + ) diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/query_schedule_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/query_schedule_stack.py new file mode 100644 index 00000000..cd61e798 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/query_schedule_stack.py @@ -0,0 +1,55 @@ +from typing import Dict + +from aws_cdk import core +import aws_cdk.aws_redshift as redshift +import aws_cdk.aws_events as events +import aws_cdk.aws_events_targets as targets +import aws_cdk.aws_glue as glue +import aws_cdk.aws_iam as iam + + +class QueryScheduleStack(core.Stack): + def __init__( + self, scope: core.Construct, id: str, + glue_database: glue.IDatabase, redshift_database: str, + redshift_cluster: redshift.ICluster, redshift_role: iam.IRole, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + # AwsApi target to call the Redshift Data API + def create_event_target(query: str) -> targets.AwsApi: + return targets.AwsApi( + action="executeStatement", + service="RedshiftData", + parameters={ + "ClusterIdentifier": f"{redshift_cluster.cluster_name}", + "Sql": query, + "Database": f"{redshift_database}", + "SecretArn": f"{redshift_cluster.secret.secret_arn}", + "WithEvent": True + }, + policy_statement=redshift_data_policy_statement + ) + + # IAM policy for events + redshift_data_policy_statement = iam.PolicyStatement( + effect=iam.Effect.ALLOW, + actions=["redshift-data:ExecuteStatement", "secretsmanager:GetSecretValue"], + resources=[ + f"arn:aws:redshift:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:cluster:{redshift_cluster.cluster_name}", + f"{redshift_cluster.secret.secret_arn}" + ] + ) + + # Redshift queries + queries = dict() + queries['call_sp'] = """ CALL refresh_account_change_summary(); """ + + # Event rules for query scheduling + def create_event_rule(query_id: str, schedule: events.Schedule): + events_rule = events.Rule( + scope=self, id=f"{query_id}_rule", + enabled=True, schedule=schedule, + ) + events_rule.add_target(create_event_target(queries[query_id])) + + create_event_rule(query_id='call_sp', schedule=events.Schedule.cron(hour="0", minute="0")) diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/rds_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/rds_stack.py new file mode 100644 index 00000000..0136f6af --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/rds_stack.py @@ -0,0 +1,48 @@ +from aws_cdk import core +import aws_cdk.aws_ec2 as ec2 +import aws_cdk.aws_rds as rds + + +class RdsStack(core.Stack): + def __init__(self, scope: core.Construct, id: str, vpc: ec2.IVpc, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + # Parameters to enable PostgreSQL logical replication + parameter_group_replication = rds.ParameterGroup( + scope=self, id="Parameter Group Replication", + engine=rds.DatabaseInstanceEngine.postgres(version=rds.PostgresEngineVersion.VER_11_8), + parameters={"rds.logical_replication": "1"}) + + # PostgreSQL RDS instance + self.postgresql_rds = rds.DatabaseInstance( + scope=self, id="Postgresql RDS", + database_name="test", + engine=rds.DatabaseInstanceEngine.postgres(version=rds.PostgresEngineVersion.VER_11_8), + instance_type=ec2.InstanceType.of(ec2.InstanceClass.BURSTABLE2, ec2.InstanceSize.SMALL), + vpc=vpc, multi_az=False, + allocated_storage=100, storage_type=rds.StorageType.GP2, + deletion_protection=False, + delete_automated_backups=True, backup_retention=core.Duration.days(1), + parameter_group=parameter_group_replication + ) + + # Secret + self.secret = self.postgresql_rds.secret + + # CFN Outputs + core.CfnOutput( + scope=self, id="Database Secret Arn", + value=self.postgresql_rds.secret.secret_arn + ) + core.CfnOutput( + scope=self, id="Database Endpoint Address", + value=self.postgresql_rds.db_instance_endpoint_address + ) + + # PostgreSQL RDS Connections + self.dms_sg = ec2.SecurityGroup( + scope=self, id="DMS SG", + vpc=vpc, allow_all_outbound=True, + description="Security Group for DMS" + ) + self.postgresql_rds.connections.allow_default_port_from(self.dms_sg) diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/redshift_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/redshift_stack.py new file mode 100644 index 00000000..6f4dcaf7 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/redshift_stack.py @@ -0,0 +1,132 @@ +from aws_cdk import core +import aws_cdk.aws_ec2 as ec2 +import aws_cdk.aws_redshift as redshift +from aws_cdk import aws_iam as iam +from aws_cdk import aws_lambda as _lambda +from aws_cdk import aws_lambda_python as lambda_python +from aws_cdk import custom_resources as custom_resources +from aws_cdk import aws_s3 as s3 +from aws_cdk import aws_iam as iam + +class RedshiftStack(core.Stack): + def __init__(self, scope: core.Construct, id: str, vpc: ec2.IVpc, raw_bucket: s3.IBucket, optimized_bucket: s3.IBucket, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + # Redshift default database + self.default_database_name="demo" + + # role for redshift to use spectrum + self.redshift_spectrum_role = iam.Role( + self,"RedshiftSpectrumRole", + assumed_by=iam.ServicePrincipal("redshift.amazonaws.com")) + + # policy to access glue catalog + self.redshift_spectrum_role.add_to_policy( + iam.PolicyStatement( + actions = ["glue:BatchCreatePartition","glue:UpdateDatabase","glue:CreateTable", + "glue:DeleteDatabase","glue:GetTables","glue:GetPartitions","glue:BatchDeletePartition", + "glue:UpdateTable","glue:BatchGetPartition","glue:DeleteTable","glue:GetDatabases", + "glue:GetTable","glue:GetDatabase","glue:GetPartition","glue:CreateDatabase", + "glue:BatchDeleteTable","glue:CreatePartition","glue:DeletePartition","glue:UpdatePartition"], + effect=iam.Effect.ALLOW, + resources=["*"] + ) + ) + # policy to read data files from raw and optimized buckets + self.redshift_spectrum_role.add_to_policy( + iam.PolicyStatement( + actions = ["s3:GetObject","s3:ListBucketMultipartUploads","s3:ListBucket","s3:GetBucketLocation","s3:ListMultipartUploadParts"], + effect=iam.Effect.ALLOW, + resources=[raw_bucket.bucket_arn, optimized_bucket.bucket_arn, f"{raw_bucket.bucket_arn}/*",f"{optimized_bucket.bucket_arn}/*" ] + ) + ) + + # policy to write and overwriter data files in optimized bucket + self.redshift_spectrum_role.add_to_policy( + iam.PolicyStatement( + actions = ["s3:PutObject","s3:DeleteObjectVersion","s3:DeleteObject"], + effect=iam.Effect.ALLOW, + resources=[f"{optimized_bucket.bucket_arn}/*"] + ) + ) + # create SG for other hosts to access Redshift + self.access_redshift_sg= ec2.SecurityGroup( + self, "Access Redshift SG", vpc=vpc, + allow_all_outbound=True, + description="Access Redshift SG") + + # create SG for Redshift + redshift_sg = ec2.SecurityGroup( + self, "Redshift SG", vpc=vpc, + allow_all_outbound=True, + description="Redshift SG") + + # spin up redshift in isolated subnets without access to IGW or NAT + private_sng = redshift.ClusterSubnetGroup( + self, "Subnet Group", + vpc=vpc, + removal_policy=core.RemovalPolicy.DESTROY, + description="Private Subnet Group", + vpc_subnets=ec2.SubnetSelection(subnets=vpc.private_subnets)) + + # redshir cluster for demo database + self.redshift_cluster = redshift.Cluster( + self, "Redshift", + master_user=redshift.Login(master_username="root"), + vpc=vpc, + cluster_name="redshifteltdemo", + cluster_type=redshift.ClusterType.MULTI_NODE, + default_database_name=self.default_database_name, + number_of_nodes=2, + node_type=redshift.NodeType.DC2_LARGE, + removal_policy=core.RemovalPolicy.DESTROY, + security_groups=[redshift_sg], + subnet_group=private_sng, + roles=[self.redshift_spectrum_role]) + + core.Tags.of(self.redshift_cluster.secret).add(key="RedshiftDataFullAccess", value="dataAPI") + # allow access_redshift_sg to access redshift + self.redshift_cluster.connections.allow_default_port_from(self.access_redshift_sg) + + + # python lambda to deploy initial schema + redshift_deploy_schema_lambda = lambda_python.PythonFunction( + self, "RedshiftDeploySchema", + entry = "./lambdas/redshift_deploy_schema", + index = "lambda.py", + handler = "lambda_handler", + runtime = _lambda.Runtime.PYTHON_3_8, + timeout = core.Duration.seconds(900), + tracing = _lambda.Tracing.ACTIVE + ) + redshift_deploy_schema_lambda.add_environment(key="cluster_identifier", value=self.redshift_cluster.cluster_name) + redshift_deploy_schema_lambda.add_environment(key="secret_arn", value=self.redshift_cluster.secret.secret_arn) + redshift_deploy_schema_lambda.add_environment(key="database", value=self.default_database_name) + + self.redshift_cluster.secret.grant_read(redshift_deploy_schema_lambda) + + # allow lambda to execute sql statements against the cluster + redshift_deploy_schema_lambda.add_to_role_policy( + statement=iam.PolicyStatement( + effect=iam.Effect.ALLOW, + actions=["redshift-data:ExecuteStatement"], + resources=[f"arn:aws:redshift:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:cluster:{self.redshift_cluster.cluster_name}"] + ) + ) + # allow lambda to check status of executed statements + redshift_deploy_schema_lambda.add_to_role_policy( + statement=iam.PolicyStatement( + effect=iam.Effect.ALLOW, + actions=["redshift-data:ListStatements"], + resources=["*"] + ) + ) + + core.CfnOutput(self, "Redshift Cluster ID ", value=self.redshift_cluster.cluster_name) + core.CfnOutput(self, "Redshift Secret Arn ", value=self.redshift_cluster.secret.secret_arn) + + # custom resource to deploy schema + provider = custom_resources.Provider(self, "Provider", on_event_handler= redshift_deploy_schema_lambda) + custom_resource = core.CustomResource( + self, "CustomResource", + service_token= provider.service_token) \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/s3_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/s3_stack.py new file mode 100644 index 00000000..116da070 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/s3_stack.py @@ -0,0 +1,18 @@ +from aws_cdk import core +import aws_cdk.aws_s3 as s3 + +class S3Stack(core.Stack): + def __init__(self, scope: core.Construct, id: str, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + self.s3_raw = s3.Bucket( + self, "S3Raw", + bucket_name=f"bucket-raw-{core.Aws.ACCOUNT_ID}-{core.Aws.REGION}", + removal_policy=core.RemovalPolicy.DESTROY) + + self.s3_optimized = s3.Bucket( + self, "S3Optimized", + bucket_name=f"bucket-optimized-{core.Aws.ACCOUNT_ID}-{core.Aws.REGION}", + removal_policy=core.RemovalPolicy.DESTROY) + + diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/start_dms_tasks.py b/src/SQLBasedCDCDataIngestion/dlpoc/start_dms_tasks.py new file mode 100644 index 00000000..d27fb80d --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/start_dms_tasks.py @@ -0,0 +1,34 @@ +import boto3 +cf_client = boto3.client("cloudformation") +dms_client = boto3.client("dms") + +def get_dms_task_arn(): + """ + get arns of dms tasks created by cdk + """ + response = cf_client.describe_stacks( + StackName='DMS' + ) + dms_outputs=response['Stacks'][0]['Outputs'] + cdc_task_arn=list(filter(lambda output: output['OutputKey'] == 'CDCLoadTaskArn', dms_outputs))[0]['OutputValue'] + full_task_arn=list(filter(lambda output: output['OutputKey'] == 'FullLoadTaskArn', dms_outputs))[0]['OutputValue'] + return {"cdc": cdc_task_arn, "full": full_task_arn} +def start_dms_tasks(tasks): + """ + Start CDC and Full loads in that order + """ + + + response_cdc = dms_client.start_replication_task( + ReplicationTaskArn=tasks['cdc'], + StartReplicationTaskType= 'start-replication') + + print(f"CDC Load task is {response_cdc['ReplicationTask']['Status']}") + + response_full = dms_client.start_replication_task( + ReplicationTaskArn=tasks['full'], + StartReplicationTaskType= 'resume-processing') + + print(f"Full Load task is {response_full['ReplicationTask']['Status']}") + +start_dms_tasks(get_dms_task_arn()) \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/unload_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/unload_stack.py new file mode 100644 index 00000000..630ed2ff --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/unload_stack.py @@ -0,0 +1,82 @@ +from aws_cdk import core +from aws_cdk import aws_events as events +from aws_cdk import aws_events_targets as events_targets +from aws_cdk import aws_lambda_python as lambda_python +from aws_cdk import aws_lambda as _lambda +from aws_cdk import aws_sns as sns +from aws_cdk import aws_glue as glue +from aws_cdk import aws_iam as iam +from aws_cdk import aws_sns_subscriptions as sns_subscriptions +from aws_cdk import aws_redshift as redshift +class UnloadStack(core.Stack): + def __init__(self, scope: core.Construct, id: str, email_address: str , glue_database: glue.IDatabase, redshift_database: str, redshift_cluster: redshift.ICluster, redshift_role: iam.IRole, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + # lambda to reduce number of cdc files and improve performance + unload_cdc_optimized_lambda = lambda_python.PythonFunction( + self, "RedshiftDeploySchema", + entry = "./lambdas/unload_cdc_optimized", + index = "lambda.py", + handler = "lambda_handler", + runtime = _lambda.Runtime.PYTHON_3_8, + timeout = core.Duration.seconds(900), + tracing = _lambda.Tracing.ACTIVE + ) + unload_cdc_optimized_lambda.add_environment(key="glue_database", value=glue_database.database_name) + unload_cdc_optimized_lambda.add_environment(key="redshift_database", value=redshift_database) + unload_cdc_optimized_lambda.add_environment(key="cluster_identifier", value=redshift_cluster.cluster_name) + unload_cdc_optimized_lambda.add_environment(key="secret_arn", value=redshift_cluster.secret.secret_arn) + unload_cdc_optimized_lambda.add_environment(key="redshift_role_arn", value=redshift_role.role_arn) + + # priv to list all tables in glue database + unload_cdc_optimized_lambda.add_to_role_policy( + statement= iam.PolicyStatement( + actions=["glue:GetTables"], + effect=iam.Effect.ALLOW, + resources=[f"arn:aws:glue:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:catalog", + f"arn:aws:glue:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:database/{glue_database.database_name}", + f"arn:aws:glue:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:table/{glue_database.database_name}/*"] + ) + ) + # priv to execute statement against Redshift cluster + unload_cdc_optimized_lambda.add_to_role_policy( + statement=iam.PolicyStatement( + effect=iam.Effect.ALLOW, + actions=["redshift-data:ExecuteStatement"], + resources=[f"arn:aws:redshift:{core.Aws.REGION}:{core.Aws.ACCOUNT_ID}:cluster:{redshift_cluster.cluster_name}"] + ) + ) + # priv to read a secret to connect to the Redshift cluster + redshift_cluster.secret.grant_read(unload_cdc_optimized_lambda) + # rule to trigger optimization once day + unload_cdc_optimized_rule = events.Rule( + self, "UnloadCDCOptimizedRule", + schedule=events.Schedule.expression('cron(0 1 * * ? *)'), + rule_name="UnloadCDCOptimizedRule", + enabled=True) + unload_cdc_optimized_rule.add_target( + target=events_targets.LambdaFunction( + handler=unload_cdc_optimized_lambda) + ) + + # SNS topic for all Redsift Data API failures + redshift_data_failure_topic = sns.Topic( + self, "RedshiftDataFailureTopic", + display_name="RedshiftDataFailureTopic", + topic_name="RedshiftDataFailureTopic") + redshift_data_failure_topic.add_subscription( + subscription=sns_subscriptions.EmailSubscription( + email_address=email_address) + ) + + # Rule to identify Redshift Data API failures and send them to SNS topic + events.Rule( + self, "FailedRedshiftDataStatementRule", + event_pattern= events.EventPattern( + source=["aws.redshift-data"], + detail= {"state": ["FAILED"]} + ), + enabled=True, + rule_name="FailedRedshiftDataStatementRule", + targets=[events_targets.SnsTopic(topic=redshift_data_failure_topic)] + ) diff --git a/src/SQLBasedCDCDataIngestion/dlpoc/vpc_stack.py b/src/SQLBasedCDCDataIngestion/dlpoc/vpc_stack.py new file mode 100644 index 00000000..efa6e97c --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/dlpoc/vpc_stack.py @@ -0,0 +1,30 @@ +from aws_cdk import core +import aws_cdk.aws_ec2 as ec2 + + +class VpcStack(core.Stack): + + def __init__(self, scope: core.Construct, id: str, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + # The code that defines your stack goes here + + self.vpc = ec2.Vpc( + self, "VPC", + max_azs=2, + cidr="10.0.0.0/16", + # configuration will create 3 groups in 2 AZs = 6 subnets. + subnet_configuration=[ec2.SubnetConfiguration( + subnet_type=ec2.SubnetType.PUBLIC, + name="Public", + cidr_mask=26), + ec2.SubnetConfiguration( + subnet_type=ec2.SubnetType.PRIVATE, + name="Private", + cidr_mask=26), + ec2.SubnetConfiguration( + subnet_type=ec2.SubnetType.ISOLATED, + name="DB", + cidr_mask=26)], + # nat_gateway_provider=ec2.NatProvider.gateway(), + nat_gateways=2) diff --git a/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/cfnresponse.py b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/cfnresponse.py new file mode 100644 index 00000000..ed1cc520 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/cfnresponse.py @@ -0,0 +1,44 @@ +# Copyright 2016 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. +# This file is licensed to you under the AWS Customer Agreement (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at http://aws.amazon.com/agreement/ . +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import requests +import json + +SUCCESS = "SUCCESS" +FAILED = "FAILED" + +def send(event, context, responseStatus, responseData, physicalResourceId=None, noEcho=False): + responseUrl = event['ResponseURL'] + + print(responseUrl) + + responseBody = {} + responseBody['Status'] = responseStatus + responseBody['Reason'] = 'See the details in CloudWatch Log Stream: ' + context.log_stream_name + responseBody['PhysicalResourceId'] = physicalResourceId or context.log_stream_name + responseBody['StackId'] = event['StackId'] + responseBody['RequestId'] = event['RequestId'] + responseBody['LogicalResourceId'] = event['LogicalResourceId'] + responseBody['NoEcho'] = noEcho + responseBody['Data'] = responseData + + json_responseBody = json.dumps(responseBody) + + print("Response body:\n" + json_responseBody) + + headers = { + 'content-type' : '', + 'content-length' : str(len(json_responseBody)) + } + + try: + response = requests.put(responseUrl, + data=json_responseBody, + headers=headers) + print("Status code: " + response.reason) + except Exception as e: + print("send(..) failed executing requests.put(..): " + str(e)) \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/lambda.py b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/lambda.py new file mode 100644 index 00000000..13887893 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/lambda.py @@ -0,0 +1,96 @@ +import traceback +import boto3 +import json +import time +import os +import cfnresponse + +# create redshift data client +redshift_data_client = boto3.client('redshift-data') + +# get cluster specific parameters +cluster_identifier = os.environ['cluster_identifier'] +secret_arn = os.environ['secret_arn'] +database = os.environ['database'] + +# error class for failed redshift SQL +class FailedRedshiftSQL(Exception): + pass + +def execute_sqls(): + """ + Execute schema.sql statements one by one on redshift cluster + using redshift data API + """ + # read schema file + file = open('schema.sql', 'r') + script_file = file.read() + file.close + try: + # split file in commands based on ; + commands=script_file.split(';') + for order, command in enumerate(commands, 1): + statement_name = f'schema statement #{order}' + response = redshift_data_client.execute_statement( + ClusterIdentifier=cluster_identifier, + Database=database, + SecretArn=secret_arn, + Sql=command, + StatementName = statement_name , + WithEvent=False + ) + # generate list of statements ran + check_execution(statement_name) + except Exception as e: + print("Cannot Execute." + str(e) + traceback.format_exc()) + raise e + return "Success" + +def check_execution(statement_name): + """ + Check if all statement names executed successfuly on newly created cluster + """ + try: + status='CHECKING' + # wait for statement to finish + while status!='FINISHED': + response = redshift_data_client.list_statements( + StatementName=statement_name, + Status='ALL') + status=response['Statements'][0]['Status'] + # if statement fails raise an error + if status in ['ABORTED','FAILED']: + raise FailedRedshiftSQL(f"failed {statement_name} statement with status {status}") + print(f"{statement_name} - {status}") + # wait for a second before rechecking + time.sleep(1) + except Exception as e: + print("Cannot Execute." + str(e) + traceback.format_exc()) + raise e + return "Success" +def get_cfn_response_data(message): + """ + Format message before passing to AWS CloudFormation + """ + response_data = {} + data = {'Message': message} + response_data['Data'] = data + return response_data + +def lambda_handler(event, context): + try: + if event['RequestType'] == 'Create': + try: + return execute_sqls() + # if statements failed rollback cluster creation + except Exception as e: + cfnresponse.send(event, context, cfnresponse.FAILED, get_cfn_response_data('failed: '+str(e))) + raise Exception(e) + else: + # delete or update of cluster do not involve schema changes + print('Delete/Update CF initiated') + cfnresponse.send(event, context, cfnresponse.SUCCESS, get_cfn_response_data('delete')) + except Exception as e: + print(e) + cfnresponse.send(event, context, cfnresponse.FAILED, get_cfn_response_data('failed: '+str(e))) + raise Exception(e) \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/requirements.txt b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/requirements.txt new file mode 100644 index 00000000..663bd1f6 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/requirements.txt @@ -0,0 +1 @@ +requests \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/schema.sql b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/schema.sql new file mode 100644 index 00000000..2bf74eae --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/lambdas/redshift_deploy_schema/schema.sql @@ -0,0 +1,54 @@ +CREATE TABLE IF NOT EXISTS public.pgbench_branches( + changets VARCHAR(26) ENCODE zstd, + bid INTEGER ENCODE az64, + bbalance INTEGER ENCODE az64, + filler VARCHAR(100) ENCODE zstd, + primary key(bid) +) DISTSTYLE ALL +sortkey(bid); +CREATE TABLE IF NOT EXISTS public.pgbench_tellers( + changets VARCHAR(26) ENCODE zstd, + tid INTEGER ENCODE az64, + bid INTEGER ENCODE az64, + tbalance INTEGER ENCODE az64, + filler VARCHAR(100) ENCODE zstd, + primary key(tid), + foreign key(bid) references public.pgbench_branches(bid) +) DISTSTYLE ALL +sortkey(bid); +CREATE TABLE IF NOT EXISTS public.pgbench_accounts( + changets VARCHAR(26) ENCODE zstd, + aid INTEGER ENCODE az64, + bid INTEGER ENCODE az64, + abalance INTEGER ENCODE az64, + filler VARCHAR(100) ENCODE zstd, + primary key(aid), + foreign key(bid) references public.pgbench_branches(bid) + ) +distkey(aid) +sortkey(bid); +CREATE TABLE IF NOT EXISTS public.pgbench_history( + changets VARCHAR(26) ENCODE zstd, + tid INTEGER ENCODE az64, + bid INTEGER ENCODE az64, + aid INTEGER ENCODE az64, + "delta" INTEGER ENCODE az64, + mtime TIMESTAMP WITHOUT TIME ZONE ENCODE zstd, + filler VARCHAR(100) ENCODE zstd, + foreign key(bid) references public.pgbench_branches(bid), + foreign key(aid) references public.pgbench_accounts(aid), + foreign key(tid) references public.pgbench_tellers(tid) + ) +distkey(aid) +sortkey(mtime); +CREATE MATERIALIZED VIEW weekly_account_change_summary_mv +distkey(aid) +sortkey(reporting_week) as +select + h.aid, + sum("h"."delta") as "weekly_change", + DATE_TRUNC('week',h.mtime) as "reporting_week", + a.filler +from public.pgbench_history h +join public.pgbench_accounts a on h.aid=a.aid +group by h.aid,DATE_TRUNC('week',h.mtime ),a.filler \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/lambdas/unload_cdc_optimized/lambda.py b/src/SQLBasedCDCDataIngestion/lambdas/unload_cdc_optimized/lambda.py new file mode 100644 index 00000000..e657c984 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/lambdas/unload_cdc_optimized/lambda.py @@ -0,0 +1,76 @@ +import boto3 +from os import environ +from datetime import date +from datetime import timedelta + +# get all environment variables +glue_database = environ['glue_database'] +redshift_database = environ['redshift_database'] +redshift_role_arn = environ['redshift_role_arn'] +cluster_identifier= environ['cluster_identifier'] +secret_arn= environ['secret_arn'] + +# create all clients +glue_client=boto3.client("glue") +redshift_data_client = boto3.client('redshift-data') + +def get_yeterday_partition(): + """ + get partition in YYYYMMDD format - the same as DMS + """ + return str(date.today() - timedelta(days = 1)).replace("-", "") + +def execute_statement(sql_statement, table): + """ + Executes sql statement using Redshift Data API + """ + statement_name = f'unload table - {table} partition - {get_yeterday_partition()}' + response = redshift_data_client.execute_statement( + ClusterIdentifier=cluster_identifier, + Database=redshift_database, + SecretArn=secret_arn, + Sql=sql_statement, + StatementName = statement_name , + WithEvent=True) + +def create_unload_statement(table_name, table_location): + """ + Creates unload statement for yesterdays date. Note: change in bucket name for output + from -raw- to -optimized- + """ + optimized_location = table_location.replace("-raw-", "-optimized-") + yesterday_partition = get_yeterday_partition() + sql_statement = f""" + UNLOAD ('select * from {glue_database}.{table_name} where partition_0=\\'{yesterday_partition}\\'') + TO '{optimized_location}changedate={yesterday_partition}/part_' + iam_role '{redshift_role_arn}' + FORMAT PARQUET + PARALLEL OFF + MAXFILESIZE 200 MB + ALLOWOVERWRITE""" + + return sql_statement + + +def lambda_handler(event, context): + + # get paginator just in case of many tables + paginator = glue_client.get_paginator('get_tables') + + # filter for tables starting with raw_cdc_ as per convention + response_iterator = paginator.paginate( + DatabaseName=glue_database, + Expression='raw_cdc_*', + PaginationConfig={ + 'PageSize': 10 + }) + + for page in response_iterator: + for table in page["TableList"]: + table_name=table['Name'] + + # for each table create and execute unload statement + sql_statement = create_unload_statement(table['Name'], table['StorageDescriptor']['Location']) + execute_statement(sql_statement, table_name) + + return "Success" \ No newline at end of file diff --git a/src/SQLBasedCDCDataIngestion/redshiftcdc_architecture.png b/src/SQLBasedCDCDataIngestion/redshiftcdc_architecture.png new file mode 100644 index 00000000..36a4248d Binary files /dev/null and b/src/SQLBasedCDCDataIngestion/redshiftcdc_architecture.png differ diff --git a/src/SQLBasedCDCDataIngestion/requirements.txt b/src/SQLBasedCDCDataIngestion/requirements.txt new file mode 100644 index 00000000..d6e1198b --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/requirements.txt @@ -0,0 +1 @@ +-e . diff --git a/src/SQLBasedCDCDataIngestion/setup.py b/src/SQLBasedCDCDataIngestion/setup.py new file mode 100644 index 00000000..e33a424c --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/setup.py @@ -0,0 +1,58 @@ +import setuptools + + +with open("README.md") as fp: + long_description = fp.read() + + +setuptools.setup( + name="dlpoc", + version="0.0.1", + + description="An empty CDK Python app", + long_description=long_description, + long_description_content_type="text/markdown", + + author="author", + + package_dir={"": "dlpoc"}, + packages=setuptools.find_packages(where="dlpoc"), + + install_requires=[ + "aws-cdk.core==1.72.0", + "aws_cdk.aws_redshift==1.72.0", + "aws_cdk.aws_rds==1.72.0", + "aws_cdk.aws_dms==1.72.0", + "aws_cdk.aws_glue==1.72.0", + "aws_cdk.aws_ec2==1.72.0", + "aws_cdk.aws_secretsmanager==1.72.0", + "aws_cdk.aws_glue==1.72.0", + "aws_cdk.aws_lambda==1.72.0", + "aws_cdk.custom_resources==1.72.0", + "aws_cdk.aws_lambda_python==1.72.0", + "aws_cdk.aws_events==1.72.0", + "aws_cdk.aws_sns==1.72.0", + "aws_cdk.aws_events_targets==1.72.0" + ], + + python_requires=">=3.6", + + classifiers=[ + "Development Status :: 4 - Beta", + + "Intended Audience :: Developers", + + "License :: OSI Approved :: Apache Software License", + + "Programming Language :: JavaScript", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + + "Topic :: Software Development :: Code Generators", + "Topic :: Utilities", + + "Typing :: Typed", + ], +) diff --git a/src/SQLBasedCDCDataIngestion/source.bat b/src/SQLBasedCDCDataIngestion/source.bat new file mode 100644 index 00000000..8f574429 --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/source.bat @@ -0,0 +1,13 @@ +@echo off + +rem The sole purpose of this script is to make the command +rem +rem source .env/bin/activate +rem +rem (which activates a Python virtualenv on Linux or Mac OS X) work on Windows. +rem On Windows, this command just runs this batch file (the argument is ignored). +rem +rem Now we don't need to document a Windows command for activating a virtualenv. + +echo Executing .env\Scripts\activate.bat for you +.env\Scripts\activate.bat diff --git a/src/SQLBasedCDCDataIngestion/sql/create_sp.sql b/src/SQLBasedCDCDataIngestion/sql/create_sp.sql new file mode 100644 index 00000000..9bed3bae --- /dev/null +++ b/src/SQLBasedCDCDataIngestion/sql/create_sp.sql @@ -0,0 +1,35 @@ +CREATE OR REPLACE PROCEDURE refresh_account_change_summary() +AS $$ + BEGIN + -- Create staging table with a column for type of operation + CREATE TABLE IF NOT EXISTS "public"."stage_pgbench_accounts" (LIKE "public"."pgbench_accounts"); + ALTER TABLE "public"."stage_pgbench_accounts" ADD COLUMN op VARCHAR(8); + -- Save the last version of the previous day records into the staging table + INSERT INTO "public"."stage_pgbench_accounts" (changets, aid, bid, abalance, filler, op) + SELECT changets, aid, bid, abalance, filler, op + FROM ( + SELECT *, row_number() over (partition by aid order by changets desc) AS lastchange + FROM "cdc_database"."raw_cdc_pgbench_accounts" + WHERE partition_0>=to_char(DATE(DATEADD(DAY, -1, GETDATE())), 'YYYYMMDD') + ) + WHERE lastchange=1; + -- Clean changed records from the original table + DELETE FROM "public"."pgbench_accounts" USING "public"."stage_pgbench_accounts" + WHERE "public"."pgbench_accounts"."aid" = "public"."stage_pgbench_accounts"."aid" + AND "public"."pgbench_accounts"."changets" < "public"."stage_pgbench_accounts"."changets"; + -- Clean pre-existing records from the staging table + DELETE FROM "public"."stage_pgbench_accounts" USING "public"."pgbench_accounts" + WHERE "public"."pgbench_accounts"."aid" = "public"."stage_pgbench_accounts"."aid" + AND"public"."pgbench_accounts"."changets" >= "public"."stage_pgbench_accounts"."changets"; + -- Update the destination table with records from the staging table + INSERT INTO "public"."pgbench_accounts" + SELECT changets, aid, bid, abalance, filler + FROM ( + SELECT * FROM "public"."stage_pgbench_accounts" WHERE op='I' OR op='U' + ); + -- Refresh the materialized view + REFRESH MATERIALIZED VIEW "public"."weekly_account_change_summary_mv"; + -- Delete the staging table + DROP TABLE "public"."stage_pgbench_accounts"; + END; +$$ LANGUAGE plpgsql;