Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
edc2745
add support for fair queue in SQS
MartinSiby Jul 29, 2025
cebbab3
fix lint issue
MartinSiby Jul 29, 2025
73fd67e
add documentation for fair queue SQS
MartinSiby Jul 29, 2025
88269fb
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Oct 7, 2025
53dfef8
Add more tests for feature: support_SQS_fair_queue
vaibhavcerta Oct 7, 2025
7cdc95f
Add tests for feature: support_SQS_fair_queue
MartinSiby Oct 14, 2025
906ddeb
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Oct 14, 2025
0634419
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 4, 2025
22dc282
Update docs/reference/kombu.transport.SQS.rst
MartinSiby Nov 4, 2025
3baa6e1
Update kombu/transport/SQS.py
MartinSiby Nov 4, 2025
3cfc027
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 5, 2025
b6f9e0f
fix: remove unnecessary blank lines in test_SQS.py
MartinSiby Nov 5, 2025
ee92f4d
docs: clarify Fair Queue Support availability in SQS documentation
MartinSiby Nov 5, 2025
e81a4ff
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 18, 2025
de73e1a
docs: fix formatting in SQS Fair Queue Support section
MartinSiby Dec 5, 2025
4fce953
Update docs/reference/kombu.transport.SQS.rst
MartinSiby Dec 5, 2025
3af332b
docs: enhance clarity on MessageGroupId requirement for FIFO and Fair…
MartinSiby Dec 5, 2025
d949190
Merge branch 'martin/support_SQS_fair_queue' of github.com:MartinSiby…
MartinSiby Dec 5, 2025
294fd56
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion docs/reference/kombu.transport.SQS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,31 @@ Message Attributes

SQS supports sending message attributes along with the message body.
To use this feature, you can pass a 'message_attributes' as keyword argument
to `basic_publish` method.
to `basic_publish` method.

Fair Queue Support (only available from version 5.7.0+)
------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need a versionadded 5.7 annotation here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auvipy both issues are taken care of, please check and let me know

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder on this @auvipy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will start merging v5.7 from january 2026


Kombu supports Amazon SQS Fair Queues, which provide improved message processing fairness by ensuring that messages from different message groups
are processed in a balanced manner.

Fair Queues are designed to prevent a single message group (or tenant) from monopolizing
consumer resources, which can happen with standard queues that handle multi-tenant
workloads with unbalanced message distribution.

When publishing messages to a Fair Queue, you should provide a `MessageGroupId`. This can be done by passing it as a
keyword argument to the `publish` method. While the Kombu implementation only sends `MessageGroupId` if it is present, AWS requires it for FIFO and Fair Queues. If omitted, AWS will reject the message or fairness will not be guaranteed. For standard queues, `MessageGroupId` is optional.::

producer.publish(
message,
routing_key='my-fair-queue',
MessageGroupId='customer-123' # Required for FIFO queues; needed for Fair queue functionality on standard queues
)

Benefits of using Fair Queues with Kombu:
- Improved message processing fairness across message groups
- Better workload distribution among consumers
- Eliminates noisy neighbor problem

For more information, refer to the AWS documentation on Fair Queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fair-queues.html

10 changes: 6 additions & 4 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,13 @@ def _put(self, queue, message, **kwargs):
# we don't want to want to have the attribute in the body
kwargs['MessageAttributes'] = \
message['properties'].pop('message_attributes')
# Support SQS fair queue system.
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
message['properties']['MessageGroupId']
# Support FIFO queues.
if queue.endswith('.fifo'):
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
message['properties']['MessageGroupId']
else:
if 'MessageGroupId' not in kwargs:
kwargs['MessageGroupId'] = 'default'
if 'MessageDeduplicationId' in message['properties']:
kwargs['MessageDeduplicationId'] = \
Expand Down
189 changes: 189 additions & 0 deletions t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,195 @@ def test_predefined_queues_put_to_fifo_queue(self):
assert 'MessageDeduplicationId' in \
sqs_queue_mock.send_message.call_args[1]

def test_predefined_queues_put_with_message_group_id(self):
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-1'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock
p.publish('message', MessageGroupId='test-group-id')

sqs_queue_mock.send_message.assert_called_once()
assert 'MessageGroupId' in sqs_queue_mock.send_message.call_args[1]
assert sqs_queue_mock.send_message.call_args[1]['MessageGroupId'] == 'test-group-id'

def test_non_fifo_queue_without_message_group_id(self):
"""Test that non-FIFO queues don't get MessageGroupId when not provided"""
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-1'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock
p.publish('message')

sqs_queue_mock.send_message.assert_called_once()
assert 'MessageGroupId' not in sqs_queue_mock.send_message.call_args[1]

def test_fifo_queue_with_custom_message_group_id(self):
"""Test that FIFO queues respect custom MessageGroupId and don't override with 'default'"""
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-3.fifo'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock
p.publish('message', MessageGroupId='custom-group-123')

sqs_queue_mock.send_message.assert_called_once()
call_kwargs = sqs_queue_mock.send_message.call_args[1]
assert 'MessageGroupId' in call_kwargs
assert call_kwargs['MessageGroupId'] == 'custom-group-123'
# FIFO queue should also have MessageDeduplicationId
assert 'MessageDeduplicationId' in call_kwargs

def test_fifo_queue_with_custom_message_group_id_and_deduplication_id(self):
"""Test FIFO queue with both custom MessageGroupId and MessageDeduplicationId"""
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-3.fifo'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock
p.publish(
'message',
MessageGroupId='tenant-456',
MessageDeduplicationId='unique-msg-789'
)

sqs_queue_mock.send_message.assert_called_once()
call_kwargs = sqs_queue_mock.send_message.call_args[1]
assert call_kwargs['MessageGroupId'] == 'tenant-456'
assert call_kwargs['MessageDeduplicationId'] == 'unique-msg-789'

def test_message_group_id_with_special_characters(self):
"""Test MessageGroupId with special characters"""
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-1'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock

# Test with hyphens, underscores, and alphanumeric
group_id = 'customer-123_tenant-abc-XYZ'
p.publish('message', MessageGroupId=group_id)

sqs_queue_mock.send_message.assert_called_once()
call_kwargs = sqs_queue_mock.send_message.call_args[1]
assert call_kwargs['MessageGroupId'] == group_id

def test_standard_queue_with_message_group_id_and_delay(self):
"""Test that non-FIFO queue can have both MessageGroupId and DelaySeconds"""
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-2'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock
p.publish('message', MessageGroupId='group-1', DelaySeconds=5)

sqs_queue_mock.send_message.assert_called_once()
call_kwargs = sqs_queue_mock.send_message.call_args[1]
assert call_kwargs['MessageGroupId'] == 'group-1'
assert call_kwargs['DelaySeconds'] == 5
# Non-FIFO queue should not have MessageDeduplicationId
assert 'MessageDeduplicationId' not in call_kwargs

def test_message_group_id_with_message_attributes(self):
"""Test that MessageGroupId works alongside message_attributes"""
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-1'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock

p.publish(
'message',
MessageGroupId='group-2',
message_attributes={
'CustomAttr': {'DataType': 'String', 'StringValue': 'test'}
}
)

sqs_queue_mock.send_message.assert_called_once()
call_kwargs = sqs_queue_mock.send_message.call_args[1]
assert call_kwargs['MessageGroupId'] == 'group-2'
assert 'MessageAttributes' in call_kwargs
assert call_kwargs['MessageAttributes']['CustomAttr']['StringValue'] == 'test'

def test_predefined_queues_put_to_queue(self):
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
Expand Down
Loading