forked from cyberark/BlobHunter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BlobHunter.py
222 lines (161 loc) · 7.86 KB
/
BlobHunter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import azure.core.exceptions
from datetime import date
from azure.identity import AzureCliCredential
from azure.mgmt.resource import SubscriptionClient, ResourceManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.blob import BlobServiceClient, ContainerClient
import subprocess
import csv
import os
ENDPOINT_URL = '{}.blob.core.windows.net'
CONTAINER_URL = '{}.blob.core.windows.net/{}/'
EXTENSIONS = ["txt", "csv", "pdf", "docx", "xlsx"]
def get_credentials():
try:
username = subprocess.check_output("az account show --query user.name", shell=True,
stderr=subprocess.DEVNULL).decode("utf-8")
except subprocess.CalledProcessError:
subprocess.check_output("az login", shell=True, stderr=subprocess.DEVNULL)
username = subprocess.check_output("az account show --query user.name", shell=True,
stderr=subprocess.DEVNULL).decode("utf-8")
print("[+] Logged in as user {}".format(username.replace('"', '').replace("\n", '')), flush=True)
return AzureCliCredential()
def get_tenants_and_subscriptions(creds):
subscription_client = SubscriptionClient(creds)
tenants_ids = list()
tenants_names = list()
subscriptions_ids = list()
subscription_names = list()
for sub in subscription_client.subscriptions.list():
tenants_ids.append(sub.tenant_id)
subscriptions_ids.append(sub.id[15:])
subscription_names.append(sub.display_name)
# Getting tenant name from given tenant id
for ten_id in tenants_ids:
for ten in subscription_client.tenants.list():
if ten_id == ten.id[9:]:
tenants_names.append(ten.display_name)
return tenants_ids, tenants_names, subscriptions_ids, subscription_names
def check_storage_account(account_name, key):
blob_service_client = BlobServiceClient(ENDPOINT_URL.format(account_name), credential=key)
containers = blob_service_client.list_containers(timeout=15)
public_containers = list()
for cont in containers:
if cont.public_access is not None:
public_containers.append(cont)
return public_containers
def check_subscription(tenant_id, tenant_name, sub_id, sub_name, creds):
print("\n\t[*] Checking subscription {}:".format(sub_name), flush=True)
storage_client = StorageManagementClient(creds, sub_id)
# Obtain the management object for resources
resource_client = ResourceManagementClient(creds, sub_id)
# Retrieve the list of resource groups
group_list = resource_client.resource_groups.list()
resource_groups = [group.name for group in list(group_list)]
print("\t\t[+] Found {} resource groups".format(len(resource_groups)), flush=True)
group_to_names_dict = {group: dict() for group in resource_groups}
accounts_counter = 0
for group in resource_groups:
for item in storage_client.storage_accounts.list_by_resource_group(group):
accounts_counter += 1
group_to_names_dict[group][item.name] = ''
print("\t\t[+] Found {} storage accounts".format(accounts_counter), flush=True)
for group in resource_groups:
for account in group_to_names_dict[group].keys():
try:
storage_keys = storage_client.storage_accounts.list_keys(group, account)
storage_keys = {v.key_name: v.value for v in storage_keys.keys}
group_to_names_dict[group][account] = storage_keys['key1']
except azure.core.exceptions.HttpResponseError:
print("\t\t[-] User do not have permissions to retrieve storage accounts keys in the given"
" subscription", flush=True)
print("\t\t Can not scan storage accounts", flush=True)
return
output_list = list()
for group in resource_groups:
for account in group_to_names_dict[group].keys():
key = group_to_names_dict[group][account]
public_containers = check_storage_account(account, key)
for cont in public_containers:
access_level = cont.public_access
container_client = ContainerClient(ENDPOINT_URL.format(account), cont.name, credential=key)
files = [f.name for f in container_client.list_blobs()]
ext_dict = count_files_extensions(files, EXTENSIONS)
row = [tenant_id, tenant_name, sub_id, sub_name, group, account, cont.name, access_level,
CONTAINER_URL.format(account, cont.name), len(files)]
for ext in ext_dict.keys():
row.append(ext_dict[ext])
output_list.append(row)
print("\t\t[+] Scanned all storage accounts successfully", flush=True)
if len(output_list) > 0:
print("\t\t[+] Found {} PUBLIC containers".format(len(output_list)), flush=True)
else:
print("\t\t[+] No PUBLIC containers found")
header = ["Tenant ID", "Tenant Name", "Subscription ID", "Subscription Name", "Resource Group", "Storage Account", "Container",
"Public Access Level", "URL", "Total Files"]
for ext in EXTENSIONS:
header.append(ext)
header.append("others")
write_csv('public-containers-{}.csv'.format(date.today()), header, output_list)
def delete_csv():
for file in os.listdir("."):
if os.path.isfile(file) and file.startswith("public"):
os.remove(file)
def write_csv(file_name, header, rows):
file_exists = os.path.isfile(file_name)
with open(file_name, 'a', newline='', encoding="utf-8") as csv_file:
writer = csv.writer(csv_file)
if not file_exists:
writer.writerow(header)
for r in rows:
writer.writerow(r)
def count_files_extensions(files, extensions):
counter_dict = dict()
others_cnt = 0
for extension in extensions:
counter_dict[extension] = 0
for f_name in files:
in_extensions = False
for extension in extensions:
if f_name.endswith(extension):
in_extensions = True
counter_dict[extension] += 1
break
if not in_extensions:
if f_name.endswith("doc"):
counter_dict['docx'] += 1
elif f_name.endswith("xls"):
counter_dict['xlsx'] += 1
else:
others_cnt += 1
counter_dict['other'] = others_cnt
return counter_dict
def print_logo():
logo = '''
-------------------------------------------------------------
______ _ _ _ _ _
| ___ \ | | | | | | | | |
| |_/ / | ___ | |__ | |_| |_ _ _ __ | |_ ___ _ __
| ___ \ |/ _ \| '_ \| _ | | | | '_ \| __/ _ \ '__|
| |_/ / | (_) | |_) | | | | |_| | | | | || __/ |
\____/|_|\___/|_.__/\_| |_/\__,_|_| |_|\__\___|_|
-------------------------------------------------------------
Author: Daniel Niv
-------------------------------------------------------------
'''
print(logo, flush=True)
def main():
print_logo()
credentials = get_credentials()
delete_csv()
if credentials is None:
print("[-] Unable to login to a valid Azure user", flush=True)
return
tenants_ids, tenants_names, subs_ids, subs_names = get_tenants_and_subscriptions(credentials)
print("[+] Found {} subscriptions".format(len(subs_ids)), flush=True)
for i in range(0, len(subs_ids)):
check_subscription(tenants_ids[i], tenants_names[i], subs_ids[i], subs_names[i], credentials)
print("\n[+] Scanned all subscriptions successfully", flush=True)
print("[+] Check out public-containers-{}.csv file for a fully detailed report".format(date.today()), flush=True)
if __name__ == '__main__':
main()