Skip to content

Commit

Permalink
Merge pull request #2 from Real-Life-IaC/use-eventbridge-instead-of-sns
Browse files Browse the repository at this point in the history
Merge use-eventbridge-instead-of-sns
  • Loading branch information
andresionek91 authored Feb 18, 2024
2 parents 7da237e + 1741f97 commit a85b184
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 123 deletions.
49 changes: 49 additions & 0 deletions infra/constructs/b1/eventbus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import aws_cdk as cdk

from aws_cdk import aws_events as events
from aws_cdk import aws_eventschemas as schemas
from aws_cdk import aws_ssm as ssm
from constructs import Construct


class B1EventBus(Construct):
"""Event Bus for PubSub"""

def __init__(
self,
scope: Construct,
id: str,
) -> None:
super().__init__(scope, id)

# Create Event Bus
self.event_bus = events.EventBus(
scope=self,
id="EventBus",
)

# Create an archive for the event bus
self.event_bus.archive(
id="Archive",
description="PubSub Archive",
event_pattern=events.EventPattern(
account=[cdk.Aws.ACCOUNT_ID]
),
retention=cdk.Duration.days(90),
)

# Allows EventBridge to automatically discover schemas
schemas.CfnDiscoverer(
scope=self,
id="EventSchemaDiscoverer",
source_arn=self.event_bus.event_bus_arn,
description="PubSub Event Schema Discoverer",
)

# Export to SSM
ssm.StringParameter(
scope=self,
id="EventBusArn",
parameter_name="/pubsub/event-bus/arn",
string_value=self.event_bus.event_bus_arn,
)
148 changes: 53 additions & 95 deletions infra/constructs/b1/firehose.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import TypedDict

import aws_cdk as cdk

from aws_cdk import aws_events as events
from aws_cdk import aws_events_targets as targets
from aws_cdk import aws_iam as iam
from aws_cdk import aws_kinesisfirehose as firehose
from aws_cdk import aws_logs as logs
from aws_cdk import aws_s3 as s3
from aws_cdk import aws_ssm as ssm
from aws_cdk.aws_kinesisfirehose import CfnDeliveryStream
from constructs import Construct
from typing_extensions import NotRequired
from typing_extensions import Unpack
Expand All @@ -14,20 +18,25 @@ class Params(TypedDict):
"""Parameters for the B1PubSubFirehose class."""

bucket: s3.Bucket
event_bus: events.EventBus
buffer_interval_in_seconds: NotRequired[int]
buffer_size_in_m_bs: NotRequired[int]


class B1PubSubFirehose(Construct):
class B1Firehose(Construct):
"""Ingest events from SNS into S3 with Firehose."""

def __init__(
self, scope: Construct, id: str, **kwargs: Unpack[Params]
self,
scope: Construct,
id: str,
**kwargs: Unpack[Params],
) -> None:
super().__init__(scope, id)

# Read the kwargs
bucket = kwargs["bucket"]
event_bus = kwargs["event_bus"]
buffer_interval_in_seconds = kwargs.get(
"buffer_interval_in_seconds", 60
)
Expand All @@ -40,80 +49,46 @@ def __init__(
retention=logs.RetentionDays.ONE_MONTH,
)
log_stream = logs.LogStream(
scope=self, id="S3DeliveryStreamLogStream", log_group=log_group
scope=self,
id="S3DeliveryStreamLogStream",
log_group=log_group,
)

# Create role and policies for Kinesis Firehose
delivery_role = iam.Role(
scope=self,
id="DeliveryStreamRole",
id="DeliveryRole",
assumed_by=iam.ServicePrincipal(service="firehose.amazonaws.com"), # type: ignore
)

# Allow Firehose to write to S3
delivery_role.add_to_policy(
statement=iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=[
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*",
"s3:DeleteObject*",
"s3:PutObject",
"s3:PutObjectLegalHold",
"s3:PutObjectRetention",
"s3:PutObjectTagging",
"s3:PutObjectVersionTagging",
"s3:Abort*",
],
resources=[
bucket.bucket_arn,
bucket.arn_for_objects(key_pattern="*"),
],
)
)
bucket.grant_write(delivery_role)

# Allow Firehose to write to CloudWatch Logs
delivery_role.add_to_policy(
statement=iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["logs:CreateLogStream", "logs:PutLogEvents"],
resources=[log_group.log_group_arn],
)
)
log_group.grant_write(delivery_role)

# Create Firehose inline processing configuration
# This extracts information from the event to be used in the prefix/partitioning
# All events reveived by firehose must have '_event_emmiter' and '_event_name' keys
processing_configuration = firehose.CfnDeliveryStream.ProcessingConfigurationProperty( # noqa: B950
# Extracts metadata from the event to be used in the prefix
processing_configuration = CfnDeliveryStream.ProcessingConfigurationProperty( # noqa: B950
enabled=True,
processors=[
firehose.CfnDeliveryStream.ProcessorProperty(
type="RecordDeAggregation",
parameters=[
firehose.CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="SubRecordType",
parameter_value="JSON",
),
],
),
firehose.CfnDeliveryStream.ProcessorProperty(
CfnDeliveryStream.ProcessorProperty(
type="AppendDelimiterToRecord",
parameters=[
firehose.CfnDeliveryStream.ProcessorParameterProperty(
CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="Delimiter",
parameter_value="\\n",
),
],
),
firehose.CfnDeliveryStream.ProcessorProperty(
CfnDeliveryStream.ProcessorProperty(
type="MetadataExtraction",
parameters=[
firehose.CfnDeliveryStream.ProcessorParameterProperty(
CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="MetadataExtractionQuery",
parameter_value="{event_publisher:.event_publisher, event_name:.event_name}", # noqa: B950
parameter_value='{source:.source, detail_type:."detail-type"}', # noqa: B950
),
firehose.CfnDeliveryStream.ProcessorParameterProperty(
CfnDeliveryStream.ProcessorParameterProperty(
parameter_name="JsonParsingEngine",
parameter_value="JQ-1.6",
),
Expand All @@ -123,82 +98,65 @@ def __init__(
)

# Create S3 Destination configuration for Firehose
s3_destination = firehose.CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty( # noqa: B950
s3_destination = CfnDeliveryStream.ExtendedS3DestinationConfigurationProperty( # noqa: B950
bucket_arn=bucket.bucket_arn,
role_arn=delivery_role.role_arn,
buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
buffering_hints=CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=buffer_interval_in_seconds,
size_in_m_bs=buffer_size_in_m_bs,
),
cloud_watch_logging_options=firehose.CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
cloud_watch_logging_options=CfnDeliveryStream.CloudWatchLoggingOptionsProperty(
enabled=True,
log_group_name=log_group.log_group_name,
log_stream_name=log_stream.log_stream_name,
),
compression_format="GZIP",
dynamic_partitioning_configuration=firehose.CfnDeliveryStream.DynamicPartitioningConfigurationProperty(
dynamic_partitioning_configuration=CfnDeliveryStream.DynamicPartitioningConfigurationProperty(
enabled=True,
retry_options=firehose.CfnDeliveryStream.RetryOptionsProperty(
retry_options=CfnDeliveryStream.RetryOptionsProperty(
duration_in_seconds=10
),
),
processing_configuration=processing_configuration,
error_output_prefix="errors/!{firehose:error-output-type}/date=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/",
prefix="!{partitionKeyFromQuery:event_publisher}/!{partitionKeyFromQuery:event_name}/date=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/",
prefix="!{partitionKeyFromQuery:source}/!{partitionKeyFromQuery:detail_type}/date=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}/",
)

# Create Firehose Delivery Stream
self.delivery_stream = firehose.CfnDeliveryStream(
delivery_stream = CfnDeliveryStream(
scope=self,
id="S3DeliveryStream",
delivery_stream_type="DirectPut",
delivery_stream_encryption_configuration_input=firehose.CfnDeliveryStream.DeliveryStreamEncryptionConfigurationInputProperty(
delivery_stream_encryption_configuration_input=CfnDeliveryStream.DeliveryStreamEncryptionConfigurationInputProperty(
key_type="AWS_OWNED_CMK"
),
extended_s3_destination_configuration=s3_destination,
)

self.delivery_stream.node.add_dependency(delivery_role)
self.delivery_stream.node.add_dependency(log_stream)
self.delivery_stream.node.add_dependency(log_group)
# Add dependencies to guarantee that the role, log stream,
# and group are created before firehose
delivery_stream.node.add_dependency(delivery_role)
delivery_stream.node.add_dependency(log_stream)
delivery_stream.node.add_dependency(log_group)

# Create event Bridge rule to send all events to Firehose
events.Rule(
scope=self,
id="S3DeliveryStreamRule",
event_bus=event_bus,
event_pattern=events.EventPattern(
account=[cdk.Aws.ACCOUNT_ID]
),
targets=[
targets.KinesisFirehoseStream(stream=delivery_stream),
], # type: ignore
)

# Export Firehose ARN
ssm.StringParameter(
scope=self,
id="S3DeliveryStreamArn",
string_value=self.delivery_stream.attr_arn,
string_value=delivery_stream.attr_arn,
description="S3 Delivery Stream ARN",
parameter_name="/pubsub/s3-delivery-stream/arn",
)

# Create role to allow SNS to publish events to Firehose
self.firehose_subscription_role = iam.Role(
scope=self,
id="FirehoseSubscriptionRole",
description="Role assumed by SNS to publish events to Kinesis Firehose",
assumed_by=iam.ServicePrincipal(service="sns.amazonaws.com"), # type: ignore
)

# Add policies to role
self.firehose_subscription_role.add_to_policy(
statement=iam.PolicyStatement(
actions=[
"firehose:DescribeDeliveryStream",
"firehose:ListDeliveryStreams",
"firehose:ListTagsForDeliveryStream",
"firehose:PutRecord",
"firehose:PutRecordBatch",
],
effect=iam.Effect.ALLOW,
resources=[self.delivery_stream.attr_arn],
)
)

# Export SNS Subscription Role ARN
ssm.StringParameter(
scope=self,
id="FirehoseSubscriptionRoleArn",
string_value=self.firehose_subscription_role.role_arn,
description="Firehose Subscription Role ARN",
parameter_name="/pubsub/firehose-subscription-role/arn",
)
17 changes: 3 additions & 14 deletions infra/constructs/b1/storage.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import aws_cdk as cdk

from aws_cdk import aws_s3 as s3
from aws_cdk import aws_s3_notifications as s3n
from aws_cdk import aws_sns as sns
from aws_cdk import aws_ssm as ssm
from constructs import Construct


class B1PubSubStorage(Construct):
class B1Storage(Construct):
"""Store events in S3 and notify a topic when object is created"""

def __init__(
Expand Down Expand Up @@ -36,7 +34,8 @@ def __init__(
object_ownership=s3.ObjectOwnership.BUCKET_OWNER_ENFORCED,
server_access_logs_bucket=access_logs_bucket,
server_access_logs_prefix="S3Logs/",
transfer_acceleration=False,
# Sends events to the default event bus
event_bridge_enabled=True,
)

self.bucket.add_lifecycle_rule(
Expand All @@ -48,16 +47,6 @@ def __init__(
],
)

# Create topic to receive event from s3
self.topic = sns.Topic(
scope=self,
id="FileCreatedTopic",
display_name="pubsub-FileCreated",
)

# Notify topic when object is created
self.bucket.add_object_created_notification(dest=s3n.SnsDestination(topic=self.topic)) # type: ignore

# Add bucket ARN to SSM
ssm.StringParameter(
scope=self,
Expand Down
15 changes: 11 additions & 4 deletions infra/constructs/b2/pubsub.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from constructs import Construct
from infra.constructs.b1.firehose import B1PubSubFirehose
from infra.constructs.b1.storage import B1PubSubStorage
from infra.constructs.b1.eventbus import B1EventBus
from infra.constructs.b1.firehose import B1Firehose
from infra.constructs.b1.storage import B1Storage


class B2PubSub(Construct):
Expand All @@ -13,6 +14,12 @@ def __init__(
) -> None:
super().__init__(scope, id)

storage = B1PubSubStorage(scope=self, id="Storage")
event_bus = B1EventBus(scope=self, id="EventBus")
storage = B1Storage(scope=self, id="Storage")

B1PubSubFirehose(scope=self, id="Firehose", bucket=storage.bucket)
B1Firehose(
scope=self,
id="Firehose",
bucket=storage.bucket,
event_bus=event_bus.event_bus,
)
Loading

0 comments on commit a85b184

Please sign in to comment.