diff --git a/README.md b/README.md index 600e2e9..4710eb2 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,121 @@ # GXF Java utilities -This repository contains utility libraries for GXF java applications. +This repository contains utility libraries for GXF java applications. ## kafka-azure-oauth + The library makes it possible to configure OAUTH Kafka authentication using environment variables OAuth is enabled by setting the `kafka-oauth` profile. The following environment variables are required: + - `AZURE_CLIENT_ID`: client id of the oauth server -- `AZURE_AUTHORITY_HOST`: url of the oauth server (ex: http://oauth-server.com/) +- `AZURE_AUTHORITY_HOST`: url of the oauth server (ex: http://oauth-server.com/) - `AZURE_TENANT_ID`: Tenant id of the oauth resource - `AZURE_FEDERATED_TOKEN_FILE`: File containing the oauth token. - `OAUTH_SCOPE`: Scope of the oauth client - ## kafka-avro + Library containing a Kafka serializer and deserializer for avro objects. It can be configured with the encoder / decoder of a specific object using: + ```java new DefaultKafkaConsumerFactory( - kafkaProperties.buildConsumerProperties(), - ErrorHandlingDeserializer(AvroSerializer(AvroMessage.getEncoder())), - ErrorHandlingDeserializer(AvroDeserializer(AvroMessage.getDecoder())) + kafkaProperties.buildConsumerProperties(), + ErrorHandlingDeserializer(AvroSerializer(AvroMessage.getEncoder())), + ErrorHandlingDeserializer(AvroDeserializer(AvroMessage.getDecoder())) ) ``` ## kafka-message-signing + Library for signing Kafka messages and for verification of signed Kafka messages. Two variations are supported: + - The signature is set on the message, via `SignableMessageWrapper`'s `signature` field; - The signature is set as a `signature` header on the Kafka `ProducerRecord`. The `MessageSigner` class is used for both signing and verifying a signature. -To sign a message, use `MessageSigner`'s `sign()` method: choose between `SignableMessageWrapper` or `ProducerRecord`. +To sign a message, use one of `MessageSigner`'s sign methods: +- `signUsingField(...)` using the `SignableMessageWrapper` to wrap your Avro object; +- `signUsingHeader(...)` to add the signature to the Avro object's header. -To verify a signature, use `MessageSigner`'s `verify()` method: choose between `SignableMessageWrapper` or `ProducerRecord`. - -The `MessageSigner` class can be created using `MessageSigner.newBuilder()` with the following configuration options: -- signingEnabled -- stripAvroHeader -- signatureAlgorithm -- signatureProvider -- signatureKeyAlgorithm -- signatureKeySize -- signingKey: from `java.security.PrivateKey` object, from a byte array or from a pem file -- verificationKey: `from java.security.PrivateKey` object, from a byte array or from a pem file +To verify a signature, use one of `MessageSigner`'s verify methods: +- `verifyUsingField(...)` using the `SignableMessageWrapper` to wrap your Avro object; +- `verifyUsingHeader(...)` to read the signature from the Avro object's header. ### Spring Auto Configuration + You can configure the settings in your `application.yaml` (or properties), for example: + ```yaml message-signing: - enabled: true - strip-headers: true - signature: - algorithm: SHA256withRSA - provider: SunRsaSign - key: - algorithm: RSA - size: 2048 + signing-enabled: true + strip-avro-header: true + signature-algorithm: SHA256withRSA + signature-provider: SunRsaSign + key-algorithm: RSA + private-key-file: classpath:rsa-private.pem + public-key-file: classpath:rsa-public.pem ``` ### Custom or multiple certificates configuration + You can create your own `MessageSigningProperties` object and use `MessageSigner.newMessageSigner(props)`. -Spring Boot users can extend the `MessageSigningProperties` to add @ConfigurationProperties capabilities and/or to support multiple configurations +Spring Boot users can extend the `MessageSigningProperties` to add `@ConfigurationProperties` capabilities and/or to +support multiple configurations. + +If you want to support multiple keys, you also have to instantiate multiple `MessageSigner` beans. + +Auto configuration (see above) will see when you defined your own MessageSigner or MessageSigningProperties bean and will not auto-create one. + +```java +@ConfigurationProperties(prefix = "your-app.your-message-signing") +class YourMessageSigningProperties extends MessageSigningProperties { +} + +@Configuration +class MessageSigningConfiguration { + @Bean + public MessageSigner yourMessageSigner(YourMessageSigningProperties yourMessageSigningProperties) { + return new MessageSigner(yourMessageSigningProperties); + } +} +``` +### Creating a public/private key for signing + +To generate a public/private keypair, use these two commands: + +```shell +openssl genrsa -out rsa-private.pem 2048 +openssl rsa -in rsa-private.pem -pubout -out rsa-public.pem +``` + +The private key (PKCS#8) and public key can then be set in the configuration as shown above. +To view the generated files, you can use + +```shell +openssl rsa -noout -text -in rsa-private.pem +openssl rsa -noout -text -pubin -in rsa-public.pem +``` ## oauth-token-client -Library that easily configures the [msal4j](https://github.com/AzureAD/microsoft-authentication-library-for-java) oauth token provider. + +Library that easily configures the [msal4j](https://github.com/AzureAD/microsoft-authentication-library-for-java) oauth +token provider. The client requires the following properties: + ```properties # If not set to true no other configuration is required oauth.client.enabled=true - oauth.client.token-endpoint=https://localhost:56788/token oauth.client.client-id=client-id oauth.client.scope=client-scope - # Resources oauth.client.private-key=classpath:keys/private-key.key oauth.client.certificate=classpath:keys/certificate.crt diff --git a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/MessageSigner.java b/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/MessageSigner.java deleted file mode 100644 index a8e6b55..0000000 --- a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/MessageSigner.java +++ /dev/null @@ -1,564 +0,0 @@ -// SPDX-FileCopyrightText: Copyright Contributors to the GXF project -// -// SPDX-License-Identifier: Apache-2.0 - -package com.alliander.osgp.kafka.message.signing; - -import com.alliander.osgp.kafka.message.wrapper.SignableMessageWrapper; -import org.apache.avro.message.BinaryMessageEncoder; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.header.Header; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.ByteBuffer; -import java.security.*; -import java.security.spec.PKCS8EncodedKeySpec; -import java.security.spec.X509EncodedKeySpec; -import java.util.Arrays; -import java.util.Base64; -import java.util.Objects; -import java.util.Optional; -import java.util.regex.Pattern; - -public class MessageSigner { - public static final String DEFAULT_SIGNATURE_ALGORITHM = "SHA256withRSA"; - public static final String DEFAULT_SIGNATURE_PROVIDER = "SunRsaSign"; - public static final String DEFAULT_SIGNATURE_KEY_ALGORITHM = "RSA"; - public static final int DEFAULT_SIGNATURE_KEY_SIZE = 2048; - - // Two magic bytes (0xC3, 0x01) followed by an 8-byte fingerprint - public static final int AVRO_HEADER_LENGTH = 10; - - public static final String RECORD_HEADER_KEY_SIGNATURE = "signature"; - - private final boolean signingEnabled; - - private boolean stripAvroHeader; - - private String signatureAlgorithm; - private String signatureProvider; - private String signatureKeyAlgorithm; - private int signatureKeySize; - - private final Signature signingSignature; - private final Signature verificationSignature; - - private PrivateKey signingKey; - private PublicKey verificationKey; - - private MessageSigner(final MessageSigningProperties properties) { - this.signingSignature = - signatureInstance( - properties.getSignatureAlgorithm(), properties.getSignatureProvider(), properties.getSigningKey()); - this.verificationSignature = - signatureInstance( - properties.getSignatureAlgorithm(), properties.getSignatureProvider(), properties.getVerificationKey()); - this.signingEnabled = properties.getSigningEnabled(); - if (!this.signingEnabled) { - return; - } - this.stripAvroHeader = properties.getStripAvroHeader(); - this.signatureAlgorithm = properties.getSignatureAlgorithm(); - this.signatureKeyAlgorithm = properties.getSignatureKeyAlgorithm(); - this.signatureKeySize = properties.getSignatureKeySize(); - if (properties.getSigningKey() == null && properties.getVerificationKey() == null) { - throw new IllegalArgumentException( - "A signing key (PrivateKey) or verification key (PublicKey) must be provided"); - } - this.signingKey = properties.getSigningKey(); - this.verificationKey = properties.getVerificationKey(); - if (properties.getSignatureProvider() != null) { - this.signatureProvider = properties.getSignatureProvider(); - } else if (this.signingSignature != null) { - this.signatureProvider = this.signingSignature.getProvider().getName(); - } else if (this.verificationSignature != null) { - this.signatureProvider = this.verificationSignature.getProvider().getName(); - } else { - // Should not happen, set to null and ignore. - this.signatureProvider = null; - } - } - - public boolean canSignMessages() { - return this.signingEnabled && this.signingSignature != null; - } - - /** - * Signs the provided {@code message}, overwriting an existing signature, if a non-null value is - * already set. - * - * @param message the message to be signed - * @throws IllegalStateException if this message signer has a public key for signature - * verification, but does not have the private key needed for signing messages. - * @throws UncheckedIOException if determining the bytes for the message throws an IOException. - * @throws UncheckedSecurityException if the signing process throws a SignatureException. - */ - public void sign(final SignableMessageWrapper message) { - if (this.signingEnabled) { - final byte[] signatureBytes = this.signature(message); - message.setSignature(ByteBuffer.wrap(signatureBytes)); - } - } - - /** - * Signs the provided {@code producerRecord} in the header, overwriting an existing signature, if a non-null value is - * already set. - * - * @param producerRecord the record to be signed - * @throws IllegalStateException if this message signer has a public key for signature - * verification, but does not have the private key needed for signing messages. - * @throws UncheckedIOException if determining the bytes for the message throws an IOException. - * @throws UncheckedSecurityException if the signing process throws a SignatureException. - */ - public void sign(final ProducerRecord producerRecord) { - if (this.signingEnabled) { - final byte[] signature = this.signature(producerRecord); - producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature); - } - } - - /** - * Determines the signature for the given {@code message}. - * - *

The value for the signature in the message will be set to {@code null} to properly determine - * the signature, but is restored to its original value before this method returns. - * - * @param message the message to be signed - * @return the signature for the message - * @throws IllegalStateException if this message signer has a public key for signature - * verification, but does not have the private key needed for signing messages. - * @throws UncheckedIOException if determining the bytes for the message throws an IOException. - * @throws UncheckedSecurityException if the signing process throws a SignatureException. - */ - public byte[] signature(final SignableMessageWrapper message) { - if (!this.canSignMessages()) { - throw new IllegalStateException( - "This MessageSigner is not configured for signing, it can only be used for verification"); - } - final ByteBuffer oldSignature = message.getSignature(); - try { - message.setSignature(null); - synchronized (this.signingSignature) { - final byte[] messageBytes; - if (this.stripAvroHeader) { - messageBytes = this.stripAvroHeader(this.toByteBuffer(message)); - } else { - messageBytes = this.toByteBuffer(message).array(); - } - this.signingSignature.update(messageBytes); - return this.signingSignature.sign(); - } - } catch (final SignatureException e) { - throw new UncheckedSecurityException("Unable to sign message", e); - } finally { - message.setSignature(oldSignature); - } - } - - /** - * Determines the signature for the given {@code producerRecord}. - * - *

The value for the signature in the record will be set to {@code null} to properly determine - * the signature, but is restored to its original value before this method returns. - * - * @param producerRecord the record to be signed - * @return the signature for the record - * @throws IllegalStateException if this message signer has a public key for signature - * verification, but does not have the private key needed for signing messages. - * @throws UncheckedIOException if determining the bytes throws an IOException. - * @throws UncheckedSecurityException if the signing process throws a SignatureException. - */ - public byte[] signature(final ProducerRecord producerRecord) { - if (!this.canSignMessages()) { - throw new IllegalStateException( - "This MessageSigner is not configured for signing, it can only be used for verification"); - } - final Header oldSignatureHeader = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE); - try { - producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE); - synchronized (this.signingSignature) { - final byte[] messageBytes; - final SpecificRecordBase specificRecordBase = producerRecord.value(); - if (this.stripAvroHeader) { - messageBytes = this.stripAvroHeader(this.toByteBuffer(specificRecordBase)); - } else { - messageBytes = this.toByteBuffer(specificRecordBase).array(); - } - this.signingSignature.update(messageBytes); - return this.signingSignature.sign(); - } - } catch (final SignatureException e) { - throw new UncheckedSecurityException("Unable to sign message", e); - } finally { - if (oldSignatureHeader != null) { - producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, oldSignatureHeader.value()); - } - } - } - - public boolean canVerifyMessageSignatures() { - return this.signingEnabled && this.verificationSignature != null; - } - - /** - * Verifies the signature of the provided {@code message}. - * - * @param message the message to be verified - * @return {@code true} if the signature of the given {@code message} was verified; {@code false} - * if not. - * @throws IllegalStateException if this message signer has a private key needed for signing - * messages, but does not have the public key for signature verification. - * @throws UncheckedIOException if determining the bytes for the message throws an IOException. - * @throws UncheckedSecurityException if the signature verification process throws a - * SignatureException. - */ - public boolean verify(final SignableMessageWrapper message) { - if (!this.canVerifyMessageSignatures()) { - throw new IllegalStateException( - "This MessageSigner is not configured for verification, it can only be used for signing"); - } - - final ByteBuffer messageSignature = message.getSignature(); - if (messageSignature == null) { - return false; - } - messageSignature.mark(); - final byte[] signatureBytes = new byte[messageSignature.remaining()]; - messageSignature.get(signatureBytes); - - try { - message.setSignature(null); - synchronized (this.verificationSignature) { - return this.verifySignatureBytes(signatureBytes, this.toByteBuffer(message)); - } - } catch (final SignatureException e) { - throw new UncheckedSecurityException("Unable to verify message signature", e); - } finally { - messageSignature.reset(); - message.setSignature(messageSignature); - } - } - - /** - * Verifies the signature of the provided {@code consumerRecord}. - * - * @param consumerRecord the record to be verified - * @return {@code true} if the signature of the given {@code consumerRecord} was verified; {@code false} - * if not. - * @throws IllegalStateException if this message signer has a private key needed for signing - * messages, but does not have the public key for signature verification. - * @throws UncheckedIOException if determining the bytes throws an IOException. - * @throws UncheckedSecurityException if the signature verification process throws a - * SignatureException. - */ - public boolean verify(final ConsumerRecord consumerRecord) { - if (!this.canVerifyMessageSignatures()) { - throw new IllegalStateException( - "This MessageSigner is not configured for verification, it can only be used for signing"); - } - - final Header header = consumerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE); - if (header == null) { - throw new IllegalStateException( - "This ProducerRecord does not contain a signature header"); - } - final byte[] signatureBytes = header.value(); - if (signatureBytes == null || signatureBytes.length == 0) { - return false; - } - - try { - consumerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE); - synchronized (this.verificationSignature) { - final SpecificRecordBase specificRecordBase = consumerRecord.value(); - return this.verifySignatureBytes(signatureBytes, this.toByteBuffer(specificRecordBase)); - } - } catch (final SignatureException e) { - throw new UncheckedSecurityException("Unable to verify message signature", e); - } - } - - private boolean verifySignatureBytes(final byte[] signatureBytes, final ByteBuffer messageByteBuffer) throws SignatureException { - final byte[] messageBytes; - if (this.stripAvroHeader) { - messageBytes = this.stripAvroHeader(messageByteBuffer); - } else { - messageBytes = messageByteBuffer.array(); - } - this.verificationSignature.update(messageBytes); - return this.verificationSignature.verify(signatureBytes); - } - - private boolean hasAvroHeader(final byte[] bytes) { - return bytes.length >= AVRO_HEADER_LENGTH - && (bytes[0] & 0xFF) == 0xC3 - && (bytes[1] & 0xFF) == 0x01; - } - - private byte[] stripAvroHeader(final ByteBuffer byteBuffer) { - final byte[] bytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(bytes); - if (this.hasAvroHeader(bytes)) { - return Arrays.copyOfRange(bytes, AVRO_HEADER_LENGTH, bytes.length); - } - return bytes; - } - - private ByteBuffer toByteBuffer(final SignableMessageWrapper message) { - try { - return message.toByteBuffer(); - } catch (final IOException e) { - throw new UncheckedIOException("Unable to determine ByteBuffer for Message", e); - } - } - - private ByteBuffer toByteBuffer(final SpecificRecordBase message) { - try { - return new BinaryMessageEncoder<>(message.getSpecificData(), message.getSchema()).encode(message); - } catch (final IOException e) { - throw new UncheckedIOException("Unable to determine ByteBuffer for Message", e); - } - } - - public boolean isSigningEnabled() { - return this.signingEnabled; - } - - public Optional signingKey() { - return Optional.ofNullable(this.signingKey); - } - - public Optional signingKeyPem() { - return this.signingKey().map(key -> this.keyAsMem(key, key.getAlgorithm() + " PRIVATE KEY")); - } - - public Optional verificationKey() { - return Optional.ofNullable(this.verificationKey); - } - - public Optional verificationKeyPem() { - return this.verificationKey() - .map(key -> this.keyAsMem(key, key.getAlgorithm() + " PUBLIC KEY")); - } - - private String keyAsMem(final Key key, final String label) { - return "-----BEGIN " + label + "-----" + "\r\n" - + Base64.getMimeEncoder().encodeToString(key.getEncoded()) + "\r\n" - + "-----END " + label + "-----" + "\r\n"; - } - - @Override - public String toString() { - return String.format( - "MessageSigner[algorithm=\"%s\"-\"%s\", provider=\"%s\", keySize=%d, sign=%b, verify=%b]", - this.signatureAlgorithm, - this.signatureKeyAlgorithm, - this.signatureProvider, - this.signatureKeySize, - this.canSignMessages(), - this.canVerifyMessageSignatures()); - } - - public String descriptionWithKeys() { - final StringBuilder sb = new StringBuilder(this.toString()); - this.signingKeyPem().ifPresent(key -> sb.append(System.lineSeparator()).append(key)); - this.verificationKeyPem().ifPresent(key -> sb.append(System.lineSeparator()).append(key)); - return sb.toString(); - } - - private static Signature signatureInstance( - final String signatureAlgorithm, - final String signatureProvider, - final PrivateKey signingKey) { - - if (signingKey == null) { - return null; - } - - final Signature signature = signatureInstance(signatureAlgorithm, signatureProvider); - try { - signature.initSign(signingKey); - } catch (final InvalidKeyException e) { - throw new UncheckedSecurityException(e); - } - return signature; - } - - private static Signature signatureInstance( - final String signatureAlgorithm, - final String signatureProvider, - final PublicKey verificationKey) { - - if (verificationKey == null) { - return null; - } - - final Signature signature = signatureInstance(signatureAlgorithm, signatureProvider); - try { - signature.initVerify(verificationKey); - } catch (final InvalidKeyException e) { - throw new UncheckedSecurityException(e); - } - return signature; - } - - private static Signature signatureInstance( - final String signatureAlgorithm, final String signatureProvider) { - try { - if (signatureProvider == null) { - return Signature.getInstance(signatureAlgorithm); - } - return Signature.getInstance(signatureAlgorithm, signatureProvider); - } catch (final GeneralSecurityException e) { - throw new UncheckedSecurityException("Unable to create Signature for Avro Messages", e); - } - } - - public static KeyPair generateKeyPair( - final String signatureKeyAlgorithm, - final String signatureProvider, - final int signatureKeySize) { - final KeyPairGenerator keyPairGenerator; - try { - if (signatureProvider == null) { - keyPairGenerator = KeyPairGenerator.getInstance(signatureKeyAlgorithm); - } else { - keyPairGenerator = KeyPairGenerator.getInstance(signatureKeyAlgorithm, signatureProvider); - } - } catch (final GeneralSecurityException e) { - throw new UncheckedSecurityException(e); - } - keyPairGenerator.initialize(signatureKeySize); - return keyPairGenerator.generateKeyPair(); - } - - public static Builder newBuilder() { - return new Builder(); - } - - public static MessageSigner newMessageSigner(final MessageSigningProperties messageSigningProperties) { - return new Builder(messageSigningProperties).build(); - } - - public static final class Builder { - - private static final Pattern PEM_REMOVAL_PATTERN = - Pattern.compile("-----(?:BEGIN|END) .*?-----|\\r|\\n"); - - private final MessageSigningProperties properties; - - private Builder() { - this.properties = new MessageSigningProperties(); - } - private Builder(final MessageSigningProperties properties) { - this.properties = properties; - } - - public Builder signingEnabled(final boolean signingEnabled) { - this.properties.setSigningEnabled(signingEnabled); - return this; - } - - public Builder stripAvroHeader(final boolean stripAvroHeader) { - this.properties.setStripAvroHeader(stripAvroHeader); - return this; - } - - public Builder signatureAlgorithm(final String signatureAlgorithm) { - this.properties.setSignatureAlgorithm(Objects.requireNonNull(signatureAlgorithm)); - return this; - } - - public Builder signatureProvider(final String signatureProvider) { - this.properties.setSignatureProvider(signatureProvider); - return this; - } - - public Builder signatureKeyAlgorithm(final String signatureKeyAlgorithm) { - this.properties.setSignatureKeyAlgorithm(Objects.requireNonNull(signatureKeyAlgorithm)); - return this; - } - - public Builder signatureKeySize(final int signatureKeySize) { - this.properties.setSignatureKeySize(signatureKeySize); - return this; - } - - public Builder signingKey(final PrivateKey signingKey) { - this.properties.setSigningKey(signingKey); - return this; - } - - public Builder signingKey(final String signingKeyPem) { - if (signingKeyPem == null) { - this.properties.setSigningKey(null); - return this; - } - final String base64 = PEM_REMOVAL_PATTERN.matcher(signingKeyPem).replaceAll(""); - final byte[] bytes = Base64.getDecoder().decode(base64); - return this.signingKey(bytes); - } - - public Builder signingKey(final byte[] signingKeyBytes) { - if (signingKeyBytes == null) { - this.properties.setSigningKey(null); - return this; - } - final PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(signingKeyBytes); - try { - this.properties.setSigningKey(KeyFactory.getInstance(this.properties.getSignatureKeyAlgorithm()).generatePrivate(keySpec)); - } catch (final GeneralSecurityException e) { - throw new UncheckedSecurityException(e); - } - return this; - } - - public Builder verificationKey(final PublicKey verificationKey) { - this.properties.setVerificationKey(verificationKey); - return this; - } - - public Builder verificationKey(final String verificationKeyPem) { - if (verificationKeyPem == null) { - this.properties.setVerificationKey(null); - return this; - } - final String base64 = PEM_REMOVAL_PATTERN.matcher(verificationKeyPem).replaceAll(""); - final byte[] bytes = Base64.getDecoder().decode(base64); - return this.verificationKey(bytes); - } - - public Builder verificationKey(final byte[] verificationKeyBytes) { - if (verificationKeyBytes == null) { - this.properties.setVerificationKey(null); - return this; - } - final X509EncodedKeySpec keySpec = new X509EncodedKeySpec(verificationKeyBytes); - try { - this.properties.setVerificationKey(KeyFactory.getInstance(this.properties.getSignatureKeyAlgorithm()).generatePublic(keySpec)); - } catch (final GeneralSecurityException e) { - throw new UncheckedSecurityException(e); - } - return this; - } - - public Builder keyPair(final KeyPair keyPair) { - this.properties.setSigningKey(keyPair.getPrivate()); - this.properties.setVerificationKey(keyPair.getPublic()); - return this; - } - - public Builder generateKeyPair() { - return this.keyPair( - MessageSigner.generateKeyPair( - this.properties.getSignatureKeyAlgorithm(), this.properties.getSignatureProvider(), this.properties.getSignatureKeySize())); - } - - public MessageSigner build() { - return new MessageSigner(this.properties); - } - } -} diff --git a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/MessageSigningAutoConfiguration.java b/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/MessageSigningAutoConfiguration.java deleted file mode 100644 index 5c68703..0000000 --- a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/MessageSigningAutoConfiguration.java +++ /dev/null @@ -1,68 +0,0 @@ -package com.alliander.osgp.kafka.message.signing; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.io.Resource; - -@Configuration -public class MessageSigningAutoConfiguration { - - @Value("${message-signing.enabled}") - private boolean signingEnabled; - - @Value("${message-signing.strip-headers}") - private boolean stripHeaders; - - @Value("${message-signing.signature.algorithm:SHA256withRSA}") - private String signatureAlgorithm; - - @Value("${message-signing.signature.provider:SunRsaSign}") - private String signatureProvider; - - @Value("${message-signing.signature.key.algorithm:RSA}") - private String signatureKeyAlgorithm; - - @Value("${message-signing.signature.key.size:2048}") - private int signatureKeySize; - - @Value("${message-signing.signature.key.private:#{null}}") - private Resource signingKeyResource; - - @Value("${message-signing.signature.key.public:#{null}}") - private Resource verificationKeyResource; - - @Bean - public MessageSigner messageSigner() { - if(this.signingEnabled) { - return MessageSigner.newBuilder() - .signingEnabled(this.signingEnabled) - .stripAvroHeader(this.stripHeaders) - .signatureAlgorithm(this.signatureAlgorithm) - .signatureProvider(this.signatureProvider) - .signatureKeyAlgorithm(this.signatureKeyAlgorithm) - .signatureKeySize(this.signatureKeySize) - .signingKey(this.readKeyFromPemResource(this.signingKeyResource)) - .verificationKey(this.readKeyFromPemResource(this.verificationKeyResource)) - .build(); - } else { - return MessageSigner.newBuilder() - .signingEnabled(false) - .build(); - } - } - - private String readKeyFromPemResource(final Resource keyResource) { - if (keyResource == null) { - return null; - } - try { - return keyResource.getContentAsString(StandardCharsets.ISO_8859_1); - } catch (final IOException e) { - throw new UncheckedIOException("Unable to read " + keyResource.getFilename() + " as ISO-LATIN-1 PEM text", e); - } - } -} diff --git a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/UncheckedSecurityException.java b/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/UncheckedSecurityException.java deleted file mode 100644 index 86de903..0000000 --- a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/signing/UncheckedSecurityException.java +++ /dev/null @@ -1,38 +0,0 @@ -// SPDX-FileCopyrightText: Copyright Contributors to the GXF project -// -// SPDX-License-Identifier: Apache-2.0 - -package com.alliander.osgp.kafka.message.signing; - -import java.io.Serial; -import java.security.GeneralSecurityException; -import java.util.Objects; - -/** Wraps a {@link GeneralSecurityException} with an unchecked exception. */ -public class UncheckedSecurityException extends RuntimeException { - - @Serial - private static final long serialVersionUID = 5152038114753546167L; - - /** - * @throws NullPointerException if the cause is {@code null} - */ - public UncheckedSecurityException(final String message, final GeneralSecurityException cause) { - super(message, Objects.requireNonNull(cause)); - } - - /** - * @throws NullPointerException if the cause is {@code null} - */ - public UncheckedSecurityException(final GeneralSecurityException cause) { - super(Objects.requireNonNull(cause)); - } - - /** - * @return the {@code GeneralSecurityException} wrapped by this exception. - */ - @Override - public synchronized GeneralSecurityException getCause() { - return (GeneralSecurityException) super.getCause(); - } -} diff --git a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/wrapper/SignableMessageWrapper.java b/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/wrapper/SignableMessageWrapper.java deleted file mode 100644 index 9f45703..0000000 --- a/kafka-message-signing/src/main/java/com/alliander/osgp/kafka/message/wrapper/SignableMessageWrapper.java +++ /dev/null @@ -1,23 +0,0 @@ -// SPDX-FileCopyrightText: Copyright Contributors to the GXF project -// -// SPDX-License-Identifier: Apache-2.0 - -package com.alliander.osgp.kafka.message.wrapper; - -public abstract class SignableMessageWrapper { - protected final T message; - - protected SignableMessageWrapper(final T message) { - this.message = message; - } - - public T getMessage() { - return this.message; - } - - public abstract java.nio.ByteBuffer toByteBuffer() throws java.io.IOException; - - public abstract java.nio.ByteBuffer getSignature(); - - public abstract void setSignature(java.nio.ByteBuffer signature); -} diff --git a/kafka-message-signing/src/main/kotlin/com/alliander/osgp/kafka/message/signing/MessageSigningProperties.kt b/kafka-message-signing/src/main/kotlin/com/alliander/osgp/kafka/message/signing/MessageSigningProperties.kt deleted file mode 100644 index d48f242..0000000 --- a/kafka-message-signing/src/main/kotlin/com/alliander/osgp/kafka/message/signing/MessageSigningProperties.kt +++ /dev/null @@ -1,21 +0,0 @@ -// SPDX-FileCopyrightText: Copyright Contributors to the GXF project -// -// SPDX-License-Identifier: Apache-2.0 - -package com.alliander.osgp.kafka.message.signing - -import java.security.PrivateKey -import java.security.PublicKey - -open class MessageSigningProperties { - var signingEnabled: Boolean = false - var stripAvroHeader: Boolean = false - - var signatureAlgorithm: String = MessageSigner.DEFAULT_SIGNATURE_ALGORITHM - var signatureProvider: String? = MessageSigner.DEFAULT_SIGNATURE_PROVIDER - var signatureKeyAlgorithm: String = MessageSigner.DEFAULT_SIGNATURE_KEY_ALGORITHM - var signatureKeySize: Int = MessageSigner.DEFAULT_SIGNATURE_KEY_SIZE - - var signingKey: PrivateKey? = null - var verificationKey: PublicKey? = null -} diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt new file mode 100644 index 0000000..90960e5 --- /dev/null +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigner.kt @@ -0,0 +1,391 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package com.gxf.utilities.kafka.message.signing + +import com.gxf.utilities.kafka.message.wrapper.SignableMessageWrapper +import org.apache.avro.message.BinaryMessageEncoder +import org.apache.avro.specific.SpecificRecordBase +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.ssl.pem.PemContent +import org.springframework.core.io.Resource +import org.springframework.stereotype.Component +import java.io.IOException +import java.io.UncheckedIOException +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.security.* +import java.security.spec.X509EncodedKeySpec +import java.util.* +import java.util.regex.Pattern + +@Component +// Only instantiate when no other bean has been configured +@ConditionalOnMissingBean(MessageSigner::class) +class MessageSigner(properties: MessageSigningProperties) { + + val signingEnabled: Boolean = properties.signingEnabled + private val stripAvroHeader: Boolean = properties.stripAvroHeader + + private val signatureAlgorithm: String = properties.signatureAlgorithm + private val signatureProvider: String? = determineProvider(properties) + private val keyAlgorithm: String = properties.keyAlgorithm + + private var signingKey: PrivateKey? = readPrivateKey(properties.privateKeyFile) + private var verificationKey: PublicKey? = readPublicKey(keyAlgorithm, properties.publicKeyFile) + + private val signingSignature: Signature? = signatureInstance(signatureAlgorithm, signatureProvider, signingKey) + private val verificationSignature: Signature? = signatureInstance(signatureAlgorithm, signatureProvider, verificationKey) + + init { + if (properties.signingEnabled) { + require(!(signingKey == null && verificationKey == null)) { "A signing key (PrivateKey) or verification key (PublicKey) must be provided" } + } + } + + fun canSignMessages(): Boolean { + return this.signingEnabled && this.signingSignature != null + } + + /** + * Signs the provided `message`, overwriting an existing signature field inside the message object. + * + * @param message the message to be signed + * @return Returns the signed (unwrapped) message. If signing is disabled through configuration, the message will be returned unchanged. + * + * @throws IllegalStateException if this message signer has a public key for signature + * verification, but does not have the private key needed for signing messages. + * @throws UncheckedIOException if determining the bytes for the message throws an IOException. + * @throws UncheckedSecurityException if the signing process throws a SignatureException. + */ + fun signUsingField(message: SignableMessageWrapper): T { + if (this.signingEnabled) { + val signatureBytes = this.signature(message) + message.setSignature(ByteBuffer.wrap(signatureBytes)) + } + return message.message + } + + /** + * Signs the provided `producerRecord` in the header, overwriting an existing signature, if a non-null value is + * already set. + * + * @param producerRecord the record to be signed + * @throws IllegalStateException if this message signer has a public key for signature + * verification, but does not have the private key needed for signing messages. + * @throws UncheckedIOException if determining the bytes for the message throws an IOException. + * @throws UncheckedSecurityException if the signing process throws a SignatureException. + */ + fun signUsingHeader(producerRecord: ProducerRecord): ProducerRecord { + if (this.signingEnabled) { + val signature = this.signature(producerRecord) + producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, signature) + } + return producerRecord + } + + /** + * Determines the signature for the given `message`. + * + * + * The value for the signature in the message will be set to `null` to properly determine + * the signature, but is restored to its original value before this method returns. + * + * @param message the message to be signed + * @return the signature for the message + * @throws IllegalStateException if this message signer has a public key for signature + * verification, but does not have the private key needed for signing messages. + * @throws UncheckedIOException if determining the bytes for the message throws an IOException. + * @throws UncheckedSecurityException if the signing process throws a SignatureException. + */ + private fun signature(message: SignableMessageWrapper<*>): ByteArray { + check(this.canSignMessages()) { "This MessageSigner is not configured for signing, it can only be used for verification" } + val oldSignature = message.getSignature() + try { + message.setSignature(null) + synchronized(signingSignature!!) { + val messageBytes: ByteArray = if (this.stripAvroHeader) { + this.stripAvroHeader(this.toByteBuffer(message)) + } else { + this.toByteBuffer(message)!!.array() + } + signingSignature.update(messageBytes) + return signingSignature.sign() + } + } catch (e: SignatureException) { + throw UncheckedSecurityException("Unable to sign message", e) + } finally { + message.setSignature(oldSignature) + } + } + + /** + * Determines the signature for the given `producerRecord`. + * + * + * The value for the signature in the record will be set to `null` to properly determine + * the signature, but is restored to its original value before this method returns. + * + * @param producerRecord the record to be signed + * @return the signature for the record + * @throws IllegalStateException if this message signer has a public key for signature + * verification, but does not have the private key needed for signing messages. + * @throws UncheckedIOException if determining the bytes throws an IOException. + * @throws UncheckedSecurityException if the signing process throws a SignatureException. + */ + private fun signature(producerRecord: ProducerRecord): ByteArray { + check(this.canSignMessages()) { "This MessageSigner is not configured for signing, it can only be used for verification" } + val oldSignatureHeader = producerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE) + try { + producerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE) + synchronized(signingSignature!!) { + val specificRecordBase = producerRecord.value() + val messageBytes: ByteArray = if (this.stripAvroHeader) { + this.stripAvroHeader(this.toByteBuffer(specificRecordBase)) + } else { + this.toByteBuffer(specificRecordBase).array() + } + signingSignature.update(messageBytes) + return signingSignature.sign() + } + } catch (e: SignatureException) { + throw UncheckedSecurityException("Unable to sign message", e) + } finally { + if (oldSignatureHeader != null) { + producerRecord.headers().add(RECORD_HEADER_KEY_SIGNATURE, oldSignatureHeader.value()) + } + } + } + + fun canVerifyMessageSignatures(): Boolean { + return this.signingEnabled && this.verificationSignature != null + } + + /** + * Verifies the signature of the provided `message` using the signature in a message field. + * + * @param message the message to be verified + * @return `true` if the signature of the given `message` was verified; `false` + * if not. + * @throws IllegalStateException if this message signer has a private key needed for signing + * messages, but does not have the public key for signature verification. + * @throws UncheckedIOException if determining the bytes for the message throws an IOException. + * @throws UncheckedSecurityException if the signature verification process throws a + * SignatureException. + */ + fun verifyUsingField(message: SignableMessageWrapper<*>): Boolean { + check(this.canVerifyMessageSignatures()) { "This MessageSigner is not configured for verification, it can only be used for signing" } + + val messageSignature = message.getSignature() ?: return false + messageSignature.mark() + val signatureBytes = ByteArray(messageSignature.remaining()) + messageSignature.get(signatureBytes) + + try { + message.setSignature(null) + synchronized((verificationSignature)!!) { + return this.verifySignatureBytes(signatureBytes, this.toByteBuffer(message)) + } + } catch (e: SignatureException) { + throw UncheckedSecurityException("Unable to verify message signature", e) + } finally { + messageSignature.reset() + message.setSignature(messageSignature) + } + } + + /** + * Verifies the signature of the provided `consumerRecord` using the signature from the message header. + * + * @param consumerRecord the record to be verified + * @return `true` if the signature of the given `consumerRecord` was verified; `false` + * if not. + * @throws IllegalStateException if this message signer has a private key needed for signing + * messages, but does not have the public key for signature verification. + * @throws UncheckedIOException if determining the bytes throws an IOException. + * @throws UncheckedSecurityException if the signature verification process throws a + * SignatureException. + */ + fun verifyUsingHeader(consumerRecord: ConsumerRecord): Boolean { + check(this.canVerifyMessageSignatures()) { "This MessageSigner is not configured for verification, it can only be used for signing" } + + val header = consumerRecord.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE) + ?: throw IllegalStateException( + "This ProducerRecord does not contain a signature header" + ) + val signatureBytes = header.value() + if (signatureBytes == null || signatureBytes.isEmpty()) { + return false + } + + try { + consumerRecord.headers().remove(RECORD_HEADER_KEY_SIGNATURE) + synchronized(verificationSignature!!) { + val specificRecordBase: SpecificRecordBase = consumerRecord.value() + return this.verifySignatureBytes( + signatureBytes, + this.toByteBuffer(specificRecordBase) + ) + } + } catch (e: SignatureException) { + throw UncheckedSecurityException("Unable to verify message signature", e) + } + } + + @Throws(SignatureException::class) + private fun verifySignatureBytes(signatureBytes: ByteArray, messageByteBuffer: ByteBuffer?): Boolean { + val messageBytes: ByteArray = if (this.stripAvroHeader) { + this.stripAvroHeader(messageByteBuffer) + } else { + messageByteBuffer!!.array() + } + verificationSignature!!.update(messageBytes) + return verificationSignature.verify(signatureBytes) + } + + private fun hasAvroHeader(bytes: ByteArray): Boolean { + return (bytes.size >= AVRO_HEADER_LENGTH) + && ((bytes[0].toInt() and 0xFF) == 0xC3) + && ((bytes[1].toInt() and 0xFF) == 0x01) + } + + private fun stripAvroHeader(byteBuffer: ByteBuffer?): ByteArray { + val bytes = ByteArray(byteBuffer!!.remaining()) + byteBuffer.get(bytes) + if (this.hasAvroHeader(bytes)) { + return Arrays.copyOfRange(bytes, AVRO_HEADER_LENGTH, bytes.size) + } + return bytes + } + + private fun toByteBuffer(message: SignableMessageWrapper<*>): ByteBuffer? { + try { + return message.toByteBuffer() + } catch (e: IOException) { + throw UncheckedIOException("Unable to determine ByteBuffer for Message", e) + } + } + + private fun toByteBuffer(message: SpecificRecordBase): ByteBuffer { + try { + return BinaryMessageEncoder(message.specificData, message.schema).encode(message) + } catch (e: IOException) { + throw UncheckedIOException("Unable to determine ByteBuffer for Message", e) + } + } + + private fun determineProvider(properties: MessageSigningProperties): String? { + return when { + properties.signatureProvider != null -> properties.signatureProvider + this.signingSignature != null -> signingSignature.getProvider()?.name + this.verificationSignature != null -> verificationSignature.getProvider()?.name + // Should not happen, set to null and ignore. + else -> null + } + } + + override fun toString(): String { + return String.format( + "MessageSigner[algorithm=\"%s\"-\"%s\", provider=\"%s\", sign=%b, verify=%b]", + this.signatureAlgorithm, + this.keyAlgorithm, + this.signatureProvider, + this.canSignMessages(), + this.canVerifyMessageSignatures() + ) + } + + companion object { + // Two magic bytes (0xC3, 0x01) followed by an 8-byte fingerprint + const val AVRO_HEADER_LENGTH: Int = 10 + + const val DEFAULT_SIGNATURE_ALGORITHM: String = "SHA256withRSA" + const val DEFAULT_SIGNATURE_PROVIDER: String = "SunRsaSign" + const val DEFAULT_KEY_ALGORITHM: String = "RSA" + + const val RECORD_HEADER_KEY_SIGNATURE: String = "signature" + + private val PEM_REMOVAL_PATTERN: Pattern = Pattern.compile("-----(?:BEGIN|END) .*?-----|\\r|\\n") + + @JvmStatic + private fun signatureInstance( + signatureAlgorithm: String, + signatureProvider: String?, + signingKey: PrivateKey? + ): Signature? { + if (signingKey == null) { + return null + } + + val signature = signatureInstance(signatureAlgorithm, signatureProvider) + try { + signature.initSign(signingKey) + } catch (e: InvalidKeyException) { + throw UncheckedSecurityException(cause = e) + } + return signature + } + + @JvmStatic + private fun signatureInstance( + signatureAlgorithm: String, + signatureProvider: String?, + verificationKey: PublicKey? + ): Signature? { + if (verificationKey == null) { + return null + } + + val signature = signatureInstance(signatureAlgorithm, signatureProvider) + try { + signature.initVerify(verificationKey) + } catch (e: InvalidKeyException) { + throw UncheckedSecurityException(cause = e) + } + return signature + } + + @JvmStatic + private fun signatureInstance(signatureAlgorithm: String, signatureProvider: String?): Signature { + try { + if (signatureProvider == null) { + return Signature.getInstance(signatureAlgorithm) + } + return Signature.getInstance(signatureAlgorithm, signatureProvider) + } catch (e: GeneralSecurityException) { + throw UncheckedSecurityException("Unable to create Signature for Avro Messages", e) + } + } + + fun readPrivateKey(privateKeyFile: Resource?): PrivateKey? { + if (privateKeyFile == null) { + return null + } + try { + val content = privateKeyFile.getContentAsString(StandardCharsets.ISO_8859_1) + return PemContent.of(content).privateKey + } catch (e: IOException) { + throw UncheckedIOException("Unable to read ${privateKeyFile.filename} as ISO-LATIN-1 PEM text", e) + } + } + + private fun readPublicKey(keyAlgorithm: String, publicKeyFile: Resource?): PublicKey? { + if (publicKeyFile == null) { + return null + } + val content = publicKeyFile.getContentAsString(StandardCharsets.ISO_8859_1) + val base64 = PEM_REMOVAL_PATTERN.matcher(content).replaceAll("") + val bytes = Base64.getDecoder().decode(base64) + val keySpec = X509EncodedKeySpec(bytes) + return try { + KeyFactory.getInstance(keyAlgorithm).generatePublic(keySpec) + } catch (e: GeneralSecurityException) { + throw UncheckedSecurityException(cause = e) + } + } + } + +} diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningAutoConfiguration.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningAutoConfiguration.kt new file mode 100644 index 0000000..b23ea10 --- /dev/null +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningAutoConfiguration.kt @@ -0,0 +1,23 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package com.gxf.utilities.kafka.message.signing + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration + +@Configuration +@EnableConfigurationProperties(MessageSigningProperties::class) +@ComponentScan("com.gxf.utilities.kafka.message.signing") +// Only instantiate when no other bean has been configured +@ConditionalOnMissingBean(MessageSigner::class) +class MessageSigningAutoConfiguration { + @Bean + fun messageSigner(signingProperties: MessageSigningProperties): MessageSigner { + return MessageSigner(signingProperties) + } +} diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningProperties.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningProperties.kt new file mode 100644 index 0000000..ebde4b6 --- /dev/null +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningProperties.kt @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package com.gxf.utilities.kafka.message.signing + +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.core.io.Resource + +@ConfigurationProperties(prefix = "message-signing") +// Only instantiate when no other bean has been configured +@ConditionalOnMissingBean(MessageSigningProperties::class) +open class MessageSigningProperties( + /** Enable or disable signing */ + var signingEnabled: Boolean = false, + /** Strip the Avro header containing the schema fingerprint */ + var stripAvroHeader: Boolean = false, + + /** Signature algorithm */ + var signatureAlgorithm: String = MessageSigner.DEFAULT_SIGNATURE_ALGORITHM, + /** Signature algorithm provider */ + var signatureProvider: String? = MessageSigner.DEFAULT_SIGNATURE_PROVIDER, + /** Public key algorithm */ + var keyAlgorithm: String = MessageSigner.DEFAULT_KEY_ALGORITHM, + /** PEM file containing the private key */ + var privateKeyFile: Resource? = null, + /** PEM file containing the public key */ + var publicKeyFile: Resource? = null +) diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/UncheckedSecurityException.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/UncheckedSecurityException.kt new file mode 100644 index 0000000..52c6cd4 --- /dev/null +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/signing/UncheckedSecurityException.kt @@ -0,0 +1,19 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package com.gxf.utilities.kafka.message.signing + +import java.io.Serial +import java.security.GeneralSecurityException + + +class UncheckedSecurityException @JvmOverloads constructor(message: String? = null, cause: GeneralSecurityException) : + RuntimeException(message, cause) { + + @Serial + private val serialVersionUID = 5152038114753546167L + + override val cause: GeneralSecurityException + get() = super.cause as GeneralSecurityException +} diff --git a/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt new file mode 100644 index 0000000..e496a94 --- /dev/null +++ b/kafka-message-signing/src/main/kotlin/com/gxf/utilities/kafka/message/wrapper/SignableMessageWrapper.kt @@ -0,0 +1,31 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package com.gxf.utilities.kafka.message.wrapper + +import java.io.IOException +import java.nio.ByteBuffer + +/** + * Wrapper for signable messages. Because these messages are generated from Avro schemas, they can't be changed. + * This wrapper unifies them for the MessageSigner. + */ +abstract class SignableMessageWrapper(val message: T) { + + /** + * @return ByteBuffer of the whole message + */ + @Throws(IOException::class) + abstract fun toByteBuffer(): ByteBuffer? + + /** + * @return ByteBuffer of the signature in the message + */ + abstract fun getSignature(): ByteBuffer? + + /** + * @param signature The signature in ByteBuffer form to be set on the message + */ + abstract fun setSignature(signature: ByteBuffer?) +} diff --git a/kafka-message-signing/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/kafka-message-signing/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index cac4a7b..2357da6 100644 --- a/kafka-message-signing/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/kafka-message-signing/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1 @@ -com.alliander.osgp.kafka.message.signing.MessageSigningAutoConfiguration +com.gxf.utilities.kafka.message.signing.MessageSigningAutoConfiguration diff --git a/kafka-message-signing/src/test/java/com/alliander/osgp/kafka/message/signing/AutoConfigurationIntegrationTest.java b/kafka-message-signing/src/test/java/com/alliander/osgp/kafka/message/signing/AutoConfigurationIntegrationTest.java deleted file mode 100644 index 23c6537..0000000 --- a/kafka-message-signing/src/test/java/com/alliander/osgp/kafka/message/signing/AutoConfigurationIntegrationTest.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.alliander.osgp.kafka.message.signing; - -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.TestPropertySource; - -@SpringBootTest(classes = MessageSigningAutoConfiguration.class) -@EnableAutoConfiguration -@TestPropertySource("classpath:/application.yaml") -class AutoConfigurationIntegrationTest { - - @Autowired private MessageSigner messageSigner; - - @Test - void autoConfigurationIntegrationTest() { - assertTrue(this.messageSigner.isSigningEnabled()); - assertTrue(this.messageSigner.canSignMessages()); - assertTrue(this.messageSigner.canVerifyMessageSignatures()); - assertNotNull(this.messageSigner.signingKey().orElseThrow(AssertionError::new)); - assertNotNull(this.messageSigner.verificationKey().orElseThrow(AssertionError::new)); - } -} diff --git a/kafka-message-signing/src/test/java/com/alliander/osgp/kafka/message/signing/MessageSignerTest.java b/kafka-message-signing/src/test/java/com/alliander/osgp/kafka/message/signing/MessageSignerTest.java deleted file mode 100644 index 3655577..0000000 --- a/kafka-message-signing/src/test/java/com/alliander/osgp/kafka/message/signing/MessageSignerTest.java +++ /dev/null @@ -1,334 +0,0 @@ -// SPDX-FileCopyrightText: Copyright Contributors to the GXF project -// -// SPDX-License-Identifier: Apache-2.0 - -package com.alliander.osgp.kafka.message.signing; - -import static com.alliander.osgp.kafka.message.signing.MessageSigner.RECORD_HEADER_KEY_SIGNATURE; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import com.alliander.osgp.kafka.message.wrapper.SignableMessageWrapper; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.security.KeyPair; -import java.security.SecureRandom; -import java.util.Random; -import java.util.stream.Collectors; -import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.jupiter.api.Test; - -class MessageSignerTest { - - private static final boolean SIGNING_ENABLED = true; - - private static final boolean STRIP_AVRO_HEADER = true; - - private static final String SIGNATURE_ALGORITHM = "SHA256withRSA"; - private static final String SIGNATURE_PROVIDER = "SunRsaSign"; - private static final String SIGNATURE_KEY_ALGORITHM = "RSA"; - private static final int SIGNATURE_KEY_SIZE = 2048; - private static final int SIGNATURE_KEY_SIZE_BYTES = SIGNATURE_KEY_SIZE / 8; - - private static final KeyPair KEY_PAIR = - MessageSigner.generateKeyPair( - SIGNATURE_KEY_ALGORITHM, SIGNATURE_PROVIDER, SIGNATURE_KEY_SIZE); - - private static final Random RANDOM = new SecureRandom(); - - private final MessageSigner messageSigner = - MessageSigner.newBuilder() - .signingEnabled(SIGNING_ENABLED) - .stripAvroHeader(STRIP_AVRO_HEADER) - .signatureAlgorithm(SIGNATURE_ALGORITHM) - .signatureProvider(SIGNATURE_PROVIDER) - .signatureKeyAlgorithm(SIGNATURE_KEY_ALGORITHM) - .signatureKeySize(SIGNATURE_KEY_SIZE) - .keyPair(KEY_PAIR) - .build(); - - @Test - void signsMessageWithoutSignature() { - final SignableMessageWrapper messageWrapper = this.messageWrapper(); - - this.messageSigner.sign(messageWrapper); - - assertThat(messageWrapper.getSignature()).isNotNull(); - } - - @Test - void signsRecordHeaderWithoutSignature() { - final ProducerRecord record = this.producerRecord(); - - this.messageSigner.sign(record); - - assertThat(record.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE)).isNotNull(); - } - - @Test - void signsMessageReplacingSignature() { - final byte[] randomSignature = this.randomSignature(); - final TestableWrapper messageWrapper = this.messageWrapper(); - messageWrapper.setSignature(ByteBuffer.wrap(randomSignature)); - - final byte[] actualSignatureBefore = this.bytes(messageWrapper.getSignature()); - assertThat(actualSignatureBefore).isNotNull().isEqualTo(randomSignature); - - this.messageSigner.sign(messageWrapper); - - final byte[] actualSignatureAfter = this.bytes(messageWrapper.getSignature()); - assertThat(actualSignatureAfter).isNotNull().isNotEqualTo(randomSignature); - } - - @Test - void signsRecordHeaderReplacingSignature() { - final byte[] randomSignature = this.randomSignature(); - final ProducerRecord record = this.producerRecord(); - record.headers().add(RECORD_HEADER_KEY_SIGNATURE, randomSignature); - - final byte[] actualSignatureBefore = record.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE).value(); - assertThat(actualSignatureBefore).isNotNull().isEqualTo(randomSignature); - - this.messageSigner.sign(record); - - final byte[] actualSignatureAfter = record.headers().lastHeader(RECORD_HEADER_KEY_SIGNATURE).value(); - assertThat(actualSignatureAfter).isNotNull().isNotEqualTo(randomSignature); - } - - @Test - void verifiesMessagesWithValidSignature() { - final TestableWrapper message = this.properlySignedMessage(); - - final boolean signatureWasVerified = this.messageSigner.verify(message); - - assertThat(signatureWasVerified).isTrue(); - } - - @Test - void verifiesRecordsWithValidSignature() { - final ConsumerRecord signedRecord = this.properlySignedRecord(); - - final boolean signatureWasVerified = this.messageSigner.verify(signedRecord); - - assertThat(signatureWasVerified).isTrue(); - } - - @Test - void doesNotVerifyMessagesWithoutSignature() { - final TestableWrapper messageWrapper = this.messageWrapper(); - - final boolean signatureWasVerified = this.messageSigner.verify(messageWrapper); - - assertThat(signatureWasVerified).isFalse(); - } - - @Test - void doesNotVerifyRecordsWithoutSignature() { - final String expectedMessage = "This ProducerRecord does not contain a signature header"; - final ConsumerRecord consumerRecord = this.consumerRecord(); - - final Exception exception = assertThrows(IllegalStateException.class, () -> - this.messageSigner.verify(consumerRecord) - ); - final String actualMessage = exception.getMessage(); - - assertTrue(actualMessage.contains(expectedMessage)); - } - - @Test - void doesNotVerifyMessagesWithIncorrectSignature() { - final byte[] randomSignature = this.randomSignature(); - final TestableWrapper messageWrapper = this.messageWrapper(randomSignature); - - final boolean signatureWasVerified = this.messageSigner.verify(messageWrapper); - - assertThat(signatureWasVerified).isFalse(); - } - - @Test - void verifiesMessagesPreservingTheSignatureAndItsProperties() { - final TestableWrapper message = this.properlySignedMessage(); - final ByteBuffer originalSignature = message.getSignature(); - final int originalPosition = originalSignature.position(); - final int originalLimit = originalSignature.limit(); - final int originalRemaining = originalSignature.remaining(); - - this.messageSigner.verify(message); - - final ByteBuffer verifiedSignature = message.getSignature(); - assertThat(verifiedSignature).isEqualTo(originalSignature); - assertThat(verifiedSignature.position()).isEqualTo(originalPosition); - assertThat(verifiedSignature.limit()).isEqualTo(originalLimit); - assertThat(verifiedSignature.remaining()).isEqualTo(originalRemaining); - } - - private String fromPemResource(final String name) { - return new BufferedReader( - new InputStreamReader( - this.getClass().getResourceAsStream(name), StandardCharsets.ISO_8859_1)) - .lines() - .collect(Collectors.joining(System.lineSeparator())); - } - - @Test - void worksWithKeysFromPemEncodedResources() { - - final MessageSigner messageSignerWithKeysFromResources = - MessageSigner.newBuilder() - .signingEnabled(SIGNING_ENABLED) - .signatureAlgorithm(SIGNATURE_ALGORITHM) - .signatureProvider(SIGNATURE_PROVIDER) - .signatureKeyAlgorithm(SIGNATURE_KEY_ALGORITHM) - .signatureKeySize(SIGNATURE_KEY_SIZE) - .signingKey(this.fromPemResource("/rsa-private.pem")) - .verificationKey(this.fromPemResource("/rsa-public.pem")) - .build(); - - final TestableWrapper messageWrapper = this.messageWrapper(); - messageSignerWithKeysFromResources.sign(messageWrapper); - final boolean signatureWasVerified = messageSignerWithKeysFromResources.verify(messageWrapper); - - assertThat(signatureWasVerified).isTrue(); - } - - @Test - void recordHeaderSigningWorksWithKeysFromPemEncodedResources() { - - final MessageSigner messageSignerWithKeysFromResources = - MessageSigner.newBuilder() - .signingEnabled(SIGNING_ENABLED) - .signatureAlgorithm(SIGNATURE_ALGORITHM) - .signatureProvider(SIGNATURE_PROVIDER) - .signatureKeyAlgorithm(SIGNATURE_KEY_ALGORITHM) - .signatureKeySize(SIGNATURE_KEY_SIZE) - .signingKey(this.fromPemResource("/rsa-private.pem")) - .verificationKey(this.fromPemResource("/rsa-public.pem")) - .build(); - - final ProducerRecord producerRecord = this.producerRecord(); - messageSignerWithKeysFromResources.sign(producerRecord); - final ConsumerRecord consumerRecord = this.producerRecordToConsumerRecord(producerRecord); - final boolean signatureWasVerified = messageSignerWithKeysFromResources.verify(consumerRecord); - - assertThat(signatureWasVerified).isTrue(); - } - - @Test - void signingCanBeDisabled() { - final MessageSigner messageSignerSigningDisabled = - MessageSigner.newBuilder().signingEnabled(!SIGNING_ENABLED).build(); - - assertThat(messageSignerSigningDisabled.canSignMessages()).isFalse(); - assertThat(messageSignerSigningDisabled.canVerifyMessageSignatures()).isFalse(); - } - - private TestableWrapper messageWrapper() { - return new TestableWrapper(); - } - - private TestableWrapper messageWrapper(final byte[] signature) { - final TestableWrapper testableWrapper = new TestableWrapper(); - testableWrapper.setSignature(ByteBuffer.wrap(signature)); - return testableWrapper; - } - - private TestableWrapper properlySignedMessage() { - final TestableWrapper messageWrapper = this.messageWrapper(); - this.messageSigner.sign(messageWrapper); - return messageWrapper; - } - - private ConsumerRecord properlySignedRecord() { - final ProducerRecord producerRecord = this.producerRecord(); - this.messageSigner.sign(producerRecord); - return this.producerRecordToConsumerRecord(producerRecord); - } - - private ConsumerRecord producerRecordToConsumerRecord(final ProducerRecord producerRecord) { - final ConsumerRecord consumerRecord = new ConsumerRecord<>(producerRecord.topic(), 0, 123L, producerRecord.key(), producerRecord.value()); - producerRecord.headers().forEach(header -> consumerRecord.headers().add(header)); - return consumerRecord; - } - - private byte[] randomSignature() { - final byte[] signature = new byte[SIGNATURE_KEY_SIZE_BYTES]; - RANDOM.nextBytes(signature); - return signature; - } - - private byte[] bytes(final ByteBuffer byteBuffer) { - if (byteBuffer == null) { - return null; - } - final byte[] bytes = new byte[byteBuffer.remaining()]; - byteBuffer.get(bytes); - return bytes; - } - - private ProducerRecord producerRecord() { - return new ProducerRecord<>("topic", this.message()); - } - - private ConsumerRecord consumerRecord() { - return new ConsumerRecord<>("topic", 0, 123L, null, this.message()); - } - - private Message message() { - return new Message("super special message"); - } - - static class Message extends SpecificRecordBase { - private String message; - - public Message() { - } - - Message(final String message) { - this.message = message; - } - - @Override - public Schema getSchema() { - return new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Message\",\"namespace\":\"com.alliander.osgp.kafka.message.signing\",\"fields\":[{\"name\":\"message\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); - } - - @Override - public Object get(final int field) { - return this.message; - } - - @Override - public void put(final int field, final Object value) { - this.message = value != null ? value.toString() : null; - } - } - - private static class TestableWrapper extends SignableMessageWrapper { - private ByteBuffer signature; - - protected TestableWrapper() { - super("Some test message"); - } - - @Override - public ByteBuffer toByteBuffer() { - return ByteBuffer.wrap(this.message.getBytes(StandardCharsets.UTF_8)); - } - - @Override - public ByteBuffer getSignature() { - return this.signature; - } - - @Override - public void setSignature(final ByteBuffer signature) { - this.signature = signature; - } - } -} diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt new file mode 100644 index 0000000..988578f --- /dev/null +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSignerTest.kt @@ -0,0 +1,257 @@ +// SPDX-FileCopyrightText: Copyright Contributors to the GXF project +// +// SPDX-License-Identifier: Apache-2.0 + +package com.gxf.utilities.kafka.message.signing + +import com.gxf.utilities.kafka.message.wrapper.SignableMessageWrapper +import org.apache.avro.Schema +import org.apache.avro.specific.SpecificRecordBase +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.header.Header +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.springframework.core.io.ClassPathResource +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.security.SecureRandom +import java.util.* +import java.util.function.Consumer + +class MessageSignerTest { + + private val messageSignerProperties = MessageSigningProperties( + signingEnabled = true, + stripAvroHeader = true, + signatureAlgorithm = "SHA256withRSA", + signatureProvider = "SunRsaSign", + keyAlgorithm = "RSA", + privateKeyFile = ClassPathResource("/rsa-private.pem"), + publicKeyFile = ClassPathResource("/rsa-public.pem") + ) + + private val messageSigner = MessageSigner(messageSignerProperties) + + @Test + fun signsMessageWithoutSignature() { + val messageWrapper: SignableMessageWrapper<*> = this.messageWrapper() + + messageSigner.signUsingField(messageWrapper) + + assertThat(messageWrapper.getSignature()).isNotNull() + } + + @Test + fun signsRecordHeaderWithoutSignature() { + val record = this.producerRecord() + + messageSigner.signUsingHeader(record) + + assertThat(record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE)).isNotNull() + } + + @Test + fun signsMessageReplacingSignature() { + val randomSignature = this.randomSignature() + val messageWrapper = this.messageWrapper() + messageWrapper.setSignature(ByteBuffer.wrap(randomSignature)) + + val actualSignatureBefore = this.bytes(messageWrapper.getSignature()) + assertThat(actualSignatureBefore).isNotNull().isEqualTo(randomSignature) + + messageSigner.signUsingField(messageWrapper) + + val actualSignatureAfter = this.bytes(messageWrapper.getSignature()) + assertThat(actualSignatureAfter).isNotNull().isNotEqualTo(randomSignature) + } + + @Test + fun signsRecordHeaderReplacingSignature() { + val randomSignature = this.randomSignature() + val record = this.producerRecord() + record.headers().add(MessageSigner.RECORD_HEADER_KEY_SIGNATURE, randomSignature) + + val actualSignatureBefore = record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE).value() + assertThat(actualSignatureBefore).isNotNull().isEqualTo(randomSignature) + + messageSigner.signUsingHeader(record) + + val actualSignatureAfter = record.headers().lastHeader(MessageSigner.RECORD_HEADER_KEY_SIGNATURE).value() + assertThat(actualSignatureAfter).isNotNull().isNotEqualTo(randomSignature) + } + + @Test + fun verifiesMessagesWithValidSignature() { + val message = this.properlySignedMessage() + + val signatureWasVerified = messageSigner.verifyUsingField(message) + + assertThat(signatureWasVerified).isTrue() + } + + @Test + fun verifiesRecordsWithValidSignature() { + val signedRecord = this.properlySignedRecord() + + val signatureWasVerified: Boolean = messageSigner.verifyUsingHeader(signedRecord) + + assertThat(signatureWasVerified).isTrue() + } + + @Test + fun doesNotVerifyMessagesWithoutSignature() { + val messageWrapper = this.messageWrapper() + + val signatureWasVerified = messageSigner.verifyUsingField(messageWrapper) + + assertThat(signatureWasVerified).isFalse() + } + + @Test + fun doesNotVerifyRecordsWithoutSignature() { + val expectedMessage = "This ProducerRecord does not contain a signature header" + val consumerRecord = this.consumerRecord() + + val exception: Exception = org.junit.jupiter.api.Assertions.assertThrows( + IllegalStateException::class.java + ) { + messageSigner.verifyUsingHeader( + consumerRecord + ) + } + val actualMessage = exception.message + + assertThat(actualMessage).contains(expectedMessage) + } + + @Test + fun doesNotVerifyMessagesWithIncorrectSignature() { + val randomSignature = this.randomSignature() + val messageWrapper = this.messageWrapper(randomSignature) + + val signatureWasVerified = messageSigner.verifyUsingField(messageWrapper) + + assertThat(signatureWasVerified).isFalse() + } + + @Test + fun verifiesMessagesPreservingTheSignatureAndItsProperties() { + val message = this.properlySignedMessage() + val originalSignature = message.getSignature() + val originalPosition = originalSignature!!.position() + val originalLimit = originalSignature.limit() + val originalRemaining = originalSignature.remaining() + + messageSigner.verifyUsingField(message) + + val verifiedSignature = message.getSignature() + assertThat(verifiedSignature).isEqualTo(originalSignature) + assertThat(verifiedSignature!!.position()).isEqualTo(originalPosition) + assertThat(verifiedSignature.limit()).isEqualTo(originalLimit) + assertThat(verifiedSignature.remaining()).isEqualTo(originalRemaining) + } + + @Test + fun signingCanBeDisabled() { + val signingDisabledProperties = MessageSigningProperties(signingEnabled = false) + val messageSignerSigningDisabled = MessageSigner(signingDisabledProperties) + + assertThat(messageSignerSigningDisabled.canSignMessages()).isFalse() + assertThat(messageSignerSigningDisabled.canVerifyMessageSignatures()).isFalse() + } + + private fun messageWrapper(): TestableWrapper { + return TestableWrapper() + } + + private fun messageWrapper(signature: ByteArray): TestableWrapper { + val testableWrapper = TestableWrapper() + testableWrapper.setSignature(ByteBuffer.wrap(signature)) + return testableWrapper + } + + private fun properlySignedMessage(): TestableWrapper { + val messageWrapper = this.messageWrapper() + messageSigner.signUsingField(messageWrapper) + return messageWrapper + } + + private fun properlySignedRecord(): ConsumerRecord { + val producerRecord = this.producerRecord() + messageSigner.signUsingHeader(producerRecord) + return this.producerRecordToConsumerRecord(producerRecord) + } + + private fun producerRecordToConsumerRecord(producerRecord: ProducerRecord): ConsumerRecord { + val consumerRecord = + ConsumerRecord(producerRecord.topic(), 0, 123L, producerRecord.key(), producerRecord.value()) + producerRecord.headers().forEach(Consumer { header: Header? -> + consumerRecord.headers().add(header) + }) + return consumerRecord + } + + private fun randomSignature(): ByteArray { + val random: Random = SecureRandom() + val keySize = 2048 + + val signature = ByteArray(keySize / 8) + random.nextBytes(signature) + + return signature + } + + private fun bytes(byteBuffer: ByteBuffer?): ByteArray? { + if (byteBuffer == null) { + return null + } + val bytes = ByteArray(byteBuffer.remaining()) + byteBuffer[bytes] + return bytes + } + + private fun producerRecord(): ProducerRecord { + return ProducerRecord("topic", this.message()) + } + + private fun consumerRecord(): ConsumerRecord { + return ConsumerRecord("topic", 0, 123L, null, this.message()) + } + + private fun message(): Message { + return Message("super special message") + } + + internal class Message(private var message: String?) : SpecificRecordBase() { + + override fun getSchema(): Schema { + return Schema.Parser() + .parse("""{"type":"record","name":"Message","namespace":"com.alliander.osgp.kafka.message.signing","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}""") + } + + override fun get(field: Int): Any { + return message!! + } + + override fun put(field: Int, value: Any) { + this.message = value.toString() + } + } + + private class TestableWrapper : SignableMessageWrapper("Some test message") { + private var signature: ByteBuffer? = null + + override fun toByteBuffer(): ByteBuffer? { + return ByteBuffer.wrap(message.toByteArray(StandardCharsets.UTF_8)) + } + + override fun getSignature(): ByteBuffer? { + return this.signature + } + + override fun setSignature(signature: ByteBuffer?) { + this.signature = signature + } + } +} diff --git a/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningAutoConfigurationTest.kt b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningAutoConfigurationTest.kt new file mode 100644 index 0000000..6474d2e --- /dev/null +++ b/kafka-message-signing/src/test/kotlin/com/gxf/utilities/kafka/message/signing/MessageSigningAutoConfigurationTest.kt @@ -0,0 +1,25 @@ +package com.gxf.utilities.kafka.message.signing + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.TestPropertySource + +@SpringBootTest(classes = [MessageSigningAutoConfiguration::class]) +@EnableAutoConfiguration +@EnableConfigurationProperties(MessageSigningProperties::class) +@TestPropertySource("classpath:/application.yaml") +class MessageSigningAutoConfigurationTest { + @Autowired + private lateinit var messageSigner: MessageSigner + + @Test + fun autoConfigurationIntegrationTest() { + assertThat(messageSigner.signingEnabled).isTrue() + assertThat(messageSigner.canSignMessages()).isTrue() + assertThat(messageSigner.canVerifyMessageSignatures()).isTrue() + } +} diff --git a/kafka-message-signing/src/test/resources/application.yaml b/kafka-message-signing/src/test/resources/application.yaml index fc45440..89fe863 100644 --- a/kafka-message-signing/src/test/resources/application.yaml +++ b/kafka-message-signing/src/test/resources/application.yaml @@ -1,16 +1,9 @@ message-signing: - enabled: true - strip-headers: true - signature: - algorithm: SHA256withRSA - provider: SunRsaSign - key: - algorithm: RSA - size: 2048 - private: /rsa-private.pem - public: /rsa-public.pem - -# To be able to integrate with other systems, a conversion of the private key may be required. -# Converting a PKCS#1 key to PKCS#8 is possible using OpenSSL. -# -# openssl pkcs8 -topk8 -inform PEM -in -out -nocrypt + signing-enabled: true + strip-avro-header: true + algorithm: SHA256withRSA + provider: SunRsaSign + key-algorithm: RSA + key-size: 2048 + private-key-file: classpath:rsa-private.pem + public-key-file: classpath:rsa-public.pem diff --git a/kafka-message-signing/src/test/resources/rsa-private.pem b/kafka-message-signing/src/test/resources/rsa-private.pem index 1f84db9..9e4a2f7 100644 --- a/kafka-message-signing/src/test/resources/rsa-private.pem +++ b/kafka-message-signing/src/test/resources/rsa-private.pem @@ -1,25 +1,28 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC2nu5a6Ys3QHhGLW8UjpBxxpPw -Icuvogm8RBL7q0nZ7YspaZhBRpEb1ygUB9EXNsNcIugf4P7EzgIzozd4oFwCOI/bbPm7oPBAJgXf -l3jrpknxI/TcRJXBNqDopXelRgaK1R8hG5BMf6NSxzUBMQEodNy1W6lL6umtGmLUY09W/kdPue8/ -G7cj0/ftp5gZSA48clOyh//h9x9CRz/4fQeOFpnTCpl+6JYQ7u+FHsCaRfS6ZSymJWjHmjI0YSsj -ykeJ7F17smwlOamGTjRuGePvG5VQgd8MWkvCgYp5PgfsPDgGTMSsnztPum5cfbOuLJV7M1vAjmqc -XlmPkaOLxw3DAgMBAAECggEBAIvl0pjIgkKQW9L+6TJpSFQwmJIDgcMJMcYMrDIpdMjCxbGy19Vh -lrYqK+S0XEQZSq1hfEs3lFP1sRAXv93jkriM1f91SxamYoXx2tv/cL2tRMW7EtBOph4+mCPA5pgw -vcBLJa66K9++g8JdIsjH3qg8Zft0vYuP6PUX2o/ziAsOLqTmlTljRAM86pST9wo4B1CAJpSIUctH -eavbwp8igGgOl5suG8bwiTYKMeY4660nM3ywyl1fLP5k1rPwBOkgoicT6Ky8exjxFduOz0ct77Y+ -sTj7pI+XDFzn2dM2ZlCegURb1sIkQYpMj1Ik9tz0FUvgTd1VLhBeoh5iDMwvHckCgYEA/pGVNmnb -TotR6APaQVKdgPSGk7Q0njWHRSIJBA7ZkjsVTNE39ZnaXdxYWFxIMKPaFQ24N55IH4LGg0tZdF3z -ti615UBN5anPi020LWAgQCDaFVi3LzeYO2g6wArxlDSuKF6Ww0Ch6TtmM4RY6ZEZPmwdjDdY6/sU -ylCNF6j5T20CgYEAt6XJ/POzbUJDKSGBxbifeqF/2P71YMnTpD374dZzMp1zrNnRGGyRRUV+qSWh -uJD4bpzkexxNzQq1UVFK76McTTQKjWuo74QjXTJcsP9LNJ0qMdNzgT6ctUnFCRVFvSoNB3d3x90F -ZORpjmOtz/PUSduKXfnVGW+KyOBD+m2SI+8CgYEAinMtHsHlt0sISdJGkn5XEPpscspwT5c3MX84 -Pg/BfslJZVToRVfermuXVL8jt+h1RDwI857PBOxAAMorJaGvWWcAIGWfuAdpzA5/rqn4AEidszxj -rHdlAPJH+Yg6KOuZyHThM+Hj7RAUHnKdVLJIc22jiE3Vu8n7Xaj/g12v8eUCgYA9SLgFD5Y6ybf7 -y9CwmJGvrKErWrmr2O4liwG5NYUvyNdHQVDDo8c+pJhF/ebf3pDo6LZeVu2nlQE457XoDjhtkwZK -dzji5OegPCQudKM2JZRlGDkdUjWdUcbM5ypkm9nJOhbgvWMFbivDdoQUNzwKgZbFEZAJcu2PZzeI -JHR2RQKBgBNHh7bmzlaxDLbM7lFQEOpxhdA9GCykYOw6rVJAE8Y7yVjY8+haQuoV8/jrtwPdmdBk -S6jWD25tkhGHbaCnhK+wU7++H1QMEpLhQyhmgFDBobfJt1GsbOr2b91tF11N5FKq+IWSyWSU+zS8 -7Vt5sL3neHodJNoZ3bTL2rMjyPml ------END RSA PRIVATE KEY----- - +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDRHNZyEN0Gs8Cb +mDcbQNvlcMYtKgCmYGXrF9T8oQutN4qiNH8KEHLidzi2wFzerxQZ5Py60NKXtHx6 +V2JZNFby9nPaTDOWQgtufBrwWYG2kJUF4JoRUo26b2ysrszBpzY902JNeD0isI2p +rwMQqtI43k/bVja8YlZZ+WBMfbvGRCuQAxuS94J8PykH5j3lsbEPPQ5ueVoK1zHO +VeZV66D5+93U4JQ0bplWCO1Pt3KeMcPuE/Qmq8+c2laAt1dgWdJNPoTnb85UiWEU +kGF82/85Ck636//+guVeG9pCS9AnvfUmXRpQIs/yaifisDQWwmUVLrdSibb1AGOh +Xj/EIiIzAgMBAAECggEAKX7sexQnmCXhab7p6ImVWVFoshusp1dnwiAnjr4yf/wO +kaOTNh8PfSslYPaJN1NMzNqo4DWCJMKXFPv0Mzl0q6AGW2JVtlKCzDjqa5mHaQAF +4nWwfRGBuGZWDWpejWsuE1S4JCFhGFoqsmpdVFhhgRF0jVTtZbKp6g7XcjZdJbmT +VaXMZc4jC9WKGkMVGXepdwool+7dn6Lx3p1kQe1RGLjH6T8SKoTQebv/+9tOZIJd +DrLXNNv2Mdn174II/7214fvBTfKdfHfrdTWx5+/bt6VVvYnFy0XwwY7K6vkdsvr8 +s51XpfCHvPlNvypBKX3jzjIGBpLOeoOWy7uT9qnZ0QKBgQD3mu5tJJxchBnlHMvx +BHFJS9LerCHtOIqKrMJYm29aXUrrlvXSWiUnKL49X3Ec4HJ/J2UJrGU+FhidmpUD +Pz82tzatgj+mc0ttu8nMRvWPMY3pWjVqYitjuu1Zm950M6/5ewmBm54LvMjUpb/q +mkgpjN/S4L3MiIHale7G1jUVKQKBgQDYM9A8lRXIQDg0CWTW2+v1iedFRdOV37ug +HMSmXhLxPRW4Ovv+MckzNt5tP9/FxUqL9HPI68tSPl910ucWV+RsTzFAnbRenxfh +ufTM4t/jr9QpoBmIdd0HAurbPm6bFJR5t+yFpRAFygNNNI2KvwLHm3wGbaV4bFXh +WBSYC9Sr+wKBgF7hHoXyJnyJHkceHsQPeL2mcXpkKWf2Z5g2FQ7RGP1ejGv0X+Qo +KN51jViFLxnqo9U3qk7nobAF1Y7nAjzJSpbT4/7ezzcgLQIymGRPuJ8TVRbbO2a1 +kXzYz0SsJrUObVtDG2Rv1kMXJznc/cqPLiHF2qq69d/fWysAkwvcX2DpAoGBAMX+ +BThFy3Hcpdu5spg1RIxS4vGEdbTt9FQ3pDs7MZustiRGGBl8nGYDXISuFw51k8RB +dHtOvJ8vsZokLy4+BVjO2Sr0gYaZqXQ1KKMtLZ3EmsYX8OoQDf7qxAtNzFrG6QXi +0n69X8P9cTKV6v1h2XjUwirc5Mk4ZRl94TYwt21bAoGAReDu/a3hIyFmlzaMQm27 +Uf9qtW8vVvK++XJIEt9piGgvvrNW6XMKWZY1GspMOD5GBlGw1LvbwsD0MWw6vgHI +yPTDmhx39RE59EgCwwrZ02nlLaZthu+ZWsq+Tfa5TJNUZfTUnBk8g9cOdu8oezPB +w0UW6KTv9fvE8Ub2nZ/lKQM= +-----END PRIVATE KEY----- diff --git a/kafka-message-signing/src/test/resources/rsa-public.pem b/kafka-message-signing/src/test/resources/rsa-public.pem index fd57d14..3dc0fc8 100644 --- a/kafka-message-signing/src/test/resources/rsa-public.pem +++ b/kafka-message-signing/src/test/resources/rsa-public.pem @@ -1,9 +1,9 @@ ------BEGIN RSA PUBLIC KEY----- -MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtp7uWumLN0B4Ri1vFI6QccaT8CHLr6IJ -vEQS+6tJ2e2LKWmYQUaRG9coFAfRFzbDXCLoH+D+xM4CM6M3eKBcAjiP22z5u6DwQCYF35d466ZJ -8SP03ESVwTag6KV3pUYGitUfIRuQTH+jUsc1ATEBKHTctVupS+rprRpi1GNPVv5HT7nvPxu3I9P3 -7aeYGUgOPHJTsof/4fcfQkc/+H0HjhaZ0wqZfuiWEO7vhR7AmkX0umUspiVox5oyNGErI8pHiexd -e7JsJTmphk40bhnj7xuVUIHfDFpLwoGKeT4H7Dw4BkzErJ87T7puXH2zriyVezNbwI5qnF5Zj5Gj -i8cNwwIDAQAB ------END RSA PUBLIC KEY----- - +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA0RzWchDdBrPAm5g3G0Db +5XDGLSoApmBl6xfU/KELrTeKojR/ChBy4nc4tsBc3q8UGeT8utDSl7R8eldiWTRW +8vZz2kwzlkILbnwa8FmBtpCVBeCaEVKNum9srK7Mwac2PdNiTXg9IrCNqa8DEKrS +ON5P21Y2vGJWWflgTH27xkQrkAMbkveCfD8pB+Y95bGxDz0ObnlaCtcxzlXmVeug ++fvd1OCUNG6ZVgjtT7dynjHD7hP0JqvPnNpWgLdXYFnSTT6E52/OVIlhFJBhfNv/ +OQpOt+v//oLlXhvaQkvQJ731Jl0aUCLP8mon4rA0FsJlFS63Uom29QBjoV4/xCIi +MwIDAQAB +-----END PUBLIC KEY-----