forked from mludvig/aws-utils
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ssm-session
executable file
·238 lines (182 loc) · 8.76 KB
/
ssm-session
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/env python3
# Convenience wrapper around 'aws ssm start-session'
# can resolve instance id from Name tag, hostname, IP address, etc.
#
# See https://aws.nz/aws-utils/ssm-session for more info.
#
# Author: Michael Ludvig (https://aws.nz)
# The script can list available instances, resolve instance names,
# and host names, etc. In the end it executes 'aws' to actually
# start the session.
import os
import sys
import re
import logging
import argparse
import botocore.exceptions
import botocore.credentials
import botocore.session
import boto3
def configure_logging(level):
streamHandler = logging.StreamHandler()
formatter = logging.Formatter(
"[%(name)s] %(levelname)s: %(message)s"
)
streamHandler.setFormatter(formatter)
logger = logging.getLogger("ssm-session")
logger.setLevel(level)
logger.addHandler(streamHandler)
logger.debug("Logging level set to DEBUG")
return logger
def parse_args(argv):
"""
Parse command line arguments.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
group_general = parser.add_argument_group('General Options')
group_general.add_argument('--profile', '-p', dest='profile', type=str, help='Configuration profile from ~/.aws/{credentials,config}')
group_general.add_argument('--region', '-r', dest='region', type=str, help='Set / override AWS region.')
group_general.add_argument('--verbose', '-v', action='store_const', dest='log_level', const=logging.INFO, default=logging.WARN, help='Increase log_level level')
group_general.add_argument('--debug', '-d', action='store_const', dest='log_level', const=logging.DEBUG, help='Increase log_level level')
group_general.add_argument('--help', '-h', action="help", help='Print this help and exit')
group_instance_wrapper = parser.add_argument_group('Instance Selection')
group_instance = group_instance_wrapper.add_mutually_exclusive_group(required=True)
group_instance.add_argument('INSTANCE', nargs='?', help='Instance ID, Name, Host name or IP address')
group_instance.add_argument('--list', '-l', dest='list', action="store_true", help='List instances available for SSM Session')
parser.description = 'Start SSM Shell Session to a given instance'
parser.epilog = f'''
IMPORTANT: instances must be registered in AWS Systems Manager (SSM)
before you can start a shell session! Instances not registered in SSM
will not be recognised by {parser.prog} nor show up in --list output.
Visit https://aws.nz/aws-utils/ssm-session for more info and usage examples.
Author: Michael Ludvig
'''
# Parse supplied arguments
return parser.parse_known_args(argv)
def update_env(env, var_names, var_value):
if var_names and var_value:
for var_name in var_names.split(','):
env[var_name] = var_value
def start_session(instance_id, extras, profile=None, region=None):
extra_args = ""
if profile:
extra_args += f"--profile {profile} "
if region:
extra_args += f"--region {region} "
parameters = ""
for extra in extras:
parameters += f" {extra}"
command = f'aws {extra_args} ssm start-session --target {instance_id}{parameters}'
logger.info("Running: %s", command)
os.system(command)
class InstanceResolver():
def __init__(self, args):
# aws-cli compatible MFA cache
cli_cache = os.path.join(os.path.expanduser('~'),'.aws/cli/cache')
# Construct boto3 session with MFA cache
session = boto3.session.Session(profile_name=args.profile, region_name=args.region)
session._session.get_component('credential_provider').get_provider('assume-role').cache = botocore.credentials.JSONFileCache(cli_cache)
# Create boto3 clients from session
self.ssm_client = session.client('ssm')
self.ec2_client = session.client('ec2')
def get_list(self):
def _try_append(_list, _dict, _key):
if _key in _dict:
_list.append(_dict[_key])
items = {}
# List instances from SSM
logger.debug("Fetching SSM inventory")
inventory_paginator = self.ssm_client.get_paginator('get_inventory')
inventory_iterator = inventory_paginator.paginate()
for inventory in inventory_iterator:
for entity in inventory["Entities"]:
try:
content = entity['Data']['AWS:InstanceInformation']["Content"][0]
# At the moment we only support EC2 Instances
assert content["ResourceType"] == "EC2Instance"
# Ignore Terminated instances
if content.get("InstanceStatus") == "Terminated":
logger.debug("Ignoring terminated instance: %s", entity)
continue
# Add to the list
instance_id = content['InstanceId']
items[instance_id] = {
"InstanceId": instance_id,
"HostName": content.get("ComputerName"),
}
logger.debug("Added instance: %s: %r", instance_id, items[instance_id])
except (KeyError, ValueError):
logger.debug("SSM inventory entity not recognised: %s", entity)
continue
# Add attributes from EC2
reservations_paginator = self.ec2_client.get_paginator('describe_instances')
reservations_iterator = reservations_paginator.paginate(InstanceIds=list(items.keys()))
for reservations in reservations_iterator:
for reservation in reservations['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
if not instance_id in items:
continue
# Find instance IPs
items[instance_id]['Addresses'] = []
_try_append(items[instance_id]['Addresses'], instance, 'PrivateIpAddress')
_try_append(items[instance_id]['Addresses'], instance, 'PublicIpAddress')
# Find instance name from tag Name
items[instance_id]['InstanceName'] = None
for tag in instance['Tags']:
if tag['Key'] == 'Name':
items[instance_id]['InstanceName'] = tag['Value']
logger.debug("Updated instance: %s: %r", instance_id, items[instance_id])
return items
def print_list(self):
hostname_len = 0
instname_len = 0
items = self.get_list().values()
if not items:
logger.warning("No instances registered in SSM!")
return
items = list(items)
items.sort(key=lambda x: x.get('InstanceName') or x.get('HostName'))
for item in items:
hostname_len = max(hostname_len, len(item['HostName']))
instname_len = max(instname_len, len(item['InstanceName']))
for item in items:
print(f"{item['InstanceId']} {item['HostName']:{hostname_len}} {item['InstanceName']:{instname_len}} {' '.join(item['Addresses'])}")
def resolve_instance(self, instance):
# Is it a valid Instance ID?
if re.match('^i-[a-f0-9]+$', instance):
return instance
# It is not - find it in the list
instances = []
items = self.get_list()
for instance_id in items:
item = items[instance_id]
if instance.lower() in [item['HostName'].lower(), item['InstanceName'].lower()] + item['Addresses']:
instances.append(instance_id)
if not instances:
return None
if len(instances) > 1:
logger.warning("Found %d instances for '%s': %s", len(instances), instance, " ".join(instances))
logger.warning("Use INSTANCE_ID to connect to a specific one")
quit(1)
# Found only one instance - return it
return instances[0]
if __name__ == "__main__":
## Split command line to main args and optional command to run
args, extras = parse_args(sys.argv[1:])
logger = configure_logging(args.log_level)
try:
instance = None
if args.list:
InstanceResolver(args).print_list()
quit(0)
instance = InstanceResolver(args).resolve_instance(args.INSTANCE)
if not instance:
logger.warning("Could not resolve Instance ID for '%s'", args.INSTANCE)
logger.warning("Perhaps the '%s' is not registered in SSM?", args.INSTANCE)
quit(1)
start_session(instance, extras, profile=args.profile, region=args.region)
except (botocore.exceptions.BotoCoreError,
botocore.exceptions.ClientError) as e:
logger.error(e)
quit(1)