Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
edc2745
add support for fair queue in SQS
MartinSiby Jul 29, 2025
cebbab3
fix lint issue
MartinSiby Jul 29, 2025
73fd67e
add documentation for fair queue SQS
MartinSiby Jul 29, 2025
88269fb
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Oct 7, 2025
53dfef8
Add more tests for feature: support_SQS_fair_queue
vaibhavcerta Oct 7, 2025
7cdc95f
Add tests for feature: support_SQS_fair_queue
MartinSiby Oct 14, 2025
906ddeb
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Oct 14, 2025
0634419
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 4, 2025
22dc282
Update docs/reference/kombu.transport.SQS.rst
MartinSiby Nov 4, 2025
3baa6e1
Update kombu/transport/SQS.py
MartinSiby Nov 4, 2025
3cfc027
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 5, 2025
b6f9e0f
fix: remove unnecessary blank lines in test_SQS.py
MartinSiby Nov 5, 2025
ee92f4d
docs: clarify Fair Queue Support availability in SQS documentation
MartinSiby Nov 5, 2025
e81a4ff
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 18, 2025
de73e1a
docs: fix formatting in SQS Fair Queue Support section
MartinSiby Dec 5, 2025
4fce953
Update docs/reference/kombu.transport.SQS.rst
MartinSiby Dec 5, 2025
3af332b
docs: enhance clarity on MessageGroupId requirement for FIFO and Fair…
MartinSiby Dec 5, 2025
d949190
Merge branch 'martin/support_SQS_fair_queue' of github.com:MartinSiby…
MartinSiby Dec 5, 2025
294fd56
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion docs/reference/kombu.transport.SQS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,30 @@ Message Attributes

SQS supports sending message attributes along with the message body.
To use this feature, you can pass a 'message_attributes' as keyword argument
to `basic_publish` method.
to `basic_publish` method.

Fair Queue Support
------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need a versionadded 5.7 annotation here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auvipy both issues are taken care of, please check and let me know

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder on this @auvipy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will start merging v5.7 from january 2026


Kombu supports Amazon SQS Fair Queues, which provide improved message processing fairness by ensuring that messages from different message groups
are processed in a balanced manner.

Fair Queues are designed to prevent a single message group (or tenant) from monopolizing consumer resources, which can happen with standard queues
that handle multi-tenant workloads with unbalanced message distribution.

When publishing messages to a Fair Queue, you must provide a `MessageGroupId`. This can be done by passing it as a
keyword argument to the `publish` method::

producer.publish(
message,
routing_key='my-fair-queue',
MessageGroupId='customer-123' # Required for FIFO and Fair queues
)

Benefits of using Fair Queues with Kombu:
- Improved message processing fairness across message groups
- Better workload distribution among consumers
- Eliminates noisy neighbor problem

For more information, refer to the AWS documentation on Fair Queues: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fair-queues.html

10 changes: 6 additions & 4 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,13 @@ def _put(self, queue, message, **kwargs):
# we don't want to want to have the attribute in the body
kwargs['MessageAttributes'] = \
message['properties'].pop('message_attributes')
# support SQS fair queue system.
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
message['properties']['MessageGroupId']
# Support FIFO queues.
if queue.endswith('.fifo'):
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
message['properties']['MessageGroupId']
else:
if 'MessageGroupId' not in kwargs:
kwargs['MessageGroupId'] = 'default'
if 'MessageDeduplicationId' in message['properties']:
kwargs['MessageDeduplicationId'] = \
Expand Down
23 changes: 23 additions & 0 deletions t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,29 @@ def test_predefined_queues_put_to_fifo_queue(self):
assert 'MessageDeduplicationId' in \
sqs_queue_mock.send_message.call_args[1]

def test_predefined_queues_put_with_message_group_id(self):
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-1'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock
p.publish('message', MessageGroupId='test-group-id')

sqs_queue_mock.send_message.assert_called_once()
assert 'MessageGroupId' in sqs_queue_mock.send_message.call_args[1]
assert sqs_queue_mock.send_message.call_args[1]['MessageGroupId'] == 'test-group-id'

def test_predefined_queues_put_to_queue(self):
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
Expand Down