Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/fosslight_source/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
--no_correction Skip OSS information correction with sbom-info.yaml
--correct_fpath <path> Path to custom sbom-info.yaml file
--hide_progress Hide the progress bar during scanning
--kb_url <url> KB API URL (priority: parameter > KB_URL env > default)
--kb_token <token> KB bearer token (priority: parameter > KB_TOKEN env)

💡 Examples
────────────────────────────────────────────────────────────────────
Expand Down
28 changes: 23 additions & 5 deletions src/fosslight_source/_scan_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,17 @@
MAX_LICENSE_LENGTH = 200
MAX_LICENSE_TOTAL_LENGTH = 600
SUBSTRING_LICENSE_COMMENT = "Maximum character limit (License)"
KB_URL = "http://fosslight-kb.lge.com/"
DEFAULT_KB_URL = "http://fosslight-kb.lge.com/"


def resolve_kb_config(kb_url: str = "", kb_token: str = "") -> tuple[str, str]:
url = (kb_url or os.environ.get("KB_URL", DEFAULT_KB_URL)).strip() or DEFAULT_KB_URL

token = (kb_token or "").strip()
if not token:
token = (os.environ.get("KB_TOKEN") or "").strip()

return f"{url.rstrip('/')}/", token


class SourceItem(FileItem):
Expand Down Expand Up @@ -114,15 +124,21 @@ def _get_hash(self, path_to_scan: str = "") -> tuple:
logger.debug(f"Failed to compute MD5 for {self.source_name_or_path}: {e}")
return md5_hex, wfp

def _get_origin_url_from_md5_hash(self, md5_hash: str, wfp: str = "") -> str:
def _get_origin_url_from_md5_hash(
self, md5_hash: str, wfp: str = "", kb_url: str = DEFAULT_KB_URL, kb_token: str = ""
) -> str:
"""Return origin_url from KB API."""
try:
payload = {"file_hash": md5_hash}
if wfp and wfp.strip():
payload["wfp_base64"] = base64.b64encode(wfp.strip().encode("utf-8")).decode("ascii")
request = urllib.request.Request(f"{KB_URL}query", data=json.dumps(payload).encode('utf-8'), method='POST')
request = urllib.request.Request(
f"{kb_url}query", data=json.dumps(payload).encode('utf-8'), method='POST'
)
request.add_header('Accept', 'application/json')
request.add_header('Content-Type', 'application/json')
if kb_token:
request.add_header('Authorization', f'Bearer {kb_token}')

with urllib.request.urlopen(request, timeout=10) as response:
data = json.loads(response.read().decode())
Expand Down Expand Up @@ -179,7 +195,9 @@ def _extract_oss_info_from_url(self, url: str) -> tuple:
logger.debug(f"Failed to extract OSS info from URL {url}: {e}")
return "", "", ""

def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None:
def set_oss_item(
self, path_to_scan: str = "", run_kb: bool = False, kb_url: str = DEFAULT_KB_URL, kb_token: str = ""
) -> None:
self.oss_items = []
if self.download_location:
for url in self.download_location:
Expand All @@ -192,7 +210,7 @@ def set_oss_item(self, path_to_scan: str = "", run_kb: bool = False) -> None:
if run_kb and not self.is_license_text:
md5_hash, wfp = self._get_hash(path_to_scan)
if md5_hash:
origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp)
origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp, kb_url, kb_token)
if origin_url:
self.kb_origin_url = origin_url
self.kb_evidence = "exact_match"
Expand Down
37 changes: 25 additions & 12 deletions src/fosslight_source/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import argparse
from .run_spdx_extractor import get_spdx_downloads
from .run_manifest_extractor import get_manifest_licenses
from ._scan_item import SourceItem, KB_URL
from ._scan_item import SourceItem, resolve_kb_config
from fosslight_util.oss_item import ScannerItem
from typing import Tuple
from ._scan_item import is_manifest_file
Expand Down Expand Up @@ -84,6 +84,8 @@ def main() -> None:
parser.add_argument('--no_correction', action='store_true', required=False)
parser.add_argument('--correct_fpath', nargs=1, type=str, required=False)
parser.add_argument('--hide_progress', action='store_true', required=False)
parser.add_argument('--kb_url', type=str, required=False, default="")
parser.add_argument('--kb_token', type=str, required=False, default="")

args = parser.parse_args()

Expand Down Expand Up @@ -112,6 +114,8 @@ def main() -> None:
if args.correct_fpath:
correct_filepath = ''.join(args.correct_fpath)
hide_progress = args.hide_progress
kb_url = args.kb_url
kb_token = args.kb_token

time_out = args.timeout
core = args.cores
Expand All @@ -120,7 +124,8 @@ def main() -> None:
result = []
result = run_scanners(path_to_scan, output_file_name, write_json_file, core, True,
print_matched_text, formats, time_out, correct_mode, correct_filepath,
selected_scanner, path_to_exclude, hide_progress=hide_progress)
selected_scanner, path_to_exclude, hide_progress=hide_progress,
kb_url=kb_url, kb_token=kb_token)

_result_log["Scan Result"] = result[1]

Expand Down Expand Up @@ -268,10 +273,12 @@ def create_report_file(
return scan_item


def check_kb_server_reachable() -> bool:
def check_kb_server_reachable(kb_url: str, kb_token: str = "") -> bool:
for attempt in range(3):
try:
request = urllib.request.Request(f"{KB_URL}health", method='GET')
request = urllib.request.Request(f"{kb_url}health", method='GET')
if kb_token:
request.add_header('Authorization', f'Bearer {kb_token}')
with urllib.request.urlopen(request, timeout=10) as response:
logger.debug(f"KB server is reachable. Response status: {response.status}")
return True
Expand Down Expand Up @@ -326,7 +333,7 @@ def mark_oss_info_correction_files_as_excluded(scan_results: list) -> None:
def merge_results(
scancode_result: list = [], scanoss_result: list = [], spdx_downloads: dict = {},
path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {},
excluded_files: set = None, hide_progress: bool = False
excluded_files: set = None, hide_progress: bool = False, kb_url: str = "", kb_token: str = ""
) -> list:

"""
Expand All @@ -337,6 +344,8 @@ def merge_results(
:param path_to_scan: path to the scanned directory for constructing absolute file paths.
:param run_kb: if True, load kb result.
:param excluded_files: set of relative paths to exclude from KB-only file discovery.
:param kb_url: KB API base URL.
:param kb_token: KB API bearer token.
:return merged_result: list of merged result in SourceItem.
"""
if excluded_files is None:
Expand Down Expand Up @@ -373,7 +382,7 @@ def merge_results(
scancode_result.append(new_result_item)

for item in scancode_result:
item.set_oss_item(path_to_scan, run_kb)
item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token)

# Add OSSItem for files in path_to_scan that are not in scancode_result
# when KB returns an origin URL for their MD5 hash (skip excluded_files)
Expand All @@ -392,7 +401,7 @@ def merge_results(
if rel_path in scancode_paths or rel_path in excluded_files:
continue
extra_item = SourceItem(rel_path)
extra_item.set_oss_item(path_to_scan, run_kb)
extra_item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token)
if extra_item.download_location:
scancode_result.append(extra_item)
scancode_paths.add(rel_path)
Expand All @@ -407,7 +416,7 @@ def run_scanners(
formats: list = [], time_out: int = 120,
correct_mode: bool = True, correct_filepath: str = "",
selected_scanner: str = ALL_MODE, path_to_exclude: list = [],
all_exclude_mode: tuple = (), hide_progress: bool = False
all_exclude_mode: tuple = (), hide_progress: bool = False, kb_url: str = "", kb_token: str = ""
) -> Tuple[bool, str, 'ScannerItem', list, list]:
"""
Run Scancode and scanoss.py for the given path.
Expand All @@ -419,6 +428,8 @@ def run_scanners(
:param called_by_cli: if not called by cli, initialize logger.
:param print_matched_text: if requested, output matched text (only for scancode).
:param format: output format (excel, csv, opossum).
:param kb_url: KB API base URL. If empty, read KB_URL environment variable, then use default.
:param kb_token: KB API bearer token. If empty, read KB_TOKEN environment variable.
:return success: success or failure of scancode.
:return result_log["Scan Result"]:
:return merged_result: merged scan result of scancode and scanoss.
Expand All @@ -435,6 +446,7 @@ def run_scanners(
result_log = {}
scan_item = []
api_limit_exceed = False
kb_url, kb_token = resolve_kb_config(kb_url, kb_token)

success, msg, output_path, output_files, output_extensions, formats = check_output_formats_v2(output_file_name, formats)

Expand Down Expand Up @@ -485,15 +497,16 @@ def run_scanners(
if selected_scanner in SCANNER_TYPE:
run_kb = True if selected_scanner in ['kb', ALL_MODE] else False
if run_kb:
if not check_kb_server_reachable():
if not check_kb_server_reachable(kb_url, kb_token):
run_kb = False
run_kb_msg = "KB Unreachable"
run_kb_msg = f"KB({kb_url}) Unreachable"
else:
run_kb_msg = "KB Enabled"
run_kb_msg = f"KB({kb_url}) Enabled"

spdx_downloads, manifest_licenses = metadata_collector(path_to_scan, excluded_files)
merged_result = merge_results(scancode_result, scanoss_result, spdx_downloads,
path_to_scan, run_kb, manifest_licenses, excluded_files, hide_progress)
path_to_scan, run_kb, manifest_licenses, excluded_files,
hide_progress, kb_url, kb_token)
mark_oss_info_correction_files_as_excluded(merged_result)
scan_item = create_report_file(start_time, merged_result, license_list, scanoss_result, selected_scanner,
print_matched_text, output_path, output_files, output_extensions, correct_mode,
Expand Down
Loading