Skip to content

Commit 37356bf

Browse files
LLParsejunrao
authored andcommitted
kafka-1493; Use a well-documented LZ4 compression format and remove redundant LZ4HC option; patched by James Oliver; reviewed by Jun Rao
1 parent 4271ecb commit 37356bf

File tree

14 files changed

+698
-58
lines changed

14 files changed

+698
-58
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public class ProducerConfig extends AbstractConfig {
153153

154154
/** <code>compression.type</code> */
155155
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
156-
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>lz4hc</code>. "
156+
private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. "
157157
+ "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
158158

159159
/** <code>metrics.sample.window.ms</code> */
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.common.message;
19+
20+
import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
21+
import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
22+
import static org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.MAGIC;
23+
24+
import java.io.FilterInputStream;
25+
import java.io.IOException;
26+
import java.io.InputStream;
27+
28+
import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.BD;
29+
import org.apache.kafka.common.message.KafkaLZ4BlockOutputStream.FLG;
30+
import org.apache.kafka.common.utils.Utils;
31+
32+
import net.jpountz.lz4.LZ4Exception;
33+
import net.jpountz.lz4.LZ4Factory;
34+
import net.jpountz.lz4.LZ4SafeDecompressor;
35+
import net.jpountz.xxhash.XXHash32;
36+
import net.jpountz.xxhash.XXHashFactory;
37+
38+
/**
39+
* A partial implementation of the v1.4.1 LZ4 Frame format.
40+
*
41+
* @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing Format Spec</a>
42+
*/
43+
public final class KafkaLZ4BlockInputStream extends FilterInputStream {
44+
45+
public static final String PREMATURE_EOS = "Stream ended prematurely";
46+
public static final String NOT_SUPPORTED = "Stream unsupported";
47+
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
48+
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
49+
50+
private final LZ4SafeDecompressor decompressor;
51+
private final XXHash32 checksum;
52+
private final byte[] buffer;
53+
private final byte[] compressedBuffer;
54+
private final int maxBlockSize;
55+
private FLG flg;
56+
private BD bd;
57+
private int bufferOffset;
58+
private int bufferSize;
59+
private boolean finished;
60+
61+
/**
62+
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
63+
*
64+
* @param in The stream to decompress
65+
* @throws IOException
66+
*/
67+
public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
68+
super(in);
69+
decompressor = LZ4Factory.fastestInstance().safeDecompressor();
70+
checksum = XXHashFactory.fastestInstance().hash32();
71+
readHeader();
72+
maxBlockSize = bd.getBlockMaximumSize();
73+
buffer = new byte[maxBlockSize];
74+
compressedBuffer = new byte[maxBlockSize];
75+
bufferOffset = 0;
76+
bufferSize = 0;
77+
finished = false;
78+
}
79+
80+
/**
81+
* Reads the magic number and frame descriptor from the underlying {@link InputStream}.
82+
*
83+
* @throws IOException
84+
*/
85+
private void readHeader() throws IOException {
86+
byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
87+
88+
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
89+
bufferOffset = 6;
90+
if (in.read(header, 0, bufferOffset) != bufferOffset) {
91+
throw new IOException(PREMATURE_EOS);
92+
}
93+
94+
if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset-6)) {
95+
throw new IOException(NOT_SUPPORTED);
96+
}
97+
flg = FLG.fromByte(header[bufferOffset-2]);
98+
bd = BD.fromByte(header[bufferOffset-1]);
99+
// TODO read uncompressed content size, update flg.validate()
100+
// TODO read dictionary id, update flg.validate()
101+
102+
// check stream descriptor hash
103+
byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
104+
header[bufferOffset++] = (byte) in.read();
105+
if (hash != header[bufferOffset-1]) {
106+
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
107+
}
108+
}
109+
110+
/**
111+
* Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum,
112+
* and writes the result to a buffer.
113+
*
114+
* @throws IOException
115+
*/
116+
private void readBlock() throws IOException {
117+
int blockSize = Utils.readUnsignedIntLE(in);
118+
119+
// Check for EndMark
120+
if (blockSize == 0) {
121+
finished = true;
122+
// TODO implement content checksum, update flg.validate()
123+
return;
124+
} else if (blockSize > maxBlockSize) {
125+
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
126+
}
127+
128+
boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
129+
byte[] bufferToRead;
130+
if (compressed) {
131+
bufferToRead = compressedBuffer;
132+
} else {
133+
blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
134+
bufferToRead = buffer;
135+
bufferSize = blockSize;
136+
}
137+
138+
if (in.read(bufferToRead, 0, blockSize) != blockSize) {
139+
throw new IOException(PREMATURE_EOS);
140+
}
141+
142+
// verify checksum
143+
if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
144+
throw new IOException(BLOCK_HASH_MISMATCH);
145+
}
146+
147+
if (compressed) {
148+
try {
149+
bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
150+
} catch (LZ4Exception e) {
151+
throw new IOException(e);
152+
}
153+
}
154+
155+
bufferOffset = 0;
156+
}
157+
158+
@Override
159+
public int read() throws IOException {
160+
if (finished) {
161+
return -1;
162+
}
163+
if (available() == 0) {
164+
readBlock();
165+
}
166+
if (finished) {
167+
return -1;
168+
}
169+
int value = buffer[bufferOffset++] & 0xFF;
170+
171+
return value;
172+
}
173+
174+
@Override
175+
public int read(byte b[], int off, int len) throws IOException {
176+
net.jpountz.util.Utils.checkRange(b, off, len);
177+
if (finished) {
178+
return -1;
179+
}
180+
if (available() == 0) {
181+
readBlock();
182+
}
183+
if (finished) {
184+
return -1;
185+
}
186+
len = Math.min(len, available());
187+
System.arraycopy(buffer, bufferOffset, b, off, len);
188+
bufferOffset += len;
189+
return len;
190+
}
191+
192+
@Override
193+
public long skip(long n) throws IOException {
194+
if (finished) {
195+
return 0;
196+
}
197+
if (available() == 0) {
198+
readBlock();
199+
}
200+
if (finished) {
201+
return 0;
202+
}
203+
n = Math.min(n, available());
204+
bufferOffset += n;
205+
return n;
206+
}
207+
208+
@Override
209+
public int available() throws IOException {
210+
return bufferSize - bufferOffset;
211+
}
212+
213+
@Override
214+
public void close() throws IOException {
215+
in.close();
216+
}
217+
218+
@Override
219+
public synchronized void mark(int readlimit) {
220+
throw new RuntimeException("mark not supported");
221+
}
222+
223+
@Override
224+
public synchronized void reset() throws IOException {
225+
throw new RuntimeException("reset not supported");
226+
}
227+
228+
@Override
229+
public boolean markSupported() {
230+
return false;
231+
}
232+
233+
}

0 commit comments

Comments
 (0)