-
Notifications
You must be signed in to change notification settings - Fork 0
AWS Lambda to Kafka
Vaquar Khan edited this page Sep 20, 2021
·
2 revisions
import json from kafka
import KafkaClient from kafka
import SimpleProducer from kafka
import KafkaProducer
def lambda_handler(event, context):
kafka = KafkaClient("XXXX.XXX.XX.XX:XXXX")
print(kafka)
producer = SimpleProducer(kafka, async = False)
print(producer)
task_op = {
"'message": "Calling from AWS Lambda"
}
print(json.dumps(task_op))
producer.send_messages("topic_update",json.dumps(task_op).encode('utf-8'))
print(producer.send_messages)
return ("Messages Sent to Kafka Topic")
This is a demo Lambda function that produces events to a Kafka topic, notifying consumers about new files in S3 buckets.
from __future__ import print_function
import json
import boto3
from kafka import KafkaProducer
import urllib
import ssl
import logging
root = logging.getLogger()
if root.handlers:
for handler in root.handlers:
root.removeHandler(handler)
logging.basicConfig(format='%(asctime)s %(message)s',level=logging.DEBUG)
print('Loading function')
s3 = boto3.client('s3')
context = ssl.create_default_context()
context.options &= ssl.OP_NO_TLSv1
context.options &= ssl.OP_NO_TLSv1_1
producer = KafkaProducer(
bootstrap_servers=['pkc-loyje.us-central1.gcp.confluent.cloud:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii'),
retry_backoff_ms=500,
request_timeout_ms=20000,
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
ssl_context=context,
sasl_plain_username='YYYY',
sasl_plain_password='XXXXXXXXXXXXXXXXXXXXX')
def lambda_handler(event, context):
print("Received event: " + json.dumps(event, indent=2))
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8'))
try:
print("We have new object. In bucket {}, with key {}".format(bucket, key))
future = producer.send("webapp","We have new object. In bucket {}, with key {}".format(bucket, key))
record_metadata = future.get(timeout=10)
print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
-
Apache Kafka cluster.
-
Create a deployment package - a zip that contains the lambda_s3_kafka.py file and all the dependencies or lambda layer. the dependency is kafka-python, and you can pull it into the zip by running:
pip install kafka-python -t /Users/XYZ/your_workspaces/lambda_s3_kafka