-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathput-aws-accounts.py
executable file
·161 lines (141 loc) · 6.61 KB
/
put-aws-accounts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
import argparse
import os
import sys
import re
import json
from schema import Schema, SchemaError, Optional, Use, And
import requests
API_URL = "https://chapi.cloudhealthtech.com/v1/aws_accounts"
SCHEMA = {
# String that specifies the unique display name of an AWS account.
'name': str,
# JSON field that specifies how to authenticate with your AWS accounts. Use IAM Role (recommended) or IAM User (less secure) to authenticate.
'authentication': {
# Authentication protocol. Specify access_key or assume_role
'protocol': And(str, Use(str.lower), lambda s: s in ('access_key', 'assume_role')),
# Access Key ID for IAM User
'access_key': str,
# Access Key secret for IAM User
'secret_key': str,
# Role ARN for IAM Role
'assume_role_arn': str,
# External ID generated by CloudHealth. See How to get External ID.
'assume_role_external_id': str
},
# JSON field that specifies the location of the AWS Detailed Billing Record (DBR).
'billing': {
# Name of S3 bucket containing the DBR
'bucket': str
},
# JSON field that specifies whether CloudHealth should collect CloudTrail Trails and the location of Trail files.
'cloudtrail': {
# Should CloudHealth collect CloudTrail trails? Default value is False
'enabled': bool,
# Name of S3 bucket containing the files
'bucket': str,
# Prefix of Trail files
'prefix': str
},
# JSON field that specifies whether CloudHealth should collect AWS Config files and the location of the files.
'aws_config': {
# Should CloudHealth collect AWS Config files? Default value is False
'enabled': bool,
# Name of S3 bucket containing the files
'bucket': str,
# Prefix of files
'prefix': str
},
# JSON field that specifies whether CloudHealth should collect CloudWatch data.
'cloudwatch': {
# Should CloudHealth collect AWS Config files? Default value is True
'enabled': bool
},
# JSON array that specifies key-value pairs for tags. When you use this field, The API restricts queries to AWS accounts that are tagged with these key-value pairs.
'tags': [{
'key': str,
'value': str
}],
# Boolean field that specifies whether CloudHealth should store public DNS and IP. Default value is True
Optional('hide_public_fields', default=True): bool,
# JSON field that specifies the type of AWS Account. Value can be global (default) or govcloud.
Optional('region', default='global'): And(str, Use(str.lower), lambda s: s in ('global', 'govcloud'))
}
ARGPARSE_HELP_SCHEMA = 'File containing a JSON schema describing the account. Specification at https://apidocs.cloudhealthtech.com/#account_update-existing-aws-account'
def main():
parser = argparse.ArgumentParser(description='Update AWS Account configuration via CloudHealth API.')
parser.add_argument('-i', '--aws-account-id', required=True, type=int, help='AWS account identifier.')
parser.add_argument('-k', '--api-key', required=False, default=os.environ.get("CH_API_KEY", None), help='CloudHealth API key.')
parser.add_argument('-p', '--per-page', required=False, default=30, type=int, help='Number of results per page.')
parser.add_argument('-v', '--verbose', required=False, default=False, action="store_true", help='Set verbose.')
parser.add_argument('schema', nargs='?', type=argparse.FileType('r'), default=sys.stdin, help=ARGPARSE_HELP_SCHEMA)
args = parser.parse_args()
if (args.aws_account_id <= 0 or args.aws_account_id > 999999999999):
print("'{}' is an invalid AWS account identifier.".format(args.aws_account_id), file=sys.stderr)
sys.exit(1)
if args.api_key is None:
print("API key not set. Set environnement variable CH_API_KEY or invoke program with option -k/--api-key", file=sys.stderr)
sys.exit(1)
if re.match("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", args.api_key) is None:
print("API key is not valid. It must be a a globally unique identifier (GUID).", file=sys.stderr)
print("See https://apidocs.cloudhealthtech.com/#how-cloudhealth-validates-api-requests for more details.", file=sys.stderr)
sys.exit(1)
try:
d = json.load(args.schema)
except json.decoder.JSONDecodeError as e:
print("Error in input JSON document at line {0}, column {1}: {2}".format(e.lineno, e.colno, e.msg))
sys.exit(1)
try:
Schema(SCHEMA).validate(d)
except SchemaError as e:
print(e, file=sys.stderr)
sys.exit(1)
if args.per_page <= 0:
print('-p/--per-page argument must be a positive number.', file=sys.stderr)
sys.exit(1)
p = {"page": 1, "per_page": args.per_page}
h = {"Authorization": "Bearer " + args.api_key, "Content-Type": "application/json"}
cht_account_id = None
while True:
try:
response = requests.get(API_URL, headers=h, params=p)
response.raise_for_status()
if args.verbose:
print (response.request.url)
except requests.exceptions.RequestException as e:
print(e, file=sys.stderr)
sys.exit(1)
try:
accounts = json.loads(response.content).get('aws_accounts')
if accounts is None or type(accounts) is not list:
raise ValueError
except (json.decoder.JSONDecodeError, KeyError, ValueError) as e:
print("Malformed CloudHealth API response body.")
sys.exit(1)
if len(accounts) <= 0:
break
p['page'] += 1
for account in accounts:
if account['owner_id'] == format(args.aws_account_id, '012d'):
cht_account_id = account['id']
break
if cht_account_id is None:
print("AWS account '{0}' does not exist in CloudHealth.".format(args.aws_account_id))
sys.exit(1)
try:
response = requests.put("{0}/{1}".format(API_URL, cht_account_id), headers=h, json=d)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(e, file=sys.stderr)
if args.verbose:
print("Request URL", response.request.url, file=sys.stderr)
print("Request body\n", response.request.body, file=sys.stderr)
sys.exit(1)
if (response.ok):
try:
print(json.dumps(json.loads(response.content)))
except json.decoder.JSONDecodeError as e:
print("Malformed CloudHealth API response body.")
sys.exit(1)
if __name__ == "__main__":
main()