diff --git a/msk-lambda-schema-avro-python-sam/.gitignore b/msk-lambda-schema-avro-python-sam/.gitignore new file mode 100644 index 000000000..2fb6152b1 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/.gitignore @@ -0,0 +1,149 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# AWS SAM +.aws-sam/ +samconfig.toml + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db diff --git a/msk-lambda-schema-avro-python-sam/MSKAndKafkaClientEC2.yaml b/msk-lambda-schema-avro-python-sam/MSKAndKafkaClientEC2.yaml new file mode 100644 index 000000000..a5ca0de24 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/MSKAndKafkaClientEC2.yaml @@ -0,0 +1,1368 @@ +AWSTemplateFormatVersion: '2010-09-09' +Parameters: + EnvType: + Description: MSK Cluster Type. + Default: Provisioned + Type: String + AllowedValues: + - Serverless + - Provisioned + ConstraintDescription: Must specify Serverless or Provisioned. + LatestAmiId: + Type: 'AWS::SSM::Parameter::Value' + Default: '/aws/service/ami-amazon-linux-latest/al2023-ami-kernel-6.1-x86_64' + MSKKafkaVersion: + Type: String + Default: 3.9.x + PythonVersion: + Type: String + Description: Choose the version of Python. Lambda currently supports Python 3.9, 3.10, 3.11, and 3.12 + AllowedValues: + - python39 + - python310 + - python311 + - python312 + Default: python312 + ApacheKafkaInstallerLocation: + Type: String + Default: https://dlcdn.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz + KafkaTopicForLambda: + Type: String + Default: MskLambdaTopic + ServerlessLandGithubLocation: + Type: String + Default: https://github.com/aws-samples/serverless-patterns/ + ContactSchemaName: + Type: String + Default: ContactSchema + GlueSchemaRegistryForMSKName: + Type: String + Default: GlueSchemaRegistryForMSK + +Conditions: + CreateProvisionedCluster: !Equals + - !Ref EnvType + - Provisioned + CreateServerlessCluster: !Equals + - !Ref EnvType + - Serverless +Mappings: + SubnetConfig: + VPC: + CIDR: '10.0.0.0/16' + PublicOne: + CIDR: '10.0.0.0/24' + PrivateSubnetMSKOne: + CIDR: '10.0.1.0/24' + PrivateSubnetMSKTwo: + CIDR: '10.0.2.0/24' + PrivateSubnetMSKThree: + CIDR: '10.0.3.0/24' +Resources: + VPC: + Type: AWS::EC2::VPC + Properties: + EnableDnsSupport: true + EnableDnsHostnames: true + CidrBlock: !FindInMap ['SubnetConfig', 'VPC', 'CIDR'] + Tags: + - Key: 'Name' + Value: 'MSKVPC' + + PublicSubnetOne: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PublicOne', 'CIDR'] + MapPublicIpOnLaunch: true + Tags: + - Key: 'Name' + Value: 'PublicSubnet' + PrivateSubnetMSKOne: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetMSKOne', 'CIDR'] + MapPublicIpOnLaunch: false + Tags: + - Key: 'Name' + Value: 'PrivateSubnetMSKOne' + PrivateSubnetMSKTwo: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 1 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetMSKTwo', 'CIDR'] + MapPublicIpOnLaunch: false + Tags: + - Key: 'Name' + Value: 'PrivateSubnetMSKTwo' + PrivateSubnetMSKThree: + Type: AWS::EC2::Subnet + Properties: + AvailabilityZone: + Fn::Select: + - 2 + - Fn::GetAZs: {Ref: 'AWS::Region'} + VpcId: !Ref 'VPC' + CidrBlock: !FindInMap ['SubnetConfig', 'PrivateSubnetMSKThree', 'CIDR'] + MapPublicIpOnLaunch: false + Tags: + - Key: 'Name' + Value: 'PrivateSubnetMSKThree' + + InternetGateway: + Type: AWS::EC2::InternetGateway + GatewayAttachement: + Type: AWS::EC2::VPCGatewayAttachment + Properties: + VpcId: !Ref 'VPC' + InternetGatewayId: !Ref 'InternetGateway' + + NATEIP: + Type: AWS::EC2::EIP + DependsOn: GatewayAttachement + Properties: + Domain: vpc + + NATGateway: + Type: AWS::EC2::NatGateway + Properties: + AllocationId: !GetAtt NATEIP.AllocationId + SubnetId: !Ref 'PublicSubnetOne' + Tags: + - Key: 'Name' + Value: 'ConfluentKafkaNATGateway' + + PublicRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref 'VPC' + PublicRoute: + Type: AWS::EC2::Route + DependsOn: GatewayAttachement + Properties: + RouteTableId: !Ref 'PublicRouteTable' + DestinationCidrBlock: '0.0.0.0/0' + GatewayId: !Ref 'InternetGateway' + PublicSubnetOneRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + SubnetId: !Ref PublicSubnetOne + RouteTableId: !Ref PublicRouteTable + + PrivateRouteTable: + Type: AWS::EC2::RouteTable + Properties: + VpcId: !Ref 'VPC' + + PrivateRoute: + Type: AWS::EC2::Route + DependsOn: NATGateway + Properties: + RouteTableId: !Ref 'PrivateRouteTable' + DestinationCidrBlock: '0.0.0.0/0' + NatGatewayId: !Ref 'NATGateway' + + PrivateSubnetMSKOneRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + RouteTableId: !Ref PrivateRouteTable + SubnetId: !Ref PrivateSubnetMSKOne + PrivateSubnetMSKTwoRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + RouteTableId: !Ref PrivateRouteTable + SubnetId: !Ref PrivateSubnetMSKTwo + PrivateSubnetMSKThreeRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + RouteTableId: !Ref PrivateRouteTable + SubnetId: !Ref PrivateSubnetMSKThree + + KafkaClientInstanceSecurityGroup: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Enable SSH access via port 22 from BastionHostSecurityGroup + GroupName: !Sub "${AWS::StackName} Security group attached to the kakfa client producer" + VpcId: !Ref VPC + SecurityGroupIngress: + - IpProtocol: tcp + FromPort: 22 + ToPort: 22 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3500 + ToPort: 3500 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3600 + ToPort: 3600 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3800 + ToPort: 3800 + CidrIp: 10.0.0.0/24 + - IpProtocol: tcp + FromPort: 3900 + ToPort: 3900 + CidrIp: 10.0.0.0/24 + + MSKSecurityGroup: + Type: AWS::EC2::SecurityGroup + DependsOn: [VPC,KafkaClientInstanceSecurityGroup] + Properties: + GroupDescription: MSK Security Group + GroupName: !Sub "${AWS::StackName} Security group for the MSK cluster" + VpcId: !Ref 'VPC' + SecurityGroupIngress: + - IpProtocol: tcp + FromPort: 2181 + ToPort: 2181 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9094 + ToPort: 9094 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9096 + ToPort: 9096 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9092 + ToPort: 9092 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 9098 + ToPort: 9098 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 8083 + ToPort: 8083 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + - IpProtocol: tcp + FromPort: 8081 + ToPort: 8081 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + + MSKSelfIngressAllowRule: + Type: AWS::EC2::SecurityGroupIngress + DependsOn: MSKSecurityGroup + Properties: + GroupId: !GetAtt MSKSecurityGroup.GroupId + Description: Enable Self referencing Bootstrap servers + IpProtocol: tcp + FromPort: 9092 + ToPort: 9098 + SourceSecurityGroupId: !GetAtt MSKSecurityGroup.GroupId + + KafkaClientSelfIngressAllowRule: + Type: AWS::EC2::SecurityGroupIngress + DependsOn: KafkaClientInstanceSecurityGroup + Properties: + GroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + IpProtocol: tcp + FromPort: 22 + ToPort: 22 + SourceSecurityGroupId: !GetAtt KafkaClientInstanceSecurityGroup.GroupId + + MSKGlueRegistry: + Type: AWS::Glue::Registry + Properties: + Name: !Ref GlueSchemaRegistryForMSKName + Description: "Registry for storing schemas related to MSK" + + ContactSchema: + Type: AWS::Glue::Schema + Properties: + Name: !Ref ContactSchemaName + Compatibility: BACKWARD + DataFormat: AVRO + Registry: + Arn: !GetAtt MSKGlueRegistry.Arn + SchemaDefinition: | + { + "type": "record", + "name": "Contact", + "fields": [ + {"name": "firstname", "type": "string"}, + {"name": "lastname", "type": "string"}, + {"name": "company", "type": "string"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "county", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "homePhone", "type": "string"}, + {"name": "cellPhone", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "website", "type": "string"} + ] + } + + KafkaClientEC2InstanceProvisioned: + Condition: CreateProvisionedCluster + DependsOn: MSKCluster + Type: AWS::EC2::Instance + Properties: + InstanceType: m5.large + IamInstanceProfile: !Ref EC2InstanceProfile + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + SubnetId: !Ref PublicSubnetOne + SecurityGroupIds: [!GetAtt KafkaClientInstanceSecurityGroup.GroupId] + ImageId: !Ref LatestAmiId + Tags: + - Key: 'Name' + Value: 'KafkaClientInstance' + BlockDeviceMappings: + - DeviceName: /dev/xvda + Ebs: + VolumeSize: 50 + VolumeType: gp2 + DeleteOnTermination: true + UserData: + Fn::Base64: + !Sub + - | + #!/bin/bash + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python install" + yum update -y + # Install Python 3.12 and development tools + + # install Python + PYTHON_VERSION=${python_version} + echo "PYTHON_VERSION=$PYTHON_VERSION" >> /home/ec2-user/.bash_profile + + # Install Python 3.12 and development tools + sudo yum install -y python3.12 python3.12-pip python3.12-devel + + # Create symlinks for easier access + sudo ln -sf /usr/bin/python3.12 /usr/local/bin/python3 + sudo ln -sf /usr/bin/pip3.12 /usr/local/bin/pip3 + + # Install virtualenv + sudo pip3.12 install virtualenv + + # Install Java (required for Kafka tools) + sudo yum install -y java-21-amazon-corretto-headless + + # Set JAVA_HOME environment variable + echo "export JAVA_HOME=/usr/lib/jvm/java-21-amazon-corretto" >> /home/ec2-user/.bash_profile + export JAVA_HOME=/usr/lib/jvm/java-21-amazon-corretto + + # Verify installations + python3 --version + pip3 --version + java -version + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Python install succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum install nmap-ncat -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of nmap succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum install git -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of git succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum erase awscli -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum erase of awscli succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum install jq -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of jq succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + sudo yum install -y docker + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of docker succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + service docker start + usermod -a -G docker ec2-user + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + sudo pip3.12 install boto3 kafka-python avro-python3 confluent-kafka + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Python packages install succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + cd /home/ec2-user + su -c "ln -s /usr/bin/python3.12 /usr/bin/python3" -s /bin/sh ec2-user + su -c "pip3 install boto3 --user" -s /bin/sh ec2-user + su -c "pip3 install kafka-python --user" -s /bin/sh ec2-user + + # install AWS CLI 2 - access with aws2 + cd /home/ec2-user + mkdir -p awscli + cd awscli + curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" + unzip awscliv2.zip + sudo ./aws/install + + # Create dirs, get Apache Kafka and unpack it + cd /home/ec2-user + KAFKA_VERSION=${msk_kafka_version} + KAFKA_FOLDER_VERSION=$(echo "$KAFKA_VERSION" | tr -d '.') + KAFKA_FOLDER='Kafka'$KAFKA_FOLDER_VERSION + mkdir -p $KAFKA_FOLDER + mkdir -p /tmp/kafka + ln -s /home/ec2-user/$KAFKA_FOLDER /home/ec2-user/kafka + cd $KAFKA_FOLDER + APACHE_KAFKA_INSTALLER_LOCATION=${apache_kafka_installer_location} + wget $APACHE_KAFKA_INSTALLER_LOCATION + APACHE_KAFKA_INSTALLER_FILE=$(echo "$APACHE_KAFKA_INSTALLER_LOCATION" | awk -F "/" '{print $NF}') + tar -xzf $APACHE_KAFKA_INSTALLER_FILE --strip 1 + cd libs + wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar + cd ../bin + echo "security.protocol=SASL_SSL" > client.properties + echo "sasl.mechanism=AWS_MSK_IAM" >> client.properties + echo "sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;" >> client.properties + echo "sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler" >> client.properties + + # Install AWS SAM CLI + cd /home/ec2-user + mkdir -p awssam + cd awssam + wget https://github.com/aws/aws-sam-cli/releases/latest/download/aws-sam-cli-linux-x86_64.zip + unzip aws-sam-cli-linux-x86_64.zip -d sam-installation + sudo ./sam-installation/install + + # Create command files for creating Kafka Topic and Kafka Producer + cd /home/ec2-user + MSK_CLUSTER_ARN=${msk_cluster_arn} + KAFKA_TOPIC=${kafka_topic_for_lambda} + echo "#!/bin/bash" > kafka_topic_creator.sh + sudo chmod +x kafka_topic_creator.sh + echo "MSK_CLUSTER_ARN=$MSK_CLUSTER_ARN" >> kafka_topic_creator.sh + AWS_REGION=${aws_region} + echo "AWS_REGION=$AWS_REGION" >> kafka_topic_creator.sh + echo "BOOTSTRAP_BROKERS_IAM=\$(aws kafka get-bootstrap-brokers --region \$AWS_REGION --cluster-arn \$MSK_CLUSTER_ARN --query 'BootstrapBrokerStringSaslIam' --output text)" >> kafka_topic_creator.sh + echo "sleep 5" >> kafka_topic_creator.sh + echo "KAFKA_TOPIC=$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "/home/ec2-user/kafka/bin/kafka-topics.sh --create --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --command-config /home/ec2-user/kafka/bin/client.properties --replication-factor 3 --partitions 3 --topic \$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "echo \"export MSK_CLUSTER_ARN=\$MSK_CLUSTER_ARN\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export AWS_REGION=\$AWS_REGION\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export BOOTSTRAP_BROKERS_IAM=\$BOOTSTRAP_BROKERS_IAM\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export KAFKA_TOPIC=\$KAFKA_TOPIC\" >> .bash_profile" >> kafka_topic_creator.sh + echo "#!/bin/bash" > kafka_message_sender.sh + echo "source /home/ec2-user/.bash_profile" >> kafka_message_sender.sh + echo "/home/ec2-user/kafka/bin/kafka-console-producer.sh --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --producer.config /home/ec2-user/kafka/bin/client.properties --topic $KAFKA_TOPIC" >> kafka_message_sender.sh + sudo chmod +x kafka_message_sender.sh + CLUSTER_NAME="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f2)" + CLUSTER_ID="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f3)" + echo "export CLUSTER_NAME=$CLUSTER_NAME" >> /home/ec2-user/.bash_profile + echo "export CLUSTER_ID=$CLUSTER_ID" >> /home/ec2-user/.bash_profile + ./kafka_topic_creator.sh > kafka_topic_creator_output.txt + + #Checkout Serverless Patterns from Github + cd /home/ec2-user + SERVERLESS_LAND_GITHUB_LOCATION=${serverless_land_github_location} + git clone $SERVERLESS_LAND_GITHUB_LOCATION + cd ./serverless-patterns/msk-lambda-schema-avro-python-sam + cp template_original.yaml template.yaml + sudo chown -R ec2-user . + + GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name} + CONTACT_SCHEMA=${contact_schema_name} + VPC_ID=${vpcid} + LAMBDA_SECURITY_GROUP_ID=${securitygroup} + PRIVATE_SUBNET_1=${privatesubnetone} + PRIVATE_SUBNET_2=${privatesubnettwo} + PRIVATE_SUBNET_3=${privatesubnetthree} + SUBNET_IDS="$PRIVATE_SUBNET_1,$PRIVATE_SUBNET_2,$PRIVATE_SUBNET_3" + + source /home/ec2-user/.bash_profile + sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml + sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml + sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml + sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml + sed -i "s/AVRO_SCHEMA/$CONTACT_SCHEMA/g" template.yaml + sed -i "s/VPC_ID/$VPC_ID/g" template.yaml + sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml + sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml + + # Get IP CIDR range for EC2 Instance Connect + cd /home/ec2-user + mkdir -p ip_prefix + cd ip_prefix + git clone https://github.com/joetek/aws-ip-ranges-json.git + cd aws-ip-ranges-json + AWS_REGION=${aws_region} + EC2_CONNECT_IP=$(cat ip-ranges-ec2-instance-connect.json | jq -r --arg AWS_REGION "$AWS_REGION" '.prefixes[] | select(.region==$AWS_REGION).ip_prefix') + echo "export EC2_CONNECT_IP=$EC2_CONNECT_IP" >> /home/ec2-user/.bash_profile + SECURITY_GROUP=${security_group_id} + echo "export SECURITY_GROUP=$SECURITY_GROUP" >> /home/ec2-user/.bash_profile + aws ec2 authorize-security-group-ingress --region $AWS_REGION --group-id $SECURITY_GROUP --protocol tcp --port 22 --cidr $EC2_CONNECT_IP + + - security_group_id : !GetAtt KafkaClientInstanceSecurityGroup.GroupId + msk_cluster_arn : !GetAtt MSKCluster.Arn + kafka_topic_for_lambda : !Ref KafkaTopicForLambda + msk_kafka_version: !Ref MSKKafkaVersion + apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation + serverless_land_github_location: !Ref ServerlessLandGithubLocation + aws_region: !Ref 'AWS::Region' + python_version: !Ref PythonVersion + vpcid: !Ref VPC + privatesubnetone: !Ref PrivateSubnetMSKOne + privatesubnettwo: !Ref PrivateSubnetMSKTwo + privatesubnetthree: !Ref PrivateSubnetMSKThree + securitygroup: !GetAtt MSKSecurityGroup.GroupId + glue_registry_name: !Ref GlueSchemaRegistryForMSKName + contact_schema_name: !Ref ContactSchemaName + + + KafkaClientEC2InstanceServerless: + Condition: CreateServerlessCluster + DependsOn: ServerlessMSKCluster + Type: AWS::EC2::Instance + Properties: + InstanceType: m5.large + IamInstanceProfile: !Ref EC2InstanceProfile + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: {Ref: 'AWS::Region'} + SubnetId: !Ref PublicSubnetOne + SecurityGroupIds: [!GetAtt KafkaClientInstanceSecurityGroup.GroupId] + ImageId: !Ref LatestAmiId + Tags: + - Key: 'Name' + Value: 'KafkaClientInstance' + BlockDeviceMappings: + - DeviceName: /dev/xvda + Ebs: + VolumeSize: 50 + VolumeType: gp2 + DeleteOnTermination: true + UserData: + Fn::Base64: + !Sub + - | + #!/bin/bash + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python install" + yum update -y + # Install Python 3.12 and development tools + + # install Python + PYTHON_VERSION=${python_version} + echo "PYTHON_VERSION=$PYTHON_VERSION" >> /home/ec2-user/.bash_profile + + # Install Python 3.12 and development tools + sudo yum install -y python3.12 python3.12-pip python3.12-devel + + # Create symlinks for easier access + sudo ln -sf /usr/bin/python3.12 /usr/local/bin/python3 + sudo ln -sf /usr/bin/pip3.12 /usr/local/bin/pip3 + + # Install virtualenv + sudo pip3.12 install virtualenv + + # Install Java (required for Kafka tools) + sudo yum install -y java-21-amazon-corretto-headless + + # Set JAVA_HOME environment variable + echo "export JAVA_HOME=/usr/lib/jvm/java-21-amazon-corretto" >> /home/ec2-user/.bash_profile + export JAVA_HOME=/usr/lib/jvm/java-21-amazon-corretto + + # Verify installations + python3 --version + pip3 --version + java -version + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Python install succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum install nmap-ncat -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of nmap succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum install git -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of git succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum erase awscli -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum erase of awscli succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + yum install jq -y + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of jq succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + sudo yum install -y docker + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Yum install of docker succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + service docker start + usermod -a -G docker ec2-user + + max_attempts=5 + attempt_num=1 + success=false + while [ $success = false ] && [ $attempt_num -le $max_attempts ]; do + echo "Trying Python packages install" + sudo pip3.12 install boto3 kafka-python avro-python3 confluent-kafka + # Check the exit code of the command + if [ $? -eq 0 ]; then + echo "Python packages install succeeded" + success=true + else + echo "Attempt $attempt_num failed. Sleeping for 3 seconds and trying again..." + sleep 3 + ((attempt_num++)) + fi + done + + cd /home/ec2-user + su -c "ln -s /usr/bin/python3.12 /usr/bin/python3" -s /bin/sh ec2-user + su -c "pip3 install boto3 --user" -s /bin/sh ec2-user + su -c "pip3 install kafka-python --user" -s /bin/sh ec2-user + + # install AWS CLI 2 - access with aws2 + cd /home/ec2-user + mkdir -p awscli + cd awscli + curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" + unzip awscliv2.zip + sudo ./aws/install + + # Create dirs, get Apache Kafka and unpack it + cd /home/ec2-user + KAFKA_VERSION=${msk_kafka_version} + KAFKA_FOLDER_VERSION=$(echo "$KAFKA_VERSION" | tr -d '.') + KAFKA_FOLDER='Kafka'$KAFKA_FOLDER_VERSION + mkdir -p $KAFKA_FOLDER + mkdir -p /tmp/kafka + ln -s /home/ec2-user/$KAFKA_FOLDER /home/ec2-user/kafka + cd $KAFKA_FOLDER + APACHE_KAFKA_INSTALLER_LOCATION=${apache_kafka_installer_location} + wget $APACHE_KAFKA_INSTALLER_LOCATION + APACHE_KAFKA_INSTALLER_FILE=$(echo "$APACHE_KAFKA_INSTALLER_LOCATION" | awk -F "/" '{print $NF}') + tar -xzf $APACHE_KAFKA_INSTALLER_FILE --strip 1 + cd libs + wget https://github.com/aws/aws-msk-iam-auth/releases/download/v2.2.0/aws-msk-iam-auth-2.2.0-all.jar + cd ../bin + echo "security.protocol=SASL_SSL" > client.properties + echo "sasl.mechanism=AWS_MSK_IAM" >> client.properties + echo "sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;" >> client.properties + echo "sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler" >> client.properties + + # Install AWS SAM CLI + cd /home/ec2-user + mkdir -p awssam + cd awssam + wget https://github.com/aws/aws-sam-cli/releases/latest/download/aws-sam-cli-linux-x86_64.zip + unzip aws-sam-cli-linux-x86_64.zip -d sam-installation + sudo ./sam-installation/install + + # Create command files for creating Kafka Topic and Kafka Producer + cd /home/ec2-user + MSK_CLUSTER_ARN=${msk_cluster_arn} + KAFKA_TOPIC=${kafka_topic_for_lambda} + echo "#!/bin/bash" > kafka_topic_creator.sh + sudo chmod +x kafka_topic_creator.sh + echo "MSK_CLUSTER_ARN=$MSK_CLUSTER_ARN" >> kafka_topic_creator.sh + AWS_REGION=${aws_region} + echo "AWS_REGION=$AWS_REGION" >> kafka_topic_creator.sh + echo "BOOTSTRAP_BROKERS_IAM=\$(aws kafka get-bootstrap-brokers --region \$AWS_REGION --cluster-arn \$MSK_CLUSTER_ARN --query 'BootstrapBrokerStringSaslIam' --output text)" >> kafka_topic_creator.sh + echo "sleep 5" >> kafka_topic_creator.sh + echo "KAFKA_TOPIC=$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "/home/ec2-user/kafka/bin/kafka-topics.sh --create --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --command-config /home/ec2-user/kafka/bin/client.properties --replication-factor 3 --partitions 3 --topic \$KAFKA_TOPIC" >> kafka_topic_creator.sh + echo "echo \"export MSK_CLUSTER_ARN=\$MSK_CLUSTER_ARN\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export AWS_REGION=\$AWS_REGION\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export BOOTSTRAP_BROKERS_IAM=\$BOOTSTRAP_BROKERS_IAM\" >> .bash_profile" >> kafka_topic_creator.sh + echo "echo \"export KAFKA_TOPIC=\$KAFKA_TOPIC\" >> .bash_profile" >> kafka_topic_creator.sh + echo "#!/bin/bash" > kafka_message_sender.sh + echo "source /home/ec2-user/.bash_profile" >> kafka_message_sender.sh + echo "/home/ec2-user/kafka/bin/kafka-console-producer.sh --bootstrap-server \$BOOTSTRAP_BROKERS_IAM --producer.config /home/ec2-user/kafka/bin/client.properties --topic $KAFKA_TOPIC" >> kafka_message_sender.sh + sudo chmod +x kafka_message_sender.sh + CLUSTER_NAME="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f2)" + CLUSTER_ID="$(echo $MSK_CLUSTER_ARN | cut -d '/' -f3)" + echo "export CLUSTER_NAME=$CLUSTER_NAME" >> /home/ec2-user/.bash_profile + echo "export CLUSTER_ID=$CLUSTER_ID" >> /home/ec2-user/.bash_profile + ./kafka_topic_creator.sh > kafka_topic_creator_output.txt + + #Checkout Serverless Patterns from Github + cd /home/ec2-user + SERVERLESS_LAND_GITHUB_LOCATION=${serverless_land_github_location} + git clone $SERVERLESS_LAND_GITHUB_LOCATION + cd ./serverless-patterns/msk-lambda-schema-avro-python-sam + cp template_original.yaml template.yaml + sudo chown -R ec2-user . + + GLUE_SCHEMA_REGISTRY_NAME=${glue_registry_name} + CONTACT_SCHEMA=${contact_schema_name} + VPC_ID=${vpcid} + LAMBDA_SECURITY_GROUP_ID=${securitygroup} + PRIVATE_SUBNET_1=${privatesubnetone} + PRIVATE_SUBNET_2=${privatesubnettwo} + PRIVATE_SUBNET_3=${privatesubnetthree} + SUBNET_IDS="$PRIVATE_SUBNET_1,$PRIVATE_SUBNET_2,$PRIVATE_SUBNET_3" + + + source /home/ec2-user/.bash_profile + sed -i "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml + sed -i "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml + sed -i "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml + sed -i "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml + sed -i "s/AVRO_SCHEMA/$CONTACT_SCHEMA/g" template.yaml + sed -i "s/VPC_ID/$VPC_ID/g" template.yaml + sed -i "s/LAMBDA_SECURITY_GROUP_ID/$LAMBDA_SECURITY_GROUP_ID/g" template.yaml + sed -i "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml + + # Get IP CIDR range for EC2 Instance Connect + cd /home/ec2-user + mkdir -p ip_prefix + cd ip_prefix + git clone https://github.com/joetek/aws-ip-ranges-json.git + cd aws-ip-ranges-json + AWS_REGION=${aws_region} + EC2_CONNECT_IP=$(cat ip-ranges-ec2-instance-connect.json | jq -r --arg AWS_REGION "$AWS_REGION" '.prefixes[] | select(.region==$AWS_REGION).ip_prefix') + echo "export EC2_CONNECT_IP=$EC2_CONNECT_IP" >> /home/ec2-user/.bash_profile + SECURITY_GROUP=${security_group_id} + echo "export SECURITY_GROUP=$SECURITY_GROUP" >> /home/ec2-user/.bash_profile + aws ec2 authorize-security-group-ingress --region $AWS_REGION --group-id $SECURITY_GROUP --protocol tcp --port 22 --cidr $EC2_CONNECT_IP + + - security_group_id : !GetAtt KafkaClientInstanceSecurityGroup.GroupId + msk_cluster_arn : !GetAtt ServerlessMSKCluster.Arn + kafka_topic_for_lambda : !Ref KafkaTopicForLambda + msk_kafka_version: !Ref MSKKafkaVersion + apache_kafka_installer_location: !Ref ApacheKafkaInstallerLocation + serverless_land_github_location: !Ref ServerlessLandGithubLocation + aws_region: !Ref 'AWS::Region' + python_version: !Ref PythonVersion + vpcid: !Ref VPC + privatesubnetone: !Ref PrivateSubnetMSKOne + privatesubnettwo: !Ref PrivateSubnetMSKTwo + privatesubnetthree: !Ref PrivateSubnetMSKThree + securitygroup: !GetAtt MSKSecurityGroup.GroupId + glue_registry_name: !Ref GlueSchemaRegistryForMSKName + contact_schema_name: !Ref ContactSchemaName + + + + EC2InstanceEndpoint: + Type: AWS::EC2::InstanceConnectEndpoint + Properties: + PreserveClientIp: true + SecurityGroupIds: + - !GetAtt KafkaClientInstanceSecurityGroup.GroupId + SubnetId: !Ref PublicSubnetOne + + EC2Role: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Sid: '' + Effect: Allow + Principal: + Service: ec2.amazonaws.com + Action: 'sts:AssumeRole' + Path: "/" + ManagedPolicyArns: + - arn:aws:iam::aws:policy/AmazonMSKFullAccess + - arn:aws:iam::aws:policy/AWSCloudFormationFullAccess + - arn:aws:iam::aws:policy/CloudWatchFullAccess + - arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore + - arn:aws:iam::aws:policy/AmazonS3FullAccess + - arn:aws:iam::aws:policy/AWSCertificateManagerPrivateCAFullAccess + - arn:aws:iam::aws:policy/IAMFullAccess + - arn:aws:iam::aws:policy/AWSLambda_FullAccess + - arn:aws:iam::aws:policy/AmazonSQSFullAccess + Policies: + - PolicyName: MSKConfigurationAccess + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": "kafka:CreateConfiguration", + "Resource": "*" + } + ] + }' + - PolicyName: CloudformationDeploy + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "iam:*" + ], + "Resource": "*" + } + ] + }' + - PolicyName: MSKProducerPermissions + PolicyDocument: + Version: 2012-10-17 + Statement: + - Sid: SecretsAccess + Effect: Allow + Action: + - 'secretsmanager:*' + - 'kms:*' + Resource: '*' + - Sid: SQSPermissions + Effect: Allow + Action: + - 'sqs:CreateQueue' + - 'sqs:DeleteQueue' + - 'sqs:SetQueueAttributes' + - 'sqs:GetQueueAttributes' + - 'sqs:GetQueueUrl' + - 'sqs:TagQueue' + - 'sqs:ListQueues' + - 'sqs:ListQueueTags' + Resource: '*' + - Sid: GlueAndIAMPermissions + Effect: Allow + Action: + - 'glue:*Schema*' + - 'iam:CreatePolicy' + - 'iam:Tag*' + - 'iam:AttachRolePolicy' + Resource: '*' + - PolicyName: MSKConnectAuthentication + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:*Topic*", + "kafka-cluster:Connect", + "kafka-cluster:AlterCluster", + "kafka-cluster:DescribeCluster", + "kafka-cluster:DescribeClusterDynamicConfiguration" + ], + "Resource": [ + "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:cluster/${AWS::StackName}-cluster/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:*Topic*", + "kafka-cluster:WriteData", + "kafka-cluster:ReadData" + ], + "Resource": [ + "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:topic/${AWS::StackName}-cluster/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:AlterGroup", + "kafka-cluster:DescribeGroup" + ], + "Resource": [ + "arn:aws:kafka:${AWS::Region}:${AWS::AccountId}:group/${AWS::StackName}-cluster/*" + ] + } + ] + }' + - PolicyName: SecurityGroupsPolicy + PolicyDocument: !Sub '{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "ec2:DescribeSecurityGroups", + "ec2:DescribeSecurityGroupRules", + "ec2:DescribeTags" + ], + "Resource": "*" + }, + { + "Effect": "Allow", + "Action": [ + "ec2:AuthorizeSecurityGroupIngress", + "ec2:RevokeSecurityGroupIngress", + "ec2:AuthorizeSecurityGroupEgress", + "ec2:RevokeSecurityGroupEgress", + "ec2:ModifySecurityGroupRules", + "ec2:UpdateSecurityGroupRuleDescriptionsIngress", + "ec2:UpdateSecurityGroupRuleDescriptionsEgress" + ], + "Resource": [ + "arn:aws:ec2:${AWS::Region}:${AWS::AccountId}:security-group/*" + ] + }, + { + "Effect": "Allow", + "Action": [ + "ec2:ModifySecurityGroupRules" + ], + "Resource": [ + "arn:aws:ec2:${AWS::Region}:${AWS::AccountId}:security-group-rule/*" + ] + } + ] + }' + + EC2InstanceProfile: + Type: AWS::IAM::InstanceProfile + Properties: + InstanceProfileName: !Join + - '-' + - - 'EC2MMMSKCFProfile' + - !Ref 'AWS::StackName' + Roles: + - !Ref EC2Role + + + MSKCertAuthority: + Type: AWS::ACMPCA::CertificateAuthority + Condition: CreateProvisionedCluster + Properties: + KeyAlgorithm: "RSA_4096" + SigningAlgorithm: "SHA256WITHRSA" + Subject: + Country: "US" + Type: "ROOT" + + MSKCert: + Type: AWS::ACMPCA::Certificate + Condition: CreateProvisionedCluster + Properties: + CertificateAuthorityArn: !Ref MSKCertAuthority + CertificateSigningRequest: !GetAtt + - MSKCertAuthority + - CertificateSigningRequest + SigningAlgorithm: "SHA256WITHRSA" + TemplateArn: arn:aws:acm-pca:::template/RootCACertificate/V1 + Validity: + Type: YEARS + Value: 10 + + RootCAActivation: + Type: AWS::ACMPCA::CertificateAuthorityActivation + Condition: CreateProvisionedCluster + Properties: + CertificateAuthorityArn: + Ref: MSKCertAuthority + Certificate: + Fn::GetAtt: + - MSKCert + - Certificate + Status: ACTIVE + + RootCAPermission: + Type: AWS::ACMPCA::Permission + Condition: CreateProvisionedCluster + Properties: + Actions: + - IssueCertificate + - GetCertificate + - ListPermissions + CertificateAuthorityArn: !Ref MSKCertAuthority + Principal: acm.amazonaws.com + + CredentialsKMSKey: + Type: AWS::KMS::Key + Condition: CreateProvisionedCluster + Properties: + Description: "KMS key to use with credentials secret with KMS" + EnableKeyRotation: True + KeyPolicy: + Version: "2012-10-17" + Id: key-default-1 + Statement: + - Sid: Enable IAM User Permissions + Effect: Allow + Principal: + AWS: !Join + - '' + - - 'arn:aws:iam::' + - !Ref 'AWS::AccountId' + - ':root' + Action: 'kms:*' + Resource: '*' + - Sid: Enable Secret Manager Permissions + Effect: Allow + Principal: + AWS: "*" + Action: + - "kms:Decrypt" + - "kms:ReEncrypt*" + - "kms:GenerateDataKey*" + - "kms:CreateGrant" + - "kms:DescribeKey" + Resource: '*' + Condition: + StringEquals: + kms:CallerAccount: !Ref 'AWS::AccountId' + kms:ViaService: !Join + - '' + - - 'secretsmanager.' + - !Ref 'AWS::Region' + - '.amazonaws.com' + PendingWindowInDays: 7 + + CredentialsKMSKeyAlias: + Type: AWS::KMS::Alias + Condition: CreateProvisionedCluster + Properties: + AliasName: alias/mskstack_secret_manager_key + TargetKeyId: !Ref 'CredentialsKMSKey' + + CredentialsSecret: + Type: AWS::SecretsManager::Secret + Condition: CreateProvisionedCluster + Properties: + Description: "Secret to use for SCRAM Auth" + Name: "AmazonMSK_Credentials" + GenerateSecretString: + SecretStringTemplate: '{"username": "test-user"}' + GenerateStringKey: "password" + PasswordLength: 30 + ExcludeCharacters: '"@/\' + KmsKeyId: !Ref 'CredentialsKMSKey' + + MSKConfiguration: + Type: AWS::MSK::Configuration + Condition: CreateProvisionedCluster + Properties: + Description: "MSKConfiguration" + Name: "MSKConfiguration" + ServerProperties: | + auto.create.topics.enable=true + default.replication.factor=3 + min.insync.replicas=2 + num.io.threads=8 + num.network.threads=5 + num.partitions=1 + num.replica.fetchers=2 + replica.lag.time.max.ms=30000 + socket.receive.buffer.bytes=102400 + socket.request.max.bytes=104857600 + socket.send.buffer.bytes=102400 + unclean.leader.election.enable=true + zookeeper.session.timeout.ms=18000 + delete.topic.enable=true + log.retention.hours=8 + + MSKCluster: + Type: AWS::MSK::Cluster + Condition: CreateProvisionedCluster + Properties: + BrokerNodeGroupInfo: + ClientSubnets: + - !Ref PrivateSubnetMSKOne + - !Ref PrivateSubnetMSKTwo + - !Ref PrivateSubnetMSKThree + SecurityGroups: + - !GetAtt MSKSecurityGroup.GroupId + InstanceType: "kafka.m5.large" + StorageInfo: + EBSStorageInfo: + VolumeSize: 100 + ClientAuthentication: + Unauthenticated: + Enabled: False + Sasl: + Iam: + Enabled: True + Scram: + Enabled: True + Tls: + CertificateAuthorityArnList: + - !Ref MSKCertAuthority + Enabled: True + ClusterName: !Sub "${AWS::StackName}-cluster" + ConfigurationInfo: + Arn: !Ref MSKConfiguration + Revision: 1 + EncryptionInfo: + EncryptionInTransit: + ClientBroker: TLS + InCluster: True + KafkaVersion: !Ref MSKKafkaVersion + NumberOfBrokerNodes: 3 + + SecretMSKAssociation: + Type: AWS::MSK::BatchScramSecret + Condition: CreateProvisionedCluster + Properties: + ClusterArn: !Ref MSKCluster + SecretArnList: + - !Ref CredentialsSecret + + ServerlessMSKCluster: + Type: AWS::MSK::ServerlessCluster + Condition: CreateServerlessCluster + Properties: + ClientAuthentication: + Sasl: + Iam: + Enabled: True + ClusterName: !Sub "${AWS::StackName}-cluster" + VpcConfigs: + - SubnetIds: + - !Ref PrivateSubnetMSKOne + - !Ref PrivateSubnetMSKTwo + - !Ref PrivateSubnetMSKThree + SecurityGroups: + - !GetAtt MSKSecurityGroup.GroupId + + + + +Outputs: + VPCId: + Description: The ID of the VPC created + Value: !Ref 'VPC' + Export: + Name: !Sub "${AWS::StackName}-VPCID" + PublicSubnetOne: + Description: The name of the public subnet created + Value: !Ref 'PublicSubnetOne' + Export: + Name: !Sub "${AWS::StackName}-PublicSubnetOne" + PrivateSubnetMSKOne: + Description: The ID of private subnet one created + Value: !Ref 'PrivateSubnetMSKOne' + Export: + Name: !Sub "${AWS::StackName}-PrivateSubnetMSKOne" + PrivateSubnetMSKTwo: + Description: The ID of private subnet two created + Value: !Ref 'PrivateSubnetMSKTwo' + Export: + Name: !Sub "${AWS::StackName}-PrivateSubnetMSKTwo" + PrivateSubnetMSKThree: + Description: The ID of private subnet three created + Value: !Ref 'PrivateSubnetMSKThree' + Export: + Name: !Sub "${AWS::StackName}-PrivateSubnetMSKThree" + VPCStackName: + Description: The name of the VPC Stack + Value: !Ref 'AWS::StackName' + Export: + Name: !Sub "${AWS::StackName}-VPCStackName" + MSKArn: + Description: Provisioned MSK Cluster ARN. + Value: !Ref MSKCluster + Export: + Name: !Sub "${AWS::StackName}-MSKArn" + Condition: "CreateProvisionedCluster" + CredentialsSecretArn: + Description: ARN for secret manager secret with credentials. + Value: !Ref CredentialsSecret + Export: + Name: !Sub "${AWS::StackName}-CredentialsSecret" + Condition: "CreateProvisionedCluster" + ServerlessMSKArn: + Description: Serverless MSK Cluster ARN. + Value: !Ref ServerlessMSKCluster + Export: + Name: !Sub "${AWS::StackName}-Serverless" + Condition: "CreateServerlessCluster" + SecurityGroupId: + Description: ID of scurity group for MSK clients. + Value: !GetAtt MSKSecurityGroup.GroupId + Export: + Name: !Sub "${AWS::StackName}-SecurityGroupId" + EC2InstanceEndpointID: + Description: The ID of the EC2 Instance Endpoint + Value: !Ref EC2InstanceEndpoint + KafkaTopicForLambda: + Description: The Topic to use for the Java Lambda Function + Value: !Ref KafkaTopicForLambda + Export: + Name: !Sub "${AWS::StackName}-KafkaTopicForLambda" + ContactSchemaArn: + Description: ARN of the Contact Schema Registry + Value: !Ref ContactSchema + Export: + Name: !Sub "${AWS::StackName}-ContactSchemaArn" \ No newline at end of file diff --git a/msk-lambda-schema-avro-python-sam/README.md b/msk-lambda-schema-avro-python-sam/README.md new file mode 100644 index 000000000..bc9c6947e --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/README.md @@ -0,0 +1,181 @@ +# AWS Lambda function subscribed to an Amazon MSK Kafka topic with Avro and Schema Registry (Python) + +This pattern demonstrates Lambda functions that: +1. Consume messages from an Amazon Managed Streaming for Kafka (Amazon MSK) topic +2. Produce Avro-formatted messages to an Amazon MSK topic using Schema Registry + +Both functions use IAM authentication to connect to the MSK Cluster and use AWS Glue Schema Registry for Avro schema management. + +This project contains source code and supporting files for a serverless application that you can deploy with the SAM CLI. It includes the following files and folders: + +- `kafka_event_consumer_function` - Code for the consumer Lambda function. +- `kafka_event_producer_function` - Code for the Avro producer Lambda function. +- `events` - Invocation events that you can use to invoke the functions. +- `schemas` - Avro schema definitions. +- `kafka_event_consumer_function/tests` - Unit tests for the consumer code. +- `template.yaml` - A template that defines the application's Lambda functions. +- `MSKAndKafkaClientEC2.yaml` - A CloudFormation template file that can be used to deploy an MSK cluster and also deploy an EC2 machine with all pre-requisites already installed, so you can directly build and deploy the lambda functions and test them out. +- `requirements.txt` - Python dependencies for the entire project. +- `deploy.sh` - Automated deployment script that detects MSK CloudFormation stack and deploys Lambda functions. + +> [!Important] +> This application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured +* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS Serverless Application Model](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) (AWS SAM) installed + +## Deployment Options + +You have two options to deploy and test this pattern: + +### Option A: Using EC2 Instance (Recommended for Testing) +- **Best for**: Quick testing and evaluation +- **Includes**: Pre-configured MSK cluster, EC2 instance with all tools installed +- **Setup time**: ~15-20 minutes + +### Option B: Local Development Environment +- **Best for**: Development and customization +- **Requires**: Local setup of Python, SAM CLI, Docker +- **Setup time**: ~10-15 minutes + +--- + +## Option A: EC2 Instance Setup + +### Step 1: Deploy MSK Cluster and EC2 Instance + +1. **Deploy the CloudFormation template**: + - Browse to the Amazon CloudFormation console + - Create a new stack using the file `MSKAndKafkaClientEC2.yaml` + - Keep the defaults for input parameters or modify them as necessary + - Wait for the CloudFormation stack to be created (~15-20 minutes) + +2. **What gets created**: + - MSK cluster (Provisioned or Serverless based on your selection) + - EC2 instance with all pre-requisites installed + - VPC, subnets, and security groups + - Kafka topic for Lambda functions + +### Step 2: Connect to EC2 Instance + +1. **Connect to the EC2 instance**: + - Go to the EC2 console + - Find your instance and click "Connect" + - Use **EC2 Instance Connect** or **EC2 Instance Connect Endpoint** + +### Step 3: Navigate and Deploy + +```bash +# Change to the pattern directory +cd serverless-patterns/msk-lambda-schema-avro-python-sam + +# Deploy using the automated script +./deploy.sh +``` + +The deploy script will automatically detect your MSK CloudFormation stack and configure all parameters. + +--- + +## Option B: Local Development Setup + +### Step 1: Clone and Navigate + +```bash +# Clone the serverless patterns repository +git clone https://github.com/aws-samples/serverless-patterns.git +cd serverless-patterns/msk-lambda-schema-avro-python-sam +``` + +### Step 2: Deploy + +#### Automated Deployment (If using MSKAndKafkaClientEC2 stack) + +```bash +# Run the automated deployment script +./deploy.sh +``` + +#### Manual Deployment + +```bash +# Build the application +sam build + +# Deploy with guided prompts +sam deploy --capabilities CAPABILITY_IAM --guided +``` + +During deployment, you'll be prompted for: +* **Stack Name**: `msk-lambda-schema-avro-python-sam` +* **AWS Region**: Your current region +* **Parameter MSKClusterName**: The name of the MSK Cluster +* **Parameter MSKClusterId**: The unique ID of the MSK Cluster +* **Parameter MSKTopic**: The Kafka topic name +* **Parameter ContactSchemaName**: The name of the schema (default: ContactSchema) +* **Parameter VpcId**: The VPC ID +* **Parameter SubnetIds**: Comma-separated subnet IDs +* **Parameter SecurityGroupIds**: Security group IDs + +--- + +## Testing the Application + +### Invoke the Producer Function + +```bash +sam remote invoke LambdaMSKProducerPythonFunction --region --stack-name msk-lambda-schema-avro-python-sam +``` + +### Verify Consumer Processing + +```bash +# View consumer function logs +sam logs --name LambdaMSKConsumerPythonFunction --stack-name msk-lambda-schema-avro-python-sam --region +``` + +### Expected Behavior + +- **Producer**: Generates messages with zip codes starting with both "1000" and "2000" +- **Consumer**: Only processes messages with zip codes starting with "2000" (filtered by event source mapping) +- **Schema Registry**: Handles Avro serialization/deserialization automatically + +--- + +## How it works + +The producer Lambda function generates sample contact data and publishes it to the MSK topic using Avro serialization with AWS Glue Schema Registry. The consumer Lambda function is triggered by messages from the MSK topic and processes only messages with zip codes starting with "2000" due to event source mapping filters. + +This demonstrates how event source mapping filters can be used to efficiently process only messages that match specific criteria, reducing Lambda invocation costs and processing overhead. + +--- + +## Cleanup + +Delete the Lambda stack: + +```bash +sam delete --stack-name msk-lambda-schema-avro-python-sam +``` + +If you deployed the MSK cluster using the CloudFormation template, delete that stack from the AWS Console. + +--- + +## Additional Resources + +- [AWS Lambda Developer Guide](https://docs.aws.amazon.com/lambda/latest/dg/) +- [Amazon MSK Developer Guide](https://docs.aws.amazon.com/msk/latest/developerguide/) +- [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html) +- [AWS Lambda Powertools Python](https://awslabs.github.io/aws-lambda-powertools-python/) +- [AWS SAM Developer Guide](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/) + +---- + +Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/msk-lambda-schema-avro-python-sam/deploy.sh b/msk-lambda-schema-avro-python-sam/deploy.sh new file mode 100755 index 000000000..abcea3f1e --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/deploy.sh @@ -0,0 +1,178 @@ +#!/bin/bash + +# Deployment script for MSK Lambda Schema Avro Python SAM +# This script helps replace placeholders in the template with actual values + +set -e + +echo "MSK Lambda Schema Avro Python SAM Deployment Script" +echo "==================================================" + +# Check if virtual environment exists +if [ ! -d "venv" ]; then + echo "Virtual environment not found. Setting up..." + ./setup_venv.sh +fi + +# Activate virtual environment +echo "Activating virtual environment..." +source venv/bin/activate + +# Verify Python environment +echo "Using Python: $(which python)" +echo "Python version: $(python --version)" + +# Check if AWS CLI is configured +if ! aws sts get-caller-identity > /dev/null 2>&1; then + echo "Error: AWS CLI is not configured or credentials are not available" + exit 1 +fi + +# Get current AWS region +AWS_REGION=$(aws configure get region) +if [ -z "$AWS_REGION" ]; then + # Try to get region from EC2 metadata if running on EC2 + AWS_REGION=$(curl -s http://169.254.169.254/latest/meta-data/placement/region 2>/dev/null || echo "") + if [ -n "$AWS_REGION" ]; then + echo "Detected region from EC2 metadata: $AWS_REGION" + aws configure set region "$AWS_REGION" + else + echo "Error: AWS region is not set. Please configure your AWS CLI with a default region." + exit 1 + fi +fi + +echo "Using AWS Region: $AWS_REGION" + +# Function to get CloudFormation stack outputs +get_stack_output() { + local stack_name=$1 + local output_key=$2 + aws cloudformation describe-stacks \ + --stack-name "$stack_name" \ + --query "Stacks[0].Outputs[?OutputKey=='$output_key'].OutputValue" \ + --output text \ + --region "$AWS_REGION" 2>/dev/null || echo "" +} + +# Check if MSKAndKafkaClientEC2 CloudFormation stack exists +echo "Checking for existing MSKAndKafkaClientEC2 CloudFormation stack..." +MSK_STACK_NAME="" +for stack in $(aws cloudformation list-stacks --stack-status-filter CREATE_COMPLETE UPDATE_COMPLETE --query 'StackSummaries[].StackName' --output text --region "$AWS_REGION"); do + # Check for MSKArn or ServerlessMSKArn outputs (from MSKAndKafkaClientEC2.yaml template) + if aws cloudformation describe-stacks --stack-name "$stack" --region "$AWS_REGION" --query 'Stacks[0].Outputs[?OutputKey==`MSKArn` || OutputKey==`ServerlessMSKArn`]' --output text 2>/dev/null | grep -q .; then + MSK_STACK_NAME=$stack + break + fi +done + +if [ -n "$MSK_STACK_NAME" ]; then + echo "Found MSKAndKafkaClientEC2 stack: $MSK_STACK_NAME" + + # Get values from CloudFormation outputs (based on MSKAndKafkaClientEC2.yaml template) + MSK_ARN=$(get_stack_output "$MSK_STACK_NAME" "MSKArn") + SERVERLESS_MSK_ARN=$(get_stack_output "$MSK_STACK_NAME" "ServerlessMSKArn") + KAFKA_TOPIC=$(get_stack_output "$MSK_STACK_NAME" "KafkaTopicForLambda") + VPC_ID=$(get_stack_output "$MSK_STACK_NAME" "VPCId") + SUBNET_IDS=$(get_stack_output "$MSK_STACK_NAME" "PrivateSubnetMSKOne"),$(get_stack_output "$MSK_STACK_NAME" "PrivateSubnetMSKTwo"),$(get_stack_output "$MSK_STACK_NAME" "PrivateSubnetMSKThree") + SECURITY_GROUP_ID=$(get_stack_output "$MSK_STACK_NAME" "SecurityGroupId") + CONTACT_SCHEMA_ARN=$(get_stack_output "$MSK_STACK_NAME" "ContactSchemaArn") + + # Get Glue Schema Registry name from stack parameters (since it's not in outputs) + GLUE_SCHEMA_REGISTRY_NAME=$(aws cloudformation describe-stacks \ + --stack-name "$MSK_STACK_NAME" \ + --query "Stacks[0].Parameters[?ParameterKey=='GlueSchemaRegistryForMSKName'].ParameterValue" \ + --output text \ + --region "$AWS_REGION" 2>/dev/null || echo "GlueSchemaRegistryForMSK") + + # Determine which MSK ARN to use (Provisioned or Serverless) + if [ -n "$MSK_ARN" ]; then + CLUSTER_ARN="$MSK_ARN" + CLUSTER_TYPE="Provisioned" + elif [ -n "$SERVERLESS_MSK_ARN" ]; then + CLUSTER_ARN="$SERVERLESS_MSK_ARN" + CLUSTER_TYPE="Serverless" + else + echo "Error: No MSK cluster ARN found in stack outputs" + exit 1 + fi + + # Extract cluster name and ID from ARN + CLUSTER_NAME=$(echo "$CLUSTER_ARN" | awk -F'/' '{print $2}') + CLUSTER_ID=$(echo "$CLUSTER_ARN" | awk -F'/' '{print $3}') + + echo "Retrieved parameters from CloudFormation stack:" + echo " Cluster Type: $CLUSTER_TYPE" + echo " Cluster ARN: $CLUSTER_ARN" + echo " Cluster Name: $CLUSTER_NAME" + echo " Cluster ID: $CLUSTER_ID" + echo " Kafka Topic: $KAFKA_TOPIC" + echo " VPC ID: $VPC_ID" + echo " Subnet IDs: $SUBNET_IDS" + echo " Security Group ID: $SECURITY_GROUP_ID" + echo " Contact Schema ARN: $CONTACT_SCHEMA_ARN" + echo " Glue Schema Registry Name: $GLUE_SCHEMA_REGISTRY_NAME" + + # Create template.yaml from template_original.yaml with replacements + echo "Creating template.yaml with actual values..." + cp template_original.yaml template.yaml + + # Replace placeholders with actual values from CloudFormation outputs + sed -i.bak "s/CLUSTER_NAME/$CLUSTER_NAME/g" template.yaml + sed -i.bak "s/CLUSTER_ID/$CLUSTER_ID/g" template.yaml + sed -i.bak "s/KAFKA_TOPIC/$KAFKA_TOPIC/g" template.yaml + sed -i.bak "s/VPC_ID/$VPC_ID/g" template.yaml + sed -i.bak "s/SUBNET_IDS/$SUBNET_IDS/g" template.yaml + sed -i.bak "s/LAMBDA_SECURITY_GROUP_ID/$SECURITY_GROUP_ID/g" template.yaml + sed -i.bak "s/GLUE_SCHEMA_REGISTRY_NAME/$GLUE_SCHEMA_REGISTRY_NAME/g" template.yaml + sed -i.bak "s/AVRO_SCHEMA/ContactSchema/g" template.yaml + + # Clean up backup file + rm template.yaml.bak + +else + echo "No MSKAndKafkaClientEC2 CloudFormation stack found." + echo "Please deploy the MSKAndKafkaClientEC2.yaml template first, or provide parameters manually." + echo "You can manually edit template.yaml or provide parameters during deployment." + exit 1 +fi + +# Verify dependencies are installed +echo "Verifying Python dependencies..." +python -c "import boto3, kafka, avro; print('All dependencies verified successfully')" || { + echo "Error: Missing dependencies. Installing..." + pip install -r requirements.txt +} + +# Build the application +echo "Building SAM application..." +sam build + +# Deploy the application +echo "Deploying SAM application..." +# Deploy with parameters from MSKAndKafkaClientEC2 CloudFormation stack +sam deploy \ + --resolve-s3 \ + --capabilities CAPABILITY_IAM \ + --no-confirm-changeset \ + --no-disable-rollback \ + --region "$AWS_REGION" \ + --stack-name msk-lambda-schema-avro-python-sam \ + --parameter-overrides \ + MSKClusterName="$CLUSTER_NAME" \ + MSKClusterId="$CLUSTER_ID" \ + MSKTopic="$KAFKA_TOPIC" \ + ContactSchemaName="ContactSchema" \ + VpcId="$VPC_ID" \ + SubnetIds="$SUBNET_IDS" \ + SecurityGroupIds="$SECURITY_GROUP_ID" + +echo "Deployment completed successfully!" +echo "" +echo "To test the application:" +echo "1. Make sure virtual environment is activated: source venv/bin/activate" +echo "2. Invoke the producer function:" +echo " sam remote invoke LambdaMSKProducerPythonFunction --region $AWS_REGION --stack-name msk-lambda-schema-avro-python-sam" +echo "" +echo "3. Check consumer function logs:" +echo " sam logs --name LambdaMSKConsumerPythonFunction --stack-name msk-lambda-schema-avro-python-sam --region $AWS_REGION" diff --git a/msk-lambda-schema-avro-python-sam/events/event.json b/msk-lambda-schema-avro-python-sam/events/event.json new file mode 100644 index 000000000..413edeb21 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/events/event.json @@ -0,0 +1,14 @@ +{ + "firstname": "John", + "lastname": "Doe", + "company": "Example Corp", + "street": "123 Main Street", + "city": "Anytown", + "county": "Example County", + "state": "NY", + "zip": "12345", + "homePhone": "555-123-4567", + "cellPhone": "555-987-6543", + "email": "john.doe@example.com", + "website": "https://www.johndoe.com" +} diff --git a/msk-lambda-schema-avro-python-sam/example-pattern.json b/msk-lambda-schema-avro-python-sam/example-pattern.json new file mode 100644 index 000000000..c4fb32374 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/example-pattern.json @@ -0,0 +1,87 @@ +{ + "title": "Amazon MSK to Lambda with Avro and Schema Registry (Python)", + "description": "Create Lambda functions that consume and produce Avro messages from Amazon MSK using AWS Glue Schema Registry", + "language": "Python", + "level": "300", + "framework": "AWS SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern demonstrates how to create Lambda functions that consume and produce Avro-formatted messages from Amazon Managed Streaming for Kafka (MSK) using AWS Glue Schema Registry.", + "The consumer Lambda function is triggered by MSK events and automatically deserializes Avro messages using the schema from AWS Glue Schema Registry.", + "The producer Lambda function creates Avro-formatted messages and sends them to the MSK topic, with automatic schema registration in AWS Glue Schema Registry.", + "Both functions use IAM authentication to connect to the MSK cluster." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/msk-lambda-schema-avro-python-sam", + "templateURL": "serverless-patterns/msk-lambda-schema-avro-python-sam", + "projectFolder": "msk-lambda-schema-avro-python-sam", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Amazon MSK Lambda trigger", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html" + }, + { + "text": "Using AWS Lambda with Amazon MSK", + "link": "https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html" + }, + { + "text": "AWS Glue Schema Registry", + "link": "https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html" + }, + { + "text": "Apache Avro", + "link": "https://avro.apache.org/" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete." + ] + }, + "authors": [ + { + "name": "Indranil Banerjee", + "bio": "AWS - Senior Solutions Architect", + "linkedin": "indranil-banerjee-b00a261" + }, + { + "name": "Vaibhav Jain", + "bio": "AWS - Sr. Application Architect", + "linkedin": "vaibhavjainv" + }, + { + "name": "Adam Wagner", + "bio": "AWS - Principal Serverless Solutions Architect", + "linkedin": "adam-wagner-4bb412" + }, + { + "name": "Philipp Page", + "bio": "AWS - SA Engineer", + "linkedin": "philipp-page" + }, + { + "name": "Leandro Cavalcante Damascena", + "bio": "AWS - Sr. SA Engineer", + "linkedin": "leandrodamascena" + } + ] +} diff --git a/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/app.py b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/app.py new file mode 100644 index 000000000..6c001190b --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/app.py @@ -0,0 +1,94 @@ +""" +AWS Lambda function that consumes AVRO messages from Amazon MSK using AWS Lambda Powertools. + +This function demonstrates: +- AVRO message deserialization using AWS Lambda Powertools +- Event source mapping with filtering (only processes messages with zip codes starting with "2000") +- Automatic dead letter queue handling for failed messages +- Structured logging with AWS Lambda Powertools +""" + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.kafka import ConsumerRecords, SchemaConfig, kafka_consumer +from aws_lambda_powertools.utilities.typing import LambdaContext + +logger = Logger() + +# Contact AVRO schema definition (must match producer schema) +CONTACT_AVRO_SCHEMA = """ +{ + "type": "record", + "name": "Contact", + "fields": [ + {"name": "firstname", "type": "string"}, + {"name": "lastname", "type": "string"}, + {"name": "company", "type": "string"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "county", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "homePhone", "type": "string"}, + {"name": "cellPhone", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "website", "type": "string"} + ] +} +""" + +# Configure schema for automatic AVRO deserialization +schema_config = SchemaConfig( + value_schema_type="AVRO", + value_schema=CONTACT_AVRO_SCHEMA, +) + + +@kafka_consumer(schema_config=schema_config) +def lambda_handler(event: ConsumerRecords, context: LambdaContext): + """ + Lambda handler for processing MSK events with automatic AVRO deserialization. + + Note: This function only receives messages with zip codes starting with "2000" + due to the event source mapping filter configuration in the SAM template. + + Args: + event: ConsumerRecords containing Kafka records with automatic AVRO deserialization + context: Lambda context + + Returns: + Success response + """ + logger.info("=== MSK AVRO Consumer Lambda started ===") + + try: + for record in event.records: + logger.info(f"Processing record - Topic: {record.topic}, Partition: {record.partition}, Offset: {record.offset}") + logger.info(f"Timestamp: {record.timestamp}, TimestampType: {record.timestamp_type}") + + # Record key (automatically decoded from base64) + logger.info(f"Record key: {record.key}") + + # Record value (automatically deserialized from AVRO by AWS Lambda Powertools) + contact = record.value + logger.info(f"Contact data: {contact}") + + # Process the contact data + if contact: + name = f"{contact.get('firstname', '')} {contact.get('lastname', '')}".strip() + zip_code = contact.get('zip', '') + email = contact.get('email', '') + + logger.info(f"Contact details - Name: {name}, Zip: {zip_code}, Email: {email}") + + # Add your business logic here + # For example: save to database, send notifications, etc. + + logger.info(f"Successfully processed {len(list(event.records))} records") + logger.info("=== MSK AVRO Consumer Lambda completed ===") + + return {"statusCode": 200} + + except Exception as e: + logger.exception(f"Error processing Kafka records: {str(e)}") + # Let the exception propagate to trigger DLQ handling + raise diff --git a/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/requirements.txt b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/requirements.txt new file mode 100644 index 000000000..bee3f05eb --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/requirements.txt @@ -0,0 +1,3 @@ +# Avro serialization +fastavro>=1.8.0 +avro>=1.11.0 diff --git a/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/tests/requirements.txt b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/tests/requirements.txt new file mode 100644 index 000000000..c937a911e --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/tests/requirements.txt @@ -0,0 +1,2 @@ +pytest>=7.4.0 +pytest-mock>=3.12.0 diff --git a/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/tests/test_app.py b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/tests/test_app.py new file mode 100644 index 000000000..6064b1969 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/kafka_event_consumer_function/tests/test_app.py @@ -0,0 +1,35 @@ +import json +import pytest +from unittest.mock import patch, MagicMock +import sys +import os + +# Add the parent directory to the path so we can import the app module +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from app import lambda_handler + +class TestLambdaHandler: + """Test cases for the Lambda handler function.""" + + def test_lambda_handler_empty_event(self): + """Test lambda handler with empty event.""" + event = {"records": {}} + context = MagicMock() + + result = lambda_handler(event, context) + assert result == {"statusCode": 200} + + def test_lambda_handler_function_exists(self): + """Test that lambda_handler function exists and is callable.""" + assert callable(lambda_handler) + + def test_lambda_handler_with_empty_records(self): + """Test lambda handler with empty records structure.""" + event = {"records": {}} + context = MagicMock() + + # Should not raise any exceptions + result = lambda_handler(event, context) + assert isinstance(result, dict) + assert result.get("statusCode") == 200 diff --git a/msk-lambda-schema-avro-python-sam/kafka_event_producer_function/app.py b/msk-lambda-schema-avro-python-sam/kafka_event_producer_function/app.py new file mode 100644 index 000000000..012bfd0ff --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/kafka_event_producer_function/app.py @@ -0,0 +1,262 @@ +""" +AWS Lambda function that produces AVRO messages to Amazon MSK using AWS Glue Schema Registry. + +This function demonstrates: +- AVRO message serialization using aws-glue-schema-registry package +- Kafka message production to MSK with IAM authentication +- AWS Lambda Powertools for logging, tracing, and metrics +- Event filtering demonstration with different zip code prefixes +""" + +import json +import os +import random +import boto3 +import io +import fastavro +from typing import Dict, Any +from kafka import KafkaProducer + +# AWS Lambda Powertools +from aws_lambda_powertools import Logger, Tracer, Metrics +from aws_lambda_powertools.logging import correlation_paths +from aws_lambda_powertools.metrics import MetricUnit + +# AWS Glue Schema Registry +from aws_schema_registry import SchemaRegistryClient +from aws_schema_registry.serde import encode + +# Initialize Powertools +logger = Logger() +tracer = Tracer() +metrics = Metrics(namespace="MSKProducer") + +# Contact AVRO schema definition - matches MSKAndKafkaClientEC2.yaml exactly +CONTACT_AVRO_SCHEMA = { + "type": "record", + "name": "Contact", + "fields": [ + {"name": "firstname", "type": "string"}, + {"name": "lastname", "type": "string"}, + {"name": "company", "type": "string"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "county", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "homePhone", "type": "string"}, + {"name": "cellPhone", "type": "string"}, + {"name": "email", "type": "string"}, + {"name": "website", "type": "string"} + ] +} + + +def get_schema_version_id(registry_name: str, schema_name: str) -> str: + """Get or register schema version using AWS Glue Schema Registry.""" + try: + glue_client = boto3.client('glue') + schema_registry_client = SchemaRegistryClient( + glue_client=glue_client, + registry_name=registry_name + ) + + schema_version = schema_registry_client.get_or_register_schema_version( + definition=json.dumps(CONTACT_AVRO_SCHEMA), + schema_name=schema_name, + data_format='AVRO' + ) + + logger.info("Schema version obtained", + schema_version_id=str(schema_version.version_id), + version_number=schema_version.version_number, + schema_name=schema_name, + registry_name=registry_name) + + return schema_version.version_id + + except Exception as e: + logger.exception("Failed to get schema version", + error=str(e), + registry_name=registry_name, + schema_name=schema_name) + raise + + +def serialize_avro_message(contact_data: Dict[str, Any], schema_version_id: str) -> bytes: + """Serialize contact data to AVRO format with AWS Glue Schema Registry header.""" + try: + # Serialize data using fastavro + avro_buffer = io.BytesIO() + fastavro.schemaless_writer(avro_buffer, CONTACT_AVRO_SCHEMA, contact_data) + avro_data = avro_buffer.getvalue() + + # Add AWS Glue Schema Registry header using the package + encoded_message = encode(avro_data, schema_version_id) + + logger.debug("Message serialized", + avro_data_size=len(avro_data), + total_message_size=len(encoded_message), + header_size=len(encoded_message) - len(avro_data)) + + return encoded_message + + except Exception as e: + logger.exception("Failed to serialize message", + error=str(e), + contact_data=contact_data) + raise + + +def create_sample_contact(index: int) -> Dict[str, Any]: + """Create a sample contact with realistic data.""" + first_names = ["John", "Jane", "Michael", "Sarah", "David", "Emily", "Robert", "Lisa", "William", "Alice"] + last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis"] + companies = ["TechCorp", "DataSys", "CloudInc", "DevCo", "InfoTech", "SoftWare", "NetSolutions"] + cities = ["Seattle", "Portland", "San Francisco", "Los Angeles", "Denver"] + states = ["WA", "OR", "CA", "CO", "TX"] + + # Alternate between zip codes starting with 1000 and 2000 for filtering demo + zip_prefix = "1000" if index % 2 == 0 else "2000" + zip_suffix = f"{random.randint(10, 99)}" + + return { + "firstname": first_names[index % len(first_names)], + "lastname": last_names[index % len(last_names)], + "company": companies[index % len(companies)], + "street": f"{random.randint(100, 9999)} {random.choice(['Main St', 'Oak Ave', 'Pine Rd', 'Elm Dr'])}", + "city": cities[index % len(cities)], + "county": f"{random.choice(['King', 'Pierce', 'Snohomish'])} County", + "state": states[index % len(states)], + "zip": f"{zip_prefix}{zip_suffix}", + "homePhone": f"({random.randint(200, 999)}) {random.randint(200, 999)}-{random.randint(1000, 9999)}", + "cellPhone": f"({random.randint(200, 999)}) {random.randint(200, 999)}-{random.randint(1000, 9999)}", + "email": f"user{index}@example.com", + "website": f"https://www.example{index}.com" + } + + +def get_bootstrap_brokers(cluster_arn: str) -> str: + """Get bootstrap brokers for an MSK cluster.""" + try: + kafka_client = boto3.client('kafka') + response = kafka_client.get_bootstrap_brokers(ClusterArn=cluster_arn) + return response['BootstrapBrokerStringSaslIam'] + except Exception as e: + logger.exception("Failed to get bootstrap brokers", cluster_arn=cluster_arn) + raise + + +@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST) +@tracer.capture_lambda_handler +@metrics.log_metrics +def lambda_handler(event: Dict[str, Any], context: Any) -> str: + """ + Lambda function handler that produces AVRO messages to a Kafka topic. + + Args: + event: Lambda event containing configuration + context: Lambda context + + Returns: + Success message + """ + logger.info("=== MSK AVRO Producer Lambda started ===") + + try: + # Get configuration from environment variables + cluster_arn = os.environ.get('MSK_CLUSTER_ARN') + kafka_topic = os.environ.get('MSK_TOPIC', 'msk-serverless-topic') + message_count = int(os.environ.get('MESSAGE_COUNT', '10')) + registry_name = os.environ.get('REGISTRY_NAME', 'GlueSchemaRegistryForMSK') + schema_name = os.environ.get('CONTACT_SCHEMA_NAME', 'ContactSchema') + + if not cluster_arn: + raise ValueError("MSK_CLUSTER_ARN environment variable is required") + + logger.info("Configuration loaded", + cluster_arn=cluster_arn, + kafka_topic=kafka_topic, + message_count=message_count, + registry_name=registry_name, + schema_name=schema_name) + + # Get schema version ID + schema_version_id = get_schema_version_id(registry_name, schema_name) + + # Get bootstrap brokers and create Kafka producer + bootstrap_servers = get_bootstrap_brokers(cluster_arn) + producer = KafkaProducer( + bootstrap_servers=bootstrap_servers, + security_protocol='SASL_SSL', + sasl_mechanism='AWS_MSK_IAM', + key_serializer=lambda x: x.encode('utf-8') if x else None, + value_serializer=lambda x: x, # Raw bytes - AVRO data is already serialized + acks='all', + retries=3, + max_block_ms=120000, + request_timeout_ms=60000, + ) + + logger.info("Starting message production", + topic=kafka_topic, + message_count=message_count, + schema_version_id=str(schema_version_id)) + + # Track zip code distribution for filtering demo + zip_1000_count = 0 + zip_2000_count = 0 + + # Send messages + for i in range(message_count): + contact = create_sample_contact(i) + message_key = f"contact-{i+1}" + + # Track zip code distribution + if contact['zip'].startswith('1000'): + zip_1000_count += 1 + elif contact['zip'].startswith('2000'): + zip_2000_count += 1 + + # Serialize and send message + avro_message = serialize_avro_message(contact, schema_version_id) + future = producer.send(kafka_topic, key=message_key, value=avro_message) + record_metadata = future.get(timeout=60) + + logger.info("Message sent successfully", + message_number=i+1, + message_key=message_key, + partition=record_metadata.partition, + offset=record_metadata.offset, + message_size=len(avro_message)) + + # Add metrics + metrics.add_metric(name="AvroMessagesSent", unit=MetricUnit.Count, value=1) + metrics.add_metric(name="AvroMessageSize", unit=MetricUnit.Bytes, value=len(avro_message)) + + # Add distribution metrics for filtering demo + metrics.add_metric(name="AvroMessages1000Prefix", unit=MetricUnit.Count, value=zip_1000_count) + metrics.add_metric(name="AvroMessages2000Prefix", unit=MetricUnit.Count, value=zip_2000_count) + + # Close producer + producer.close() + + success_message = ( + f"Successfully sent {message_count} AVRO messages to Kafka topic: {kafka_topic} " + f"using schema {schema_name} (version {schema_version_id}) from registry {registry_name} " + f"(Zip codes: {zip_1000_count} with prefix 1000, {zip_2000_count} with prefix 2000)" + ) + + logger.info("MSK AVRO Producer Lambda completed successfully", + success_message=success_message, + total_messages_sent=message_count, + schema_version_id=str(schema_version_id)) + + return success_message + + except Exception as e: + logger.exception("Error in lambda_handler", + error=str(e), + error_type=type(e).__name__) + metrics.add_metric(name="AvroErrors", unit=MetricUnit.Count, value=1) + raise RuntimeError(f"Failed to send AVRO messages: {str(e)}") from e diff --git a/msk-lambda-schema-avro-python-sam/kafka_event_producer_function/requirements.txt b/msk-lambda-schema-avro-python-sam/kafka_event_producer_function/requirements.txt new file mode 100644 index 000000000..d6e1f2a58 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/kafka_event_producer_function/requirements.txt @@ -0,0 +1,10 @@ +# Core AWS and Lambda dependencies +boto3>=1.34.0 +aws-xray-sdk>=2.12.0 + +# Kafka and messaging +kafka-python>=2.2.14 + +# Avro serialization and AWS Glue Schema Registry +fastavro>=1.8.0 +aws-glue-schema-registry>=1.1.3 diff --git a/msk-lambda-schema-avro-python-sam/requirements.txt b/msk-lambda-schema-avro-python-sam/requirements.txt new file mode 100644 index 000000000..72f13680a --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/requirements.txt @@ -0,0 +1,24 @@ +# Core AWS and Lambda dependencies +boto3>=1.34.0 +botocore>=1.34.0 +aws-lambda-powertools>=3.15.0 + +# Kafka and messaging +kafka-python>=2.2.14 + +# Avro serialization and AWS Glue Schema Registry +fastavro>=1.8.0 +avro>=1.11.0 +aws-glue-schema-registry>=1.1.3 + +# AWS X-Ray tracing +aws-xray-sdk>=2.12.0 + +# Development and testing dependencies +pytest>=7.4.0 +pytest-mock>=3.12.0 + +# Additional utilities +requests>=2.31.0 +urllib3>=2.0.0 +six>=1.16.0 diff --git a/msk-lambda-schema-avro-python-sam/schemas/contact.avsc b/msk-lambda-schema-avro-python-sam/schemas/contact.avsc new file mode 100644 index 000000000..13a00de1e --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/schemas/contact.avsc @@ -0,0 +1,19 @@ +{ + "type": "record", + "name": "Contact", + "namespace": "com.amazonaws.services.lambda.samples.events.msk", + "fields": [ + {"name": "firstname", "type": ["null", "string"], "default": null}, + {"name": "lastname", "type": ["null", "string"], "default": null}, + {"name": "company", "type": ["null", "string"], "default": null}, + {"name": "street", "type": ["null", "string"], "default": null}, + {"name": "city", "type": ["null", "string"], "default": null}, + {"name": "county", "type": ["null", "string"], "default": null}, + {"name": "state", "type": ["null", "string"], "default": null}, + {"name": "zip", "type": ["null", "string"], "default": null}, + {"name": "homePhone", "type": ["null", "string"], "default": null}, + {"name": "cellPhone", "type": ["null", "string"], "default": null}, + {"name": "email", "type": ["null", "string"], "default": null}, + {"name": "website", "type": ["null", "string"], "default": null} + ] +} diff --git a/msk-lambda-schema-avro-python-sam/setup_venv.sh b/msk-lambda-schema-avro-python-sam/setup_venv.sh new file mode 100755 index 000000000..e97018ec9 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/setup_venv.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +# Virtual Environment Setup Script for MSK Lambda Schema Avro Python SAM +# This script creates and configures a Python virtual environment + +set -e + +echo "Setting up Python Virtual Environment for MSK Lambda Schema Avro Python SAM" +echo "==========================================================================" + +# Check if Python 3 is available +if ! command -v python3 &> /dev/null; then + echo "Error: Python 3 is not installed or not in PATH" + echo "Please install Python 3.9 or later" + exit 1 +fi + +# Check Python version +PYTHON_VERSION=$(python3 -c 'import sys; print(".".join(map(str, sys.version_info[:2])))') +echo "Found Python version: $PYTHON_VERSION" + +# Check if version is 3.9 or later +if python3 -c 'import sys; exit(0 if sys.version_info >= (3, 9) else 1)'; then + echo "Python version is compatible" +else + echo "Error: Python 3.9 or later is required" + exit 1 +fi + +# Create virtual environment +echo "Creating virtual environment..." +python3 -m venv venv + +# Activate virtual environment +echo "Activating virtual environment..." +source venv/bin/activate + +# Upgrade pip +echo "Upgrading pip..." +pip install --upgrade pip + +# Install dependencies +echo "Installing dependencies from requirements.txt..." +pip install -r requirements.txt + +echo "" +echo "Virtual environment setup completed successfully!" +echo "" +echo "To activate the virtual environment in the future, run:" +echo " source venv/bin/activate" +echo "" +echo "To deactivate the virtual environment, run:" +echo " deactivate" +echo "" +echo "To install additional dependencies:" +echo " pip install " +echo " pip freeze > requirements.txt # to update requirements.txt" +echo "" +echo "Next steps:" +echo "1. Make sure the virtual environment is activated: source venv/bin/activate" +echo "2. Run the deployment script: ./deploy.sh" +echo "3. Or build and deploy manually: sam build && sam deploy --guided" diff --git a/msk-lambda-schema-avro-python-sam/template_original.yaml b/msk-lambda-schema-avro-python-sam/template_original.yaml new file mode 100644 index 000000000..6e17cbc27 --- /dev/null +++ b/msk-lambda-schema-avro-python-sam/template_original.yaml @@ -0,0 +1,244 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + kafka_event_consumer_and_producer_functions + + Sample SAM Template for MSK consumer and AVRO producer with IAM auth (Python) + +# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst +Globals: + Function: + Timeout: 15 + +Resources: + # SQS Queue to use as Dead Letter Queue for the MSK event source mapping + ConsumerDLQ: + Type: AWS::SQS::Queue + Properties: + MessageRetentionPeriod: 1209600 # 14 days (maximum retention period) + VisibilityTimeout: 300 # 5 minutes + Tags: + - Key: Purpose + Value: MSKConsumerDLQ + + LambdaMSKConsumerPythonFunction: + Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction + Properties: + CodeUri: kafka_event_consumer_function + Handler: app.lambda_handler + Runtime: python3.12 + Architectures: + - x86_64 + MemorySize: 512 + Layers: + - !Sub arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-x86_64:18 + VpcConfig: + SecurityGroupIds: !Ref SecurityGroupIds + SubnetIds: !Ref SubnetIds + Environment: # More info about Env Vars: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#environment-object + Variables: + PARAM1: VALUE + LOG_LEVEL: INFO + Events: + MSKEvent: + Type: MSK + Properties: + StartingPosition: LATEST + BatchSize: 10 + MaximumBatchingWindowInSeconds: 20 + Stream: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] + Topics: + - !Ref MSKTopic + DestinationConfig: + OnFailure: + Destination: !GetAtt ConsumerDLQ.Arn + ProvisionedPollerConfig: + MaximumPollers: 3 + MinimumPollers: 1 + SchemaRegistryConfig: + SchemaRegistryURI: !Join [ '', ["arn:", "aws:", "glue:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "registry/", !Ref GlueSchemaRegistryName] ] + EventRecordFormat: SOURCE + SchemaValidationConfigs: + - Attribute: VALUE + FilterCriteria: + Filters: + - Pattern: '{"value": {"zip": [ { "prefix": "2000" } ]}}' + Policies: + - Statement: + - Sid: KafkaClusterPermissionsPolicy + Effect: Allow + Action: + - kafka-cluster:Connect + - kafka-cluster:DescribeGroup + - kafka-cluster:DescribeCluster + - kafka-cluster:AlterCluster + - kafka-cluster:AlterClusterDynamicConfiguration + - kafka-cluster:WriteDataIdempotently + - kafka-cluster:AlterGroup + - kafka-cluster:DescribeTopic + - kafka-cluster:ReadData + - kafka-cluster:DescribeClusterDynamicConfiguration + Resource: + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "group/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + + - Sid: KafkaPermissionsPolicy + Effect: Allow + Action: + - kafka:DescribeClusterV2 + - kafka:GetBootstrapBrokers + Resource: '*' + + - Sid: EC2PermissionsPolicy + Effect: Allow + Action: + - ec2:DescribeSecurityGroups + - ec2:DescribeSubnets + - ec2:DescribeVpcs + - ec2:CreateNetworkInterface + - ec2:DescribeNetworkInterfaces + - ec2:DeleteNetworkInterface + Resource: '*' + + - Sid: GlueSchemaRegistryPermissionsPolicy + Effect: Allow + Action: + - glue:GetSchemaByDefinition + - glue:GetSchemaVersion + - glue:GetRegistry + - glue:ListSchemas + - glue:ListSchemaVersions + - glue:RegisterSchemaVersion + - glue:PutSchemaVersionMetadata + - glue:GetSchemaVersionsDiff + - glue:QuerySchemaVersionMetadata + Resource: '*' + + - Sid: SQSPermissionsPolicy + Effect: Allow + Action: + - sqs:SendMessage + Resource: !GetAtt ConsumerDLQ.Arn + - VPCAccessPolicy: {} + + LambdaMSKProducerPythonFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: kafka_event_producer_function + Handler: app.lambda_handler + Runtime: python3.12 + Timeout: 300 + Architectures: + - x86_64 + MemorySize: 512 + Layers: + - !Sub arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-python312-x86_64:18 + + VpcConfig: + SecurityGroupIds: !Ref SecurityGroupIds + SubnetIds: !Ref SubnetIds + Environment: + Variables: + MSK_CLUSTER_ARN: !Join [ '', ["arn:", "aws:", "kafka:", !Ref "AWS::Region" , ":" ,!Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId] ] + MSK_TOPIC: !Ref MSKTopic + REGISTRY_NAME: !Ref GlueSchemaRegistryName + CONTACT_SCHEMA_NAME: !Ref ContactSchemaName + LOG_LEVEL: INFO + Policies: + - Statement: + - Sid: KafkaClusterPermissionsPolicy + Effect: Allow + Action: + - kafka-cluster:Connect + - kafka-cluster:DescribeCluster + - kafka-cluster:WriteData + - kafka-cluster:DescribeTopic + Resource: + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "cluster/", !Ref MSKClusterName, "/" , !Ref MSKClusterId]] + - !Join ['', ["arn:", "aws:", "kafka:", !Ref "AWS::Region", ":", !Ref "AWS::AccountId", ":", "topic/", !Ref MSKClusterName, "/" , !Ref MSKClusterId, "/*"]] + + - Sid: KafkaPermissionsPolicy + Effect: Allow + Action: + - kafka:DescribeClusterV2 + - kafka:GetBootstrapBrokers + Resource: '*' + + - Sid: EC2PermissionsPolicy + Effect: Allow + Action: + - ec2:DescribeSecurityGroups + - ec2:DescribeSubnets + - ec2:DescribeVpcs + - ec2:CreateNetworkInterface + - ec2:DescribeNetworkInterfaces + - ec2:DeleteNetworkInterface + Resource: '*' + + - Sid: GlueSchemaRegistryPermissionsPolicy + Effect: Allow + Action: + - glue:GetSchemaByDefinition + - glue:GetSchemaVersion + - glue:GetRegistry + - glue:ListSchemas + - glue:ListSchemaVersions + - glue:GetSchemaVersionsDiff + - glue:QuerySchemaVersionMetadata + - glue:RegisterSchemaVersion + - glue:PutSchemaVersionMetadata + - glue:CreateSchema + - glue:CreateRegistry + Resource: + - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:registry/*" + - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:schema/*/*" + - VPCAccessPolicy: {} + +Parameters: + MSKClusterName: + Type: String + Description: Enter the name of the MSK Cluster + Default: CLUSTER_NAME + MSKClusterId: + Type: String + Description: Enter the ID of the MSK Cluster + Default: CLUSTER_ID + MSKTopic: + Type: String + Description: Enter the name of the MSK Topic + Default: KAFKA_TOPIC + GlueSchemaRegistryName: + Type: String + Description: Enter the name of the Glue Schema Registry + Default: GLUE_SCHEMA_REGISTRY_NAME + ContactSchemaName: + Type: String + Description: Enter the name of the Contact Schema + Default: AVRO_SCHEMA + VpcId: + Type: String + Description: Enter the VPC ID where the MSK cluster is deployed + Default: VPC_ID + SubnetIds: + Type: CommaDelimitedList + Description: Enter the subnet IDs where the MSK cluster is deployed (comma-separated) + Default: SUBNET_IDS + SecurityGroupIds: + Type: CommaDelimitedList + Description: Enter the security group IDs that allow access to the MSK cluster (comma-separated) + Default: LAMBDA_SECURITY_GROUP_ID + +Outputs: + MSKConsumerLambdaFunction: + Description: "Topic Consumer Lambda Function ARN" + Value: !GetAtt LambdaMSKConsumerPythonFunction.Arn + MSKProducerLambdaFunction: + Description: "AVRO Producer Lambda Function ARN" + Value: !GetAtt LambdaMSKProducerPythonFunction.Arn + ConsumerDLQUrl: + Description: "URL of the Dead Letter Queue for the MSK Consumer" + Value: !Ref ConsumerDLQ + ConsumerDLQArn: + Description: "ARN of the Dead Letter Queue for the MSK Consumer" + Value: !GetAtt ConsumerDLQ.Arn