Skip to content

Commit

Permalink
feat: parsers can outbut more debug messages (#466)
Browse files Browse the repository at this point in the history

Signed-off-by: Jan Kowalleck <jan.kowalleck@gmail.com>
  • Loading branch information
jkowalleck authored Dec 13, 2022
1 parent 24c4163 commit 9eedb4f
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 79 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,15 @@ jobs:
- name: Build own SBoM (XML)
run: >
docker run --rm "$DOCKER_TAG"
-X
--environment
--format=xml
--output=-
> "$REPORTS_DIR/docker-image.bom.xml"
- name: Build own SBoM (JSON)
run: >
docker run --rm "$DOCKER_TAG"
-X
--environment
--format=json
--output=-
Expand Down
82 changes: 51 additions & 31 deletions cyclonedx_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import os
import sys
from datetime import datetime
from typing import Optional
from typing import Any, Optional

from cyclonedx.model import Tool
from cyclonedx.model.bom import Bom
Expand Down Expand Up @@ -74,19 +74,19 @@ def __init__(self, args: argparse.Namespace) -> None:
if self._arguments.debug_enabled:
self._DEBUG_ENABLED = True
self._debug_message('!!! DEBUG MODE ENABLED !!!')
self._debug_message('Parsed Arguments: {}'.format(self._arguments))
self._debug_message('Parsed Arguments: {}', self._arguments)

def _get_output_format(self) -> _CLI_OUTPUT_FORMAT:
return _CLI_OUTPUT_FORMAT(str(self._arguments.output_format).lower())

def get_output(self) -> BaseOutput:
try:
parser = self._get_input_parser()
except CycloneDxCmdNoInputFileSupplied as e:
print(f'ERROR: {str(e)}', file=sys.stderr)
except CycloneDxCmdNoInputFileSupplied as error:
print(f'ERROR: {str(error)}', file=sys.stderr)
exit(1)
except CycloneDxCmdException as e:
print(f'ERROR: {str(e)}', file=sys.stderr)
except CycloneDxCmdException as error:
print(f'ERROR: {str(error)}', file=sys.stderr)
exit(1)

if parser and parser.has_warnings():
Expand Down Expand Up @@ -134,13 +134,13 @@ def get_output(self) -> BaseOutput:

def execute(self) -> None:
output_format = self._get_output_format()
self._debug_message(f'output_format: {output_format}')
self._debug_message('output_format: {}', output_format)

# Quick check for JSON && SchemaVersion <= 1.1
if output_format == OutputFormat.JSON and \
str(self._arguments.output_schema_version) in ['1.0', '1.1']:
self._error_and_exit(
message='CycloneDX schema does not support JSON output in Schema Versions < 1.2',
'CycloneDX schema does not support JSON output in Schema Versions < 1.2',
exit_code=2
)

Expand All @@ -154,7 +154,7 @@ def execute(self) -> None:
output_file = self._arguments.output_file
output_filename = os.path.realpath(
output_file if isinstance(output_file, str) else _output_default_filenames[output_format])
self._debug_message('Will be outputting SBOM to file at: {}'.format(output_filename))
self._debug_message('Will be outputting SBOM to file at: {}', output_filename)
output.output_to_file(filename=output_filename, allow_overwrite=self._arguments.output_file_overwrite)

@staticmethod
Expand Down Expand Up @@ -240,30 +240,35 @@ def get_arg_parser(*, prog: Optional[str] = None) -> argparse.ArgumentParser:

return arg_parser

def _debug_message(self, message: str) -> None:
def _debug_message(self, message: str, *args: Any, **kwargs: Any) -> None:
if self._DEBUG_ENABLED:
print('[DEBUG] - {} - {}'.format(datetime.now(), message), file=sys.stderr)
print(f'[DEBUG] - {{__t}} - {message}'.format(*args, **kwargs, __t=datetime.now()),
file=sys.stderr)

@staticmethod
def _error_and_exit(message: str, exit_code: int = 1) -> None:
print('[ERROR] - {} - {}'.format(datetime.now(), message), file=sys.stderr)
def _error_and_exit(message: str, *args: Any, exit_code: int = 1, **kwargs: Any) -> None:
print(f'[ERROR] - {{__t}} - {message}'.format(*args, **kwargs, __t=datetime.now()),
file=sys.stderr)
exit(exit_code)

def _get_input_parser(self) -> BaseParser:
if self._arguments.input_from_environment:
return EnvironmentParser(use_purl_bom_ref=self._arguments.use_purl_bom_ref)
return EnvironmentParser(
use_purl_bom_ref=self._arguments.use_purl_bom_ref,
debug_message=lambda m, *a, **k: self._debug_message(f'EnvironmentParser {m}', *a, **k)
)

# All other Parsers will require some input - grab it now!
if not self._arguments.input_source:
# Nothing passed via STDIN, and no FILENAME supplied, let's assume a default by input type for ease
current_directory = os.getcwd()
try:
if self._arguments.input_from_conda_explicit:
raise CycloneDxCmdNoInputFileSupplied('When using input from Conda Explicit, you need to pipe input'
'via STDIN')
raise CycloneDxCmdNoInputFileSupplied(
'When using input from Conda Explicit, you need to pipe input via STDIN')
elif self._arguments.input_from_conda_json:
raise CycloneDxCmdNoInputFileSupplied('When using input from Conda JSON, you need to pipe input'
'via STDIN')
raise CycloneDxCmdNoInputFileSupplied(
'When using input from Conda JSON, you need to pipe input via STDIN')
elif self._arguments.input_from_pip:
self._arguments.input_source = open(os.path.join(current_directory, 'Pipfile.lock'), 'r')
elif self._arguments.input_from_poetry:
Expand All @@ -272,31 +277,46 @@ def _get_input_parser(self) -> BaseParser:
self._arguments.input_source = open(os.path.join(current_directory, 'requirements.txt'), 'r')
else:
raise CycloneDxCmdException('Parser type could not be determined.')
except FileNotFoundError as e:
except FileNotFoundError as error:
raise CycloneDxCmdNoInputFileSupplied(
f'No input file was supplied and no input was provided on STDIN:\n{str(e)}'
)
f'No input file was supplied and no input was provided on STDIN:\n{str(error)}'
) from error

input_data_fh = self._arguments.input_source
with input_data_fh:
input_data = input_data_fh.read()
input_data_fh.close()

if self._arguments.input_from_conda_explicit:
return CondaListExplicitParser(conda_data=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref)
return CondaListExplicitParser(
conda_data=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref,
debug_message=lambda m, *a, **k: self._debug_message(f'CondaListExplicitParser {m}', *a, **k)
)
elif self._arguments.input_from_conda_json:
return CondaListJsonParser(conda_data=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref)
return CondaListJsonParser(
conda_data=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref,
debug_message=lambda m, *a, **k: self._debug_message(f'CondaListJsonParser {m}', *a, **k)
)
elif self._arguments.input_from_pip:
return PipEnvParser(pipenv_contents=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref)
return PipEnvParser(
pipenv_contents=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref,
debug_message=lambda m, *a, **k: self._debug_message(f'PipEnvParser {m}', *a, **k)
)
elif self._arguments.input_from_poetry:
return PoetryParser(poetry_lock_contents=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref)
return PoetryParser(
poetry_lock_contents=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref,
debug_message=lambda m, *a, **k: self._debug_message(f'PoetryParser {m}', *a, **k)
)
elif self._arguments.input_from_requirements:
return RequirementsParser(requirements_content=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref)
return RequirementsParser(
requirements_content=input_data,
use_purl_bom_ref=self._arguments.use_purl_bom_ref,
debug_message=lambda m, *a, **k: self._debug_message(f'RequirementsParser {m}', *a, **k)
)
else:
raise CycloneDxCmdException('Parser type could not be determined.')

Expand Down
41 changes: 41 additions & 0 deletions cyclonedx_py/parser/_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# encoding: utf-8

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) OWASP Foundation. All Rights Reserved.

"""The following structures are internal helpers."""

from typing import TYPE_CHECKING, Any, Callable

if TYPE_CHECKING:
from mypy_extensions import Arg, KwArg, VarArg

DebugMessageCallback = Callable[[Arg(str, 'message'), VarArg(Any), KwArg(Any)], None] # noqa: F821
"""Callback for debug messaging.
:parameter message: the format string.
:Other Parameters: the *args: to :func:`str.forma()`.
:Keyword Arguments: the **kwargs to :func:`str.format()`.
"""
else:
DebugMessageCallback = Callable[..., None]


def quiet(message: str, *_: Any, **__: Any) -> None:
"""Do not print anything.
Must be compatible to :py:data:`DebugMessageCallback`.
"""
pass
20 changes: 18 additions & 2 deletions cyclonedx_py/parser/conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,21 @@
parse_conda_json_to_conda_package,
parse_conda_list_str_to_conda_package,
)
from ._debug import DebugMessageCallback, quiet


class _BaseCondaParser(BaseParser, metaclass=ABCMeta):
"""Internal abstract parser - not for programmatic use.
"""

def __init__(self, conda_data: str, use_purl_bom_ref: bool = False) -> None:
def __init__(
self, conda_data: str, use_purl_bom_ref: bool = False,
*,
debug_message: DebugMessageCallback = quiet
) -> None:
super().__init__()
debug_message('init {}', self.__class__.__name__)
self._debug_message = debug_message
self._conda_packages: List[CondaPackage] = []
self._parse_to_conda_packages(data_str=conda_data)
self._conda_packages_to_components(use_purl_bom_ref=use_purl_bom_ref)
Expand All @@ -61,7 +68,9 @@ def _conda_packages_to_components(self, use_purl_bom_ref: bool) -> None:
Converts the parsed `CondaPackage` instances into `Component` instances.
"""
self._debug_message('processing conda_packages')
for conda_package in self._conda_packages:
self._debug_message('processing conda_package: {!r}', conda_package)
purl = conda_package_to_purl(conda_package)
bom_ref = purl.to_string() if use_purl_bom_ref else None
c = Component(
Expand Down Expand Up @@ -89,11 +98,14 @@ class CondaListJsonParser(_BaseCondaParser):

def _parse_to_conda_packages(self, data_str: str) -> None:
conda_list_content = json.loads(data_str)

self._debug_message('processing conda_list_content')
for package in conda_list_content:
self._debug_message('processing package: {!r}', package)
conda_package = parse_conda_json_to_conda_package(conda_json_str=json.dumps(package))
if conda_package:
self._conda_packages.append(conda_package)
else:
self._debug_message('no conda_package -> skip')


class CondaListExplicitParser(_BaseCondaParser):
Expand All @@ -103,8 +115,12 @@ class CondaListExplicitParser(_BaseCondaParser):
"""

def _parse_to_conda_packages(self, data_str: str) -> None:
self._debug_message('processing data_str')
for line in data_str.replace('\r\n', '\n').split('\n'):
line = line.strip()
self._debug_message('processing line: {}', line)
conda_package = parse_conda_list_str_to_conda_package(conda_list_str=line)
if conda_package:
self._conda_packages.append(conda_package)
else:
self._debug_message('no conda_package -> skip')
39 changes: 27 additions & 12 deletions cyclonedx_py/parser/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
from cyclonedx.model.component import Component
from cyclonedx.parser import BaseParser

from ._debug import DebugMessageCallback, quiet


class EnvironmentParser(BaseParser):
"""
Expand All @@ -57,45 +59,58 @@ class EnvironmentParser(BaseParser):
Best used when you have virtual Python environments per project.
"""

def __init__(self, use_purl_bom_ref: bool = False) -> None:
def __init__(
self, use_purl_bom_ref: bool = False,
*,
debug_message: DebugMessageCallback = quiet
) -> None:
super().__init__()
debug_message('init {}', self.__class__.__name__)

debug_message('late import pkg_resources')
import pkg_resources

debug_message('processing pkg_resources.working_set')
i: DistInfoDistribution
for i in iter(pkg_resources.working_set):
debug_message('processing working_set item: {!r}', i)
purl = PackageURL(type='pypi', name=i.project_name, version=i.version)
bom_ref = purl.to_string() if use_purl_bom_ref else None
c = Component(name=i.project_name, bom_ref=bom_ref, version=i.version, purl=purl)

i_metadata = self._get_metadata_for_package(i.project_name)
debug_message('processing i_metadata')
if 'Author' in i_metadata:
c.author = i_metadata['Author']

if 'License' in i_metadata and i_metadata['License'] and i_metadata['License'] != 'UNKNOWN':
# Values might be ala `MIT` (SPDX id), `Apache-2.0 license` (arbitrary string), ...
# Therefore, just go with a named license.
try:
c.licenses.add(LicenseChoice(license_=License(license_name=i_metadata['License'])))
except CycloneDxModelException:
# write a debug message?
pass
except CycloneDxModelException as error:
# @todo traceback and details to the output?
debug_message('Warning: suppressed {!r}', error)
del error

debug_message('processing classifiers')
for classifier in i_metadata.get_all("Classifier", []):
debug_message('processing classifier: {!r}', classifier)
classifier = str(classifier)
# Trove classifiers - https://packaging.python.org/specifications/core-metadata/#metadata-classifier
# Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers
if str(classifier).startswith('License :: OSI Approved :: '):
license_name = str(classifier).replace('License :: OSI Approved :: ', '').strip()
elif str(classifier).startswith('License :: '):
license_name = str(classifier).replace('License :: ', '').strip()
if classifier.startswith('License :: OSI Approved :: '):
license_name = classifier.replace('License :: OSI Approved :: ', '').strip()
elif classifier.startswith('License :: '):
license_name = classifier.replace('License :: ', '').strip()
else:
license_name = ''
if license_name:
try:
c.licenses.add(LicenseChoice(license_=License(license_name=license_name)))
except CycloneDxModelException:
# write a debug message?
pass
except CycloneDxModelException as error:
# @todo traceback and details to the output?
debug_message('Warning: suppressed {!r}', error)
del error

self._components.append(c)

Expand Down
Loading

0 comments on commit 9eedb4f

Please sign in to comment.