Skip to content

Commit

Permalink
Added the ability to decrypt AES256 text encrypted by the standard AP…
Browse files Browse the repository at this point in the history
…I, or by NS4Kafka (#5)
  • Loading branch information
yraffin authored Jan 31, 2023
1 parent 6c46427 commit 7cbbbc4
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 20 deletions.
12 changes: 8 additions & 4 deletions doc/config-providers/aes256-config-provider.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ Nice to have when using Kafka Connect in a multi-tenant environment.
config.providers.aes256.class=com.michelin.kafka.AES256ConfigProvider
config.providers.aes256.param.key=0000111122223333
config.providers.aes256.param.salt=0000111122223333
````
2. Provide an API to your users so that they can encrypt their passwords with your key
https://github.com/twobeeb/aes-256-vault-api
````

2. Use NS4Kafka to encrypt your password, or provide an API to your users so that they can encrypt their passwords with your key
* NS4Kafka: https://github.com/michelin/kafkactl/blob/main/README.md
* AES256 API: https://github.com/twobeeb/aes-256-vault-api


3. Keep your key safe !

## Connector configuration

Encode your password using API:
Encode your password using the AES256 API:
````console
curl -X POST http://admin-api/vault -d '{"password": "mypassword"}'
> ${aes256:mfw43l96122yZiDhu2RevQ==}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.types.Password;

import javax.crypto.BadPaddingException;
import javax.crypto.Cipher;
import javax.crypto.IllegalBlockSizeException;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.GCMParameterSpec;
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.SecretKeySpec;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
import java.util.*;

/**
* AES256 encrypted Kafka config provider.
Expand All @@ -33,6 +33,26 @@ public class AES256ConfigProvider implements ConfigProvider {
private static final String AES_KEY_CONFIG = "key";
private static final String SALT_CONFIG = "salt";

/**
* The AES encryption algorithm.
*/
private static final String ENCRYPT_ALGO = "AES/GCM/NoPadding";

/**
* The authentication tag length.
*/
private static final int TAG_LENGTH_BIT = 128;

/**
* The Initial Value length.
*/
private static final int IV_LENGTH_BYTE = 12;

/**
* The NS4KAFKA prefix.
*/
private static final String NS4KAFKA_PREFIX = "NS4K";

/**
* Definition of accepted parameters: key and salt.
*/
Expand Down Expand Up @@ -67,11 +87,10 @@ public ConfigData get(final String pPath) {
@Override
public ConfigData get(final String pPath, final Set<String> pKeys) {
final var decoded = new HashMap<String, String>();
final var cipher = this.getCipher();
pKeys.forEach(key -> {
try {
decoded.put(key, new String(cipher.doFinal(Base64.getDecoder().decode(key)), StandardCharsets.UTF_8));
} catch (IllegalArgumentException | IllegalBlockSizeException | BadPaddingException e) {
decoded.put(key, decrypt(key, this.aesKey, this.salt));
} catch (IllegalArgumentException e) {
throw new ConfigException("Error while decrypting " + key, e);
}
});
Expand All @@ -85,24 +104,106 @@ public void close() {
}

/**
* Gets the cipher instance for decryption.
* Decrypt text with the right algorithm.
*
* @param encryptedText The text to decrypt.
* @param aesKey The encryption key.
* @param salt The encryption salt.
* @return The encrypted password.
*/
public static String decrypt(final String encryptedText, final Password aesKey, final String salt) {
if (encryptedText == null || encryptedText.isEmpty()) {
return encryptedText;
}

final byte[] prefix = NS4KAFKA_PREFIX.getBytes(StandardCharsets.UTF_8);
final var byteBuffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedText));
final byte[] encryptedPrefix = new byte[prefix.length];
byteBuffer.get(encryptedPrefix);

// Check if text has been encoded by NS4KAFKA vault endpoint.
if (Arrays.equals(prefix, encryptedPrefix)) {
return decryptAESFromNs4Kafka(encryptedText, aesKey, salt);
}

return decryptAESFromPreviousAPI(encryptedText, aesKey, salt);
}

/**
* Decrypt text with the given key and salt encoded by the aes256 api.
*
* @return The cipher used to decrypt the keys.
* @param encryptedText The text to decrypt.
* @param aesKey The encryption key.
* @param salt The encryption salt.
* @return The encrypted password.
*/
private Cipher getCipher() {
public static String decryptAESFromPreviousAPI(final String encryptedText, final Password aesKey, final String salt) {
if (encryptedText == null || encryptedText.isEmpty()) {
return encryptedText;
}

try {
final var ivspec = new IvParameterSpec(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0});

final var factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
final var spec = new PBEKeySpec(this.aesKey.value().toCharArray(), this.salt.getBytes(StandardCharsets.UTF_8), 65536, 256);
final var spec = new PBEKeySpec(aesKey.value().toCharArray(), salt.getBytes(StandardCharsets.UTF_8), 65536, 256);
final var tmp = factory.generateSecret(spec);
final var secretKey = new SecretKeySpec(tmp.getEncoded(), "AES");

final var cipher = Cipher.getInstance("AES/CBC/PKCS5PADDING");

cipher.init(Cipher.DECRYPT_MODE, secretKey, ivspec);
return cipher;
return new String(cipher.doFinal(Base64.getDecoder().decode(encryptedText)), StandardCharsets.UTF_8);
} catch (final Exception e) {
throw new ConfigException("Error during Cipher initialization", e);
}
}

/**
* Decrypt text with the given key and salt encoded by NS4Kafka.
*
* @param encryptedText The text to decrypt.
* @param aesKey The encryption key.
* @param salt The encryption salt.
* @return The encrypted password.
*/
public static String decryptAESFromNs4Kafka(final String encryptedText, final Password aesKey, final String salt) {
if (encryptedText == null || encryptedText.isEmpty()) {
return encryptedText;
}

try {
// Get IV and cipherText from encrypted text.
final byte[] prefix = NS4KAFKA_PREFIX.getBytes(StandardCharsets.UTF_8);
final var byteBuffer = ByteBuffer.wrap(Base64.getDecoder().decode(encryptedText));
final byte[] iv = new byte[IV_LENGTH_BYTE];
byteBuffer.position(prefix.length);
byteBuffer.get(iv);
final byte[] cipherText = new byte[byteBuffer.remaining()];
byteBuffer.get(cipherText);

// decrypt the cipher text.
final SecretKey secret = getAESSecretKey(aesKey.value(), salt);
final var cipher = Cipher.getInstance(ENCRYPT_ALGO);
cipher.init(Cipher.DECRYPT_MODE, secret, new GCMParameterSpec(TAG_LENGTH_BIT, iv));
return new String(cipher.doFinal(cipherText), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new ConfigException("An error occurred during Connect cluster AES256 NS4KAFKA string decryption", e);
}
}


/**
* Gets the secret key derived AES 256 bits key
*
* @param key The encryption key
* @param salt The encryption salt
* @return The encryption secret key.
* @throws NoSuchAlgorithmException No such algorithm exception.
* @throws InvalidKeySpecException Invalid key spec exception.
*/
private static SecretKey getAESSecretKey(final String key, final String salt)
throws NoSuchAlgorithmException, InvalidKeySpecException {
var factory = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256");
var spec = new PBEKeySpec(key.toCharArray(), salt.getBytes(StandardCharsets.UTF_8), 65536, 256);
return new SecretKeySpec(factory.generateSecret(spec).getEncoded(), "AES");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,33 @@ void DecryptSuccess() {
}
}

@Test
void DecryptSuccessBothAlgorithm() {
final var originalPassword = "hello !";
final var encodedPassword = "hgkWF2Gp3qPxcPnVifDgJA==";
final var encodedPasswordNS4K1 = "TlM0SxO5N1cXueLtuDRVEBCAacMVm/4dwYN0/SMjKGlnkNXFDDt7";
final var encodedPasswordNS4K2 = "TlM0S9g7l18K6q6pTXs5NGVL2vRVDyHbQ6NDliGPh6UlgL4+6MEX";
try (final var configProvider = new AES256ConfigProvider()) {
final var configs = new HashMap<String, String>();
configs.put("key", "key-aaaabbbbccccdddd");
configs.put("salt", "salt-aaaabbbbccccdddd");
configProvider.configure(configs);

// String encoded = AES256Helper.encrypt("aaaabbbbccccdddd",AES256ConfigProvider.DEFAULT_SALT, originalPassword);
// System.out.println(encoded);

final var rightKeys = new HashSet<String>();
rightKeys.add(encodedPassword);
rightKeys.add(encodedPasswordNS4K1);
rightKeys.add(encodedPasswordNS4K2);

var result = configProvider.get("", rightKeys).data();
assertEquals(originalPassword, result.get(encodedPassword));
assertEquals(originalPassword, result.get(encodedPasswordNS4K1));
assertEquals(originalPassword, result.get(encodedPasswordNS4K2));
}
}

@Test
void MissingConfig_key() {
try (final var configProvider = new AES256ConfigProvider()) {
Expand Down

0 comments on commit 7cbbbc4

Please sign in to comment.