Skip to content

Conversation

vaibhavjainv
Copy link
Contributor

Issue #, if available:

Description of changes:
Python implementation for MSK - Glue Schema registry demo.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

vaibhavjainv and others added 5 commits June 28, 2025 22:43
* python implementation

* working version

* working code

* simplified code

* final changes
Copy link

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a super amazing work @vaibhavjainv! I just left some small comments.

logger.info("=== MSK AVRO Consumer Lambda started ===")

try:
record_count = 0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this.

try:
record_count = 0
for record in event.records:
record_count += 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this.

# Add your business logic here
# For example: save to database, send notifications, etc.

logger.info(f"Successfully processed {record_count} records")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info(f"Successfully processed {record_count} records")
logger.info(f"Successfully processed {len(list(event.records))} records")

Comment on lines 2 to 6
aws-lambda-powertools>=3.15.0

# Avro serialization
fastavro>=1.8.0
avro>=1.11.0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
aws-lambda-powertools>=3.15.0
# Avro serialization
fastavro>=1.8.0
avro>=1.11.0
aws-lambda-powertools[kafka-consumer-avro]>=3.15.0

Comment on lines 267 to 270
logger.exception("Error in lambda_handler", extra={
"error": str(e),
"error_type": type(e).__name__
})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.exception already propagate error type and name.

Suggested change
logger.exception("Error in lambda_handler", extra={
"error": str(e),
"error_type": type(e).__name__
})
logger.exception("Error in lambda_handler")

response = kafka_client.get_bootstrap_brokers(ClusterArn=cluster_arn)
return response['BootstrapBrokerStringSaslIam']
except Exception as e:
logger.exception("Failed to get bootstrap brokers", extra={"cluster_arn": cluster_arn})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.exception("Failed to get bootstrap brokers", extra={"cluster_arn": cluster_arn})
logger.exception("Failed to get bootstrap brokers", cluster_arn=cluster_arn)

Copy link

@leandrodamascena leandrodamascena left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vaibhavjainv thanks a lot for working this! I left some comments to improve code and experience.

Comment on lines +27 to +35
def test_lambda_handler_with_empty_records(self):
"""Test lambda handler with empty records structure."""
event = {"records": {}}
context = MagicMock()

# Should not raise any exceptions
result = lambda_handler(event, context)
assert isinstance(result, dict)
assert result.get("statusCode") == 200

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test test_lambda_handler_empty_event is doing the same thing, no?

Comment on lines +55 to +108
def get_schema_version_id(registry_name: str, schema_name: str) -> str:
"""Get or register schema version using AWS Glue Schema Registry."""
try:
glue_client = boto3.client('glue')
schema_registry_client = SchemaRegistryClient(
glue_client=glue_client,
registry_name=registry_name
)

schema_version = schema_registry_client.get_or_register_schema_version(
definition=json.dumps(CONTACT_AVRO_SCHEMA),
schema_name=schema_name,
data_format='AVRO'
)

logger.info("Schema version obtained",
schema_version_id=str(schema_version.version_id),
version_number=schema_version.version_number,
schema_name=schema_name,
registry_name=registry_name)

return schema_version.version_id

except Exception as e:
logger.exception("Failed to get schema version",
error=str(e),
registry_name=registry_name,
schema_name=schema_name)
raise


def serialize_avro_message(contact_data: Dict[str, Any], schema_version_id: str) -> bytes:
"""Serialize contact data to AVRO format with AWS Glue Schema Registry header."""
try:
# Serialize data using fastavro
avro_buffer = io.BytesIO()
fastavro.schemaless_writer(avro_buffer, CONTACT_AVRO_SCHEMA, contact_data)
avro_data = avro_buffer.getvalue()

# Add AWS Glue Schema Registry header using the package
encoded_message = encode(avro_data, schema_version_id)

logger.debug("Message serialized",
avro_data_size=len(avro_data),
total_message_size=len(encoded_message),
header_size=len(encoded_message) - len(avro_data))

return encoded_message

except Exception as e:
logger.exception("Failed to serialize message",
error=str(e),
contact_data=contact_data)
raise

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaProducer can serialize automatically if you pass a SchemaRegistryClient as the serializer instance. So, you can remove both functions.. the code is something like this:

glue_client = boto3.client('glue', region_name="eu-west-3")
client = SchemaRegistryClient(glue_client,
                              registry_name='powertools-python-schema-registry')
serializer = KafkaSerializer(client)

producer = KafkaProducer(
    bootstrap_servers='....',
    security_protocol='....',
    sasl_mechanism='....',
    sasl_oauth_token_provider=...,
    client_id=socket.gethostname(),
    value_serializer=serializer # This pass the serializer and KafkaProducer automatic uses this.
)

Comment on lines +188 to +199
bootstrap_servers = get_bootstrap_brokers(cluster_arn)
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM',
key_serializer=lambda x: x.encode('utf-8') if x else None,
value_serializer=lambda x: x, # Raw bytes - AVRO data is already serialized
acks='all',
retries=3,
max_block_ms=120000,
request_timeout_ms=60000,
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this outside of the Lambda handler. Whenever KafkaProducer is created, it will open a new connection to the Kafka cluster and can easily exhaust client connections. Additionally, connections are static and must be established outside of the handler.

sasl_mechanism='AWS_MSK_IAM',
key_serializer=lambda x: x.encode('utf-8') if x else None,
value_serializer=lambda x: x, # Raw bytes - AVRO data is already serialized
acks='all',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack=all can slow down the message creation process. If ack isn't critical, I suggest changing it to 0, which means you don't have to wait for brokers to confirm it.

Comment on lines +259 to +260
error=str(e),
error_type=type(e).__name__)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove both keys.. logger.exception automatically includes the stack tracer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants