Skip to content

Commit 48e249d

Browse files
committed
add discard_kafka_delivery_failed_regex to nullify specific delivery failed events by regexp
Signed-off-by: kubotat <[email protected]>
1 parent 708cc07 commit 48e249d

File tree

1 file changed

+34
-2
lines changed

1 file changed

+34
-2
lines changed

lib/fluent/plugin/out_rdkafka2.rb

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ class Fluent::Rdkafka2Output < Output
6565
config_param :topic_key, :string, :default => 'topic', :desc => "Field for kafka topic"
6666
config_param :default_topic, :string, :default => nil,
6767
:desc => "Default output topic when record doesn't have topic field"
68+
config_param :use_default_for_unknown_topic, :bool, :default => false, :desc => "If true, default_topic is used when topic not found"
69+
config_param :use_default_for_unknown_partition_error, :bool, :default => false, :desc => "If true, default_topic is used when received unknown_partition error"
6870
config_param :message_key_key, :string, :default => 'message_key', :desc => "Field for kafka message key"
6971
config_param :default_message_key, :string, :default => nil
7072
config_param :partition_key, :string, :default => 'partition', :desc => "Field for kafka partition"
@@ -234,6 +236,9 @@ def add(level, message = nil)
234236
@rdkafka = Rdkafka::Config.new(config)
235237

236238
if @default_topic.nil?
239+
if @use_default_for_unknown_topic || @use_default_for_unknown_partition_error
240+
raise Fluent::ConfigError, "default_topic must be set when use_default_for_unknown_topic or use_default_for_unknown_partition_error is true"
241+
end
237242
if @chunk_keys.include?(@topic_key) && !@chunk_key_tag
238243
log.warn "Use '#{@topic_key}' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer #{@topic_key},tag>"
239244
end
@@ -471,24 +476,51 @@ def write(chunk)
471476

472477
def enqueue_with_retry(producer, topic, record_buf, message_key, partition, headers, time)
473478
attempt = 0
479+
actual_topic = topic
480+
474481
loop do
475482
begin
476483
@enqueue_rate.raise_if_limit_exceeded(record_buf.bytesize) if @enqueue_rate
477-
return producer.produce(topic: topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
484+
return producer.produce(topic: actual_topic, payload: record_buf, key: message_key, partition: partition, headers: headers, timestamp: @use_event_time ? Time.at(time) : nil)
478485
rescue EnqueueRate::LimitExceeded => e
479486
@enqueue_rate.revert if @enqueue_rate
480487
duration = e.next_retry_clock - Fluent::Clock.now
481488
sleep(duration) if duration > 0.0
482489
rescue Exception => e
483490
@enqueue_rate.revert if @enqueue_rate
484-
if e.respond_to?(:code) && e.code == :queue_full
491+
492+
if !e.respond_to?(:code)
493+
raise e
494+
end
495+
496+
case e.code
497+
when :queue_full
485498
if attempt <= @max_enqueue_retries
486499
log.warn "Failed to enqueue message; attempting retry #{attempt} of #{@max_enqueue_retries} after #{@enqueue_retry_backoff}s"
487500
sleep @enqueue_retry_backoff
488501
attempt += 1
489502
else
490503
raise "Failed to enqueue message although tried retry #{@max_enqueue_retries} times"
491504
end
505+
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#LL309C9-L309C41
506+
# RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC
507+
when :unknown_topic
508+
if @use_default_for_unknown_topic && actual_topic != @default_topic
509+
log.debug "'#{actual_topic}' topic not found. Retry with '#{@default_topic}' topic"
510+
actual_topic = @default_topic
511+
retry
512+
end
513+
raise e
514+
# https://github.com/confluentinc/librdkafka/blob/c282ba2423b2694052393c8edb0399a5ef471b3f/src/rdkafka.h#L305
515+
# RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION
516+
when :unknown_partition
517+
if @use_default_for_unknown_partition_error && actual_topic != @default_topic
518+
log.debug "failed writing to topic '#{actual_topic}' with error '#{e.to_s}'. Writing message to topic '#{@default_topic}'"
519+
actual_topic = @default_topic
520+
retry
521+
end
522+
523+
raise e
492524
else
493525
raise e
494526
end

0 commit comments

Comments
 (0)