-
Notifications
You must be signed in to change notification settings - Fork 5
/
group.py
133 lines (112 loc) · 5.75 KB
/
group.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/python3
# Cisco Cyber Vision V4.0
# Group Management
import argparse
import requests
import json
import csv
import sys
import cvconfig
import api
def main():
parser = argparse.ArgumentParser(prog="group.py",
description="Group Management")
# Options parsing
parser.add_argument("--token", dest="token", help="Use this token")
parser.add_argument("--center-ip", dest="center_ip",
help="Specified the center FQDN or IPv4 address"
" (default:'cybervision')")
parser.add_argument("--center-port", dest="center_port",
help="Specified the center port (default: %d)"%cvconfig.center_port,
default=cvconfig.center_port)
parser.add_argument("--proxy", dest="proxy",
help="Specified the proxy to use (default: %s)"%cvconfig.proxy,
default=cvconfig.proxy)
parser.add_argument("--encoding", dest="csv_encoding",
help="CSV file encoding, default is %s" % cvconfig.csv_encoding)
parser.add_argument("--delimiter", dest="csv_delimiter",
help="CSV file delimiter, default is %s" % cvconfig.csv_delimiter)
parser.add_argument("--filename", dest="filename", help="Use this filename", default="groups.csv")
# Main Command Parsing
command_group = parser.add_mutually_exclusive_group()
command_group.add_argument("--export",
help="Export all groups into a CSV file\n",
action="store_true", default=False, dest="command_export")
command_group.add_argument("--import",
help="Create groups from a CSV file\n",
action="store_true", default=False, dest="command_import")
command_group.add_argument("--delete",
help="Delete all groups of the Cyber Vision center\n",
action="store_true", default=False, dest="command_delete")
args = parser.parse_args()
# Handle Cybervision configuration
token = set_conf(args.token, cvconfig.token)
center_ip = set_conf(args.center_ip, cvconfig.center_ip)
center_port = set_conf(args.center_port, cvconfig.center_port)
proxy = set_conf(args.proxy, cvconfig.proxy)
csv_encoding = set_conf(args.csv_encoding, cvconfig.csv_encoding)
csv_delimiter = set_conf(args.csv_delimiter, cvconfig.csv_delimiter)
if not token or not center_ip:
print("TOKEN and CENTER_IP are mandatory, check cvconfig.py or us --token/--center-ip")
if args.command_export:
return group_export(center_ip, center_port, token, proxy, args.filename,csv_delimiter, csv_encoding)
elif args.command_import:
return group_import(center_ip, center_port, token, proxy, args.filename, csv_delimiter, csv_encoding)
elif args.command_delete:
return group_delete_all(center_ip,center_port,token, proxy)
parser.print_help()
def set_conf(arg,conf):
if arg and arg != conf:
return arg
return conf
def group_delete_all(center_ip, center_port, token, proxy):
with api.APISession(center_ip, center_port, token, proxy) as session:
groups = api.get_route(session, '/api/3.0/groups')
for group in groups:
print(f"LOG: Deleting {group['label']}")
ret = session.delete(f"/api/1.0/group/{group['id']}")
if (ret.status_code != 200):
print(f"ERROR: Group [{group['label']}][{group['id']}] was not deleted, return code: {ret.status_code}")
def group_export(center_ip, center_port, token, proxy, filename,csv_delimiter, csv_encoding):
with api.APISession(center_ip, center_port, token, proxy) as session:
groups = api.get_route(session, '/api/3.0/groups')
with open(filename, 'w', encoding=csv_encoding) as csvfile:
fieldnames = ['group-name','group-description','group-color','group-industrial-impact']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter=csv_delimiter)
writer.writeheader()
for g in groups:
row = {}
row['group-name'] = g['label']
row['group-description'] = g['description']
row['group-color'] = g['color']
if 'criticalness' in g:
row['group-industrial-impact'] = g['criticalness']
writer.writerow(row)
return
def group_import(center_ip, center_port, token, proxy, filename,csv_delimiter, csv_encoding):
with open(filename, 'r') as csvfile:
with api.APISession(center_ip, center_port, token, proxy) as session:
reader = csv.DictReader(csvfile, delimiter=csv_delimiter)
group_import_lib(session, reader)
def group_import_lib(session, reader):
for row in reader:
if not 'group-name' in row or not row['group-name']:
continue
print(f"LOG: Group '{row['group-name']}' - Creating...")
industrial_impact = 0
if 'group-industrial-impact' in row and row['group-industrial-impact']:
industrial_impact = int(row['group-industrial-impact'])
route = f"/api/3.0/groups"
json = {
"label": row['group-name'],
"description": row['group-description'],
"color": row['group-color'],
"criticalness": industrial_impact,
}
ret = api.post_route(session, route, json)
if ret.status_code == 409:
print(f"LOG: Group '{row['group-name']}' - Already exists")
elif ret.status_code != 200:
print(f"ERROR: Calling [POST] {route} got error code {ret.status_code}")
if __name__ == "__main__":
main()