-
Notifications
You must be signed in to change notification settings - Fork 50
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This manager leverages the Samba implementation of certificate autoenrollment. It contains a few patches and fixes on top of the samba#master version of `gp_cert_auto_enroll_ext.py` file, that will ultimately be upstreamed. Autoenrollment is performed via a separate policy manager that runs a helper Python script (`cert-autoenroll`) which communicates with the Windows CEP/CES services through Samba. For better control and to avoid unexpected behavior we vendor the required Samba files, which are confirmed to work on all Ubuntu versions starting with (and including) Jammy (22.04). Samba has its own cache mechanism which stores information concerning the applied GPOs which we are using in order to ensure idempotency. By default, Samba would parse the .reg file itself (see the process_group_policy method from the vendored code). However, it is better to have this functionality entirely within adsys so we can provide samba the pre-parsed list of GPO entries and override the entry point of the extension. This ensures we don't operate on disk files which can change at anytime (even during adsys policy application). Doing this we also have better knowledge on the enabled/disabled state of the GPO entry used to configure the policy. The advanced configuration entries are passed via JSON to the external script which then takes care to create the proper PReg entries that can be used by Samba to apply additional logic when determining the policy servers to use. Fixes UDENG-1056
- Loading branch information
1 parent
fdd9b74
commit 265d8a1
Showing
23 changed files
with
2,384 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
#!/usr/bin/python3 | ||
|
||
import argparse | ||
import json | ||
import os | ||
import sys | ||
import tempfile | ||
|
||
from collections import namedtuple | ||
|
||
from samba import param | ||
from samba.credentials import MUST_USE_KERBEROS, Credentials | ||
from samba.dcerpc import preg | ||
|
||
from vendor_samba.gp.gpclass import GPOStorage | ||
from vendor_samba.gp import gp_cert_auto_enroll_ext as cae | ||
|
||
class adsys_cert_auto_enroll(cae.gp_cert_auto_enroll_ext): | ||
def enroll(self, guid, entries, trust_dir, private_dir, global_trust_dir): | ||
self._gp_cert_auto_enroll_ext__enroll(guid, entries, trust_dir, private_dir, global_trust_dir) | ||
|
||
def unenroll(self, guid): | ||
ca_attrs = self.cache_get_all_attribute_values(guid) | ||
self.clean(guid, remove=list(ca_attrs.keys())) | ||
|
||
def smb_config(realm, enable_debug): | ||
config = "[global]\nrealm = %s\n" % realm | ||
if enable_debug: | ||
config += "log level = 10\n" | ||
return config | ||
|
||
def main(): | ||
parser = argparse.ArgumentParser(description='Certificate autoenrollment via Samba') | ||
parser.add_argument('action', type=str, | ||
help='Action to perform (one of: enroll, unenroll)', | ||
choices=['enroll', 'unenroll']) | ||
parser.add_argument('object_name', type=str, | ||
help='The computer name to enroll/unenroll, e.g. keypress') | ||
parser.add_argument('realm', type=str, | ||
help='The realm of the domain, e.g. example.com') | ||
|
||
parser.add_argument('--policy_servers_json', type=str, | ||
help='GPO entries for advanced configuration of the policy servers. \ | ||
Must be in JSON format.') | ||
parser.add_argument('--state_dir', type=str, | ||
default='/var/lib/adsys', | ||
help='Directory to store GPO state in.') | ||
parser.add_argument('--private_dir', type=str, | ||
default='/var/lib/adsys/private/certs', | ||
help='Directory to store private keys in.') | ||
parser.add_argument('--trust_dir', type=str, | ||
default='/var/lib/adsys/certs', | ||
help='Directory to store trusted certificates in.') | ||
parser.add_argument('--global_trust_dir', type=str, | ||
default='/usr/local/share/ca-certificates', | ||
help='Directory to symlink root CA certificates to.') | ||
parser.add_argument('--debug', action='store_true', | ||
help='Enable samba debug output.') | ||
|
||
args = parser.parse_args() | ||
|
||
state_dir = args.state_dir | ||
trust_dir = args.trust_dir | ||
private_dir = args.private_dir | ||
global_trust_dir = args.global_trust_dir | ||
|
||
# Create needed directories if they don't exist | ||
for directory in [state_dir, trust_dir, private_dir]: | ||
if not os.path.exists(directory): | ||
os.makedirs(directory) | ||
|
||
with tempfile.NamedTemporaryFile(prefix='smb_conf') as smb_conf: | ||
smb_conf.write(smb_config(args.realm, args.debug).encode('utf-8')) | ||
smb_conf.flush() | ||
|
||
lp = param.LoadParm(smb_conf.name) | ||
c = Credentials() | ||
c.set_kerberos_state(MUST_USE_KERBEROS) | ||
c.guess(lp) | ||
username = c.get_username() | ||
store = GPOStorage(os.path.join(state_dir, f'cert_gpo_state_{args.object_name}.tdb')) | ||
|
||
ext = adsys_cert_auto_enroll(lp, c, username, store) | ||
guid = f'adsys-cert-autoenroll-{args.object_name}' | ||
if args.action == 'enroll': | ||
entries = gpo_entries(args.policy_servers_json) | ||
ext.enroll(guid, entries, trust_dir, private_dir, global_trust_dir) | ||
else: | ||
ext.unenroll(guid) | ||
|
||
def gpo_entries(entries_json): | ||
""" | ||
Convert JSON string to list of GPO entries | ||
JSON must be an array of objects with the following keys: | ||
keyname (str): Registry key name | ||
valuename (str): Registry value name | ||
type (int): Registry value type | ||
data (any): Registry value data | ||
Parameters: | ||
entries_json (str): JSON string of GPO entries | ||
Returns: | ||
list: List of GPO entries, or empty list if entries_json is empty | ||
""" | ||
|
||
if not entries_json: | ||
return [] | ||
|
||
entries_dict = json.loads(entries_json) | ||
if not entries_dict: | ||
return [] | ||
|
||
entries = [] | ||
for entry in entries_dict: | ||
try: | ||
e = preg.entry() | ||
e.keyname = entry['keyname'] | ||
e.valuename = entry['valuename'] | ||
e.type = entry['type'] | ||
e.data = entry['data'] | ||
entries.append(e) | ||
except KeyError as exc: | ||
raise ValueError(f'Could not find key {exc} in GPO entry') from exc | ||
except TypeError as exc: | ||
raise ValueError(f'GPO data must be a JSON array of objects') from exc | ||
return entries | ||
|
||
|
||
if __name__ == "__main__": | ||
sys.exit(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package certificate_test | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"os" | ||
"os/exec" | ||
"path/filepath" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"github.com/ubuntu/adsys/internal/testutils" | ||
) | ||
|
||
const advancedConfigurationJSON = `[ | ||
{ | ||
"keyname": "Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\37c9dc30f207f27f61a2f7c3aed598a6e2920b54", | ||
"valuename": "AuthFlags", | ||
"data": 2, | ||
"type": 4 | ||
}, | ||
{ | ||
"keyname": "Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\37c9dc30f207f27f61a2f7c3aed598a6e2920b54", | ||
"valuename": "Cost", | ||
"data": 2147483645, | ||
"type": 4 | ||
}, | ||
{ | ||
"keyname": "Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\37c9dc30f207f27f61a2f7c3aed598a6e2920b54", | ||
"valuename": "Flags", | ||
"data": 20, | ||
"type": 4 | ||
}, | ||
{ | ||
"keyname": "Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\37c9dc30f207f27f61a2f7c3aed598a6e2920b54", | ||
"valuename": "FriendlyName", | ||
"data": "ActiveDirectoryEnrollmentPolicy", | ||
"type": 1 | ||
}, | ||
{ | ||
"keyname": "Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\37c9dc30f207f27f61a2f7c3aed598a6e2920b54", | ||
"valuename": "PolicyID", | ||
"data": "{A5E9BF57-71C6-443A-B7FC-79EFA6F73EBD}", | ||
"type": 1 | ||
}, | ||
{ | ||
"keyname": "Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\37c9dc30f207f27f61a2f7c3aed598a6e2920b54", | ||
"valuename": "URL", | ||
"data": "LDAP:", | ||
"type": 1 | ||
}, | ||
{ | ||
"keyname": "Software\\Policies\\Microsoft\\Cryptography\\PolicyServers", | ||
"valuename": "Flags", | ||
"data": 0, | ||
"type": 4 | ||
} | ||
]` | ||
|
||
func TestCertAutoenrollScript(t *testing.T) { | ||
t.Parallel() | ||
|
||
coverageOn := testutils.PythonCoverageToGoFormat(t, "cert-autoenroll", false) | ||
certAutoenrollCmd := "./cert-autoenroll" | ||
if coverageOn { | ||
certAutoenrollCmd = "cert-autoenroll" | ||
} | ||
|
||
compactedJSON := &bytes.Buffer{} | ||
err := json.Compact(compactedJSON, []byte(advancedConfigurationJSON)) | ||
require.NoError(t, err, "Failed to compact JSON") | ||
|
||
// Setup samba mock | ||
pythonPath, err := filepath.Abs("../../testutils/admock") | ||
require.NoError(t, err, "Setup: Failed to get current absolute path for mock") | ||
|
||
tests := map[string]struct { | ||
args []string | ||
|
||
readOnlyPath bool | ||
autoenrollError bool | ||
|
||
wantErr bool | ||
}{ | ||
"Enroll with simple configuration": {args: []string{"enroll", "keypress", "example.com"}}, | ||
"Enroll with simple configuration and debug enabled": {args: []string{"enroll", "keypress", "example.com", "--debug"}}, | ||
"Enroll with empty advanced configuration": {args: []string{"enroll", "keypress", "example.com", "--policy_servers_json", "null"}}, | ||
"Enroll with valid advanced configuration": {args: []string{"enroll", "keypress", "example.com", "--policy_servers_json", compactedJSON.String()}}, | ||
|
||
"Unenroll": {args: []string{"unenroll", "keypress", "example.com"}}, | ||
|
||
// Error cases | ||
"Error on missing arguments": {args: []string{"enroll"}, wantErr: true}, | ||
"Error on invalid flags": {args: []string{"enroll", "keypress", "example.com", "--invalid_flag"}, wantErr: true}, | ||
"Error on invalid JSON": {args: []string{"enroll", "keypress", "example.com", "--policy_servers_json", "invalid_json"}, wantErr: true}, | ||
"Error on invalid JSON keys": { | ||
args: []string{"enroll", "keypress", "example.com", "--policy_servers_json", `[{"key":"Software\\Policies\\Microsoft","value":"MyValue"}]`}, wantErr: true}, | ||
"Error on invalid JSON structure": { | ||
args: []string{"enroll", "keypress", "example.com", "--policy_servers_json", `{"key":"Software\\Policies\\Microsoft","value":"MyValue"}`}, wantErr: true}, | ||
"Error on read-only path": {readOnlyPath: true, args: []string{"enroll", "keypress", "example.com"}, wantErr: true}, | ||
"Error on enroll failure": {autoenrollError: true, args: []string{"enroll", "keypress", "example.com"}, wantErr: true}, | ||
"Error on unenroll failure": {autoenrollError: true, args: []string{"unenroll", "keypress", "example.com"}, wantErr: true}, | ||
} | ||
|
||
for name, tc := range tests { | ||
tc := tc | ||
name := name | ||
t.Run(name, func(t *testing.T) { | ||
t.Parallel() | ||
|
||
tmpdir := t.TempDir() | ||
stateDir := filepath.Join(tmpdir, "state") | ||
privateDir := filepath.Join(tmpdir, "private") | ||
trustDir := filepath.Join(tmpdir, "trust") | ||
globalTrustDir := filepath.Join(tmpdir, "ca-certificates") | ||
|
||
if tc.readOnlyPath { | ||
testutils.MakeReadOnly(t, tmpdir) | ||
} | ||
|
||
args := append(tc.args, "--state_dir", stateDir, "--private_dir", privateDir, "--trust_dir", trustDir, "--global_trust_dir", globalTrustDir) | ||
|
||
cmd := exec.Command(certAutoenrollCmd, args...) | ||
cmd.Env = append(os.Environ(), "PYTHONPATH="+pythonPath) | ||
if tc.autoenrollError { | ||
cmd.Env = append(os.Environ(), "ADSYS_WANT_AUTOENROLL_ERROR=1") | ||
} | ||
out, err := cmd.CombinedOutput() | ||
if tc.wantErr { | ||
require.Error(t, err, "cert-autoenroll should have failed but didn’t") | ||
return | ||
} | ||
require.NoErrorf(t, err, "cert-autoenroll should have exited successfully: %s", string(out)) | ||
|
||
got := strings.ReplaceAll(string(out), tmpdir, "#TMPDIR#") | ||
want := testutils.LoadWithUpdateFromGolden(t, got) | ||
require.Equal(t, want, got, "Unexpected output from cert-autoenroll script") | ||
}) | ||
} | ||
} |
Oops, something went wrong.