Skip to content

Conversation

@ozeranskii
Copy link
Contributor

@ozeranskii ozeranskii commented Oct 15, 2025

Description

Allows specifying a message key when publishing a batch of messages, enabling partition control.
Introduces the option to set a default key for a batch publisher and override it per message if required.

Usage examples (post-update)

Below are concise, copy-ready examples showing how to publish Kafka batches with and without keys, including per-message keys and mixed cases. Behavior is identical for AIOKafka and Confluent on the producer side. See the last section for how to read keys on the consumer side.


1) No keys (legacy behavior)

  • No default key on publisher/factory.
  • Batch contains plain items (not wrapped).
  • Result: every record gets key=None.
publisher = broker.publisher("topic-A", batch=True)

await publisher.publish([b"msg1", b"msg2", b"msg3"])  # keys --> [None, None, None]

2) Single default key on the publisher (or factory)

  • Default key is provided once (at publisher creation or via factory default).
  • Batch contains plain items.
  • Result: all records get the same key.
publisher = broker.publisher("topic-B", batch=True, key=b"default-key")
await publisher.publish([{"a": 1}, {"a": 2}, {"a": 3}])
# keys --> [b"default-key", b"default-key", b"default-key"]

(If you set the default on a factory, the effect is the same.)


3) Per-message keys (each element wrapped)

  • No default key on publisher.
  • Each batch item uses the wrapper {"message": <payload>, "key": <bytes|None>}.
  • Result: each record uses its own key from the wrapper.
publisher = broker.publisher("topic-C", batch=True)

await publisher.publish([
    {"message": {"id": 1}, "key": b"a"},
    {"message": {"id": 2}, "key": b"b"},
    {"message": {"id": 3}, "key": b"c"},
])
# keys --> [b"a", b"b", b"c"]

4) Mixed batch: some items have per-message keys, some don’t (no default key)

  • Items are a mix of wrapped and plain.
  • No default key on publisher.
  • Result: wrapped items use their key; others have None.
publisher = broker.publisher("topic-D", batch=True)

await publisher.publish([
    {"message": "m1", "key": b"k1"},  # has per-message key
    "m2",                             # plain --> no key
    {"message": "m3"},                # wrapped, but no key --> no key
    "m4",
])
# keys --> [b"k1", None, None, None]

5) Mixed with both default key and per-message keys

  • Publisher has a default key.
  • Some items override with a per-message key.
  • Priority: per-message key > default key.
publisher = broker.publisher("topic-one", batch=True, key=b"default")

await publisher.publish([
    {"message": 1, "key": b"a"},  # per-message wins --> b"a"
    2,                            # plain --> uses default
    {"message": 3},               # wrapped, no key --> uses default
    4,                            # plain --> uses default
])
# keys --> [b"a", b"default", b"default", b"default"]

6) Notes and edge cases

  • Wrapper format: {"message": <payload>, "key": <bytes|None>}. Prefer always including "message" in wrapped items.
  • {"key": None} behaves the same as no "key": it falls back to the default key if one exists; otherwise None.
  • If wrappers appear only for the first N items and the rest are plain, the first N use their per-message keys (or None), and the rest use the default key (if provided) or None.
  • Single-message publish (non-batch) is unchanged; per-message wrapper keys apply only to batch publishing.

Reading keys on the consumer side

  • Confluent:
    • Single message: msg.raw_message.key()
    • Batch: keys = [m.key() for m in msg.raw_message]
  • AIOKafka:
    • Single message: msg.raw_message.key
    • Batch: keys = [r.key for r in msg.raw_message]

A small helper if you want backend-agnostic access in your handler/tests:

def get_key(raw):
    k = getattr(raw, "key", None)
    return k() if callable(k) else k

Priority rule (quick recap)

  • If an item has a per-message key, use it.
  • Else, if the publisher/factory provides a default key, use it.
  • Else, None.

Fixes #2514

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (just lint shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running just test-coverage
  • I have ensured that static analysis tests are passing by running just static-analysis
  • I have included code examples to illustrate the modifications

Allows specifying a message key when publishing a batch of
messages, enabling partition control.

Introduces the option to set a default key for a batch publisher
and override it per message if required.

Closes ag2ai#2514

refactor(confluent): Removes unused blank line
@ozeranskii ozeranskii requested a review from Lancetnik as a code owner October 15, 2025 10:55
@github-actions github-actions bot added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module labels Oct 15, 2025
Copy link
Member

@Lancetnik Lancetnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but magic {"message": ..., "key": ... } doesn't look as a good API.
One reason - we may want to add more additional information to each method instead of just key (headers, publish timestamp, etc)

This reason we have a feature already - Response (or KafkaResponse)

@broker.subscriber(...)
@broker.publisher(...)
async def handler():
     return Response("body", key=b"key")

This object allows to setup any information of outgoing message you want

So, I suggest to support smth like this Response object - Message (we can discuss naming)

await broker.publish_batch(KafkaMessage("body", key=b"1"), KafkaMessage("body", key=b"2"), "just a body message")

Probably, we can reuse the KafkaMessage class we have already. Also, please take a look at KafkaPublishCommand class

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AioKafka Issues related to `faststream.kafka` module Confluent Issues related to `faststream.confluent` module

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Set the key for batch producing

2 participants