@@ -7,6 +7,7 @@ import com.gxf.utilities.kafka.avro.AvroEncoder
7
7
import com.gxf.utilities.kafka.message.wrapper.SignableMessageWrapper
8
8
import java.io.IOException
9
9
import java.io.UncheckedIOException
10
+ import java.nio.ByteBuffer
10
11
import java.nio.charset.StandardCharsets
11
12
import java.security.*
12
13
import java.security.spec.X509EncodedKeySpec
@@ -83,7 +84,7 @@ class MessageSigner(properties: MessageSigningProperties) {
83
84
): ProducerRecord <String , out SpecificRecordBase > {
84
85
if (this .signingEnabled) {
85
86
val signature = this .signature(producerRecord)
86
- producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE , signature)
87
+ producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE , signature.array() )
87
88
}
88
89
return producerRecord
89
90
}
@@ -101,15 +102,15 @@ class MessageSigner(properties: MessageSigningProperties) {
101
102
* @throws UncheckedIOException if determining the bytes for the message throws an IOException.
102
103
* @throws UncheckedSecurityException if the signing process throws a SignatureException.
103
104
*/
104
- private fun signature (message : SignableMessageWrapper <* >): ByteArray {
105
+ private fun signature (message : SignableMessageWrapper <* >): ByteBuffer {
105
106
check(this .canSignMessages()) {
106
107
" This MessageSigner is not configured for signing, it can only be used for verification"
107
108
}
108
109
val oldSignature = message.getSignature()
109
110
message.setSignature(null )
110
- val byteArray = this .toByteArray (message)
111
+ val byteBuffer = this .toByteBuffer (message)
111
112
try {
112
- return signature(byteArray )
113
+ return signature(byteBuffer )
113
114
} catch (e: SignatureException ) {
114
115
throw UncheckedSecurityException (" Unable to sign message" , e)
115
116
} finally {
@@ -130,16 +131,16 @@ class MessageSigner(properties: MessageSigningProperties) {
130
131
* @throws UncheckedIOException if determining the bytes throws an IOException.
131
132
* @throws UncheckedSecurityException if the signing process throws a SignatureException.
132
133
*/
133
- private fun signature (producerRecord : ProducerRecord <String , out SpecificRecordBase >): ByteArray {
134
+ private fun signature (producerRecord : ProducerRecord <String , out SpecificRecordBase >): ByteBuffer {
134
135
check(this .canSignMessages()) {
135
136
" This MessageSigner is not configured for signing, it can only be used for verification"
136
137
}
137
138
val oldSignatureHeader = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE )
138
139
producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE )
139
140
val specificRecordBase = producerRecord.value()
140
- val byteArray = this .toByteArray (specificRecordBase)
141
+ val byteBuffer = this .toByteBuffer (specificRecordBase)
141
142
try {
142
- return signature(byteArray )
143
+ return signature(byteBuffer )
143
144
} catch (e: SignatureException ) {
144
145
throw UncheckedSecurityException (" Unable to sign message" , e)
145
146
} finally {
@@ -149,16 +150,16 @@ class MessageSigner(properties: MessageSigningProperties) {
149
150
}
150
151
}
151
152
152
- private fun signature (byteArray : ByteArray ): ByteArray {
153
- val messageBytes: ByteArray =
153
+ private fun signature (byteBuffer : ByteBuffer ): ByteBuffer {
154
+ val messageBytes: ByteBuffer =
154
155
if (this .stripAvroHeader) {
155
- this .stripAvroHeader(byteArray )
156
+ this .stripAvroHeader(byteBuffer )
156
157
} else {
157
- byteArray
158
+ byteBuffer
158
159
}
159
160
val signingSignature = signatureInstance(signatureAlgorithm, signatureProvider, signingKey!! )
160
161
signingSignature.update(messageBytes)
161
- return signingSignature.sign()
162
+ return ByteBuffer .wrap( signingSignature.sign() )
162
163
}
163
164
164
165
fun canVerifyMessageSignatures (): Boolean {
@@ -179,14 +180,14 @@ class MessageSigner(properties: MessageSigningProperties) {
179
180
180
181
val messageSignature = message.getSignature()
181
182
182
- if (messageSignature == null || messageSignature.isEmpty() ) {
183
+ if (messageSignature == null ) {
183
184
logger.error(" This message does not contain a signature" )
184
185
return false
185
186
}
186
187
187
188
try {
188
189
message.setSignature(null )
189
- return this .verifySignatureBytes(messageSignature, this .toByteArray (message))
190
+ return this .verifySignatureBytes(messageSignature, this .toByteBuffer (message))
190
191
} catch (e: Exception ) {
191
192
logger.error(" Unable to verify message signature" , e)
192
193
return false
@@ -221,50 +222,50 @@ class MessageSigner(properties: MessageSigningProperties) {
221
222
222
223
try {
223
224
val specificRecordBase: SpecificRecordBase = consumerRecord.value()
224
- return this .verifySignatureBytes(signatureBytes, this .toByteArray (specificRecordBase))
225
+ return this .verifySignatureBytes(ByteBuffer .wrap( signatureBytes) , this .toByteBuffer (specificRecordBase))
225
226
} catch (e: Exception ) {
226
227
logger.error(" Unable to verify message signature" , e)
227
228
return false
228
229
}
229
230
}
230
231
231
232
@Throws(SignatureException ::class )
232
- private fun verifySignatureBytes (signatureBytes : ByteArray , messageByteArray : ByteArray ): Boolean {
233
- val messageBytes: ByteArray =
233
+ private fun verifySignatureBytes (signatureBytes : ByteBuffer , messageByteBuffer : ByteBuffer ): Boolean {
234
+ val messageBytes: ByteBuffer =
234
235
if (this .stripAvroHeader) {
235
- this .stripAvroHeader(messageByteArray )
236
+ this .stripAvroHeader(messageByteBuffer )
236
237
} else {
237
- messageByteArray
238
+ messageByteBuffer
238
239
}
239
240
val verificationSignature = signatureInstance(signatureAlgorithm, signatureProvider, verificationKey!! )
240
241
verificationSignature.update(messageBytes)
241
- return verificationSignature.verify(signatureBytes)
242
+ return verificationSignature.verify(signatureBytes.array() )
242
243
}
243
244
244
- private fun hasAvroHeader (bytes : ByteArray ): Boolean {
245
- return (bytes.size >= AVRO_HEADER_LENGTH ) &&
245
+ private fun hasAvroHeader (bytes : ByteBuffer ): Boolean {
246
+ return (bytes.array(). size >= AVRO_HEADER_LENGTH ) &&
246
247
((bytes[0 ].toInt() and 0xFF ) == 0xC3 ) &&
247
248
((bytes[1 ].toInt() and 0xFF ) == 0x01 )
248
249
}
249
250
250
- private fun stripAvroHeader (bytes : ByteArray ): ByteArray {
251
+ private fun stripAvroHeader (bytes : ByteBuffer ): ByteBuffer {
251
252
if (this .hasAvroHeader(bytes)) {
252
- return Arrays .copyOfRange(bytes, AVRO_HEADER_LENGTH , bytes.size)
253
+ return ByteBuffer .wrap( Arrays .copyOfRange(bytes.array() , AVRO_HEADER_LENGTH , bytes.array(). size) )
253
254
}
254
255
return bytes
255
256
}
256
257
257
- private fun toByteArray (message : SignableMessageWrapper <* >): ByteArray {
258
+ private fun toByteBuffer (message : SignableMessageWrapper <* >): ByteBuffer {
258
259
try {
259
- return message.toByteArray ()
260
+ return message.toByteBuffer ()
260
261
} catch (e: IOException ) {
261
262
throw UncheckedIOException (" Unable to determine bytes for message" , e)
262
263
}
263
264
}
264
265
265
- private fun toByteArray (message : SpecificRecordBase ): ByteArray {
266
+ private fun toByteBuffer (message : SpecificRecordBase ): ByteBuffer {
266
267
try {
267
- return AvroEncoder .encode(message)
268
+ return ByteBuffer .wrap( AvroEncoder .encode(message) )
268
269
} catch (e: IOException ) {
269
270
throw UncheckedIOException (" Unable to determine bytes for message" , e)
270
271
}
0 commit comments