Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FDP-1737: enable signing and verifying in ProducerRecord signature header #9

Merged
merged 14 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ new DefaultKafkaConsumerFactory(
)
```

## kafka-message-signing
Library for signing Kafka messages and for verification of signed Kafka messages.

Two variations are supported:
- The signature is set on the message, via `SignableMessageWrapper`'s `signature` field;
- The signature is set as a `signature` header on the Kafka `ProducerRecord`.

The `MessageSigner` class is used for both signing and verifying a signature.

To sign a message, use `MessageSigner`'s `sign()` method: choose between `SignableMessageWrapper` or `ProducerRecord`.

To verify a signature, use `MessageSigner`'s `verify()` method: choose between `SignableMessageWrapper` or `ProducerRecord`.

The `MessageSigner` class can be created with the following configuration options:
- signingEnabled
- stripAvroHeader
- signatureAlgorithm
- signatureProvider
- signatureKeyAlgorithm
- signatureKeySize
- signingKey: from `java.security.PrivateKey` object, from a byte array or from a pem file
- verificationKey: `from java.security.PrivateKey` object, from a byte array or from a pem file

## oauth-token-client
Library that easily configures the [msal4j](https://github.com/AzureAD/microsoft-authentication-library-for-java) oauth token provider.
Expand Down
7 changes: 7 additions & 0 deletions kafka-message-signing/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@

dependencies {
implementation("org.springframework:spring-context")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.springframework.boot:spring-boot-autoconfigure")

api(libs.avro)

testImplementation("org.junit.jupiter:junit-jupiter-api")
testImplementation("org.junit.jupiter:junit-jupiter-engine")
testImplementation("org.assertj:assertj-core")
testImplementation("org.springframework:spring-test")
testImplementation("org.springframework.boot:spring-boot-test")
testImplementation("org.springframework.boot:spring-boot-starter")
}

tasks.test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.avro.message.BinaryMessageEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

public class MessageSigner {
public static final String DEFAULT_SIGNATURE_ALGORITHM = "SHA256withRSA";
Expand All @@ -35,27 +39,35 @@ public class MessageSigner {
// Two magic bytes (0xC3, 0x01) followed by an 8-byte fingerprint
public static final int AVRO_HEADER_LENGTH = 10;

public static final String RECORD_HEADER_KEY_SIGNATURE = "signature";

private final boolean signingEnabled;

private boolean stripHeaders;
private boolean stripAvroHeader;

private String signatureAlgorithm;
private String signatureProvider;
private String signatureKeyAlgorithm;
private int signatureKeySize;

private Signature signingSignature;
private Signature verificationSignature;
private final Signature signingSignature;
private final Signature verificationSignature;

private PrivateKey signingKey;
private PublicKey verificationKey;

private MessageSigner(final Builder builder) {
this.signingSignature =
signatureInstance(
builder.signatureAlgorithm, builder.signatureProvider, builder.signingKey);
this.verificationSignature =
signatureInstance(
builder.signatureAlgorithm, builder.signatureProvider, builder.verificationKey);
this.signingEnabled = builder.signingEnabled;
if (!this.signingEnabled) {
return;
}
this.stripHeaders = builder.stripHeaders;
this.stripAvroHeader = builder.stripAvroHeader;
this.signatureAlgorithm = builder.signatureAlgorithm;
this.signatureKeyAlgorithm = builder.signatureKeyAlgorithm;
this.signatureKeySize = builder.signatureKeySize;
Expand All @@ -65,12 +77,6 @@ private MessageSigner(final Builder builder) {
}
this.signingKey = builder.signingKey;
this.verificationKey = builder.verificationKey;
this.signingSignature =
signatureInstance(
builder.signatureAlgorithm, builder.signatureProvider, builder.signingKey);
this.verificationSignature =
signatureInstance(
builder.signatureAlgorithm, builder.signatureProvider, builder.verificationKey);
if (builder.signatureProvider != null) {
this.signatureProvider = builder.signatureProvider;
} else if (this.signingSignature != null) {
Expand Down Expand Up @@ -104,6 +110,23 @@ public void sign(final SignableMessageWrapper<?> message) {
}
}

/**
* Signs the provided {@code producerRecord} in the header, overwriting an existing signature, if a non-null value is
* already set.
*
* @param producerRecord the record to be signed
* @throws IllegalStateException if this message signer has a public key for signature
* verification, but does not have the private key needed for signing messages.
* @throws UncheckedIOException if determining the bytes for the message throws an IOException.
* @throws UncheckedSecurityException if the signing process throws a SignatureException.
*/
public void sign(final ProducerRecord<String, ? extends SpecificRecordBase> producerRecord) {
if (this.signingEnabled) {
final byte[] signature = this.signature(producerRecord);
producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature);
}
}

/**
* Determines the signature for the given {@code message}.
*
Expand All @@ -127,8 +150,8 @@ public byte[] signature(final SignableMessageWrapper<?> message) {
message.setSignature(null);
synchronized (this.signingSignature) {
final byte[] messageBytes;
if (this.stripHeaders) {
messageBytes = this.stripHeaders(this.toByteBuffer(message));
if (this.stripAvroHeader) {
messageBytes = this.stripAvroHeader(this.toByteBuffer(message));
} else {
messageBytes = this.toByteBuffer(message).array();
}
Expand All @@ -142,6 +165,47 @@ public byte[] signature(final SignableMessageWrapper<?> message) {
}
}

/**
* Determines the signature for the given {@code producerRecord}.
*
* <p>The value for the signature in the record will be set to {@code null} to properly determine
* the signature, but is restored to its original value before this method returns.
*
* @param producerRecord the record to be signed
* @return the signature for the record
* @throws IllegalStateException if this message signer has a public key for signature
* verification, but does not have the private key needed for signing messages.
* @throws UncheckedIOException if determining the bytes throws an IOException.
* @throws UncheckedSecurityException if the signing process throws a SignatureException.
*/
public byte[] signature(final ProducerRecord<String, ? extends SpecificRecordBase> producerRecord) {
if (!this.canSignMessages()) {
throw new IllegalStateException(
"This MessageSigner is not configured for signing, it can only be used for verification");
}
final Header oldSignatureHeader = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE);
try {
producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE);
synchronized (this.signingSignature) {
final byte[] messageBytes;
final SpecificRecordBase specificRecordBase = producerRecord.value();
if (this.stripAvroHeader) {
messageBytes = this.stripAvroHeader(this.toByteBuffer(specificRecordBase));
} else {
messageBytes = this.toByteBuffer(specificRecordBase).array();
}
this.signingSignature.update(messageBytes);
return this.signingSignature.sign();
}
} catch (final SignatureException e) {
throw new UncheckedSecurityException("Unable to sign message", e);
} finally {
if(oldSignatureHeader != null) {
producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, oldSignatureHeader.value());
}
}
}

public boolean canVerifyMessageSignatures() {
return this.signingEnabled && this.verificationSignature != null;
}
Expand Down Expand Up @@ -175,14 +239,7 @@ public boolean verify(final SignableMessageWrapper<?> message) {
try {
message.setSignature(null);
synchronized (this.verificationSignature) {
final byte[] messageBytes;
if (this.stripHeaders) {
messageBytes = this.stripHeaders(this.toByteBuffer(message));
} else {
messageBytes = this.toByteBuffer(message).array();
}
this.verificationSignature.update(messageBytes);
return this.verificationSignature.verify(signatureBytes);
return this.verifySignatureBytes(signatureBytes, this.toByteBuffer(message));
}
} catch (final SignatureException e) {
throw new UncheckedSecurityException("Unable to verify message signature", e);
Expand All @@ -192,13 +249,63 @@ public boolean verify(final SignableMessageWrapper<?> message) {
}
}

/**
* Verifies the signature of the provided {@code producerRecord}.
*
* @param producerRecord the record to be verified
* @return {@code true} if the signature of the given {@code producerRecord} was verified; {@code false}
* if not.
* @throws IllegalStateException if this message signer has a private key needed for signing
* messages, but does not have the public key for signature verification.
* @throws UncheckedIOException if determining the bytes throws an IOException.
* @throws UncheckedSecurityException if the signature verification process throws a
* SignatureException.
*/
public boolean verify(final ProducerRecord<String, ? extends SpecificRecordBase> producerRecord) {
if (!this.canVerifyMessageSignatures()) {
throw new IllegalStateException(
"This MessageSigner is not configured for verification, it can only be used for signing");
}

final Header header = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE);
if(header == null) {
throw new IllegalStateException(
"This ProducerRecord does not contain a signature header");
}
final byte[] signatureBytes = header.value();
if (signatureBytes == null || signatureBytes.length == 0) {
return false;
}

try {
producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE);
synchronized (this.verificationSignature) {
final SpecificRecordBase specificRecordBase = producerRecord.value();
return this.verifySignatureBytes(signatureBytes, this.toByteBuffer(specificRecordBase));
}
} catch (final SignatureException e) {
throw new UncheckedSecurityException("Unable to verify message signature", e);
}
}

private boolean verifySignatureBytes(final byte[] signatureBytes, final ByteBuffer messageByteBuffer) throws SignatureException {
final byte[] messageBytes;
if (this.stripAvroHeader) {
messageBytes = this.stripAvroHeader(messageByteBuffer);
} else {
messageBytes = messageByteBuffer.array();
}
this.verificationSignature.update(messageBytes);
return this.verificationSignature.verify(signatureBytes);
}

private boolean hasAvroHeader(final byte[] bytes) {
return bytes.length >= AVRO_HEADER_LENGTH
&& (bytes[0] & 0xFF) == 0xC3
&& (bytes[1] & 0xFF) == 0x01;
}

private byte[] stripHeaders(final ByteBuffer byteBuffer) {
private byte[] stripAvroHeader(final ByteBuffer byteBuffer) {
final byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
if (this.hasAvroHeader(bytes)) {
Expand All @@ -215,6 +322,14 @@ private ByteBuffer toByteBuffer(final SignableMessageWrapper<?> message) {
}
}

private ByteBuffer toByteBuffer(final SpecificRecordBase message) {
try {
return new BinaryMessageEncoder<>(message.getSpecificData(), message.getSchema()).encode(message);
} catch (final IOException e) {
throw new UncheckedIOException("Unable to determine ByteBuffer for Message", e);
}
}

public boolean isSigningEnabled() {
return this.signingEnabled;
}
Expand Down Expand Up @@ -338,7 +453,7 @@ public static final class Builder {

private boolean signingEnabled;

private boolean stripHeaders;
private boolean stripAvroHeader;

private String signatureAlgorithm = DEFAULT_SIGNATURE_ALGORITHM;
private String signatureProvider = DEFAULT_SIGNATURE_PROVIDER;
Expand All @@ -353,8 +468,8 @@ public Builder signingEnabled(final boolean signingEnabled) {
return this;
}

public Builder stripHeaders(final boolean stripHeaders) {
this.stripHeaders = stripHeaders;
public Builder stripAvroHeader(final boolean stripAvroHeader) {
this.stripAvroHeader = stripAvroHeader;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.alliander.osgp.kafka.message.signing;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;

@Configuration
public class MessageSigningAutoConfiguration {

@Value("${message-signing.enabled}")
private boolean signingEnabled;

@Value("${message-signing.strip-headers}")
private boolean stripHeaders;

@Value("${message-signing.signature.algorithm:SHA256withRSA}")
private String signatureAlgorithm;

@Value("${message-signing.signature.provider:SunRsaSign}")
private String signatureProvider;

@Value("${message-signing.signature.key.algorithm:RSA}")
private String signatureKeyAlgorithm;

@Value("${message-signing.signature.key.size:2048}")
private int signatureKeySize;

@Value("${message-signing.signature.key.private:#{null}}")
private Resource signingKeyResource;

@Value("${message-signing.signature.key.public:#{null}}")
private Resource verificationKeyResource;

@Bean
public MessageSigner messageSigner() {
if(this.signingEnabled) {
return MessageSigner.newBuilder()
.signingEnabled(this.signingEnabled)
.stripAvroHeader(this.stripHeaders)
.signatureAlgorithm(this.signatureAlgorithm)
.signatureProvider(this.signatureProvider)
.signatureKeyAlgorithm(this.signatureKeyAlgorithm)
.signatureKeySize(this.signatureKeySize)
.signingKey(this.readKeyFromPemResource(this.signingKeyResource))
.verificationKey(this.readKeyFromPemResource(this.verificationKeyResource))
.build();
} else {
return MessageSigner.newBuilder()
.signingEnabled(false)
.build();
}
}

private String readKeyFromPemResource(final Resource keyResource) {
if (keyResource == null) {
return null;
}
try {
return keyResource.getContentAsString(StandardCharsets.ISO_8859_1);
} catch (final IOException e) {
throw new UncheckedIOException("Unable to read " + keyResource.getFilename() + " as ISO-LATIN-1 PEM text", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.alliander.osgp.kafka.message.signing.MessageSigningAutoConfiguration
Loading
Loading