25
25
import java .util .Objects ;
26
26
import java .util .Optional ;
27
27
import java .util .regex .Pattern ;
28
+ import org .apache .avro .message .BinaryMessageEncoder ;
29
+ import org .apache .avro .specific .SpecificRecordBase ;
30
+ import org .apache .kafka .clients .producer .ProducerRecord ;
31
+ import org .apache .kafka .common .header .Header ;
28
32
29
33
public class MessageSigner {
30
34
public static final String DEFAULT_SIGNATURE_ALGORITHM = "SHA256withRSA" ;
@@ -35,27 +39,35 @@ public class MessageSigner {
35
39
// Two magic bytes (0xC3, 0x01) followed by an 8-byte fingerprint
36
40
public static final int AVRO_HEADER_LENGTH = 10 ;
37
41
42
+ public static final String RECORD_HEADER_KEY_SIGNATURE = "signature" ;
43
+
38
44
private final boolean signingEnabled ;
39
45
40
- private boolean stripHeaders ;
46
+ private boolean stripAvroHeader ;
41
47
42
48
private String signatureAlgorithm ;
43
49
private String signatureProvider ;
44
50
private String signatureKeyAlgorithm ;
45
51
private int signatureKeySize ;
46
52
47
- private Signature signingSignature ;
48
- private Signature verificationSignature ;
53
+ private final Signature signingSignature ;
54
+ private final Signature verificationSignature ;
49
55
50
56
private PrivateKey signingKey ;
51
57
private PublicKey verificationKey ;
52
58
53
59
private MessageSigner (final Builder builder ) {
60
+ this .signingSignature =
61
+ signatureInstance (
62
+ builder .signatureAlgorithm , builder .signatureProvider , builder .signingKey );
63
+ this .verificationSignature =
64
+ signatureInstance (
65
+ builder .signatureAlgorithm , builder .signatureProvider , builder .verificationKey );
54
66
this .signingEnabled = builder .signingEnabled ;
55
67
if (!this .signingEnabled ) {
56
68
return ;
57
69
}
58
- this .stripHeaders = builder .stripHeaders ;
70
+ this .stripAvroHeader = builder .stripAvroHeader ;
59
71
this .signatureAlgorithm = builder .signatureAlgorithm ;
60
72
this .signatureKeyAlgorithm = builder .signatureKeyAlgorithm ;
61
73
this .signatureKeySize = builder .signatureKeySize ;
@@ -65,12 +77,6 @@ private MessageSigner(final Builder builder) {
65
77
}
66
78
this .signingKey = builder .signingKey ;
67
79
this .verificationKey = builder .verificationKey ;
68
- this .signingSignature =
69
- signatureInstance (
70
- builder .signatureAlgorithm , builder .signatureProvider , builder .signingKey );
71
- this .verificationSignature =
72
- signatureInstance (
73
- builder .signatureAlgorithm , builder .signatureProvider , builder .verificationKey );
74
80
if (builder .signatureProvider != null ) {
75
81
this .signatureProvider = builder .signatureProvider ;
76
82
} else if (this .signingSignature != null ) {
@@ -104,6 +110,23 @@ public void sign(final SignableMessageWrapper<?> message) {
104
110
}
105
111
}
106
112
113
+ /**
114
+ * Signs the provided {@code producerRecord} in the header, overwriting an existing signature, if a non-null value is
115
+ * already set.
116
+ *
117
+ * @param producerRecord the record to be signed
118
+ * @throws IllegalStateException if this message signer has a public key for signature
119
+ * verification, but does not have the private key needed for signing messages.
120
+ * @throws UncheckedIOException if determining the bytes for the message throws an IOException.
121
+ * @throws UncheckedSecurityException if the signing process throws a SignatureException.
122
+ */
123
+ public void sign (final ProducerRecord <String , ? extends SpecificRecordBase > producerRecord ) {
124
+ if (this .signingEnabled ) {
125
+ final byte [] signature = this .signature (producerRecord );
126
+ producerRecord .headers ().add (RECORD_HEADER_KEY_SIGNATURE , signature );
127
+ }
128
+ }
129
+
107
130
/**
108
131
* Determines the signature for the given {@code message}.
109
132
*
@@ -127,8 +150,8 @@ public byte[] signature(final SignableMessageWrapper<?> message) {
127
150
message .setSignature (null );
128
151
synchronized (this .signingSignature ) {
129
152
final byte [] messageBytes ;
130
- if (this .stripHeaders ) {
131
- messageBytes = this .stripHeaders (this .toByteBuffer (message ));
153
+ if (this .stripAvroHeader ) {
154
+ messageBytes = this .stripAvroHeader (this .toByteBuffer (message ));
132
155
} else {
133
156
messageBytes = this .toByteBuffer (message ).array ();
134
157
}
@@ -142,6 +165,47 @@ public byte[] signature(final SignableMessageWrapper<?> message) {
142
165
}
143
166
}
144
167
168
+ /**
169
+ * Determines the signature for the given {@code producerRecord}.
170
+ *
171
+ * <p>The value for the signature in the record will be set to {@code null} to properly determine
172
+ * the signature, but is restored to its original value before this method returns.
173
+ *
174
+ * @param producerRecord the record to be signed
175
+ * @return the signature for the record
176
+ * @throws IllegalStateException if this message signer has a public key for signature
177
+ * verification, but does not have the private key needed for signing messages.
178
+ * @throws UncheckedIOException if determining the bytes throws an IOException.
179
+ * @throws UncheckedSecurityException if the signing process throws a SignatureException.
180
+ */
181
+ public byte [] signature (final ProducerRecord <String , ? extends SpecificRecordBase > producerRecord ) {
182
+ if (!this .canSignMessages ()) {
183
+ throw new IllegalStateException (
184
+ "This MessageSigner is not configured for signing, it can only be used for verification" );
185
+ }
186
+ final Header oldSignatureHeader = producerRecord .headers ().lastHeader (RECORD_HEADER_KEY_SIGNATURE );
187
+ try {
188
+ producerRecord .headers ().remove (RECORD_HEADER_KEY_SIGNATURE );
189
+ synchronized (this .signingSignature ) {
190
+ final byte [] messageBytes ;
191
+ final SpecificRecordBase specificRecordBase = producerRecord .value ();
192
+ if (this .stripAvroHeader ) {
193
+ messageBytes = this .stripAvroHeader (this .toByteBuffer (specificRecordBase ));
194
+ } else {
195
+ messageBytes = this .toByteBuffer (specificRecordBase ).array ();
196
+ }
197
+ this .signingSignature .update (messageBytes );
198
+ return this .signingSignature .sign ();
199
+ }
200
+ } catch (final SignatureException e ) {
201
+ throw new UncheckedSecurityException ("Unable to sign message" , e );
202
+ } finally {
203
+ if (oldSignatureHeader != null ) {
204
+ producerRecord .headers ().add (RECORD_HEADER_KEY_SIGNATURE , oldSignatureHeader .value ());
205
+ }
206
+ }
207
+ }
208
+
145
209
public boolean canVerifyMessageSignatures () {
146
210
return this .signingEnabled && this .verificationSignature != null ;
147
211
}
@@ -175,14 +239,7 @@ public boolean verify(final SignableMessageWrapper<?> message) {
175
239
try {
176
240
message .setSignature (null );
177
241
synchronized (this .verificationSignature ) {
178
- final byte [] messageBytes ;
179
- if (this .stripHeaders ) {
180
- messageBytes = this .stripHeaders (this .toByteBuffer (message ));
181
- } else {
182
- messageBytes = this .toByteBuffer (message ).array ();
183
- }
184
- this .verificationSignature .update (messageBytes );
185
- return this .verificationSignature .verify (signatureBytes );
242
+ return this .verifySignatureBytes (signatureBytes , this .toByteBuffer (message ));
186
243
}
187
244
} catch (final SignatureException e ) {
188
245
throw new UncheckedSecurityException ("Unable to verify message signature" , e );
@@ -192,13 +249,63 @@ public boolean verify(final SignableMessageWrapper<?> message) {
192
249
}
193
250
}
194
251
252
+ /**
253
+ * Verifies the signature of the provided {@code producerRecord}.
254
+ *
255
+ * @param producerRecord the record to be verified
256
+ * @return {@code true} if the signature of the given {@code producerRecord} was verified; {@code false}
257
+ * if not.
258
+ * @throws IllegalStateException if this message signer has a private key needed for signing
259
+ * messages, but does not have the public key for signature verification.
260
+ * @throws UncheckedIOException if determining the bytes throws an IOException.
261
+ * @throws UncheckedSecurityException if the signature verification process throws a
262
+ * SignatureException.
263
+ */
264
+ public boolean verify (final ProducerRecord <String , ? extends SpecificRecordBase > producerRecord ) {
265
+ if (!this .canVerifyMessageSignatures ()) {
266
+ throw new IllegalStateException (
267
+ "This MessageSigner is not configured for verification, it can only be used for signing" );
268
+ }
269
+
270
+ final Header header = producerRecord .headers ().lastHeader (RECORD_HEADER_KEY_SIGNATURE );
271
+ if (header == null ) {
272
+ throw new IllegalStateException (
273
+ "This ProducerRecord does not contain a signature header" );
274
+ }
275
+ final byte [] signatureBytes = header .value ();
276
+ if (signatureBytes == null || signatureBytes .length == 0 ) {
277
+ return false ;
278
+ }
279
+
280
+ try {
281
+ producerRecord .headers ().remove (RECORD_HEADER_KEY_SIGNATURE );
282
+ synchronized (this .verificationSignature ) {
283
+ final SpecificRecordBase specificRecordBase = producerRecord .value ();
284
+ return this .verifySignatureBytes (signatureBytes , this .toByteBuffer (specificRecordBase ));
285
+ }
286
+ } catch (final SignatureException e ) {
287
+ throw new UncheckedSecurityException ("Unable to verify message signature" , e );
288
+ }
289
+ }
290
+
291
+ private boolean verifySignatureBytes (final byte [] signatureBytes , final ByteBuffer messageByteBuffer ) throws SignatureException {
292
+ final byte [] messageBytes ;
293
+ if (this .stripAvroHeader ) {
294
+ messageBytes = this .stripAvroHeader (messageByteBuffer );
295
+ } else {
296
+ messageBytes = messageByteBuffer .array ();
297
+ }
298
+ this .verificationSignature .update (messageBytes );
299
+ return this .verificationSignature .verify (signatureBytes );
300
+ }
301
+
195
302
private boolean hasAvroHeader (final byte [] bytes ) {
196
303
return bytes .length >= AVRO_HEADER_LENGTH
197
304
&& (bytes [0 ] & 0xFF ) == 0xC3
198
305
&& (bytes [1 ] & 0xFF ) == 0x01 ;
199
306
}
200
307
201
- private byte [] stripHeaders (final ByteBuffer byteBuffer ) {
308
+ private byte [] stripAvroHeader (final ByteBuffer byteBuffer ) {
202
309
final byte [] bytes = new byte [byteBuffer .remaining ()];
203
310
byteBuffer .get (bytes );
204
311
if (this .hasAvroHeader (bytes )) {
@@ -215,6 +322,14 @@ private ByteBuffer toByteBuffer(final SignableMessageWrapper<?> message) {
215
322
}
216
323
}
217
324
325
+ private ByteBuffer toByteBuffer (final SpecificRecordBase message ) {
326
+ try {
327
+ return new BinaryMessageEncoder <>(message .getSpecificData (), message .getSchema ()).encode (message );
328
+ } catch (final IOException e ) {
329
+ throw new UncheckedIOException ("Unable to determine ByteBuffer for Message" , e );
330
+ }
331
+ }
332
+
218
333
public boolean isSigningEnabled () {
219
334
return this .signingEnabled ;
220
335
}
@@ -338,7 +453,7 @@ public static final class Builder {
338
453
339
454
private boolean signingEnabled ;
340
455
341
- private boolean stripHeaders ;
456
+ private boolean stripAvroHeader ;
342
457
343
458
private String signatureAlgorithm = DEFAULT_SIGNATURE_ALGORITHM ;
344
459
private String signatureProvider = DEFAULT_SIGNATURE_PROVIDER ;
@@ -353,8 +468,8 @@ public Builder signingEnabled(final boolean signingEnabled) {
353
468
return this ;
354
469
}
355
470
356
- public Builder stripHeaders (final boolean stripHeaders ) {
357
- this .stripHeaders = stripHeaders ;
471
+ public Builder stripAvroHeader (final boolean stripAvroHeader ) {
472
+ this .stripAvroHeader = stripAvroHeader ;
358
473
return this ;
359
474
}
360
475
0 commit comments