-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #5487 from artemislena/master
Add a helper script for generating CAA records
- Loading branch information
Showing
1 changed file
with
122 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
#!/usr/bin/env python3 | ||
# Based on github.com/diafygi/acme-tiny, original copyright: | ||
# Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny | ||
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging | ||
try: | ||
from urllib.request import urlopen, Request # Python 3 | ||
except ImportError: # pragma: no cover | ||
from urllib2 import urlopen, Request # Python 2 | ||
|
||
DEFAULT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory" | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
LOGGER.addHandler(logging.StreamHandler()) | ||
LOGGER.setLevel(logging.INFO) | ||
|
||
def get_id(account_key, log=LOGGER, directory_url=DEFAULT_DIRECTORY_URL, contact=None): | ||
directory, acct_headers, alg, jwk = None, None, None, None # global variables | ||
|
||
# helper functions - base64 encode for jose spec | ||
def _b64(b): | ||
return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") | ||
|
||
# helper function - run external commands | ||
def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): | ||
proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | ||
out, err = proc.communicate(cmd_input) | ||
if proc.returncode != 0: | ||
raise IOError("{0}\n{1}".format(err_msg, err)) | ||
return out | ||
|
||
# helper function - make request and automatically parse json response | ||
def _do_request(url, data=None, err_msg="Error", depth=0): | ||
try: | ||
resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"})) | ||
resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers | ||
except IOError as e: | ||
resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e) | ||
code, headers = getattr(e, "code", None), {} | ||
try: | ||
resp_data = json.loads(resp_data) # try to parse json results | ||
except ValueError: | ||
pass # ignore json parsing errors | ||
if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce": | ||
raise IndexError(resp_data) # allow 100 retrys for bad nonces | ||
if code not in [200, 201, 204]: | ||
raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data)) | ||
return resp_data, code, headers | ||
|
||
# helper function - make signed requests | ||
def _send_signed_request(url, payload, err_msg, depth=0): | ||
payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8')) | ||
new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce'] | ||
protected = {"url": url, "alg": alg, "nonce": new_nonce} | ||
protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']}) | ||
protected64 = _b64(json.dumps(protected).encode('utf8')) | ||
protected_input = "{0}.{1}".format(protected64, payload64).encode('utf8') | ||
out = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], stdin=subprocess.PIPE, cmd_input=protected_input, err_msg="OpenSSL Error") | ||
data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(out)}) | ||
try: | ||
return _do_request(url, data=data.encode('utf8'), err_msg=err_msg, depth=depth) | ||
except IndexError: # retry bad nonces (they raise IndexError) | ||
return _send_signed_request(url, payload, err_msg, depth=(depth + 1)) | ||
|
||
# helper function - poll until complete | ||
def _poll_until_not(url, pending_statuses, err_msg): | ||
result, t0 = None, time.time() | ||
while result is None or result['status'] in pending_statuses: | ||
assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout | ||
time.sleep(0 if result is None else 2) | ||
result, _, _ = _send_signed_request(url, None, err_msg) | ||
return result | ||
|
||
# parse account key to get public key | ||
log.info("Parsing account key...") | ||
out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error") | ||
pub_pattern = r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)" | ||
pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups() | ||
pub_exp = "{0:x}".format(int(pub_exp)) | ||
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp | ||
alg, jwk = "RS256", { | ||
"e": _b64(binascii.unhexlify(pub_exp.encode("utf-8"))), | ||
"kty": "RSA", | ||
"n": _b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), | ||
} | ||
accountkey_json = json.dumps(jwk, sort_keys=True, separators=(',', ':')) | ||
thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest()) | ||
|
||
# get the ACME directory of urls | ||
log.info("Getting directory...") | ||
directory, _, _ = _do_request(directory_url, err_msg="Error getting directory") | ||
log.info("Directory found!") | ||
|
||
# create account and get the global key identifier | ||
log.info("Registering account...") | ||
reg_payload = {"termsOfServiceAgreed": True} if contact is None else {"termsOfServiceAgreed": True, "contact": contact} | ||
account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering") | ||
log.info("Registered!" if code == 201 else "Already registered!") | ||
|
||
return acct_headers['Location'] | ||
|
||
def main(argv=None): | ||
parser = argparse.ArgumentParser( | ||
formatter_class=argparse.RawDescriptionHelpFormatter, | ||
description=textwrap.dedent("""\ | ||
Generate a CAA record for Mailcow. | ||
Example Usage: python mailcow_gencaa.py --account-key data/assets/ssl/acme/account.pem | ||
""") | ||
) | ||
parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key") | ||
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors") | ||
parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt") | ||
parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:[email protected]) for your account-key") | ||
|
||
args = parser.parse_args(argv) | ||
LOGGER.setLevel(args.quiet or LOGGER.level) | ||
id = get_id(args.account_key, log=LOGGER, directory_url=args.directory_url, contact=args.contact) | ||
print("Use this as your CAA record:") | ||
print('issue 128 "letsencrypt.org;accounturi={}"'.format(id)) | ||
|
||
if __name__ == "__main__": # pragma: no cover | ||
main(sys.argv[1:]) |