Skip to content

Commit

Permalink
Feature: Certificate passphrase check (#238)
Browse files Browse the repository at this point in the history
#42 
- Introduces a new parameter to cert_info module named
`passphrase_check`
- If `passphrase_check` is set to true, the module will only check if
current passphrase is working and returns the result of it, but will not
return any information about the certificate
- Anyway the result will also be returned if `passphrase_check` is set
to false, but the task will fail in this case
  • Loading branch information
danopt authored Sep 25, 2023
1 parent 96c04c0 commit b410656
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 72 deletions.
18 changes: 18 additions & 0 deletions molecule/plugins/converge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,21 @@
- name: Test no parameters
cert_info:
ignore_errors: true
- name: Test wrong passphrase with passphrase_check parameter
cert_info:
path: files/es-ca/elastic-stack-ca.p12
passphrase: PleaseChangeMe-wrong
passphrase_check: true
register: pass_check
- name: Debug
debug:
msg: "{{ pass_check }}"
- name: Test correct passphrase with passphrase_check parameter
cert_info:
path: files/es-ca/elastic-stack-ca.p12
passphrase: PleaseChangeMe
passphrase_check: true
register: pass_check
- name: Debug
debug:
msg: "{{ pass_check }}"
39 changes: 8 additions & 31 deletions plugins/README.md
Original file line number Diff line number Diff line change
@@ -1,31 +1,8 @@
# Collections Plugins Directory

This directory can be used to ship various plugins inside an Ansible collection. Each plugin is placed in a folder that
is named after the type of plugin it is in. It can also include the `module_utils` and `modules` directory that
would contain module utils and modules respectively.

Here is an example directory of the majority of plugins currently supported by Ansible:

```
└── plugins
├── action
├── become
├── cache
├── callback
├── cliconf
├── connection
├── filter
├── httpapi
├── inventory
├── lookup
├── module_utils
├── modules
├── netconf
├── shell
├── strategy
├── terminal
├── test
└── vars
```

A full list of plugin types can be found at [Working With Plugins](https://docs.ansible.com/ansible-core/2.13/plugins/plugins.html).
# `netways.elasticstack` Plugins Directory

## Overview
- [module_utils](https://github.com/NETWAYS/ansible-collection-elasticstack/tree/main/plugins/module_utils)
- [`certs` module util](https://github.com/NETWAYS/ansible-collection-elasticstack/tree/main/plugins/module_utils)
- [modules](https://github.com/NETWAYS/ansible-collection-elasticstack/tree/main/plugins/modules)
- [`cert_info` module](https://github.com/NETWAYS/ansible-collection-elasticstack/tree/main/plugins/modules)

7 changes: 2 additions & 5 deletions plugins/module_utils/README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
# Documentation: netways.elasticstack module_utils

## Overview
- [`certs` module_util](#cert_info-module)

## `netways.elasticstack.certs` function
## `netways.elasticstack.certs` functions

### `bytes_to_hex()` function

Since binascii.hexlify doesn't support a second parameter, which would define a seperator (e.g. ":") for hex strings in older Python versions like 2.6 and 2.7, we implemeted a small function to get similar results.

**Parameter:** A __bytes__ object that represent a hexadecimal value (e.g. b'\\x82S \\x11\\xc7s\\xa7^*w\\xc1\\xdf\"\\xe4#\\xb4\\xc4P\\xba\\xcf')
**Parameter:** A __bytes__ string that represent a hexadecimal value (e.g. b'\\x82S \\x11\\xc7s\\xa7^*w\\xc1\\xdf\"\\xe4#\\xb4\\xc4P\\xba\\xcf')

**Return:** A hexadecimal __string__ seperated by colons (e.g. "82:53:20:11:C7:73:A7:5E:2A:77:C1:DF:22:E4:23:B4:C4:50:BA:CF")

Expand Down
53 changes: 35 additions & 18 deletions plugins/module_utils/certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ class AnalyzeCertificate():
def __init__(self, module, result):
self.module = module
self.result = result
self.__passphrase_check = self.module.params['passphrase_check']
self.__passphrase = self.module.params['passphrase']
self.__path = self.module.params['path']
self.__cert = None
self.__private_key = None
self.__additional_certs = None
self.load_certificate()
self.load_info()
if not self.__passphrase_check:
self.load_info()

def load_certificate(self):
# track if module can load pkcs12
Expand All @@ -95,7 +97,7 @@ def load_certificate(self):
# read the pkcs12 file
try:
with open(self.__path, 'rb') as f:
pkcs12_data = f.read()
__pkcs12_data = f.read()
except IOError as e:
self.module.fail_json(
msg='IOError: %s' % (to_native(e))
Expand All @@ -104,32 +106,47 @@ def load_certificate(self):
# for cryptography >= 3.1.x
try:
__pkcs12_tuple = pkcs12.load_key_and_certificates(
pkcs12_data,
__pkcs12_data,
to_bytes(self.__passphrase),
)
loaded = True
except ValueError as e:
self.result["passphrase_check"] = False
if self.__passphrase_check:
self.module.exit_json(**self.result)
else:
self.module.fail_json(msg='ValueError: %s' % to_native(e))
except Exception:
self.module.log(
msg="Couldn't load certificate without backend. Trying with backend."
)
# try to load with 3 parameters for
# cryptography >= 2.5.x and <= 3.0.x
if not loaded:
# create backend object
backend = default_backend()
# call load_key_and_certificates with 3 paramters
__pkcs12_tuple = pkcs12.load_key_and_certificates(
pkcs12_data,
to_bytes(self.__passphrase),
backend
)
self.module.log(
msg="Loaded certificate with backend."
)
# map loaded certificate to object
self.__private_key = __pkcs12_tuple[0]
self.__cert = __pkcs12_tuple[1]
self.__additional_certs = __pkcs12_tuple[2]
try:
# create backend object
backend = default_backend()
# call load_key_and_certificates with 3 paramters
__pkcs12_tuple = pkcs12.load_key_and_certificates(
__pkcs12_data,
to_bytes(self.__passphrase),
backend
)
self.module.log(
msg="Loaded certificate with backend."
)
loaded = True
except ValueError as e:
self.result["passphrase_check"] = False
if self.__passphrase_check:
self.module.exit_json(**self.result)
else:
self.module.fail_json(msg='ValueError: %s' % to_native(e))
if loaded and not self.__passphrase_check:
# map loaded certificate to object
self.__private_key = __pkcs12_tuple[0]
self.__cert = __pkcs12_tuple[1]
self.__additional_certs = __pkcs12_tuple[2]

def load_info(self):
self.general_info()
Expand Down
49 changes: 45 additions & 4 deletions plugins/modules/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

## `cert_info` module

The netways.elasticstack.cert_info module gathers information about pkcs12 certificates generated by the Elastic stack cert util.
The netways.elasticstack.cert_info module gathers information about pkcs12 certificates generated by the Elastic Stack cert util.

### Dependencies
- python-cryptography >= 2.5.0 on the remote node
Expand Down Expand Up @@ -71,9 +71,11 @@ Currently, the information of the following extensions and values will be return
`path`:
Absolute path to certificate. (**Default:** undefined, required)

`password`:
The password of the pkcs12 certificate. (**Default:** No default, optional)
`passphrase`:
The passphrase of the pkcs12 certificate. (**Default:** No default, optional)

`passphrase_check`:
This will only check the passphrase and returns a bool in the results. If enabled it won't return any certificate information, only the passphrase_check result. (**Default:** False, optional)

### Returns
All keys and values that will be returned with the results variable of the module:
Expand Down Expand Up @@ -101,12 +103,15 @@ The serial number of the certificate as **str** which represents an integer.
- `critical`: The value of critical as **str** which represents a bool.
- `values`: The keys and their values of the extension as **str**. (See: Supported extensions)

`passphrase_check`:
A **bool** that will be `True` if the passphrase check was positive and `False`, if not. It's also possible that it returns `False` if the certificate is corrupted, since Python can't differentiate it and handles exceptions like this as a "VauleError".

### Example
```
- name: Test
cert_info:
path: /opt/es-ca/elasticsearch-ca.pkcs12
password: PleaseChangeMe
passphrase: PleaseChangeMe
register: test
- name: Debug
Expand Down Expand Up @@ -156,3 +161,39 @@ ok: [localhost] => {
}
}
```

### Example of passphrase_check
```
- name: Test correct passphrase wit passphrase_check parameter
cert_info:
path: /opt/es-ca/elasticsearch-ca.pkcs12
passphrase: PleaseChangeMe
passphrase_check: True
register: test
- name: Debug
debug:
msg: "{{ test }}"
```

**Output**:
```
TASK [Test correct passphrase wit passphrase_check parameter] ******************
ok: [localhost]
TASK [Debug] *******************************************************************
ok: [localhost] => {
"msg": {
"changed": false,
"extensions": {},
"failed": false,
"issuer": "",
"not_valid_after": "",
"not_valid_before": "",
"passphrase_check": true,
"serial_number": "",
"subject": "",
"version": ""
}
}
```
15 changes: 6 additions & 9 deletions plugins/modules/cert_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
def run_module():
module_args = dict(
path=dict(type='str', no_log=True, required=True),
passphrase=dict(type='str', no_log=True, required=False, default=None)
passphrase=dict(type='str', no_log=True, required=False, default=None),
passphrase_check=dict(type='bool', no_log=True, required=False, default=False)
)

# seed the result dict
Expand All @@ -34,7 +35,8 @@ def run_module():
not_valid_before='',
serial_number='',
subject='',
version=''
version='',
passphrase_check=True
)

# the AnsibleModule object
Expand All @@ -47,13 +49,8 @@ def run_module():
if module.check_mode:
module.exit_json(**result)

try:
cert_info = AnalyzeCertificate(module, result)
result = cert_info.return_result()
except ValueError as e:
module.fail_json(msg='ValueError: %s' % to_native(e))
except Exception as e:
module.fail_json(msg='Exception: %s: %s' % (to_native(type(e)), to_native(e)))
cert_info = AnalyzeCertificate(module, result)
result = cert_info.return_result()

module.exit_json(**result)

Expand Down
41 changes: 36 additions & 5 deletions tests/unit/plugins/modules/test_cert_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest.mock import patch
from ansible.module_utils import basic
from ansible.module_utils.common.text.converters import to_bytes

sys.path.append('/home/runner/.ansible/collections/')
from ansible_collections.netways.elasticstack.plugins.modules import cert_info

Expand Down Expand Up @@ -55,6 +56,7 @@ class AnsibleExitJson(Exception):
"""Exception class to be raised by module.exit_json and caught by the test case"""
pass


class AnsibleFailJson(Exception):
"""Exception class to be raised by module.fail_json and caught by the test case"""
pass
Expand All @@ -70,12 +72,22 @@ def exit_json(*args, **kwargs):

checks_passed = True

# check every item in certificate if it matches with the result
# and if that fails, don't catch the Exception, so the test will fail
for item in certificate:
if certificate[item] != kwargs[item]:
# only if passphrase_check mode is disabled
if args[0].params['passphrase_check'] is not True:
# check every item in certificate if it matches with the result
# and if that fails, don't catch the Exception, so the test will fail
for item in certificate:
if certificate[item] != kwargs[item]:
checks_passed = False
# if passphrase_check mode is enabled
else:
# fail checks, if passphrase is wrong and passphrase_check kwarg is not False
if args[0].params['passphrase'] == 'PleaseChangeMe-Wrong' and kwargs['passphrase_check'] is True:
checks_passed = False

# fail checks, if passphrase is correct and passphrase_check kwarg is not True
if args[0].params['passphrase'] == 'PleaseChangeMe' and kwargs['passphrase_check'] is False:
checks_passed = False

if checks_passed:
raise AnsibleExitJson(kwargs)

Expand Down Expand Up @@ -131,6 +143,25 @@ def test_module_exit_when_path_and_password_correct(self):
})
cert_info.main()

# Tests with passphrase_check mode set to True (default is False)
def test_module_exit_when_password_wrong_with_passphrase_check(self):
with self.assertRaises(AnsibleExitJson):
set_module_args({
'path': 'molecule/plugins/files/es-ca/elastic-stack-ca.p12',
'passphrase': 'PleaseChangeMe-Wrong',
'passphrase_check': True
})
cert_info.main()

def test_module_exit_when_password_correct_with_passphrase_check(self):
with self.assertRaises(AnsibleExitJson):
set_module_args({
'path': 'molecule/plugins/files/es-ca/elastic-stack-ca.p12',
'passphrase': 'PleaseChangeMe',
'passphrase_check': True
})
cert_info.main()


if __name__ == '__main__':
unittest.main()

0 comments on commit b410656

Please sign in to comment.