-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathdocumentation.html
2677 lines (2419 loc) · 203 KB
/
documentation.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!-- should always link the the latest release's documentation -->
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
<html xmlns:og="http://ogp.me/ns#">
<head>
<title>Apache Kafka</title>
<link rel='stylesheet' href='./styles.css' type='text/css'>
<link rel="icon" type="image/gif" href="./images/apache_feather.gif">
<link href='http://fonts.googleapis.com/css?family=Source+Sans+Pro:400,400italic' rel='stylesheet' type='text/css'>
<meta name="robots" content="index,follow" />
<meta name="language" content="en" />
<meta name="keywords" content="apache kafka messaging queuing distributed stream processing">
<meta name="description" content="Apache Kafka: A high-throughput, distributed, publish-subscribe messaging system.">
<meta http-equiv='Content-Type' content='text/html;charset=utf-8' />
<meta property="og:title" value="Kafka" />
<meta property="og:image" value="" />
<meta property="og:description" value="A high-throughput, distributed, publish-subscribe messaging system." />
<meta property="og:site_name" value="sna-projects" />
<meta property="og:type" value="website" />
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-7818013-4', 'apache.org');
ga('send', 'pageview');
</script>
</head>
<body>
<div id="everything">
<div id="header">
<table>
<tr>
<td><a href="http://kafka.apache.org/"><img src="./images/kafka_logo.png"></a></td>
<td class="title">
<a href="http://kafka.apache.org/">Apache Kafka</a>
<br>
<span class="subtitle"><a href="http://kafka.apache.org/">A high-throughput distributed messaging system.</a></span>
</td>
</tr>
</table>
</div>
<div class="lsidebar">
<ul>
<li><a href="http://kafka.apache.org/downloads.html">download</a></li>
<li><a href="documentation.html#introduction">introduction</a></li>
<li><a href="documentation.html#uses">uses</a></li>
<li><a href="documentation.html">documentation</a></li>
<li><a href="http://kafka.apache.org/performance.html">performance</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">clients</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem">ecosystem</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/FAQ">faq</a></li>
<li>project
<ul>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA">wiki</a></li>
<li><a href="https://issues.apache.org/jira/browse/KAFKA">bugs</a></li>
<li><a href="http://kafka.apache.org/contact.html">mailing lists</a></li>
<li><a href="http://kafka.apache.org/committers.html">committers</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Powered+By">powered by</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+papers+and+presentations">papers & talks</a></li>
</ul>
</li>
<li>developers
<ul>
<li><a href="http://kafka.apache.org/code.html">code</a></li>
<li><a href="http://cwiki.apache.org/confluence/display/KAFKA/Projects">projects</a></li>
<li><a href="http://kafka.apache.org/contributing.html">contributing</a></li>
<li><a href="http://kafka.apache.org/coding-guide.html">coding guide</a></li>
<li><a href="https://builds.apache.org">unit tests</a></li>
</ul>
</li>
</ul>
</div>
<div class='content'>
<h1>Kafka 0.8.1 Documentation</h1>
Prior releases: <a href="http://kafka.apache.org/07/documentation.html">0.7.x</a>, <a href="http://kafka.apache.org/08/documentation.html">0.8.0</a>.
</ul>
<ul class="toc">
<li><a href="documentation.html#gettingStarted">1. Getting Started</a>
<ul>
<li><a href="documentation.html#introduction">1.1 Introduction</a>
<li><a href="documentation.html#uses">1.2 Use Cases</a>
<li><a href="documentation.html#quickstart">1.3 Quick Start</a>
<li><a href="documentation.html#ecosystem">1.4 Ecosystem</a>
<li><a href="documentation.html#upgrade">1.5 Upgrading</a>
</ul>
<li><a href="documentation.html#api">2. API</a>
<ul>
<li><a href="documentation.html#producerapi">2.1 Producer API</a>
<li><a href="documentation.html#highlevelconsumerapi">2.2 High Level Consumer API</a>
<li><a href="documentation.html#simpleconsumerapi">2.3 Simple Consumer API</a>
<li><a href="documentation.html#kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a>
</ul>
<li><a href="documentation.html#configuration">3. Configuration</a>
<ul>
<li><a href="documentation.html#brokerconfigs">3.1 Broker Configs</a>
<li><a href="documentation.html#consumerconfigs">3.2 Consumer Configs</a>
<li><a href="documentation.html#producerconfigs">3.3 Producer Configs</a>
<li><a href="documentation.html#newproducerconfigs">3.4 New Producer Configs</a>
</ul>
<li><a href="documentation.html#design">4. Design</a>
<ul>
<li><a href="documentation.html#majordesignelements">4.1 Motivation</a>
<li><a href="documentation.html#persistence">4.2 Persistence</a>
<li><a href="documentation.html#maximizingefficiency">4.3 Efficiency</a>
<li><a href="documentation.html#theproducer">4.4 The Producer</a>
<li><a href="documentation.html#theconsumer">4.5 The Consumer</a>
<li><a href="documentation.html#semantics">4.6 Message Delivery Semantics</a>
<li><a href="documentation.html#replication">4.7 Replication</a>
<li><a href="documentation.html#compaction">4.8 Log Compaction</a>
</ul>
<li><a href="documentation.html#implementation">5. Implementation</a>
<ul>
<li><a href="documentation.html#apidesign">5.1 API Design</a>
<li><a href="documentation.html#networklayer">5.2 Network Layer</a>
<li><a href="documentation.html#messages">5.3 Messages</a>
<li><a href="documentation.html#messageformat">5.4 Message format</a>
<li><a href="documentation.html#log">5.5 Log</a>
<li><a href="documentation.html#distributionimpl">5.6 Distribution</a>
</ul>
<li><a href="documentation.html#operations">6. Operations</a>
<ul>
<li><a href="documentation.html#basic_ops">6.1 Basic Kafka Operations</a>
<ul>
<li><a href="documentation.html#basic_ops_add_topic">Adding and removing topics</a>
<li><a href="documentation.html#basic_ops_modify_topic">Modifying topics</a>
<li><a href="documentation.html#basic_ops_restarting">Graceful shutdown</a>
<li><a href="documentation.html#basic_ops_leader_balancing">Balancing leadership</a>
<li><a href="documentation.html#basic_ops_consumer_lag">Checking consumer position</a>
<li><a href="documentation.html#basic_ops_mirror_maker">Mirroring data between clusters</a>
<li><a href="documentation.html#basic_ops_cluster_expansion">Expanding your cluster</a>
<li><a href="documentation.html#basic_ops_decommissioning_brokers">Decommissioning brokers</a>
<li><a href="documentation.html#basic_ops_increase_replication_factor">Increasing replication factor</a>
</ul>
<li><a href="documentation.html#datacenters">6.2 Datacenters</a>
<li><a href="documentation.html#config">6.3 Important Configs</a>
<ul>
<li><a href="documentation.html#serverconfig">Important Server Configs</a>
<li><a href="documentation.html#clientconfig">Important Client Configs</a>
<li><a href="documentation.html#prodconfig">A Production Server Configs</a>
</ul>
<li><a href="documentation.html#java">6.4 Java Version</a>
<li><a href="documentation.html#hwandos">6.5 Hardware and OS</a>
<ul>
<li><a href="documentation.html#os">OS</a>
<li><a href="documentation.html#diskandfs">Disks and Filesystems</a>
<li><a href="documentation.html#appvsosflush">Application vs OS Flush Management</a>
<li><a href="documentation.html#linuxflush">Linux Flush Behavior</a>
<li><a href="documentation.html#ext4">Ext4 Notes</a>
</ul>
<li><a href="documentation.html#monitoring">6.6 Monitoring</a>
<li><a href="documentation.html#zk">6.7 ZooKeeper</a>
<ul>
<li><a href="documentation.html#zkversion">Stable Version</a>
<li><a href="documentation.html#zkops">Operationalization</a>
</ul>
</ul>
</ul>
<h2><a id="gettingStarted">1. Getting Started</a></h2>
<h3><a id="introduction">1.1 Introduction</a></h3>
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
<p>
What does all that mean?
<p>
First let's review some basic messaging terminology:
<ul>
<li>Kafka maintains feeds of messages in categories called <i>topics</i>.
<li>We'll call processes that publish messages to a Kafka topic <i>producers</i>.
<li>We'll call processes that subscribe to topics and process the feed of published messages <i>consumers</i>..
<li>Kafka is run as a cluster comprised of one or more servers each of which is called a <i>broker</i>.
</ul>
So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:
<div style="text-align: center; width: 100%">
<img src="images/producer_consumer.png">
</div>
Communication between the clients and the servers is done with a simple, high-performance, language agnostic <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">TCP protocol</a>. We provide a Java client for Kafka, but clients are available in <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">many languages</a>.
<h4>Topics and Logs</h4>
Let's first dive into the high-level abstraction Kafka provides—the topic.
<p>
A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:
<div style="text-align: center; width: 100%">
<img src="images/log_anatomy.png">
</div>
Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the <i>offset</i> that uniquely identifies each message within the partition.
<p>
The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.
<p>
In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.
<p>
This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers.
<p>
The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.
<h4>Distribution</h4>
The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.
<p>
Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.
<h4>Producers</h4>
Producers publish data to the topics of their choice. The producer is responsible for choosing which message to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the message). More on the use of partitioning in a second.
<h4><a id="intro_consumers">Consumers</a></h4>
Messaging traditionally has two models: <a href="http://en.wikipedia.org/wiki/Message_queue">queuing</a> and <a href="http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern">publish-subscribe</a>. In a queue, a pool of consumers may read from a server and each message goes to one of them; in publish-subscribe the message is broadcast to all consumers. Kafka offers a single consumer abstraction that generalizes both of these—the <i>consumer group</i>.
<p>
Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
<p>
If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
<p>
If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.
<p>
More commonly, however, we have found that topics have a small number of consumer groups, one for each "logical subscriber". Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is cluster of consumers instead of a single process.
<p>
<div style="float: right; margin: 20px; width: 500px" class="caption">
<img src="images/consumer-groups.png"><br>
A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
</div>
<p>
Kafka has stronger ordering guarantees than a traditional messaging system, too.
<p>
A traditional queue retains messages in-order on the server, and if multiple consumers consume from the queue then the server hands out messages in the order they are stored. However, although the server hands out messages in order, the messages are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the messages is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of "exclusive consumer" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.
<p>
Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances than partitions.
<p>
Kafka only provides a total order over messages <i>within</i> a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over messages this can be achieved with a topic that has only one partition, though this will mean only one consumer process.
<h4>Guarantees</h4>
At a high-level Kafka gives the following guarantees:
<ul>
<li>Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a message M1 is sent by the same producer as a message M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
<li>A consumer instance sees messages in the order they are stored in the log.
<li>For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any messages committed to the log.
</ul>
More details on these guarantees are given in the design section of the documentation.
<h3><a id="uses">1.2 Use Cases</a></h3>
Here is a description of a few of the popular use cases for Apache Kafka. For an overview of a number of these areas in action, see <a href="http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying">this blog post</a>.
<h4>Messaging</h4>
Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.
<p>
In our experience messaging uses are often comparatively low-throughput, but may require low end-to-end latency and often depend on the strong durability guarantees Kafka provides.
<p>
In this domain Kafka is comparable to traditional messaging systems such as <a href="http://activemq.apache.org">ActiveMQ</a> or <a href="https://www.rabbitmq.com">RabbitMQ</a>.
<h4>Website Activity Tracking</h4>
The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.
<p>
Activity tracking is often very high volume as many activity messages are generated for each user page view.
<h4>Metrics</h4>
Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.
<h4>Log Aggregation</h4>
Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner abstraction of log or event data as a stream of messages. This allows for lower-latency processing and easier support for multiple data sources and distributed data consumption.
In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication, and much lower end-to-end latency.
<h4>Stream Processing</h4>
Many users end up doing stage-wise processing of data where data is consumed from topics of raw data and then aggregated, enriched, or otherwise transformed into new Kafka topics for further consumption. For example a processing flow for article recommendation might crawl article content from RSS feeds and publish it to an "articles" topic; further processing might help normalize or deduplicate this content to a topic of cleaned article content; a final stage might attempt to match this content to users. This creates a graph of real-time data flow out of the individual topics. <a href="https://github.com/nathanmarz/storm">Storm</a> and <a href="http://samza.incubator.apache.org/">Samza</a> are popular frameworks for implementing these kinds of transformations.
<h4>Event Sourcing</h4>
<a href="http://martinfowler.com/eaaDev/EventSourcing.html">Event sourcing</a> is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka's support for very large stored log data makes it an excellent backend for an application built in this style.
<h4>Commit Log</h4>
Kafka can serve as a kind of external commit-log for a distributed system. The log helps replicate data between nodes and acts as a re-syncing mechanism for failed nodes to restore their data. The <a href="documentation.html#compaction">log compaction</a> feature in Kafka helps support this usage. In this usage Kafka is similar to <a href="http://zookeeper.apache.org/bookkeeper/">Apache BookKeeper</a> project.
<h3><a id="quickstart">1.3 Quick Start</a></h3>
This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data.
<h4> Step 1: Download the code </h4>
<a href="https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz" title="Kafka downloads">Download</a> the 0.8.1.1 release and un-tar it.
<pre>
> <b>tar -xzf kafka_2.9.2-0.8.1.1.tgz</b>
> <b>cd kafka_2.9.2-0.8.1.1</b>
</pre>
<h4>Step 2: Start the server</h4>
<p>
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
<pre>
> <b>bin/zookeeper-server-start.sh config/zookeeper.properties</b>
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
</pre>
Now start the Kafka server:
<pre>
> <b>bin/kafka-server-start.sh config/server.properties</b>
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
</pre>
<h4>Step 3: Create a topic</h4>
Let's create a topic named "test" with a single partition and only one replica:
<pre>
> <b>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test</b>
</pre>
We can now see that topic if we run the list topic command:
<pre>
> <b>bin/kafka-topics.sh --list --zookeeper localhost:2181</b>
test
</pre>
Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
<h4>Step 4: Send some messages</h4>
Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.
<p>
Run the producer and then type a few messages into the console to send to the server.
<pre>
> <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test</b>
<b>This is a message</b>
<b>This is another message</b>
</pre>
<h4>Step 5: Start a consumer</h4>
Kafka also has a command line consumer that will dump out messages to standard output.
<pre>
> <b>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning</b>
This is a message
This is another message
</pre>
<p>
If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
</p>
<p>
All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.
</p>
<h4>Step 6: Setting up a multi-broker cluster</h4>
So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).
<p>
First we make a config file for each of the brokers:
<pre>
> <b>cp config/server.properties config/server-1.properties</b>
> <b>cp config/server.properties config/server-2.properties</b>
</pre>
Now edit these new files and set the following properties:
<pre>
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
</pre>
The <code>broker.id</code> property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each others data.
<p>
We already have Zookeeper and our single node started, so we just need to start the two new nodes:
<pre>
> <b>bin/kafka-server-start.sh config/server-1.properties &</b>
...
> <b>bin/kafka-server-start.sh config/server-2.properties &</b>
...
</pre>
Now create a new topic with a replication factor of three:
<pre>
> <b>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic</b>
</pre>
Okay but now that we have a cluster how can we know which broker is doing what? To see that run the "describe topics" command:
<pre>
> <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic</b>
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
</pre>
Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.
<ul>
<li>"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
<li>"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
<li>"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
</ul>
Note that in my example node 1 is the leader for the only partition of the topic.
<p>
We can run the same command on the original topic we created to see where it is:
<pre>
> <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test</b>
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
</pre>
So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.
<p>
Let's publish a few messages to our new topic:
<pre>
> <b>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic</b>
...
<b>my test message 1</b>
<b>my test message 2</b>
<b>^C</b>
</pre>
Now let's consume these messages:
<pre>
> <b>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic</b>
...
my test message 1
my test message 2
<b>^C</b>
</pre>
Now let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:
<pre>
> <b>ps | grep server-1.properties</b>
<i>7564</i> ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> <b>kill -9 7564</b>
</pre>
Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:
<pre>
> <b>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic</b>
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
</pre>
But the messages are still be available for consumption even though the leader that took the writes originally is down:
<pre>
> <b>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic</b>
...
my test message 1
my test message 2
<b>^C</b>
</pre>
<h3><a id="upgrade">1.4 Ecosystem</a></h3>
There are a plethora of tools that integrate with Kafka outside the main distribution. The <a href="https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem"> ecosystem page</a> lists many of these, including stream processing systems, Hadoop integration, monitoring, and deployment tools.
<h3><a id="upgrade">1.5 Upgrading From Previous Versions</a></h3>
<h4>Upgrading from 0.8.0 to 0.8.1</h4>
0.8.1 is fully compatible with 0.8. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.
<h4>Upgrading from 0.7</h4>
0.8, the release in which added replication, was our first backwards-incompatible release: major changes were made to the API, ZooKeeper data structures, and protocol, and configuration. The upgrade from 0.7 to 0.8.x requires a <a href="https://cwiki.apache.org/confluence/display/KAFKA/Migrating+from+0.7+to+0.8">special tool</a> for migration. This migration can be done without downtime.
<h2><a id="api">2. API</a></h2>
<h3><a id="producerapi">2.1 Producer API</a></h3>
<pre>
/**
* V: type of the message
* K: type of the optional key associated with the message
*/
class kafka.javaapi.producer.Producer<K,V> {
public Producer(ProducerConfig config);
/**
* Sends the data to a single topic, partitioned by key, using either the
* synchronous or the asynchronous producer
* @param message the producer data object that encapsulates the topic, key and message data
*/
public void send(KeyedMessage<K,V> message);
/**
* Use this API to send data to multiple topics
* @param messages list of producer data objects that encapsulate the topic, key and message data
*/
public void send(List<KeyedMessage<K,V>> messages);
/**
* Close API to close the producer pool connections to all Kafka brokers.
*/
public void close();
}
</pre>
You can follow
<a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example" title="Kafka 0.8 producer example">this example</a> to learn how to use the producer api.
<h3><a id="highlevelconsumerapi">2.2 High Level Consumer API</a></h3>
<pre>
class Consumer {
/**
* Create a ConsumerConnector
*
* @param config at the minimum, need to specify the groupid of the consumer and the zookeeper
* connection string zookeeper.connect.
*/
public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
}
/**
* V: type of the message
* K: type of the optional key assciated with the message
*/
public interface kafka.javaapi.consumer.ConsumerConnector {
/**
* Create a list of message streams of type T for each topic.
*
* @param topicCountMap a map of (topic, #streams) pair
* @param decoder a decoder that converts from Message to T
* @return a map of (topic, list of KafkaStream) pairs.
* The number of items in the list is #streams. Each stream supports
* an iterator over message/metadata pairs.
*/
public <K,V> Map<String, List<KafkaStream<K,V>>>
createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams of type T for each topic, using the default decoder.
*/
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
* Create a list of message streams for topics matching a wildcard.
*
* @param topicFilter a TopicFilter that specifies which topics to
* subscribe to (encapsulates a whitelist or a blacklist).
* @param numStreams the number of message streams to return.
* @param keyDecoder a decoder that decodes the message key
* @param valueDecoder a decoder that decodes the message itself
* @return a list of KafkaStream. Each stream supports an
* iterator over its MessageAndMetadata elements.
*/
public <K,V> List<KafkaStream<K,V>>
createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder.
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);
/**
* Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
*/
public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);
/**
* Commit the offsets of all topic/partitions connected by this connector.
*/
public void commitOffsets();
/**
* Shut down the connector
*/
public void shutdown();
}
</pre>
You can follow
<a href="https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example" title="Kafka 0.8 consumer example">this example</a> to learn how to use the high level consumer api.
<h3><a id="simpleconsumerapi">2.3 Simple Consumer API</a></h3>
<pre>
class kafka.javaapi.consumer.SimpleConsumer {
/**
* Fetch a set of messages from a topic.
*
* @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
* @return a set of fetched messages
*/
public FetchResponse fetch(kafka.javaapi.FetchRequest request);
/**
* Fetch metadata for a sequence of topics.
*
* @param request specifies the versionId, clientId, sequence of topics.
* @return metadata for each topic in the request.
*/
public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);
/**
* Get a list of valid offsets (up to maxSize) before the given time.
*
* @param request a [[kafka.javaapi.OffsetRequest]] object.
* @return a [[kafka.javaapi.OffsetResponse]] object.
*/
public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
/**
* Close the SimpleConsumer.
*/
public void close();
}
</pre>
For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in
<a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example" title="Kafka 0.8 SimpleConsumer example">here</a>.
<h3><a id="kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a></h3>
<p>
Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).
</p>
<p>
Usage information on the hadoop consumer can be found <a href="https://github.com/linkedin/camus/tree/camus-kafka-0.8/">here</a>.
</p>
<h2><a id="configuration">3. Configuration</a></h2>
Kafka uses key-value pairs in the <a href="http://en.wikipedia.org/wiki/.properties">property file format</a> for configuration. These values can be supplied either from a file or programmatically.
<h3><a id="brokerconfigs">3.1 Broker Configs</a></h3>
The essential configurations are the following:
<ul>
<li><code>broker.id</code>
<li><code>log.dirs</code>
<li><code>zookeeper.connect</code>
</ul>
Topic-level configurations and defaults are discussed in more detail <a href="documentation.html#topic-config">below</a>.
<table class="data-table">
<tbody><tr>
<th>Property</th>
<th>Default</th>
<th>Description</th>
</tr>
<tr>
<td>broker.id</td>
<td></td>
<td>Each broker is uniquely identified by a non-negative integer id. This id serves as the broker's "name" and allows the broker to be moved to a different host/port without confusing consumers. You can choose any number you like so long as it is unique.
</td>
</tr>
<tr>
<td>log.dirs</td>
<td nowrap>/tmp/kafka-logs</td>
<td>A comma-separated list of one or more directories in which Kafka data is stored. Each new partition that is created will be placed in the directory which currently has the fewest partitions.</td>
</tr>
<tr>
<td>port</td>
<td>6667</td>
<td>The port on which the server accepts client connections.</td>
</tr>
<tr>
<td>zookeeper.connect</td>
<td>null</td>
<td>Specifies the ZooKeeper connection string in the form <code>hostname:port</code>, where hostname and port are the host and port for a node in your ZooKeeper cluster. To allow connecting through other ZooKeeper nodes when that host is down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.
<p>
ZooKeeper also allows you to add a "chroot" path which will make all kafka data for this cluster appear under a particular path. This is a way to setup multiple Kafka clusters or other applications on the same ZooKeeper cluster. To do this give a connection string in the form <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code> which would put all this cluster's data under the path <code>/chroot/path</code>. Note that you must create this path yourself prior to starting the broker and consumers must use the same connection string.</td>
</tr>
<tr>
<td>message.max.bytes</td>
<td>1000000</td>
<td>The maximum size of a message that the server can receive. It is important that this property be in sync with the maximum fetch size your consumers use or else an unruly producer will be able to publish messages too large for consumers to consume.</td>
</tr>
<tr>
<td>num.network.threads</td>
<td>3</td>
<td>The number of network threads that the server uses for handling network requests. You probably don't need to change this.</td>
</tr>
<tr>
<td>num.io.threads</td>
<td>8</td>
<td>The number of I/O threads that the server uses for executing requests. You should have at least as many threads as you have disks.</td>
</tr>
<tr>
<td>background.threads</td>
<td>4</td>
<td>The number of threads to use for various background processing tasks such as file deletion. You should not need to change this.</td>
</tr>
<tr>
<td>queued.max.requests</td>
<td>500</td>
<td>The number of requests that can be queued up for processing by the I/O threads before the network threads stop reading in new requests.</td>
</tr>
<tr>
<td>host.name</td>
<td>null</td>
<td>
<p>Hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces, and publish one to ZK.</p>
</td>
</tr>
<tr>
<td>advertised.host.name</td>
<td>null</td>
<td>
<p>If this is set this is the hostname that will be given out to producers, consumers, and other brokers to connect to.</p>
</td>
</tr>
<tr>
<td>advertised.port</td>
<td>null</td>
<td>
<p>The port to give out to producers, consumers, and other brokers to use in establishing connections. This only needs to be set if this port is different from the port the server should bind to.</p>
</td>
</tr>
<tr>
<td>socket.send.buffer.bytes</td>
<td>100 * 1024</td>
<td>The SO_SNDBUFF buffer the server prefers for socket connections.</td>
</tr>
<tr>
<td>socket.receive.buffer.bytes</td>
<td>100 * 1024</td>
<td>The SO_RCVBUFF buffer the server prefers for socket connections.</td>
</tr>
<tr>
<td>socket.request.max.bytes</td>
<td>100 * 1024 * 1024</td>
<td>The maximum request size the server will allow. This prevents the server from running out of memory and should be smaller than the Java heap size.</td>
</tr>
<tr>
<td>num.partitions</td>
<td>1</td>
<td>The default number of partitions per topic if a partition count isn't given at topic creation time.</td>
</tr>
<tr>
<td>log.segment.bytes</td>
<td nowrap>1024 * 1024 * 1024</td>
<td>The log for a topic partition is stored as a directory of segment files. This setting controls the size to which a segment file will grow before a new segment is rolled over in the log. This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.roll.hours</td>
<td>24 * 7</td>
<td>This setting will force Kafka to roll a new log segment even if the log.segment.bytes size has not been reached. This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.cleanup.policy</td>
<td>delete</td>
<td>This can take either the value <i>delete</i> or <i>compact</i>. If <i>delete</i> is set, log segments will be deleted when they reach the size or time limits set. If <i>compact</i> is set <a href="documentation.html#compaction">log compaction</a> will be used to clean out obsolete records. This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.retention.{minutes,hours}</td>
<td>7 days</td>
<td>The amount of time to keep a log segment before it is deleted, i.e. the default data retention window for all topics. Note that if both log.retention.minutes and log.retention.bytes are both set we delete a segment when either limit is exceeded. This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.retention.bytes</td>
<td>-1</td>
<td>The amount of data to retain in the log for each topic-partitions. Note that this is the limit per-partition so multiply by the number of partitions to get the total data retained for the topic. Also note that if both log.retention.hours and log.retention.bytes are both set we delete a segment when either limit is exceeded. This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.retention.check.interval.ms</td>
<td>5 minutes</td>
<td>The period with which we check whether any log segment is eligible for deletion to meet the retention policies.</td>
</tr>
<tr>
<td>log.cleaner.enable</td>
<td>false</td>
<td>This configuration must be set to true for log compaction to run.</td>
</tr>
<tr>
<td>log.cleaner.threads</td>
<td>1</td>
<td>The number of threads to use for cleaning logs in log compaction.</td>
</tr>
<tr>
<td>log.cleaner.io.max.bytes.per.second</td>
<td>None</td>
<td>The maximum amount of I/O the log cleaner can do while performing log compaction. This setting allows setting a limit for the cleaner to avoid impacting live request serving.</td>
</tr>
<tr>
<td>log.cleaner.dedupe.buffer.size</td>
<td>500*1024*1024</td>
<td>The size of the buffer the log cleaner uses for indexing and deduplicating logs during cleaning. Larger is better provided you have sufficient memory.</td>
</tr>
<tr>
<td>log.cleaner.io.buffer.size</td>
<td>512*1024</td>
<td>The size of the I/O chunk used during log cleaning. You probably don't need to change this.</td>
</tr>
<tr>
<td>log.cleaner.io.buffer.load.factor</td>
<td>0.9</td>
<td>The load factor of the hash table used in log cleaning. You probably don't need to change this.</td>
</tr>
<tr>
<td>log.cleaner.backoff.ms</td>
<td>15000</td>
<td>The interval between checks to see if any logs need cleaning.</td>
</tr>
<tr>
<td>log.cleaner.min.cleanable.ratio</td>
<td>0.5</td>
<td>This configuration controls how frequently the log compactor will attempt to clean the log (assuming <a href="documentation.html#compaction">log compaction</a> is enabled). By default we will avoid cleaning a log where more than 50% of the log has been compacted. This ratio bounds the maximum space wasted in the log by duplicates (at 50% at most 50% of the log could be duplicates). A higher ratio will mean fewer, more efficient cleanings but will mean more wasted space in the log. This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.cleaner.delete.retention.ms</td>
<td>1 day</td>
<td>The amount of time to retain delete tombstone markers for <a href="documentation.html#compaction">log compacted</a> topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan). This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.index.size.max.bytes</td>
<td>10 * 1024 * 1024</td>
<td>The maximum size in bytes we allow for the offset index for each log segment. Note that we will always pre-allocate a sparse file with this much space and shrink it down when the log rolls. If the index fills up we will roll a new log segment even if we haven't reached the log.segment.bytes limit. This setting can be overridden on a per-topic basis (see <a href="documentation.html#topic-config">the per-topic configuration section</a>).</td>
</tr>
<tr>
<td>log.index.interval.bytes</td>
<td>4096</td>
<td>The byte interval at which we add an entry to the offset index. When executing a fetch request the server must do a linear scan for up to this many bytes to find the correct position in the log to begin and end the fetch. So setting this value to be larger will mean larger index files (and a bit more memory usage) but less scanning. However the server will never add more than one index entry per log append (even if more than log.index.interval worth of messages are appended). In general you probably don't need to mess with this value.</td>
</tr>
<tr>
<td>log.flush.interval.messages</td>
<td>None</td>
<td>The number of messages written to a log partition before we force an fsync on the log. Setting this lower will sync data to disk more often but will have a major impact on performance. We generally recommend that people make use of replication for durability rather than depending on single-server fsync, however this setting can be used to be extra certain.</td>
</tr>
<tr>
<td>log.flush.scheduler.interval.ms</td>
<td>3000</td>
<td>The frequency in ms that the log flusher checks whether any log is eligible to be flushed to disk.</td>
</tr>
<tr>
<td>log.flush.interval.ms</td>
<td>None</td>
<td>The maximum time between fsync calls on the log. If used in conjuction with log.flush.interval.messages the log will be flushed when either criteria is met.</td>
</tr>
<tr>
<td>log.delete.delay.ms</td>
<td>60000</td>
<td>The period of time we hold log files around after they are removed from the in-memory segment index. This period of time allows any in-progress reads to complete uninterrupted without locking. You generally don't need to change this.</td>
</tr>
<tr>
<td>log.flush.offset.checkpoint.interval.ms</td>
<td>60000</td>
<td>The frequency with which we checkpoint the last flush point for logs for recovery. You should not need to change this.</td>
</tr>
<tr>
<td>auto.create.topics.enable</td>
<td>true</td>
<td>Enable auto creation of topic on the server. If this is set to true then attempts to produce, consume, or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.</td>
</tr>
<tr>
<td>controller.socket.timeout.ms</td>
<td>30000</td>
<td>The socket timeout for commands from the partition management controller to the replicas.</td>
</tr>
<tr>
<td>controller.message.queue.size</td>
<td>10</td>
<td>The buffer size for controller-to-broker-channels</td>
</tr>
<tr>
<td>default.replication.factor</td>
<td>1</td>
<td>The default replication factor for automatically created topics.</td>
</tr>
<tr>
<td>replica.lag.time.max.ms</td>
<td>10000</td>
<td>If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead.</td>
</tr>
<tr>
<td>replica.lag.max.messages</td>
<td>4000</td>
<td>If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead.</td>
</tr>
<tr>
<td>replica.socket.timeout.ms</td>
<td>30 * 1000</td>
<td>The socket timeout for network requests to the leader for replicating data.</td>
</tr>
<tr>
<td>replica.socket.receive.buffer.bytes</td>
<td>64 * 1024</td>
<td>The socket receive buffer for network requests to the leader for replicating data.</td>
</tr>
<tr>
<td>replica.fetch.max.bytes</td>
<td nowrap>1024 * 1024</td>
<td>The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.</td>
</tr>
<tr>
<td>replica.fetch.wait.max.ms</td>
<td>500</td>
<td>The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.</td>
</tr>
<tr>
<td>replica.fetch.min.bytes</td>
<td>1</td>
<td>Minimum bytes expected for each fetch response for the fetch requests from the replica to the leader. If not enough bytes, wait up to replica.fetch.wait.max.ms for this many bytes to arrive.</td>
</tr>
<tr>
<td>num.replica.fetchers</td>
<td>1</td>
<td>
<p>Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.</p>
</td>
</tr>
<tr>
<td>replica.high.watermark.checkpoint.interval.ms</td>
<td>5000</td>
<td>The frequency with which each replica saves its high watermark to disk to handle recovery.</td>
</tr>
<tr>
<td>fetch.purgatory.purge.interval.requests</td>
<td>10000</td>
<td>The purge interval (in number of requests) of the fetch request purgatory.</td>
</tr>
<tr>
<td>producer.purgatory.purge.interval.requests</td>
<td>10000</td>
<td>The purge interval (in number of requests) of the producer request purgatory.</td>
</tr>
<tr>
<td>zookeeper.session.timeout.ms</td>
<td>6000</td>
<td>ZooKeeper session timeout. If the server fails to heartbeat to ZooKeeper within this period of time it is considered dead. If you set this too low the server may be falsely considered dead; if you set it too high it may take too long to recognize a truly dead server.</td>
</tr>
<tr>
<td>zookeeper.connection.timeout.ms</td>
<td>6000</td>
<td>The maximum amount of time that the client waits to establish a connection to zookeeper.</td>
</tr>
<tr>
<td>zookeeper.sync.time.ms</td>
<td>2000</td>
<td>How far a ZK follower can be behind a ZK leader.</td>
</tr>
<tr>
<td>controlled.shutdown.enable</td>
<td>false</td>
<td>Enable controlled shutdown of the broker. If enabled, the broker will move all leaders on it to some other brokers before shutting itself down. This reduces the unavailability window during shutdown.</td>
</tr>
<tr>
<td>controlled.shutdown.max.retries</td>
<td>3</td>
<td>Number of retries to complete the controlled shutdown successfully before executing an unclean shutdown.</td>
</tr>
<tr>
<td>controlled.shutdown.retry.backoff.ms</td>
<td>5000</td>
<td>Backoff time between shutdown retries.</td>
</tr>
<tr>
<td>auto.leader.rebalance.enable</td>
<td>false</td>
<td>If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the "preferred" replica for each partition if it is available.</td>
</tr>
<tr>
<td>leader.imbalance.per.broker.percentage</td>
<td>10</td>
<td>The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above
the configured value per broker.</td>
</tr>
<tr>
<td>leader.imbalance.check.interval.seconds</td>
<td>300</td>
<td>The frequency with which to check for leader imbalance.</td>
</tr>
<tr>
<td>offset.metadata.max.bytes</td>
<td>1024</td>
<td>The maximum amount of metadata to allow clients to save with their offsets.</td>
</tr>
</tbody></table>
<p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>
<h4><a id="topic-config">Topic-level configuration</a></h3>
Configurations pertinent to topics have both a global default as well an optional per-topic override. If no per-topic configuration is given the global default is used. The override can be set at topic creation time by giving one or more <code>--config</code> options. This example creates a topic named <i>my-topic</i> with a custom max message size and flush rate:
<pre>
<b> > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1</b>
</pre>
Overrides can also be changed or set later using the alter topic command. This example updates the max message size for <i>my-topic</i>:
<pre>
<b> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
--config max.message.bytes=128000</b>
</pre>
To remove an override you can do
<pre>
<b> > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
--deleteConfig max.message.bytes</b>
</pre>
The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading, setting this default in the server config allows you to change the default given to topics that have no override specified.
<table class="data-table">
<tbody>
<tr>