Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes from linters (and some cosmetics) #246

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 44 additions & 43 deletions acme_tiny.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
#!/usr/bin/env python
# Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny
import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging
Copy link
Author

@socketpair socketpair Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy is not used.

import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, textwrap, logging
try:
from urllib.request import urlopen, Request # Python 3
except ImportError:
from urllib2 import urlopen, Request # Python 2

DEFAULT_CA = "https://acme-v02.api.letsencrypt.org" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD
DEFAULT_CA = "https://acme-v02.api.letsencrypt.org" # DEPRECATED! USE DEFAULT_DIRECTORY_URL INSTEAD
DEFAULT_DIRECTORY_URL = "https://acme-v02.api.letsencrypt.org/directory"

LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.StreamHandler())
Copy link
Author

@socketpair socketpair Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding handlers spoils logic in applications that use this file as a library.

LOGGER.setLevel(logging.INFO)


def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None):
directory, acct_headers, alg, jwk = None, None, None, None # global variables
directory, acct_headers, alg, jwk = None, None, None, None # global variables

# helper functions - base64 encode for jose spec
def _b64(b):
Expand All @@ -23,28 +22,28 @@ def _b64(b):
# helper function - run external commands
def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"):
proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate(cmd_input)
stdout, err = proc.communicate(cmd_input)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shadows upper variable

if proc.returncode != 0:
raise IOError("{0}\n{1}".format(err_msg, err))
return out
return stdout

# helper function - make request and automatically parse json response
def _do_request(url, data=None, err_msg="Error", depth=0):
try:
resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"}))
resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
except IOError as e:
resp_data = e.read().decode("utf8") if hasattr(e, "read") else str(e)
code, headers = getattr(e, "code", None), {}
resp_data, resp_code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers
except IOError as error:
resp_data = error.read().decode("utf8") if hasattr(error, "read") else str(error)
resp_code, headers = getattr(error, "code", None), {}
try:
resp_data = json.loads(resp_data) # try to parse json results
resp_data = json.loads(resp_data) # try to parse json results
except ValueError:
pass # ignore json parsing errors
if depth < 100 and code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce":
raise IndexError(resp_data) # allow 100 retrys for bad nonces
if code not in [200, 201, 204]:
raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data))
return resp_data, code, headers
pass # ignore json parsing errors
if depth < 100 and resp_code == 400 and resp_data['type'] == "urn:ietf:params:acme:error:badNonce":
raise IndexError(resp_data) # allow 100 retries for bad nonces
if resp_code not in [200, 201, 204]:
raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, resp_code, resp_data))
return resp_data, resp_code, headers

# helper function - make signed requests
def _send_signed_request(url, payload, err_msg, depth=0):
Expand All @@ -54,27 +53,27 @@ def _send_signed_request(url, payload, err_msg, depth=0):
protected.update({"jwk": jwk} if acct_headers is None else {"kid": acct_headers['Location']})
protected64 = _b64(json.dumps(protected).encode('utf8'))
protected_input = "{0}.{1}".format(protected64, payload64).encode('utf8')
out = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], stdin=subprocess.PIPE, cmd_input=protected_input, err_msg="OpenSSL Error")
data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(out)})
stdout = _cmd(["openssl", "dgst", "-sha256", "-sign", account_key], stdin=subprocess.PIPE, cmd_input=protected_input, err_msg="OpenSSL Error")
data = json.dumps({"protected": protected64, "payload": payload64, "signature": _b64(stdout)})
try:
return _do_request(url, data=data.encode('utf8'), err_msg=err_msg, depth=depth)
except IndexError: # retry bad nonces (they raise IndexError)
except IndexError: # retry bad nonces (they raise IndexError)
return _send_signed_request(url, payload, err_msg, depth=(depth + 1))

# helper function - poll until complete
def _poll_until_not(url, pending_statuses, err_msg):
result, t0 = None, time.time()
while result is None or result['status'] in pending_statuses:
assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout
assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout
time.sleep(0 if result is None else 2)
result, _, _ = _send_signed_request(url, None, err_msg)
return result

# parse account key to get public key
log.info("Parsing account key...")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ellipses and exclamation marks are not a good style in logging messages.

log.info("Parsing account key.")
out = _cmd(["openssl", "rsa", "-in", account_key, "-noout", "-text"], err_msg="OpenSSL Error")
pub_pattern = r"modulus:[\s]+?00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)"
pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE|re.DOTALL).groups()
pub_hex, pub_exp = re.search(pub_pattern, out.decode('utf8'), re.MULTILINE | re.DOTALL).groups()
pub_exp = "{0:x}".format(int(pub_exp))
pub_exp = "0{0}".format(pub_exp) if len(pub_exp) % 2 else pub_exp
alg = "RS256"
Expand All @@ -87,45 +86,45 @@ def _poll_until_not(url, pending_statuses, err_msg):
thumbprint = _b64(hashlib.sha256(accountkey_json.encode('utf8')).digest())

# find domains
log.info("Parsing CSR...")
log.info("Parsing CSR.")
out = _cmd(["openssl", "req", "-in", csr, "-noout", "-text"], err_msg="Error loading {0}".format(csr))
domains = set([])
common_name = re.search(r"Subject:.*? CN\s?=\s?([^\s,;/]+)", out.decode('utf8'))
if common_name is not None:
domains.add(common_name.group(1))
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE|re.DOTALL)
subject_alt_names = re.search(r"X509v3 Subject Alternative Name: (?:critical)?\n +([^\n]+)\n", out.decode('utf8'), re.MULTILINE | re.DOTALL)
if subject_alt_names is not None:
for san in subject_alt_names.group(1).split(", "):
if san.startswith("DNS:"):
domains.add(san[4:])
log.info("Found domains: {0}".format(", ".join(domains)))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.format() formatting in logging is a bad style, at least because formatting occurs even if (according to loglevel) message should not be logged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason GitHub isn't showing me the additions for this line unless I look at the file directly. Just a heads up if anyone else is confused by the seemingly broken change.

log.info("Found domains: %s.", ", ".join(domains))

# get the ACME directory of urls
log.info("Getting directory...")
directory_url = CA + "/directory" if CA != DEFAULT_CA else directory_url # backwards compatibility with deprecated CA kwarg
log.info("Getting directory.")
directory_url = CA + "/directory" if CA != DEFAULT_CA else directory_url # backwards compatibility with deprecated CA kwarg
directory, _, _ = _do_request(directory_url, err_msg="Error getting directory")
log.info("Directory found!")
log.info("Directory found.")

# create account, update contact details (if any), and set the global key identifier
log.info("Registering account...")
log.info("Registering account.")
reg_payload = {"termsOfServiceAgreed": True}
account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering")
log.info("Registered!" if code == 201 else "Already registered!")
log.info("Registered.") if code == 201 else log.info("Already registered.")
if contact is not None:
account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details")
log.info("Updated contact details:\n{0}".format("\n".join(account['contact'])))
log.info("Updated contact details: %s.", "; ".join(account['contact']))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newlines in logging may not work in syslog.


# create a new order
log.info("Creating new order...")
log.info("Creating new order.")
order_payload = {"identifiers": [{"type": "dns", "value": d} for d in domains]}
order, _, order_headers = _send_signed_request(directory['newOrder'], order_payload, "Error creating new order")
log.info("Order created!")
log.info("Order created.")

# get the authorizations that need to be completed
for auth_url in order['authorizations']:
authorization, _, _ = _send_signed_request(auth_url, None, "Error getting challenges")
domain = authorization['identifier']['value']
log.info("Verifying {0}...".format(domain))
log.info("Verifying %s.", domain)

# find the http-01 challenge and write the challenge file
challenge = [c for c in authorization['challenges'] if c['type'] == "http-01"][0]
Expand All @@ -136,8 +135,8 @@ def _poll_until_not(url, pending_statuses, err_msg):
wellknown_file.write(keyauthorization)

# check that the file is in place
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to old code, exception theoretically may happen in .format(). In this case code in the except block will use undefined variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More specifically, it could raise a ValueError, which would be mistakenly caught.

try:
wellknown_url = "http://{0}/.well-known/acme-challenge/{1}".format(domain, token)
assert (disable_check or _do_request(wellknown_url)[0] == keyauthorization)
except (AssertionError, ValueError) as e:
raise ValueError("Wrote file to {0}, but couldn't download {1}: {2}".format(wellknown_path, wellknown_url, e))
Expand All @@ -148,10 +147,10 @@ def _poll_until_not(url, pending_statuses, err_msg):
if authorization['status'] != "valid":
raise ValueError("Challenge did not pass for {0}: {1}".format(domain, authorization))
os.remove(wellknown_path)
log.info("{0} verified!".format(domain))
log.info("Domain %s verified.", domain)

# finalize the order with the csr
log.info("Signing certificate...")
log.info("Signing certificate.")
csr_der = _cmd(["openssl", "req", "-in", csr, "-outform", "DER"], err_msg="DER Export Error")
_send_signed_request(order['finalize'], {"csr": _b64(csr_der)}, "Error finalizing order")

Expand All @@ -162,9 +161,10 @@ def _poll_until_not(url, pending_statuses, err_msg):

# download the certificate
certificate_pem, _, _ = _send_signed_request(order['certificate'], None, "Certificate download failed")
log.info("Certificate signed!")
log.info("Certificate signed.")
return certificate_pem


def main(argv=None):
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
Expand All @@ -183,16 +183,17 @@ def main(argv=None):
parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key")
parser.add_argument("--csr", required=True, help="path to your certificate signing request")
parser.add_argument("--acme-dir", required=True, help="path to the .well-known/acme-challenge/ directory")
parser.add_argument("--quiet", action="store_const", const=logging.ERROR, help="suppress output except for errors")
parser.add_argument("--quiet", action="store_true", help="suppress output except for errors")
parser.add_argument("--disable-check", default=False, action="store_true", help="disable checking if the challenge file is hosted correctly before telling the CA")
parser.add_argument("--directory-url", default=DEFAULT_DIRECTORY_URL, help="certificate authority directory url, default is Let's Encrypt")
parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!")
parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:[email protected]) for your account-key")

args = parser.parse_args(argv)
LOGGER.setLevel(args.quiet or LOGGER.level)
logging.basicConfig(level=logging.ERROR if args.quiet else logging.INFO)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding handlers by hand.

signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact)
sys.stdout.write(signed_crt)

if __name__ == "__main__": # pragma: no cover

if __name__ == "__main__": # pragma: no cover
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, 200 lines limit still honored.

main(sys.argv[1:])