Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit - CDC ingestion based on SQL #565

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/SQLBasedCDCDataIngestion/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
*.swp
package-lock.json
__pycache__
.pytest_cache
.env
*.egg-info

# CDK asset staging directory
.cdk.staging
cdk.out
292 changes: 292 additions & 0 deletions src/SQLBasedCDCDataIngestion/README.md

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions src/SQLBasedCDCDataIngestion/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#!/usr/bin/env python3

from aws_cdk import core

from dlpoc.vpc_stack import VpcStack
from dlpoc.rds_stack import RdsStack
from dlpoc.bastion_stack import BastionStack
from dlpoc.s3_stack import S3Stack
from dlpoc.dms_stack import DMSStack
from dlpoc.redshift_stack import RedshiftStack
from dlpoc.glue_stack import GlueStack
from dlpoc.unload_stack import UnloadStack
from dlpoc.query_schedule_stack import QueryScheduleStack

email_address="[email protected]"

app = core.App()
dl_vpc = VpcStack(app, "DLVPC")
rds_stack = RdsStack(app, "RDS", vpc=dl_vpc.vpc)
bastion_stack = BastionStack(app, "Bastion", vpc=dl_vpc.vpc, dms_sg=rds_stack.dms_sg, secret=rds_stack.secret)
s3_stack = S3Stack(app, "S3")
dms_stack = DMSStack(app, "DMS", vpc=dl_vpc.vpc, rds_secret=rds_stack.secret, raw_bucket=s3_stack.s3_raw, dms_sg=rds_stack.dms_sg)
redshift_stack = RedshiftStack(app, "Redshift", vpc=dl_vpc.vpc, raw_bucket=s3_stack.s3_raw, optimized_bucket=s3_stack.s3_optimized)
glue_stack = GlueStack(scope=app, id="Glue", raw_bucket=s3_stack.s3_raw, optimized_bucket=s3_stack.s3_optimized)
unload_stack = UnloadStack(
app, "Unload", email_address=email_address,
glue_database = glue_stack.glue_database, redshift_database=redshift_stack.default_database_name,
redshift_cluster=redshift_stack.redshift_cluster, redshift_role=redshift_stack.redshift_spectrum_role)
query_schedule_stack = QueryScheduleStack(
scope=app, id="QuerySchedule",
glue_database=glue_stack.glue_database, redshift_database=redshift_stack.default_database_name,
redshift_cluster=redshift_stack.redshift_cluster, redshift_role=redshift_stack.redshift_spectrum_role)
app.synth()
8 changes: 8 additions & 0 deletions src/SQLBasedCDCDataIngestion/cdk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"app": "python3 app.py",
"context": {
"@aws-cdk/core:enableStackNameDuplicates": "true",
"aws-cdk:enableDiffNoFail": "true",
"@aws-cdk/core:stackRelativeExports": "true"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create external schema cdc_database from data catalog database 'cdc_database' iam_role '<your-RedshiftSpectrumRole-arn>' create external database if not exists;
34 changes: 34 additions & 0 deletions src/SQLBasedCDCDataIngestion/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/bash
# This script deploys components of demo Redshift ELT blog
# from Cloud9 environment


# The following steps need to be performed first:
# clone code repo
# git clone https://github.com/AndrewShvedRepo/redshiftcdcelt
# cd redshiftcdcelt/
# ./deploy.sh



# upgrade cdk
sudo npm install -g [email protected]

# install dependencies
sudo pip3 install -r requirements.txt

# remove unnecessary docker images to free up space
docker rmi -f $(docker image ls -a -q)

# bootstrap ckd
cdk bootstrap

# deploy all stacks
cdk deploy --require-approval never "*"


# install boto3
sudo pip3 install boto3

#start dms tasks
python dlpoc/start_dms_tasks.py
Empty file.
56 changes: 56 additions & 0 deletions src/SQLBasedCDCDataIngestion/dlpoc/bastion_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os

from aws_cdk import core
from aws_cdk.aws_s3_assets import Asset
import aws_cdk.aws_ec2 as ec2
import aws_cdk.aws_iam as iam
import aws_cdk.aws_secretsmanager as secretsmanager

DIRNAME = os.path.dirname(__file__)


class BastionStack(core.Stack):
def __init__(
self, scope: core.Construct, id: str,
vpc: ec2.IVpc, dms_sg: ec2.ISecurityGroup,
secret: secretsmanager.ISecret, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

# Allow SSH Access
dms_sg.add_ingress_rule(
peer=ec2.Peer.ipv4("52.95.4.3/32"), connection=ec2.Port.tcp(22),
description="SSH access"
)
dms_sg.add_ingress_rule(
peer=ec2.Peer.ipv4("3.231.255.131/32"), connection=ec2.Port.tcp(22),
description="SSH access"
)

# Create Bastion
self.bastion_host = ec2.BastionHostLinux(
scope=self, id="BastionHost",
vpc=vpc, subnet_selection=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
instance_name="BastionHost", instance_type=ec2.InstanceType(instance_type_identifier="t3.micro"),
security_group=dms_sg
)

# Permission for using Secrets Manager
self.bastion_host.role.add_managed_policy(
iam.ManagedPolicy.from_aws_managed_policy_name("SecretsManagerReadWrite")
)

# Script asset
asset_script = Asset(scope=self, id="pgbench_script", path=os.path.join(DIRNAME, "bootstrap.sh"))
local_bootstrap_path = self.bastion_host.instance.user_data.add_s3_download_command(
bucket=asset_script.bucket,
bucket_key=asset_script.s3_object_key
)

# Execute script asset
self.bastion_host.instance.user_data.add_execute_file_command(
file_path=local_bootstrap_path,
arguments=f"-s {secret.secret_full_arn}"
)

# Read permissions to assets
asset_script.grant_read(self.bastion_host.role)
43 changes: 43 additions & 0 deletions src/SQLBasedCDCDataIngestion/dlpoc/bootstrap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash
# This script bootstraps the installation of pgbench
# initiates it and sets it on the cron for continuous execution
set -e

POSITIONAL=()
while [[ $# -gt 0 ]]; do
key="$1"
case $key in
-s|--secret-id) SECRET_ID="$2"; shift;;
--debug) set -x;;
*)
POSITIONAL+=("$1"); shift;;
esac
done
set -- "${POSITIONAL[@]}"

function install_system_packages() {
/usr/bin/yum -y install postgresql-contrib jq
}

function initiate_pgbench() {
echo "${PG_HOST}:${PG_PORT}:*:${PG_USER}:${PG_PASSWORD}" | tee ~/.pgpass
chmod 0600 ~/.pgpass
pgbench -i -h "${PG_HOST}" -p "${PG_PORT}" -U "${PG_USER}" "${PG_DBNAME}"
}

function schedule_pgbench() {
BENCHMARKING_SCRIPT="*/2 * * * * root pgbench -c 10 -T 30 -h ${PG_HOST} -p ${PG_PORT} -U ${PG_USER} ${PG_DBNAME}"
echo "$BENCHMARKING_SCRIPT" | tee /etc/cron.d/pgbench_cron
}

install_system_packages
REGION=$(curl -s http://169.254.169.254/latest/dynamic/instance-identity/document | jq .region -r)
SECRET=$(aws secretsmanager get-secret-value --secret-id "$SECRET_ID" --region "$REGION" | jq .SecretString -r)
PG_PASSWORD=$(echo "$SECRET" | jq .password -r)
PG_HOST=$(echo "$SECRET" | jq .host -r)
PG_PORT=$(echo "$SECRET" | jq .port -r)
PG_USER=$(echo "$SECRET" | jq .username -r)
PG_DBNAME=$(echo "$SECRET" | jq .dbname -r)
initiate_pgbench
schedule_pgbench
echo "Bootstrapping complete."
118 changes: 118 additions & 0 deletions src/SQLBasedCDCDataIngestion/dlpoc/dms_stack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from aws_cdk import core
import aws_cdk.aws_dms as dms
import aws_cdk.aws_ec2 as ec2
import aws_cdk.aws_s3 as s3
import aws_cdk.aws_iam as iam
import aws_cdk.aws_secretsmanager as secretsmanager
import json



class DMSStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, vpc: ec2.IVpc, rds_secret: secretsmanager.ISecret, dms_sg: ec2.ISecurityGroup ,raw_bucket: s3.IBucket, **kwargs) -> None:
super().__init__(scope, id, **kwargs)

replication_subnet_group = dms.CfnReplicationSubnetGroup(
self, "ReplicationSubnetGroup",
replication_subnet_group_description="Replication Subnet Group for DL",
replication_subnet_group_identifier="replicationsubnetgroupdl",
subnet_ids=[subnet.subnet_id for subnet in vpc.private_subnets])

replication_subnet_group.to_string

dms_s3_role = iam.Role(self, "DMS S3 Role", assumed_by=iam.ServicePrincipal("dms.amazonaws.com"))
raw_bucket.grant_read_write(dms_s3_role)
dms_replication_instance=dms.CfnReplicationInstance(
self, "DMS Replication Instance",
replication_instance_class="dms.t2.large",
allocated_storage=50,
auto_minor_version_upgrade=True,
engine_version="3.4.2",
replication_subnet_group_identifier="replicationsubnetgroupdl",
vpc_security_group_ids=[dms_sg.security_group_id]
)
dms_replication_instance.add_depends_on(replication_subnet_group)


source_endpoint = dms.CfnEndpoint(
self, "Source Endpoint",
endpoint_identifier="sourcerdsdatabase",
endpoint_type="source",
username=rds_secret.secret_value_from_json("username").to_string(),
password=rds_secret.secret_value_from_json("password").to_string(),
database_name=rds_secret.secret_value_from_json("dbname").to_string(),
server_name=rds_secret.secret_value_from_json("host").to_string(),
port=5432,
engine_name=rds_secret.secret_value_from_json("engine").to_string()
)

target_endpoint_cdc = dms.CfnEndpoint(
self, "Target Endpoint CDC",
endpoint_identifier="targetendpointcdc",
endpoint_type="target",
engine_name="s3",
extra_connection_attributes="AddColumnName=true;timestampColumnName=changets;dataFormat=parquet;DatePartitionEnabled=true;DatePartitionSequence=YYYYMMDD;DatePartitionDelimiter=NONE;bucketFolder=CDC",
s3_settings=dms.CfnEndpoint.S3SettingsProperty(
bucket_name=raw_bucket.bucket_name,
service_access_role_arn=dms_s3_role.role_arn)
)
table_mappings = {
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%",
"table-name": "%"
},
"rule-action": "include",
"filters": []
}]
}
target_endpoint_full=dms.CfnEndpoint(
self, "Target Endpoint Full",
endpoint_identifier="targetendpointfull",
endpoint_type="target",
engine_name="s3",
extra_connection_attributes=f"AddColumnName=true;timestampColumnName=changets;dataFormat=parquet;bucketFolder=full",
s3_settings=dms.CfnEndpoint.S3SettingsProperty(
bucket_name=raw_bucket.bucket_name,
service_access_role_arn=dms_s3_role.role_arn)
)
table_mappings = {
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%",
"table-name": "%"
},
"rule-action": "include",
"filters": []
}]
}

replication_task_cdc = dms.CfnReplicationTask(
self, "Replication Task CDC",
migration_type="cdc",
replication_task_identifier="cdcload",
replication_instance_arn=dms_replication_instance.ref,
source_endpoint_arn=source_endpoint.ref,
target_endpoint_arn=target_endpoint_cdc.ref,
table_mappings=json.dumps(table_mappings)
)
replication_task_full = dms.CfnReplicationTask(
self, "Replication Task Full",
migration_type="full-load",
replication_task_identifier="fullload",
replication_instance_arn=dms_replication_instance.ref,
source_endpoint_arn=source_endpoint.ref,
target_endpoint_arn=target_endpoint_full.ref,
table_mappings=json.dumps(table_mappings)
)

core.CfnOutput(self, "Full Load Task Arn", value=replication_task_full.ref)
core.CfnOutput(self, "CDC Load Task Arn", value=replication_task_cdc.ref)
Loading