Skip to content

Commit ec59669

Browse files
kislerdmerikdw
andauthored
Add the field to avoid creating missing topic. (#873)
* Add the field to avoid creating missing topic. Add field AllowAutoTopicCreation to Writer struct. It prevents the side-effect of creating missing topic. Closes #872 * Fix the test when missing topic creation is expected Closes #872 * Add documentation to create missing topic. Add the feature on the Writer's features list. Add a code snippet to create a topic before messages publication. Closes #872 * Fixes missing word. * Update writer_test.go Co-authored-by: Erik Weathers <[email protected]> * Update README.md language adjustment @erikdw Co-authored-by: Erik Weathers <[email protected]> * Update README.md Adjust styling by @erikdw Co-authored-by: Erik Weathers <[email protected]> Co-authored-by: Erik Weathers <[email protected]>
1 parent 227809c commit ec59669

File tree

3 files changed

+84
-2
lines changed

3 files changed

+84
-2
lines changed

README.md

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ to use in most cases as it provides additional features:
337337
- Synchronous or asynchronous writes of messages to Kafka.
338338
- Asynchronous cancellation using contexts.
339339
- Flushing of pending messages on close to support graceful shutdowns.
340+
- Creation of a missing topic before publishing a message. *Note!* it was the default behaviour up to the version `v0.4.30`.
340341

341342
```go
342343
// make a writer that produces to topic-A, using the least-bytes distribution
@@ -369,6 +370,55 @@ if err := w.Close(); err != nil {
369370
}
370371
```
371372

373+
### Missing topic creation before publication
374+
375+
```go
376+
// Make a writer that publishes messages to topic-A.
377+
// The topic will be created if it is missing.
378+
w := &Writer{
379+
Addr: TCP("localhost:9092"),
380+
Topic: "topic-A",
381+
AllowAutoTopicCreation: true,
382+
}
383+
384+
messages := []kafka.Message{
385+
{
386+
Key: []byte("Key-A"),
387+
Value: []byte("Hello World!"),
388+
},
389+
{
390+
Key: []byte("Key-B"),
391+
Value: []byte("One!"),
392+
},
393+
{
394+
Key: []byte("Key-C"),
395+
Value: []byte("Two!"),
396+
},
397+
}
398+
399+
var err error
400+
const retries = 3
401+
for i := 0; i < retries; i++ {
402+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
403+
defer cancel()
404+
405+
// attempt to create topic prior to publishing the message
406+
err = w.WriteMessages(ctx, messages...)
407+
if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
408+
time.Sleep(time.Millisecond * 250)
409+
continue
410+
}
411+
412+
if err != nil {
413+
log.Fatalf("unexpected error %v", err)
414+
}
415+
}
416+
417+
if err := w.Close(); err != nil {
418+
log.Fatal("failed to close writer:", err)
419+
}
420+
```
421+
372422
### Writing to multiple topics
373423

374424
Normally, the `WriterConfig.Topic` is used to initialize a single-topic writer.

writer.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,9 @@ type Writer struct {
185185
// If nil, DefaultTransport is used.
186186
Transport RoundTripper
187187

188+
// AllowAutoTopicCreation notifies writer to create topic is missing.
189+
AllowAutoTopicCreation bool
190+
188191
// Manages the current set of partition-topic writers.
189192
group sync.WaitGroup
190193
mutex sync.Mutex
@@ -733,7 +736,7 @@ func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
733736
// caching recent results (the kafka.Transport types does).
734737
r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
735738
TopicNames: []string{topic},
736-
AllowAutoTopicCreation: true,
739+
AllowAutoTopicCreation: w.AllowAutoTopicCreation,
737740
})
738741
if err != nil {
739742
return 0, err
@@ -941,7 +944,7 @@ func newBatchQueue(initialSize int) batchQueue {
941944
bq := batchQueue{
942945
queue: make([]*writeBatch, 0, initialSize),
943946
mutex: &sync.Mutex{},
944-
cond: &sync.Cond{},
947+
cond: &sync.Cond{},
945948
}
946949

947950
bq.cond.L = bq.mutex

writer_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,10 @@ func TestWriter(t *testing.T) {
160160
scenario: "writing a message to a non-existant topic creates the topic",
161161
function: testWriterAutoCreateTopic,
162162
},
163+
{
164+
scenario: "terminates on an attempt to write a message to a nonexistent topic",
165+
function: testWriterTerminateMissingTopic,
166+
},
163167
}
164168

165169
for _, test := range tests {
@@ -712,6 +716,7 @@ func testWriterAutoCreateTopic(t *testing.T) {
712716
Topic: topic,
713717
Balancer: &RoundRobin{},
714718
})
719+
w.AllowAutoTopicCreation = true
715720
defer w.Close()
716721

717722
msg := Message{Key: []byte("key"), Value: []byte("Hello World")}
@@ -737,6 +742,30 @@ func testWriterAutoCreateTopic(t *testing.T) {
737742
}
738743
}
739744

745+
func testWriterTerminateMissingTopic(t *testing.T) {
746+
topic := makeTopic()
747+
748+
transport := &Transport{}
749+
defer transport.CloseIdleConnections()
750+
751+
writer := &Writer{
752+
Addr: TCP("localhost:9092"),
753+
Topic: topic,
754+
Balancer: &RoundRobin{},
755+
RequiredAcks: RequireNone,
756+
AllowAutoTopicCreation: false,
757+
Transport: transport,
758+
}
759+
defer writer.Close()
760+
761+
msg := Message{Value: []byte("FooBar")}
762+
763+
if err := writer.WriteMessages(context.Background(), msg); err == nil {
764+
t.Fatal("Kafka error [3] UNKNOWN_TOPIC_OR_PARTITION is expected")
765+
return
766+
}
767+
}
768+
740769
type staticBalancer struct {
741770
partition int
742771
}

0 commit comments

Comments
 (0)