forked from fabric8-analytics/graph-cve-sync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraph_snyk_cve_sync.py
709 lines (656 loc) · 34.5 KB
/
graph_snyk_cve_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
#!/usr/bin/env python3
# Copyright © 2020 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Yusuf Zainee <[email protected]>
#
"""Script which synchronizes snyk CVEs from S3 to graph."""
from datetime import datetime, timedelta
import json
import os
from f8a_utils.versions import get_versions_and_latest_for_ep
from f8a_utils.golang_utils import GolangUtils
from f8a_utils.gh_utils import GithubUtils
from helper import Helper
import re
import logging
import time
from vulnerability_metadata import Vulnerability
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SUPPORTED_ECOSYSTEMS = ['maven', 'npm', 'pypi', 'golang']
class SnykCveSync:
"""Snyk class to sync cves to graph."""
def __init__(self, data):
"""Init method for helper class."""
self.helper = Helper()
self.is_retry = False
self.is_force_run = self.helper.force_run_ingestion()
self.utc_now = datetime.utcnow()
self.today = self.utc_now.replace(hour=0, minute=0, second=0, microsecond=0)
self.day = self.utc_now.weekday()
# Populate all default json structures.
self.SNYK_REPORT = self._populate_default_report()
self.CVE_DATA = {}
self.DELETE_CVE_DATA = {}
self.DELTA_FEED = {}
self.FAILED_DATA = {}
# This is the offset days i.e how many days old data do we need to ingest.
delta_feed_offset = int(os.environ.get('SNYK_DELTA_FEED_OFFSET', '1'))
self.start_day = self.today - timedelta(days=delta_feed_offset)
# If we want to selectively run the ingestion for an ecosystem.
self.selective_eco_run = os.environ.get('SELECTIVE_ECOSYSTEM_SNYK_SYNC', '')
if not data:
self.snyk_data = self.helper.read_data_from_s3("latest-feed", "snyk-feed/")
# For testing purpose we can use the sample feed
"""with open('data/feed_sample.json', encoding='utf-8') as f:
x = json.load(f)
self.snyk_data = x"""
else:
self.snyk_data = data
# Only if the operation is a retry, then load the failed data else ignore.
if self.is_retry:
self.FAILED_DATA = self._populate_failed_data()
def _populate_failed_data(self):
"""Generate a failed data json."""
details = self.SNYK_REPORT['details']
failed_data = {}
# Run for each ecosystem
for eco in details:
failed_data[eco] = {}
# Scan all the records of the failed ingestion scenarios
ingested_data = details[eco]['ingest']
for vuln_id in ingested_data:
if ingested_data[vuln_id]['status'] != "success":
# Add all the failed ingestion data to the failed data json.
failed_data[eco][ingested_data[vuln_id]['name']] = "failed"
# Scan all the records of the failed deletion scenarios
deleted_data = details[eco]['delete']
for vuln_id in deleted_data:
if deleted_data[vuln_id]['status'] != "success":
# Add all the failed deletion data to the failed data json.
failed_data[eco][deleted_data[vuln_id]['name']] = "failed"
return failed_data
def _populate_default_report(self):
"""Generate a default report."""
# Read the default value of the report from a file.
retry_disabled = self.helper.is_snyk_retry_disabled()
if not retry_disabled:
# switch on the retry mode only if the daily report is generated for the day.
data = self.helper.read_data_from_s3(
self.utc_now.strftime('%d-%m-%Y'), "snyk-feed/daily-report/")
if data:
self.is_retry = True
else:
# If its the 1st operation of the day, load the default report data.
with open('data/default_report.json', encoding='utf-8') as f:
data = json.load(f)
else:
# If retry is disabled.
with open('data/default_report.json', encoding='utf-8') as f:
data = json.load(f)
return data
def _parse_data_for_eco(self, eco):
"""Return True/False depending on whether the ecosysytem data should be parsed or not."""
# For delta feed mode, run for all ecosystems.
if self.helper.is_delta_mode_on():
return True
if eco == self.selective_eco_run:
return True
elif self.selective_eco_run != "none":
return False
day = str(self.day)
# The weekdays here will be in use only when running it in bootstrap mode.
if (day in ["0", "4"] and eco == "java") \
or (day in ["1", "5"] and eco == "js") \
or (day in ["2", "6"] and eco == "python") \
or (day in ["3"] and eco == "golang"):
return True
return False
def _check_retry_run(self, eco):
"""Check if retry needs to run."""
# If retry is not set then run full ingestion. If set, then run only if failure obj exist
if self.is_force_run:
return True
if not self.is_retry or len(self.FAILED_DATA[eco]) > 0:
return True
logger.info("No {e} failed data to retry.".format(e=eco))
return False
def _extract_data_from_feed(self):
"""Fetch all the required info from the feed."""
for eco in self.snyk_data:
if eco == "java" and self._parse_data_for_eco(eco):
logger.info("Parsing feed for Maven.")
self._add_default_obj_for_eco("maven")
if self._check_retry_run("maven"):
self._parse_data(self.snyk_data[eco], "maven")
elif eco == "js" and self._parse_data_for_eco(eco):
logger.info("Parsing feed for Npm.")
self._add_default_obj_for_eco("npm")
if self._check_retry_run("npm"):
self._parse_data(self.snyk_data[eco], "npm")
elif eco == "python" and self._parse_data_for_eco(eco):
logger.info("Parsing feed for Pypi.")
self._add_default_obj_for_eco("pypi")
if self._check_retry_run("pypi"):
self._parse_data(self.snyk_data[eco], "pypi")
elif eco == "golang" and self._parse_data_for_eco(eco):
logger.info("Parsing feed for Golang.")
self._add_default_obj_for_eco("golang")
if self._check_retry_run("golang"):
self._parse_golang_data(self.snyk_data[eco], "golang")
else:
logger.info("Ignoring the ecosystem {} from the feed".format(eco))
def _is_date_in_range(self, date):
"""Check to see if the date of vuln falls in the range."""
date_obj = datetime.strptime(date.split('T')[0], '%Y-%m-%d')
"""When running under delta feed mode, we need to consider only those vulns which were
updated between the given offset date and today's date."""
return self.today > date_obj >= self.start_day
def _add_default_obj_for_eco(self, eco):
"""Add default entries for the ecosystem into the different lists."""
self.CVE_DATA[eco] = {}
self.DELETE_CVE_DATA[eco] = []
self.DELTA_FEED[eco] = []
def _add_data_for_false_positives(self, eco, data, pkg):
"""Update the records with false positive data."""
self.DELETE_CVE_DATA[eco].append(data)
self.DELTA_FEED[eco].append(data)
# Default status is skipped. When ingested, it gets updated with success or failed.
self.SNYK_REPORT['details'][eco]['delete'][data['id']] = {
'name': pkg,
'status': "skipped"
}
def _generate_default_cve_obj(self, eco, pkg, versions, latest, gh=None, lic=None, mod=None):
"""Generate the default cve object."""
# Generate the default snyk cve object.
obj = {
"affected": [],
"vulnerabilities": [],
"all_ver": versions,
"latest_version": latest,
"ecosystem": eco,
"package": pkg
}
# 3 extra fields are needed in case of golang for github url,license details & module name
if eco == "golang":
checked_versions = []
# This is required to remove all commit hash released as versions in golang.
for ver in versions:
if "v0.0.0-" not in ver and ver != "none":
checked_versions.append(ver)
obj['gh_link'] = gh
obj['license'] = lic
obj['moduleName'] = mod
obj['all_ver'] = checked_versions
return obj
def _set_additional_fields(self, data):
"""To set additional fields or modify some values before ingesting."""
# Remove the non required rules data.
if 'rules' in data:
del data['rules']
# Change description into proper string.
data['description'] = re.sub("[\'\"]", "", data['description'])
# Calculate and update the premium field.
premium = str(data.get('premium', "false")).lower() == 'true'
data['pvtVuln'] = premium
return data
def __format_golang_version(self, version):
"""Return with a proper format v1.1.1."""
if '.' in version and version[0].isdigit():
version = 'v' + version
return version
def _set_commit_hash_rules(self, data, gh_link):
"""To set the commit hash rules."""
hashes_range = data['vulnerableHashRanges']
# If there is no range, it means all commits are vulnerable.
if not hashes_range:
data['commitRules'] = '*'
return data
gh = GithubUtils()
# This is needed to get the org and name from the gh link.
gh_splitted = gh_link.split("/")
length = len(gh_splitted)
org = gh_splitted[length - 2]
name = gh_splitted[length - 1]
regex_vr = "[<>=*]+"
regex_op = "[0-9a-zA-Z\\_\\.\\-]+"
rules = ""
for range in hashes_range:
# Remove any blank spaces.
range = range.replace(" ", "")
operands = re.split(regex_vr, range)
operators = re.split(regex_op, range)
if len(operators) == 2 and len(operands) == 2:
# It means there is only 1 condition.
date = gh._get_commit_date(org, name, self.__format_golang_version(operands[1]))
if date:
rules = rules + operators[0] + "#" + date + ","
else:
logger.error("No such data present on Github. Contact Snyk.")
elif len(operators) == 3 and len(operands) == 3:
# It means there is a nesting. Ex >x & <y.
date1 = gh._get_commit_date(org, name, self.__format_golang_version(operands[1]))
date2 = gh._get_commit_date(org, name, self.__format_golang_version(operands[2]))
if date1 and date2:
rules = rules + operators[0] + "#" + date1 +\
"&" + operators[1] + "#" + date2 + ","
else:
logger.error("No such data present on Github. Contact Snyk.")
else:
logger.error("Incorrect hashesRange data. Contact Snyk.")
# Remove extra , which is get appended.
if rules:
rules = rules[:-1]
data['commitRules'] = rules
return data
def _retry_or_forced_run_check(self, eco, pkg):
"""Check retry or force run applicable."""
# Run the feed scan if the force flag is enabled.
if self.is_force_run or not self.is_retry:
return True
# Run the feed scan only if its retry and ingestion/deletion is in failed state.
if pkg in self.FAILED_DATA[eco]:
return True
return False
def _parse_golang_data(self, vuln_data, eco):
"""Parse data for golang eco."""
total_vuln = 0
delta_mode = self.helper.is_delta_mode_on()
if len(vuln_data) != 0:
for data in vuln_data:
# If delta mode is on & the modificationTime doesnt fall in the range, then ignore.
if delta_mode and not self._is_date_in_range(data['modificationTime']):
logger.debug("No new updates for {}".format(data['id']))
continue
pkg = data['package']
# Picking only those pkgs which failed in the previous attempt
if not self._retry_or_forced_run_check(eco, pkg):
continue
logger.debug("Fetching details for package: {}".format(pkg))
try:
if len(data['vulnerableVersions']) == 0:
# In this case, we use the data to remove the vuln from the graph
logger.info("False positive found. {i}".format(i=data['id']))
self._add_data_for_false_positives(eco, data, pkg)
continue
""" This is done so that we dont fetch the same pkg data again & again if more
than 1 vuln for the same pkg is present."""
if pkg not in self.CVE_DATA[eco]:
go_utils = GolangUtils(pkg)
versions = go_utils.get_all_versions()
""" From the available options we have in scraping, if we get the details
then only we can go ahead fetch and create nodes, else we need to ignore
for the time being."""
if versions:
# As we are relying on web scraping, we might get None in some cases.
latest_version = go_utils.get_latest_version() or ""
gh_link = go_utils.get_gh_link() or ""
lic = go_utils.get_license() or []
mod = go_utils.get_module() or []
self.CVE_DATA[eco][pkg] = self._generate_default_cve_obj(
eco, pkg, versions, latest_version, gh_link, lic, mod)
else:
# TODO we need to decide what we need to do when we dont find any data.
logger.info("No details about the pkg {} found "
"or pkg not on Github".format(pkg))
self.SNYK_REPORT['details'][eco]['pvt_pkgs'][data['id']] = {
'name': pkg
}
continue
logger.info("Processing {}".format(data['id']))
versions = self.CVE_DATA[eco][pkg]['all_ver']
data['ecosystem'] = eco
data['moduleName'] = self.CVE_DATA[eco][pkg]['moduleName']
data['affected'] = []
vuln_versions = data['vulnerableVersions']
if versions:
data['rules'] = self.helper.get_version_rules(vuln_versions)
data['affected'] = self.helper.get_affected_versions(
data['rules'], versions)
# Create edges for vuln only when affected versions found.
if len(data['affected']) != 0:
self.CVE_DATA[eco][pkg]['affected'].extend(data['affected'])
self.CVE_DATA[eco][pkg]['affected'] = list(
set(self.CVE_DATA[eco][pkg]['affected']))
else:
""" This will make sure vuln node gets created which can be used for
commit hash usecase, even when affected versions not found"""
logger.info("No affected versions for {}".format(data['id']))
total_vuln += 1
data = self._set_additional_fields(data)
data = self._set_commit_hash_rules(data, self.CVE_DATA[eco][pkg]['gh_link'])
# In Snyk feed, for some golang vuln, they dont have this field.
if 'vulnerableHashes' in data:
del data['vulnerableHashes']
self.SNYK_REPORT['details'][eco]['ingest'][data['id']] = {
'name': pkg,
'premium': data['pvtVuln'],
'affected_version_count': len(data['affected']),
'status': "skipped"
}
self.DELTA_FEED[eco].append(data)
self.CVE_DATA[eco][pkg]['vulnerabilities'].append(data)
except ValueError:
logger.error("Encountered a Value Error while trying to fetch versions for "
"{e} -> {p}".format(e=eco, p=pkg))
except AttributeError:
logger.error("Encountered an Attribute Error while trying to fetch details for "
"{e} -> {p}".format(e=eco, p=pkg))
logger.info("{} Data".format(eco).center(50, '-'))
logger.info("Total affected packages: {}".format(len(self.CVE_DATA[eco])))
logger.info("Total vulnerabilities: {}".format(total_vuln))
logger.debug(self.CVE_DATA[eco])
def _parse_data(self, vuln_data, eco):
"""Parse data for all eco."""
total_vuln = 0
delta_mode = self.helper.is_delta_mode_on()
if len(vuln_data) != 0:
for data in vuln_data:
data['package'] = data['package'].replace(" ", "")
if eco == "pypi":
data['package'] = str.lower(data['package'])
if delta_mode and not self._is_date_in_range(data['modificationTime']):
logger.debug("No new updates for {}".format(data['id']))
continue
pkg = data['package']
# Picking only those pkgs which failed in the previous attempt
if not self._retry_or_forced_run_check(eco, pkg):
continue
logger.debug("Fetching details for package: {}".format(pkg))
try:
versions = None
if len(data['vulnerableVersions']) == 0:
# In this case, we use the data to remove the vuln from the garph
logger.info("False positive found. {i}".format(i=data['id']))
self._add_data_for_false_positives(eco, data, pkg)
continue
""" This is done so that we dont fetch the same pkg data again & again if more
than 1 vuln for the same pkg is present."""
if pkg not in self.CVE_DATA[eco]:
resp_obj = get_versions_and_latest_for_ep(eco, pkg)
if resp_obj and (not isinstance(resp_obj, list)) \
and 'versions' in resp_obj and len(resp_obj['versions']) > 0:
versions = resp_obj['versions']
self.CVE_DATA[eco][pkg] = self._generate_default_cve_obj(
eco, pkg, versions, resp_obj['latest_version'])
else:
versions = self.CVE_DATA[eco][pkg]['all_ver']
logger.info("All versions: {}".format(versions))
if versions:
logger.info("Processing {}".format(data['id']))
self.SNYK_REPORT['details'][eco]['ingest'][data['id']] = {
'name': pkg,
'premium': False,
'affected_version_count': 0,
'status': "skipped"
}
data['ecosystem'] = eco
if eco in ['maven', 'pypi']:
vuln_versions = self.helper.get_semver_versions(
data['vulnerableVersions'])
else:
vuln_versions = data['vulnerableVersions']
data['rules'] = self.helper.get_version_rules(vuln_versions)
data['affected'] = self.helper.get_affected_versions(
data['rules'], versions)
if len(data['affected']) == 0:
logger.info("No active affected version found for {}. Ignored."
.format(pkg))
continue
total_vuln += 1
self.CVE_DATA[eco][pkg]['affected'].extend(data['affected'])
self.CVE_DATA[eco][pkg]['affected'] = list(
set(self.CVE_DATA[eco][pkg]['affected']))
data = self._set_additional_fields(data)
# Update the premium value and the affected version count in report.
rep_obj = self.SNYK_REPORT['details'][eco]['ingest'][data['id']]
rep_obj['premium'] = data['pvtVuln']
rep_obj['affected_version_count'] = len(data['affected'])
self.DELTA_FEED[eco].append(data)
self.CVE_DATA[eco][pkg]['vulnerabilities'].append(data)
else:
logger.info("Pvt package encountered {}".format(pkg))
self.SNYK_REPORT['details'][eco]['pvt_pkgs'][data['id']] = {
'name': pkg
}
except ValueError:
logger.error("Encountered an error while trying to fetch versions for "
"{e} -> {p}".format(e=eco, p=pkg))
logger.info("{} Data".format(eco).center(50, '-'))
logger.info("Total affected packages: {}".format(len(self.CVE_DATA[eco])))
logger.info("Total vulnerabilities: {}".format(total_vuln))
logger.debug(self.CVE_DATA[eco])
def _insert_cves(self):
"""Insert the cve data for each ecosystem."""
logger.info("Insertion of data begins".center(50, '-'))
dry_run = self.helper.is_dry_run()
if dry_run:
logger.info("Dry run mode is on. No ingestion will take place".center(30, '-'))
for eco in SUPPORTED_ECOSYSTEMS:
if eco in self.CVE_DATA:
logger.info("Inserting {} CVEs...".format(eco))
if len(self.CVE_DATA[eco]) > 0:
for pkg in self.CVE_DATA[eco]:
cves = self.CVE_DATA[eco][pkg]
# If there are no cves for a pkg, we need to ignore it.
if len(cves['vulnerabilities']) > 0:
logger.info("Inserting CVEs for pkg: {}".format(pkg))
logger.debug(cves)
if not dry_run:
resp = self.helper.make_api_call(cves, 'PUT')
for cve in cves['vulnerabilities']:
cve_id = cve['id']
rep_details = self.SNYK_REPORT['details']
rep_details[eco]['ingest'][cve_id]['status'] = resp
logger.info("Waiting for 1 second".center(30, '-'))
time.sleep(1)
else:
logger.info("Nothing to insert for {} CVEs...".format(eco))
else:
logger.info("Nothing to insert for {} CVEs...".format(eco))
return True
def _delete_cves(self):
"""Delete the cve data for each ecosystem."""
logger.info("Deletion of data begins".center(50, '-'))
dry_run = self.helper.is_dry_run()
if dry_run:
logger.info("Dry run mode is on. No ingestion will take place".center(30, '-'))
del_obj = {'id': ''}
for eco in SUPPORTED_ECOSYSTEMS:
if eco in self.DELETE_CVE_DATA:
logger.info("Deleting false positive {} CVEs...".format(eco))
if len(self.DELETE_CVE_DATA[eco]) > 0:
for vuln in self.DELETE_CVE_DATA[eco]:
logger.info("Deleting {}".format(vuln['id']))
del_obj['id'] = vuln['id']
if not dry_run:
resp = self.helper.make_api_call(del_obj, 'DELETE')
self.SNYK_REPORT['details'][eco]['delete'][vuln['id']]['status'] = resp
logger.info("Waiting for 1 second".center(30, '-'))
time.sleep(1)
else:
logger.info("Nothing to delete for {} CVEs...".format(eco))
else:
logger.info("Nothing to delete for {} CVEs...".format(eco))
logger.info("Deletion of data ends".center(50, '-'))
return True
def _generate_snyk_report(self):
"""Generate the ingestion report for snyk."""
details = self.SNYK_REPORT['details']
stats = self.SNYK_REPORT['stats']
for eco in SUPPORTED_ECOSYSTEMS:
eco_details = details[eco]
eco_stats = stats[eco]
# Calculate the number of vulnerabilities pointing to pvt pkgs.
eco_stats['pvt_pkg_vulnerability_count'] = \
len(eco_details['pvt_pkgs'])
# Calculate the stats for vulnerabilities deleted.
if len(eco_details['delete']) > 0:
success_del = 0
total_del = 0
for del_vuln in eco_details['delete']:
total_del += 1
if eco_details['delete'][del_vuln]['status'] == "success":
success_del += 1
# Deletion accuracy calculation.
eco_stats['successfully_deleted'] = success_del
eco_stats['to_be_deleted'] = total_del
eco_stats['deletion_accuracy'] = str(round((
(success_del * 100) / total_del), 2)) + "%"
else:
# When there is no data available for an eco, this default data is populated.
eco_stats['successfully_deleted'] = 0
eco_stats['deletion_accuracy'] = "NA"
# Calculate the stats for vulnerabilities ingested.
if len(eco_details['ingest']) > 0:
success_ing = 0
total_ing = 0
pkgs = []
ver_count = 0
hash_count = 0
for ing_vuln in eco_details['ingest']:
total_ing += 1
pkgs.append(eco_details['ingest'][ing_vuln]['name'])
ver_count += eco_details['ingest'][ing_vuln]['affected_version_count']
"""
if eco == "golang":
hash_count += eco_details['ingest'][ing_vuln]['affected_commit_hash_count']
"""
if eco_details['ingest'][ing_vuln]['status'] == "success":
success_ing += 1
if eco_details['ingest'][ing_vuln]['premium']:
eco_stats['premium_count'] += 1
# Ingestion accuracy calculation.
eco_stats['successfully_ingested'] = success_ing
eco_stats['to_be_ingested'] = total_ing
eco_stats['ingestion_accuracy'] = str(round((
(success_ing * 100) / total_ing), 2)) + "%"
# Total affected pkgs and versions count.
eco_stats['packages_affected'] = len(list(set(pkgs)))
eco_stats['versions_affected'] = ver_count
# The details of commit hash count is needed only in case of golang.
if eco == "golang":
eco_stats['commit_hash_affected'] = hash_count
else:
# When there is no data available for an eco, this default data is populated.
eco_stats['successfully_ingested'] = 0
eco_stats['ingestion_accuracy'] = "NA"
def run_custom_logic(self):
"""Run some customized logic here."""
logger.info("Custom logic for vulnerability ingestion begins".center(100, '-'))
all_query = []
to_update = 0
updated = 0
skipped = []
for eco in SUPPORTED_ECOSYSTEMS:
if eco in self.CVE_DATA:
logger.info("Customizing {} CVEs...".format(eco))
if len(self.CVE_DATA[eco]) > 0:
for pkg in self.CVE_DATA[eco]:
cves = self.CVE_DATA[eco][pkg]
# If there are no cves for a pkg, we need to ignore it.
total_cves = len(cves['vulnerabilities'])
if total_cves > 0:
for cve in cves['vulnerabilities']:
to_update += 1
cve_id = cve['id']
logger.info("{n}- {p} {i} in progress".format(
i=cve_id, p=pkg, n=to_update))
bindings = {
'vuln_id': cve_id,
'modified_date': (datetime.utcnow()).strftime('%Y%m%d'),
'pkg': pkg
}
# to make sure we clean up existing fixed_in data.
query_str = "g.V().has('snyk_vuln_id', vuln_id)." \
"properties('fixed_in').drop().iterate();"
# query to add data to affected_version field.
query_str += "cve=g.V().has('snyk_vuln_id', vuln_id);"
query_str += "cve.property('package_name', pkg);"
# modification time to be updated for the vuln.
query_str += "cve.property('modified_date', modified_date);"
affected = cve['affected']
aff_str = ""
for aff in affected:
if aff_str:
aff_str += "," + aff
else:
aff_str += aff
query_str += "cve.property('vulnerable_versions', vuln_ver);"
bindings['vuln_ver'] = aff_str
# bindings for fixed_in.
if cve.get('initiallyFixedIn'):
counter = 1
for fix in cve.get('initiallyFixedIn'):
query_str += "cve.property('fixed_in', " \
"fixedIn" + str(counter) + ");"
bindings["fixedIn" + str(counter)] = fix
counter += 1
payload = {
'gremlin': query_str,
'bindings': bindings
}
all_query.append(payload)
res = self.helper.execute_gremlin_dsl(payload)
if not res:
skipped.append(cve_id)
else:
updated += 1
logger.info("Waiting for 1 second".center(30, '-'))
time.sleep(0.5)
else:
logger.info("Nothing to insert for {} CVEs...".format(eco))
else:
logger.info("Nothing to insert for {} CVEs...".format(eco))
skipped = list(set(skipped))
logger.info("Custom logic for vulnerability ingestion ends".center(100, '-'))
logger.info("Number of Vulnerabilities to be updated: {}".format(to_update))
logger.info("Number of Vulnerabilities successfully updated: {}".format(updated))
logger.info("List of Vulnerabilities skipped/failed: {}".format(skipped))
return all_query
def run_snyk_sync(self):
"""Entrypoint for the snyk cve sync process."""
logger.info("Running Snyk Sync".center(50, '-'))
logger.info(self.utc_now)
if self.snyk_data:
# Step1. Fetch all the data from the snyk feed into different formats.
self._extract_data_from_feed()
if self.helper.is_custom_mode_enabled():
# Run only the customized task. Ignore all other steps.
self.run_custom_logic()
else:
# Step2. Delete all records which have 0 vulnerable versions.
self._delete_cves()
# Step3. Start ingestion of all the vulnerable data.
self._insert_cves()
# Step4. Generate the daily snyk ingestion report.
self._generate_snyk_report()
# Step5. Store the overall snyk report in S3.
self.helper.store_json_content(
self.SNYK_REPORT,
"snyk-feed/daily-report/" + self.utc_now.strftime('%d-%m-%Y') + ".json")
# Step6. Check if its not a retry operation.
if not self.is_retry:
# Save the delta information in S3.
self.helper.store_json_content(
self.DELTA_FEED, "snyk-feed/delta-feed-data/" + self.utc_now.strftime(
'%d-%m-%Y') + ".json")
# If the vuln metadata is not disabled, start the vuln process.
if not self.helper.is_vuln_mode_disabled():
Vulnerability().process(self.CVE_DATA)
else:
logger.info("No data found. Snyk sync aborted.".center(50, '-'))
return
logger.info("Snyk Sync Process Successfully Completed".center(50, '-'))
return "Success"