Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
edc2745
add support for fair queue in SQS
MartinSiby Jul 29, 2025
cebbab3
fix lint issue
MartinSiby Jul 29, 2025
73fd67e
add documentation for fair queue SQS
MartinSiby Jul 29, 2025
88269fb
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Oct 7, 2025
53dfef8
Add more tests for feature: support_SQS_fair_queue
vaibhavcerta Oct 7, 2025
7cdc95f
Add tests for feature: support_SQS_fair_queue
MartinSiby Oct 14, 2025
906ddeb
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Oct 14, 2025
0634419
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 4, 2025
22dc282
Update docs/reference/kombu.transport.SQS.rst
MartinSiby Nov 4, 2025
3baa6e1
Update kombu/transport/SQS.py
MartinSiby Nov 4, 2025
3cfc027
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 5, 2025
b6f9e0f
fix: remove unnecessary blank lines in test_SQS.py
MartinSiby Nov 5, 2025
ee92f4d
docs: clarify Fair Queue Support availability in SQS documentation
MartinSiby Nov 5, 2025
e81a4ff
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Nov 18, 2025
de73e1a
docs: fix formatting in SQS Fair Queue Support section
MartinSiby Dec 5, 2025
4fce953
Update docs/reference/kombu.transport.SQS.rst
MartinSiby Dec 5, 2025
3af332b
docs: enhance clarity on MessageGroupId requirement for FIFO and Fair…
MartinSiby Dec 5, 2025
d949190
Merge branch 'martin/support_SQS_fair_queue' of github.com:MartinSiby…
MartinSiby Dec 5, 2025
294fd56
Merge branch 'main' into martin/support_SQS_fair_queue
MartinSiby Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions kombu/transport/SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,13 @@ def _put(self, queue, message, **kwargs):
# we don't want to want to have the attribute in the body
kwargs['MessageAttributes'] = \
message['properties'].pop('message_attributes')
# support SQS fair queue system.
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
message['properties']['MessageGroupId']
# Support FIFO queues.
if queue.endswith('.fifo'):
if 'MessageGroupId' in message['properties']:
kwargs['MessageGroupId'] = \
message['properties']['MessageGroupId']
else:
if 'MessageGroupId' not in kwargs:
kwargs['MessageGroupId'] = 'default'
if 'MessageDeduplicationId' in message['properties']:
kwargs['MessageDeduplicationId'] = \
Expand Down
23 changes: 23 additions & 0 deletions t/unit/transport/test_SQS.py
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,29 @@ def test_predefined_queues_put_to_fifo_queue(self):
assert 'MessageGroupId' in sqs_queue_mock.send_message.call_args[1]
assert 'MessageDeduplicationId' in \
sqs_queue_mock.send_message.call_args[1]

def test_predefined_queues_put_with_message_group_id(self):
connection = Connection(transport=SQS.Transport, transport_options={
'predefined_queues': example_predefined_queues,
})
channel = connection.channel()

queue_name = 'queue-1'

exchange = Exchange('test_SQS', type='direct')
p = messaging.Producer(channel, exchange, routing_key=queue_name)

queue = Queue(queue_name, exchange, queue_name)
queue(channel).declare()

channel.sqs = Mock()
sqs_queue_mock = Mock()
channel.sqs.return_value = sqs_queue_mock
p.publish('message', MessageGroupId='test-group-id')

sqs_queue_mock.send_message.assert_called_once()
assert 'MessageGroupId' in sqs_queue_mock.send_message.call_args[1]
assert sqs_queue_mock.send_message.call_args[1]['MessageGroupId'] == 'test-group-id'

def test_predefined_queues_put_to_queue(self):
connection = Connection(transport=SQS.Transport, transport_options={
Expand Down