-
Notifications
You must be signed in to change notification settings - Fork 19
/
filter-ip-ranges
executable file
·185 lines (153 loc) · 6.28 KB
/
filter-ip-ranges
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
# Parse, filter and lookup the published Amazon IP Ranges.
# Useful for creating Security Groups and other firewall rules.
# E.g. incoming only from CloudFront
# or outgoing only to non-EC2 ranges in US-WEST-1 region
# Author: Michael Ludvig <[email protected]>
import json
import sys
import argparse
import socket
import ipaddress
from httplib2 import Http
def fatal(message):
print("ERROR: %s" % message, file=sys.stderr)
sys.exit(1)
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter)
parser.description="""\
Parse and Filter the published Amazon IP Ranges.
"""
parser.add_argument('--url', '-u', metavar="URL", default="https://ip-ranges.amazonaws.com/ip-ranges.json", help='URL for ip-ranges.json download. Default: %(default)s')
parser.add_argument('--file', '-f', type=argparse.FileType('r'), help='Path to ip-ranges.json. Overrides --url')
parser.add_argument('--min-serial', type=int, metavar="SERIAL", help='Only process the file id serial number (aka syncToken) is greater than SERIAL. No output if less or equal.')
parser.add_argument('--ipv4', '-4', action="store_true", help='Lookup / display only IPv4 addresses. By default both IPv4 and IPv6 are considered.')
parser.add_argument('--ipv6', '-6', action="store_true", help='Lookup / display only IPv6 addresses. By default both IPv4 and IPv6 are considered.')
parser.add_argument('--quiet', action="store_true", help='Do not print any extra info, e.g. serial number, etc.')
parser.add_argument('--verbose', '-v', action="store_true", help='Print the Region and Services along with each IP range.')
parser.usage = parser.format_usage().rstrip()[7:]
parser.usage += " [FILTER [...]]"
parser.epilog = """
FILTER Syntax
The IP Range prefixes can be filtered based on Services and Regions.
Keywords can be combined, for example: +AMAZON -EC2 us-west-1
+KEYWORD Include prefixes with matching keyword (AMAZON).
The '+' sign is optional, +AMAZON and AMAZON works the same.
-KEYWORD Exclude prefixes with service or region matching KEYWORD.
=KEYWORD Include prefixes that have only one service matching KEYWORD.
Sometimes prefixes overlap and belong to multiple services.
The KEYWORD can be a Service (e.g. AMAZON, EC2, ...) or Region (us-west-1,
ap-southeast-2, GLOBAL).
KEYWORD can also be an IP address in which case the matching subnet
will be selected.
Supports both IPv4 (default) and IPv6 addresses (with --ipv6 parameter).
EXAMPLE
$ filter-ip-ranges ap-southeast-2 =AMAZON
52.119.210.0/23
54.239.0.112/28
52.144.224.64/26
54.240.204.0/22
...
$ filter-ip-ranges -v 52.119.211.123
52.119.210.0/23 ap-southeast-2 AMAZON
AUTHOR
Michael Ludvig -- https://aws.nz
"""
args, extra = parser.parse_known_args()
if args.file:
try:
ipranges = json.load(args.file)
except ValueError:
fatal("File is not ip-ranges.json")
else:
try:
resp, content = Http().request(args.url)
if resp.status != 200:
fatal("Unable to load %s - %d %s" % (args.url, resp.status, resp.reason))
content = content.decode('latin1')
ipranges = json.loads(content)
except Exception as e:
fatal("Unable to load %s - %s" % (args.url, e))
if len(ipranges['prefixes']) + len(ipranges['ipv6_prefixes']) < 1:
fatal("No prefixes found")
if args.min_serial and int(ipranges['syncToken']) <= args.min_serial:
# Serial number is not greater than required by --min-serial=NNN
sys.exit(0)
def list_prefixes(ipv4=True):
if ipv4:
address_family = socket.AF_INET
prefixes_label = 'prefixes'
ip_prefix_label = 'ip_prefix'
else:
address_family = socket.AF_INET6
prefixes_label = 'ipv6_prefixes'
ip_prefix_label = 'ipv6_prefix'
pfx_dict = {}
for prefix in ipranges[prefixes_label]:
ip_prefix = prefix[ip_prefix_label]
if ip_prefix not in pfx_dict:
pfx_dict[ip_prefix] = {}
pfx_dict[ip_prefix]['net'] = ip_prefix
pfx_dict[ip_prefix]['rgn'] = prefix['region']
pfx_dict[ip_prefix]['svc'] = [ prefix['service'] ]
else:
pfx_dict[ip_prefix]['svc'].append(prefix['service'])
pfx_vals = list(pfx_dict.values())
pfx_vals = sorted(pfx_vals, key=lambda x: socket.inet_pton(address_family, x['net'].split('/')[0]))
return pfx_vals
prefixes = []
if not args.ipv6:
prefixes.extend(list_prefixes(ipv4=True))
if not args.ipv4:
prefixes.extend(list_prefixes(ipv4=False))
ips = []
for xarg in extra:
try:
# If xarg is an IP address we store it for later
# and move on to the next xarg
ips.append(ipaddress.ip_network(xarg))
continue
except ValueError:
pass
_pfx = []
if xarg.startswith('='):
# Filter records that have ONLY this service/region
_arg = xarg[1:]
for prefix in prefixes:
if prefix['svc'].count(_arg) and len(prefix['svc']) == 1:
_pfx.append(prefix)
elif prefix['rgn'] == _arg:
_pfx.append(prefix)
elif xarg.startswith('-'):
# Exclude this service/region
_arg = xarg[1:]
for prefix in prefixes:
if prefix['svc'].count(_arg) == 0 and prefix['rgn'] != _arg:
_pfx.append(prefix)
else:
# Include this service/region
_arg = xarg.startswith('+') and xarg[1:] or xarg
for prefix in prefixes:
if prefix['svc'].count(_arg):
_pfx.append(prefix)
elif prefix['rgn'] == _arg:
_pfx.append(prefix)
prefixes = _pfx
if not args.quiet:
print('# SERIAL=%s' % ipranges['syncToken'])
# Now it's the time to process the IPs found in 'extra', if any
if ips:
_pfx = []
for ip in ips:
for prefix in prefixes:
net = ipaddress.ip_network(prefix['net'])
if net.overlaps(ip):
_pfx.append(prefix)
prefixes = _pfx
if args.verbose and not args.quiet:
print('# %d prefixes found / %d prefixes matching' % (
len(ipranges['prefixes']), len(prefixes)))
for prefix in prefixes:
if args.verbose:
print("%s %s %s" % (prefix['net'], prefix['rgn'], " ".join(prefix['svc'])))
else:
print("%s" % prefix['net'])