Skip to content

Commit

Permalink
Merge pull request #183 from fosslight/ossitem
Browse files Browse the repository at this point in the history
Refactoring OSS Item
  • Loading branch information
dd-jy authored Sep 6, 2024
2 parents 3877214 + 0921181 commit 53fe700
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 98 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ pyparsing
scancode-toolkit>=32.0.2,==32.0.*
scanoss
XlsxWriter
fosslight_util~=1.4.47
fosslight_util>=2.0.0
PyYAML
wheel>=0.38.1
intbitset
fosslight_binary
fosslight_binary>=5.0.0
typecode-libmagic;sys_platform!="darwin"
6 changes: 3 additions & 3 deletions src/fosslight_source/_parsing_scancode_file_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import re
import fosslight_util.constant as constant
from ._license_matched import MatchedLicense
from ._scan_item import ScanItem
from ._scan_item import SourceItem
from ._scan_item import is_exclude_dir
from ._scan_item import is_exclude_file
from ._scan_item import replace_word
Expand Down Expand Up @@ -75,7 +75,7 @@ def parsing_scancode_32_earlier(scancode_file_list, has_error=False):
licenses = file.get("licenses", [])
copyright_list = file.get("copyrights", [])

result_item = ScanItem(file_path)
result_item = SourceItem(file_path)

if has_error and "scan_errors" in file:
error_msg = file.get("scan_errors", [])
Expand Down Expand Up @@ -201,7 +201,7 @@ def parsing_scancode_32_later(scancode_file_list, has_error=False):
if (not file_path) or is_binary or is_dir:
continue

result_item = ScanItem(file_path)
result_item = SourceItem(file_path)

if has_error:
error_msg = file.get("scan_errors", [])
Expand Down
8 changes: 4 additions & 4 deletions src/fosslight_source/_parsing_scanoss_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import logging
import fosslight_util.constant as constant
from ._scan_item import ScanItem
from ._scan_item import SourceItem
from ._scan_item import is_exclude_file
from ._scan_item import replace_word

Expand All @@ -22,14 +22,14 @@ def parsing_extraInfo(scanned_result):
license_w_source = scan_item.scanoss_reference
if scan_item.matched_lines:
if license_w_source:
extra_item = [scan_item.file, ','.join(license_w_source['component_declared']),
extra_item = [scan_item.source_name_or_path, ','.join(license_w_source['component_declared']),
','.join(license_w_source['file_spdx_tag']),
','.join(license_w_source['file_header']),
','.join(license_w_source['license_file']),
','.join(license_w_source['scancode']),
scan_item.matched_lines, scan_item.fileURL]
else:
extra_item = [scan_item.file, '', '', '', '', '', scan_item.matched_lines, scan_item.fileURL]
extra_item = [scan_item.source_name_or_path, '', '', '', '', '', scan_item.matched_lines, scan_item.fileURL]
scanoss_extra_info.append(extra_item)
scanoss_extra_info.insert(0, SCANOSS_INFO_HEADER)
return scanoss_extra_info
Expand All @@ -43,7 +43,7 @@ def parsing_scanResult(scanoss_report, path_to_scan="", path_to_exclude=[]):
abs_file_path = os.path.abspath(os.path.join(path_to_scan, file_path))
if any(os.path.commonpath([abs_file_path, exclude_path]) == exclude_path for exclude_path in abs_path_to_exclude):
continue
result_item = ScanItem(file_path)
result_item = SourceItem(file_path)
if 'id' in findings[0]:
if "none" == findings[0]['id']:
continue
Expand Down
73 changes: 33 additions & 40 deletions src/fosslight_source/_scan_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import logging
import re
import fosslight_util.constant as constant
from fosslight_util.oss_item import FileItem, OssItem

logger = logging.getLogger(constant.LOGGER_NAME)
replace_word = ["-only", "-old-style", "-or-later", "licenseref-scancode-", "licenseref-"]
Expand All @@ -25,43 +26,28 @@
SUBSTRING_LICENSE_COMMENT = "Maximum character limit (License)"


class ScanItem:
file = ""
scanoss_reference = {}
exclude = False
is_license_text = False
oss_name = ""
oss_version = ""
download_location = []
matched_lines = "" # Only for SCANOSS results
fileURL = "" # Only for SCANOSS results
license_reference = ""
class SourceItem(FileItem):

def __init__(self, value):
self.file = value
self._copyright = []
self._licenses = []
self.download_location = []
self.comment = ""
self.exclude = False
super().__init__("")
self.source_name_or_path = value
self.is_license_text = False
self.license_reference = ""
self.scanoss_reference = {}
self.matched_lines = "" # Only for SCANOSS results
self.fileURL = "" # Only for SCANOSS results
self.download_location = []
self.copyright = []
self._licenses = []
self.oss_name = ""
self.oss_version = ""

def __del__(self):
pass

def __hash__(self):
return hash(self.file)

@property
def copyright(self):
return self._copyright

@copyright.setter
def copyright(self, value):
self._copyright.extend(value)
if len(self._copyright) > 0:
self._copyright = list(set(self._copyright))

@property
def licenses(self):
return self._licenses
Expand All @@ -84,27 +70,34 @@ def licenses(self, value):
if max_length_exceed and (SUBSTRING_LICENSE_COMMENT not in self.comment):
self.comment = f"{self.comment}/ {SUBSTRING_LICENSE_COMMENT}" if self.comment else SUBSTRING_LICENSE_COMMENT

def get_file(self):
return self.file
def set_oss_item(self):
self.oss_items = []
if self.download_location:
for url in self.download_location:
item = OssItem(self.oss_name, self.oss_version, self.licenses, url)
item.copyright = "\n".join(self.copyright)
item.comment = self.comment
self.oss_items.append(item)
else:
item = OssItem(self.oss_name, self.oss_version, self.licenses)
item.copyright = "\n".join(self.copyright)
item.comment = self.comment
self.oss_items.append(item)

def get_row_to_print(self):
def get_print_array(self):
print_rows = []
if not self.download_location:
print_rows.append([self.file, self.oss_name, self.oss_version, ",".join(self.licenses), "", "",
"\n".join(self.copyright), "Exclude" if self.exclude else "", self.comment,
for item in self.oss_items:
print_rows.append([self.source_name_or_path, item.name, item.version, ",".join(item.license),
item.download_location, "",
item.copyright, "Exclude" if self.exclude else "", item.comment,
self.license_reference])
else:
for url in self.download_location:
print_rows.append([self.file, self.oss_name, self.oss_version, ",".join(self.licenses), url, "",
"\n".join(self.copyright), "Exclude" if self.exclude else "", self.comment,
self.license_reference])
return print_rows

def __eq__(self, other):
if type(other) == str:
return self.file == other
return self.source_name_or_path == other
else:
return self.file == other.file
return self.source_name_or_path == other.source_name_or_path


def is_exclude_dir(dir_path):
Expand Down
68 changes: 36 additions & 32 deletions src/fosslight_source/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import yaml
import argparse
from .run_spdx_extractor import get_spdx_downloads
from ._scan_item import ScanItem
from fosslight_util.cover import CoverItem
from ._scan_item import SourceItem
from fosslight_util.oss_item import ScannerItem

SRC_SHEET_NAME = 'SRC_FL_Source'
SCANOSS_HEADER = {SRC_SHEET_NAME: ['ID', 'Source Path', 'OSS Name',
Expand All @@ -35,7 +35,7 @@

logger = logging.getLogger(constant.LOGGER_NAME)
warnings.filterwarnings("ignore", category=FutureWarning)
_PKG_NAME = "fosslight_source"
PKG_NAME = "fosslight_source"
RESULT_KEY = "Scan Result"


Expand Down Expand Up @@ -75,7 +75,7 @@ def main():
if args.help:
print_help_msg_source_scanner()
if args.version:
print_version(_PKG_NAME)
print_version(PKG_NAME)
if not args.path:
path_to_scan = os.getcwd()
else:
Expand Down Expand Up @@ -171,24 +171,21 @@ def create_report_file(_start_time, merged_result, license_list, scanoss_result,

if not correct_filepath:
correct_filepath = path_to_scan
cover = CoverItem(tool_name=_PKG_NAME,
start_time=_start_time,
input_path=path_to_scan,
exclude_path=path_to_exclude)

scan_item = ScannerItem(PKG_NAME, _start_time)
scan_item.set_cover_pathinfo(path_to_scan, path_to_exclude)
files_count, removed_files_count = count_files(path_to_scan, path_to_exclude)
cover.comment = f"Total number of files / removed files: {files_count} / {removed_files_count}"
scan_item.set_cover_comment(f"Total number of files / removed files: {files_count} / {removed_files_count}")

if len(merged_result) == 0:
if not merged_result:
if files_count < 1:
cover.comment += "(No file detected.)"
scan_item.set_cover_comment("(No file detected.)")
else:
cover.comment += "(No OSS detected.)"
scan_item.set_cover_comment("(No OSS detected.)")

sheet_list[SRC_SHEET_NAME] = []
if merged_result:
for scan_item in merged_result:
for row in scan_item.get_row_to_print():
sheet_list[SRC_SHEET_NAME].append(row)
sheet_list = {}
scan_item.append_file_items(merged_result, PKG_NAME)

if selected_scanner == 'scanoss':
extended_header = SCANOSS_HEADER
Expand All @@ -203,37 +200,40 @@ def create_report_file(_start_time, merged_result, license_list, scanoss_result,
else:
sheet_list["scancode_reference"] = get_license_list_to_print(license_list)
sheet_list["scanoss_reference"] = get_scanoss_extra_info(scanoss_result)
if sheet_list:
scan_item.external_sheets = sheet_list

if correct_mode:
success, msg_correct, correct_list = correct_with_yaml(correct_filepath, path_to_scan, sheet_list)
success, msg_correct, correct_item = correct_with_yaml(correct_filepath, path_to_scan, scan_item)
if not success:
logger.info(f"No correction with yaml: {msg_correct}")
else:
sheet_list = correct_list
scan_item = correct_item
logger.info("Success to correct with yaml.")

combined_paths_and_files = [os.path.join(output_path, file) for file in output_files]
results = []
for combined_path_and_file, output_extension in zip(combined_paths_and_files, output_extensions):
if need_license and output_extension == _json_ext and "scanoss_reference" in sheet_list:
del sheet_list["scanoss_reference"]
results.append(write_output_file(combined_path_and_file, output_extension, sheet_list, extended_header, "", cover))
# if need_license and output_extension == _json_ext and "scanoss_reference" in sheet_list:
# del sheet_list["scanoss_reference"]
results.append(write_output_file(combined_path_and_file, output_extension, scan_item, extended_header, ""))
for success, msg, result_file in results:
if success:
logger.info(f"Output file: {result_file}")
if cover:
logger.info(f'{cover.comment}')
for row in scan_item.get_cover_comment():
logger.info(row)
else:
logger.error(f"Fail to generate result file {result_file}. msg:({msg})")
return scan_item


def merge_results(scancode_result=[], scanoss_result=[], spdx_downloads={}):
"""
Merge scanner results and spdx parsing result.
:param scancode_result: list of scancode results in ScanItem.
:param scanoss_result: list of scanoss results in ScanItem.
:param scancode_result: list of scancode results in SourceItem.
:param scanoss_result: list of scanoss results in SourceItem.
:param spdx_downloads: dictionary of spdx parsed results.
:return merged_result: list of merged result in ScanItem.
:return merged_result: list of merged result in SourceItem.
"""

# If anything that is found at SCANOSS only exist, add it to result.
Expand All @@ -247,9 +247,13 @@ def merge_results(scancode_result=[], scanoss_result=[], spdx_downloads={}):
merged_result_item = scancode_result[scancode_result.index(file_name)]
merged_result_item.download_location = download_location
else:
new_result_item = ScanItem(file_name)
new_result_item = SourceItem(file_name)
new_result_item.download_location = download_location
scancode_result.append(new_result_item)

for item in scancode_result:
item.set_oss_item()

return scancode_result


Expand Down Expand Up @@ -284,7 +288,7 @@ def run_scanners(path_to_scan, output_file_name="", write_json_file=False, num_c
success, msg, output_path, output_files, output_extensions = check_output_formats(output_file_name, formats)

logger, result_log = init_log(os.path.join(output_path, f"fosslight_log_src_{start_time}.txt"),
True, logging.INFO, logging.DEBUG, _PKG_NAME, path_to_scan, path_to_exclude)
True, logging.INFO, logging.DEBUG, PKG_NAME, path_to_scan, path_to_exclude)

if '.xlsx' not in output_extensions and print_matched_text:
logger.warning("-m option is only available for excel.")
Expand All @@ -302,17 +306,17 @@ def run_scanners(path_to_scan, output_file_name="", write_json_file=False, num_c
if selected_scanner in SCANNER_TYPE:
spdx_downloads = get_spdx_downloads(path_to_scan, path_to_exclude)
merged_result = merge_results(scancode_result, scanoss_result, spdx_downloads)
create_report_file(start_time, merged_result, license_list, scanoss_result, selected_scanner,
print_matched_text, output_path, output_files, output_extensions, correct_mode,
correct_filepath, path_to_scan, path_to_exclude)
scan_item = create_report_file(start_time, merged_result, license_list, scanoss_result, selected_scanner,
print_matched_text, output_path, output_files, output_extensions, correct_mode,
correct_filepath, path_to_scan, path_to_exclude)
else:
print_help_msg_source_scanner()
result_log[RESULT_KEY] = "Unsupported scanner"
success = False
else:
result_log[RESULT_KEY] = f"Format error. {msg}"
success = False
return success, result_log.get(RESULT_KEY, ""), merged_result, license_list, scanoss_result
return success, result_log.get(RESULT_KEY, ""), scan_item, license_list, scanoss_result


if __name__ == '__main__':
Expand Down
11 changes: 1 addition & 10 deletions src/fosslight_source/run_scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from fosslight_util.set_log import init_log
from ._parsing_scancode_file_item import parsing_file_item
from ._parsing_scancode_file_item import get_error_from_header
from ._license_matched import get_license_list_to_print
from fosslight_util.output_format import check_output_formats
from fosslight_binary.binary_analysis import check_binary

Expand Down Expand Up @@ -100,13 +99,10 @@ def run_scan(path_to_scan, output_file_name="",
output_json_pp=output_json_file, only_findings=True,
license_text=True, url=True, timeout=time_out,
include=(), ignore=tuple(total_files_to_excluded))

if not rc:
msg = "Source code analysis failed."
success = False

if results:
sheet_list = {}
has_error = False
if "headers" in results:
has_error, error_msg = get_error_from_header(results["headers"])
Expand All @@ -125,13 +121,8 @@ def run_scan(path_to_scan, output_file_name="",
result_list, key=lambda row: (''.join(row.licenses)))

for scan_item in result_list:
if check_binary(os.path.join(path_to_scan, scan_item.file)):
if check_binary(os.path.join(path_to_scan, scan_item.source_name_or_path)):
scan_item.exclude = True

sheet_list["SRC_FL_Source"] = [scan_item.get_row_to_print() for scan_item in result_list]
if need_license:
sheet_list["matched_text"] = get_license_list_to_print(license_list)

except Exception as ex:
success = False
msg = str(ex)
Expand Down
4 changes: 2 additions & 2 deletions tests/cli_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def main():
if len(ret) > 2:
try:
for scan_item in ret[2]:
logger.warning(scan_item.get_row_to_print())
logger.warning(scan_item.get_print_array())
except Exception as ex:
logger.error("Error:"+str(ex))
if ret_scanoss:
for scan_item in ret_scanoss:
logger.warning(scan_item.get_row_to_print())
logger.warning(scan_item.get_print_array())


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 53fe700

Please sign in to comment.