diff --git a/src/macaron/artifact/maven.py b/src/macaron/artifact/maven.py new file mode 100644 index 000000000..3dcd05c25 --- /dev/null +++ b/src/macaron/artifact/maven.py @@ -0,0 +1,124 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""This module declares types and utilities for Maven artifacts.""" + +from packageurl import PackageURL + +from macaron.slsa_analyzer.provenance.intoto import InTotoPayload +from macaron.slsa_analyzer.provenance.intoto.v01 import InTotoV01Subject +from macaron.slsa_analyzer.provenance.intoto.v1 import InTotoV1ResourceDescriptor +from macaron.slsa_analyzer.provenance.witness import ( + extract_build_artifacts_from_witness_subjects, + is_witness_provenance_payload, + load_witness_verifier_config, +) + + +class MavenSubjectPURLMatcher: + """A matcher matching a PURL identifying a Maven artifact to a provenance subject.""" + + @staticmethod + def get_subject_in_provenance_matching_purl( + provenance_payload: InTotoPayload, purl: PackageURL + ) -> InTotoV01Subject | InTotoV1ResourceDescriptor | None: + """Get the subject in the provenance matching the PURL. + + In this case where the provenance is assumed to be built from a Java project, + the subject must be a Maven artifact. + + Parameters + ---------- + provenance_payload : InTotoPayload + The provenance payload. + purl : PackageURL + The PackageURL identifying the matching subject. + + Returns + ------- + InTotoV01Subject | InTotoV1ResourceDescriptor | None + The subject in the provenance matching the given PURL. + """ + if not purl.namespace: + return None + if not purl.version: + return None + if purl.type != "maven": + return None + + if not is_witness_provenance_payload( + payload=provenance_payload, + predicate_types=load_witness_verifier_config().predicate_types, + ): + return None + artifact_subjects = extract_build_artifacts_from_witness_subjects(provenance_payload) + + for subject in artifact_subjects: + _, _, artifact_filename = subject["name"].rpartition("/") + subject_purl = create_maven_purl_from_artifact_filename( + artifact_filename=artifact_filename, + group_id=purl.namespace, + version=purl.version, + ) + if subject_purl == purl: + return subject + + return None + + +def create_maven_purl_from_artifact_filename( + artifact_filename: str, + group_id: str, + version: str, +) -> PackageURL | None: + """Create a Maven PackageURL given an artifact filename, a group id, and a version. + + For reference, see: + - https://maven.apache.org/ref/3.9.6/maven-core/artifact-handlers.html + - https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst#maven + Notes: + - For the time being, we are only supporting the ``"type"`` qualifier, although the + Maven section in the PackageURL docs also mention the ``"classifier"`` qualifier. + This is because not all artifact types has a unique value of ``"classifier"`` + according to the Artifact Handlers table in the Maven Core reference. In addition, + not supporting the ``"classifier"`` qualifier at the moment simplifies the + implementation for PURL decoding and generation until there is a concrete use + case for this additional qualifier. + - We are only supporting only 4 artifact types: jar, pom, javadoc, and java-source. + + Parameters + ---------- + artifact_filename : str + The filename of the artifact. + group_id : str + The group id of the artifact. + version : str + The version of the artifact. + + Returns + ------- + PackageURL | None + A Maven artifact PackageURL, or `None` if the filename does not follow any + of the supported artifact name patters. + """ + # Each artifact name should follow the pattern "-" + # where "" is one of the following. + suffix_to_purl_qualifiers = { + f"-{version}.jar": {"type": "jar"}, + f"-{version}.pom": {"type": "pom"}, + f"-{version}-javadoc.jar": {"type": "javadoc"}, + f"-{version}-sources.jar": {"type": "java-source"}, + } + + for suffix, purl_qualifiers in suffix_to_purl_qualifiers.items(): + if artifact_filename.endswith(suffix): + artifact_id = artifact_filename[: -len(suffix)] + return PackageURL( + type="maven", + namespace=group_id, + name=artifact_id, + version=version, + qualifiers=purl_qualifiers, + ) + + return None diff --git a/src/macaron/database/table_definitions.py b/src/macaron/database/table_definitions.py index 3cab09cb9..542dd5679 100644 --- a/src/macaron/database/table_definitions.py +++ b/src/macaron/database/table_definitions.py @@ -15,7 +15,7 @@ import string from datetime import datetime from pathlib import Path -from typing import Any +from typing import Any, Self from packageurl import PackageURL from sqlalchemy import ( @@ -32,9 +32,11 @@ ) from sqlalchemy.orm import Mapped, mapped_column, relationship +from macaron.artifact.maven import MavenSubjectPURLMatcher from macaron.database.database_manager import ORMBase from macaron.database.rfc3339_datetime import RFC3339DateTime from macaron.errors import InvalidPURLError +from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, ProvenanceSubjectPURLMatcher from macaron.slsa_analyzer.slsa_req import ReqName logger: logging.Logger = logging.getLogger(__name__) @@ -168,6 +170,13 @@ class Component(PackageURLMixin, ORMBase): secondaryjoin=components_association_table.c.child_component == id, ) + #: The optional one-to-one relationship with a provenance subject in case this + #: component represents a subject in a provenance. + provenance_subject: Mapped["ProvenanceSubject | None"] = relationship( + back_populates="component", + lazy="immediate", + ) + def __init__(self, purl: str, analysis: Analysis, repository: "Repository | None"): """ Instantiate the software component using PURL identifier. @@ -528,3 +537,71 @@ class HashDigest(ORMBase): #: The many-to-one relationship with artifacts. artifact: Mapped["ReleaseArtifact"] = relationship(back_populates="digests", lazy="immediate") + + +class ProvenanceSubject(ORMBase): + """A subject in a provenance that matches the user-provided PackageURL. + + This subject may be later populated in VSAs during policy verification. + """ + + __tablename__ = "_provenance_subject" + + #: The primary key. + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) # noqa: A003 + + #: The component id of the provenance subject. + component_id: Mapped[int] = mapped_column( + Integer, + ForeignKey("_component.id"), + nullable=False, + ) + + #: The required one-to-one relationship with a component. + component: Mapped[Component] = relationship( + back_populates="provenance_subject", + lazy="immediate", + ) + + #: The SHA256 hash of the subject. + sha256: Mapped[str] = mapped_column(String, nullable=False) + + @classmethod + def from_purl_and_provenance( + cls, + purl: PackageURL, + provenance_payload: InTotoPayload, + ) -> Self | None: + """Create a ``ProvenanceSubject`` entry if there is a provenance subject matching the PURL. + + Parameters + ---------- + purl : PackageURL + The PackageURL identifying the software component being analyzed. + provenance_payload : InTotoPayload + The provenance payload. + + Returns + ------- + Self | None + A ``ProvenanceSubject`` entry with the SHA256 digest of the provenance subject + matching the given PURL. + """ + subject_artifact_types: list[ProvenanceSubjectPURLMatcher] = [MavenSubjectPURLMatcher] + + for subject_artifact_type in subject_artifact_types: + subject = subject_artifact_type.get_subject_in_provenance_matching_purl( + provenance_payload, + purl, + ) + if subject is None: + return None + digest = subject["digest"] + if digest is None: + return None + sha256 = digest.get("sha256") + if not sha256: + return None + return cls(sha256=sha256) + + return None diff --git a/src/macaron/slsa_analyzer/analyzer.py b/src/macaron/slsa_analyzer/analyzer.py index 0496f29cd..f905e978e 100644 --- a/src/macaron/slsa_analyzer/analyzer.py +++ b/src/macaron/slsa_analyzer/analyzer.py @@ -19,7 +19,7 @@ from macaron.config.global_config import global_config from macaron.config.target_config import Configuration from macaron.database.database_manager import DatabaseManager, get_db_manager, get_db_session -from macaron.database.table_definitions import Analysis, Component, Repository +from macaron.database.table_definitions import Analysis, Component, ProvenanceSubject, Repository from macaron.dependency_analyzer import DependencyAnalyzer, DependencyInfo from macaron.errors import ( CloneError, @@ -332,7 +332,12 @@ def run_single( # Create the component. component = None try: - component = self.add_component(analysis, analysis_target, existing_records) + component = self.add_component( + analysis, + analysis_target, + existing_records, + provenance_payload, + ) except PURLNotFoundError as error: logger.error(error) return Record( @@ -484,6 +489,7 @@ def add_component( analysis: Analysis, analysis_target: AnalysisTarget, existing_records: dict[str, Record] | None = None, + provenance_payload: InTotoPayload | None = None, ) -> Component: """Add a software component if it does not exist in the DB already. @@ -547,18 +553,30 @@ def add_component( raise PURLNotFoundError( f"The repository {analysis_target.repo_path} is not available and no PURL is provided from the user." ) - - repo_snapshot_purl = PackageURL( + purl = PackageURL( type=repository.type, namespace=repository.owner, name=repository.name, version=repository.commit_sha, ) - return Component(purl=str(repo_snapshot_purl), analysis=analysis, repository=repository) + else: + # If the PURL is available, we always create the software component with it whether the repository is + # available or not. + purl = analysis_target.parsed_purl + + component = Component( + purl=str(purl), + analysis=analysis, + repository=repository, + ) + + if provenance_payload: + component.provenance_subject = ProvenanceSubject.from_purl_and_provenance( + purl=purl, + provenance_payload=provenance_payload, + ) - # If the PURL is available, we always create the software component with it whether the repository is - # available or not. - return Component(purl=str(analysis_target.parsed_purl), analysis=analysis, repository=repository) + return component @staticmethod def parse_purl(config: Configuration) -> PackageURL | None: diff --git a/src/macaron/slsa_analyzer/checks/provenance_available_check.py b/src/macaron/slsa_analyzer/checks/provenance_available_check.py index 1462a85af..205326011 100644 --- a/src/macaron/slsa_analyzer/checks/provenance_available_check.py +++ b/src/macaron/slsa_analyzer/checks/provenance_available_check.py @@ -57,7 +57,7 @@ class ProvenanceAvailableFacts(CheckFacts): id: Mapped[int] = mapped_column(ForeignKey("_check_facts.id"), primary_key=True) # noqa: A003 #: The provenance asset name. - asset_name: Mapped[str] = mapped_column(String, nullable=False, info={"justification": JustificationType.TEXT}) + asset_name: Mapped[str] = mapped_column(String, nullable=True, info={"justification": JustificationType.TEXT}) #: The URL for the provenance asset. asset_url: Mapped[str] = mapped_column(String, nullable=True, info={"justification": JustificationType.HREF}) @@ -504,6 +504,12 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: CheckResultData The result of the check. """ + if ctx.dynamic_data["provenance"]: + return CheckResultData( + result_tables=[ProvenanceAvailableFacts(confidence=Confidence.HIGH)], + result_type=CheckResultType.PASSED, + ) + provenance_extensions = defaults.get_list( "slsa.verifier", "provenance_extensions", diff --git a/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py b/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py index a66194335..16f621a5a 100644 --- a/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py +++ b/src/macaron/slsa_analyzer/checks/provenance_l3_content_check.py @@ -58,6 +58,17 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: logger.info("%s check was unable to find any expectations.", self.check_info.check_id) return CheckResultData(result_tables=[], result_type=CheckResultType.UNKNOWN) + if ctx.dynamic_data["provenance"]: + if expectation.validate(ctx.dynamic_data["provenance"]): + return CheckResultData( + result_tables=[expectation], + result_type=CheckResultType.PASSED, + ) + return CheckResultData( + result_tables=[expectation], + result_type=CheckResultType.FAILED, + ) + package_registry_info_entries = ctx.dynamic_data["package_registries"] ci_services = ctx.dynamic_data["ci_services"] diff --git a/src/macaron/slsa_analyzer/checks/provenance_witness_l1_check.py b/src/macaron/slsa_analyzer/checks/provenance_witness_l1_check.py index 539855b4e..8b274d8ef 100644 --- a/src/macaron/slsa_analyzer/checks/provenance_witness_l1_check.py +++ b/src/macaron/slsa_analyzer/checks/provenance_witness_l1_check.py @@ -14,9 +14,9 @@ from macaron.slsa_analyzer.checks.check_result import CheckResultData, CheckResultType, Confidence, JustificationType from macaron.slsa_analyzer.package_registry import JFrogMavenRegistry from macaron.slsa_analyzer.package_registry.jfrog_maven_registry import JFrogMavenAsset +from macaron.slsa_analyzer.provenance.intoto.v01 import InTotoV01Subject from macaron.slsa_analyzer.provenance.witness import ( - WitnessProvenanceSubject, - extract_witness_provenance_subjects, + extract_build_artifacts_from_witness_subjects, is_witness_provenance_payload, load_witness_verifier_config, ) @@ -51,7 +51,7 @@ class WitnessProvenanceAvailableFacts(CheckFacts): def verify_artifact_assets( artifact_assets: list[JFrogMavenAsset], - subjects: set[WitnessProvenanceSubject], + subjects: list[InTotoV01Subject], ) -> bool: """Verify artifact assets against subjects in the witness provenance payload. @@ -59,7 +59,7 @@ def verify_artifact_assets( ---------- artifact_assets : list[JFrogMavenAsset] List of artifact assets to verify. - subjects : list[WitnessProvenanceSubject] + subjects : list[InTotoV01Subject] List of subjects extracted from the in the witness provenance. Returns @@ -70,12 +70,12 @@ def verify_artifact_assets( # A look-up table to verify: # 1. if the name of the artifact appears in any subject of the witness provenance, then # 2. if the digest of the artifact could be found - look_up: dict[str, dict[str, WitnessProvenanceSubject]] = {} + look_up: dict[str, dict[str, InTotoV01Subject]] = {} for subject in subjects: - if subject.artifact_name not in look_up: - look_up[subject.artifact_name] = {} - look_up[subject.artifact_name][subject.sha256_digest] = subject + if subject["name"] not in look_up: + look_up[subject["name"]] = {} + look_up[subject["name"]][subject["digest"]["sha256"]] = subject for asset in artifact_assets: if asset.name not in look_up: @@ -93,7 +93,7 @@ def verify_artifact_assets( logger.info( "Successfully verified asset '%s' against the subject '%s' in the provenance.", asset.name, - subject.subject_name, + subject["name"], ) return True @@ -167,7 +167,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: version=provenance.asset.version, extensions=witness_verifier_config.artifact_extensions, ) - subjects = extract_witness_provenance_subjects(provenance.payload) + subjects = extract_build_artifacts_from_witness_subjects(provenance.payload) if not verify_artifact_assets(artifact_assets, subjects): return CheckResultData( diff --git a/src/macaron/slsa_analyzer/provenance/intoto/__init__.py b/src/macaron/slsa_analyzer/provenance/intoto/__init__.py index 1babc31e9..c82a590fc 100644 --- a/src/macaron/slsa_analyzer/provenance/intoto/__init__.py +++ b/src/macaron/slsa_analyzer/provenance/intoto/__init__.py @@ -6,10 +6,14 @@ from __future__ import annotations from collections.abc import Mapping -from typing import NamedTuple, TypeVar +from typing import NamedTuple, Protocol, TypeVar + +from packageurl import PackageURL from macaron.slsa_analyzer.provenance.intoto import v01, v1 from macaron.slsa_analyzer.provenance.intoto.errors import ValidateInTotoPayloadError +from macaron.slsa_analyzer.provenance.intoto.v01 import InTotoV01Subject +from macaron.slsa_analyzer.provenance.intoto.v1 import InTotoV1ResourceDescriptor from macaron.util import JsonType # Type of an in-toto statement. @@ -119,3 +123,31 @@ def validate_intoto_payload(payload: dict[str, JsonType]) -> InTotoPayload: raise error raise ValidateInTotoPayloadError("Invalid value for the attribute '_type' of the provenance payload.") + + +class ProvenanceSubjectPURLMatcher(Protocol): + """Interface for a matcher that matches a PURL to a subject in the provenance.""" + + @staticmethod + def get_subject_in_provenance_matching_purl( + provenance_payload: InTotoPayload, + purl: PackageURL, + ) -> InTotoV01Subject | InTotoV1ResourceDescriptor | None: + """Obtain the subject in the provenance payload matching the given PackageURL. + + This function assumes there is only one such subject. If there are multiple + such subjects, the first matching subject is returned. However, this should not + happen since the PackageURL should be specific enough to identify a single subject. + + Parameters + ---------- + provenance_payload : InTotoPayload + The provenance payload. + purl : PackageURL + The PackageURL identifying the matching subject. + + Returns + ------- + InTotoV01Subject | InTotoV1ResourceDescriptor | None + The subject in the provenance matching the given PURL. + """ diff --git a/src/macaron/slsa_analyzer/provenance/witness/__init__.py b/src/macaron/slsa_analyzer/provenance/witness/__init__.py index 408fb31ca..cbd1c7b1b 100644 --- a/src/macaron/slsa_analyzer/provenance/witness/__init__.py +++ b/src/macaron/slsa_analyzer/provenance/witness/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """Witness provenance (https://github.com/testifysec/witness).""" @@ -9,6 +9,7 @@ from macaron.config.defaults import defaults from macaron.slsa_analyzer.asset import AssetLocator from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, InTotoV01Payload +from macaron.slsa_analyzer.provenance.intoto.v01 import InTotoV01Subject from macaron.slsa_analyzer.provenance.witness.attestor import GitLabWitnessAttestor, RepoAttestor logger: logging.Logger = logging.getLogger(__name__) @@ -120,41 +121,38 @@ def extract_repo_url(witness_payload: InTotoPayload) -> str | None: return None -def extract_witness_provenance_subjects(witness_payload: InTotoPayload) -> set[WitnessProvenanceSubject]: - """Read the ``"subjects"`` field of the provenance to obtain the hash digests of each subject. +def extract_build_artifacts_from_witness_subjects(witness_payload: InTotoPayload) -> list[InTotoV01Subject]: + """Extract subjects that are build artifacts from the ``"subject"`` field of the provenance. + + Each artifact subject is assumed to have a sha256 digest. If a sha256 digest is not present for + a subject, that subject is ignored. Parameters ---------- witness_payload : InTotoPayload The witness provenance payload. - extensions : list[str] - The allowed extensions of the subjects. - All subjects with names not ending in these extensions are ignored. Returns ------- - dict[str, str] - A dictionary in which each key is a subject name and each value is the corresponding SHA256 digest. + list[InTotoV01Subject] + A list subjects in the ``"subject"`` field of the provenance that are build artifacts. """ - if isinstance(witness_payload, InTotoV01Payload): - subjects = witness_payload.statement["subject"] - subject_digests = set() - - for subject in subjects: - name = subject["name"] - digest = subject["digest"] - - sha256 = digest.get("sha256") - if not sha256 or not isinstance(sha256, str): - continue - - subject_digests.add( - WitnessProvenanceSubject( - subject_name=name, - sha256_digest=sha256, - ) - ) - - return subject_digests - - return set() + if not isinstance(witness_payload, InTotoV01Payload): + return [] + + subjects = witness_payload.statement["subject"] + artifact_subjects = [] + for subject in subjects: + # Filter all subjects attested by the product attestor, which records all changed and + # created files in the build process. + # Documentation: https://github.com/in-toto/witness/blob/main/docs/attestors/product.md + if not subject["name"].startswith("https://witness.dev/attestations/product/v0.1/file:"): + continue + + digest = subject["digest"] + sha256 = digest.get("sha256") + if not sha256 or not isinstance(sha256, str): + continue + artifact_subjects.append(subject) + + return artifact_subjects diff --git a/src/macaron/vsa/vsa.py b/src/macaron/vsa/vsa.py index 60c704e21..f06a948f9 100644 --- a/src/macaron/vsa/vsa.py +++ b/src/macaron/vsa/vsa.py @@ -9,10 +9,19 @@ import datetime import json import logging +from collections.abc import Iterable from enum import StrEnum from importlib import metadata as importlib_metadata from typing import TypedDict +import sqlalchemy +from packageurl import PackageURL +from sqlalchemy.orm import Session + +from macaron.database.database_manager import get_db_manager +from macaron.database.table_definitions import ProvenanceSubject +from macaron.util import JsonType + logger: logging.Logger = logging.getLogger(__name__) # Note: The lint error "N815:mixedCase variable in class scope" is disabled for @@ -135,11 +144,45 @@ class VerificationResult(StrEnum): PASSED = "PASSED" +def get_common_purl_from_artifact_purls(purl_strs: Iterable[str]) -> str | None: + """Get a single common PackageURL given some artifact PackageURLs. + + Assumption: A package may have more than one artifact. If each artifact is identified + by a PackageURL, these PackageURLs still share the type, namespace, name, and + version values. The common PackageURL contains these values. + """ + try: + purls = [PackageURL.from_string(purl_str) for purl_str in purl_strs] + except ValueError: + return None + + if len(purls) == 0: + return None + + purl_type = purls[0].type + namespace = purls[0].namespace + name = purls[0].name + version = purls[0].version + + for purl in purls: + if any( + [ + purl_type != purl.type, + namespace != purl.namespace, + name != purl.name, + version != purl.version, + ] + ): + return None + + common_purl = PackageURL(type=purl_type, namespace=namespace, name=name, version=version) + return str(common_purl) + + def create_vsa_statement( - subject_purl: str, + passed_components: dict[str, int], policy_content: str, - verification_result: VerificationResult, -) -> VsaStatement: +) -> VsaStatement | None: """Construct the Statement layer of the VSA. Parameters @@ -157,13 +200,49 @@ def create_vsa_statement( VsaStatement A Statement layer of the VSA. """ + subjects = [] + + try: + with Session(get_db_manager().engine) as session, session.begin(): + for purl, component_id in passed_components.items(): + try: + provenance_subject = ( + session.execute( + sqlalchemy.select(ProvenanceSubject).where(ProvenanceSubject.component_id == component_id) + ) + .scalars() + .one() + ) + sha256 = provenance_subject.sha256 + except sqlalchemy.orm.exc.NoResultFound: + sha256 = None + logger.debug("No digest stored for software component '%s'.", purl) + except sqlalchemy.orm.exc.MultipleResultsFound as e: + logger.debug( + "Unexpected database query result. " + "Expected no more than one result when retrieving SHA256 of a provenance subject. " + "Error: %s", + e, + ) + continue + + subject: dict[str, JsonType] = { + "uri": purl, + } + if sha256: + subject["digest"] = { + "sha256": sha256, + } + + subjects.append(subject) + + except sqlalchemy.exc.SQLAlchemyError as error: + logger.debug("Cannot retrieve hash digest of software components: %s.", error) + return None + return VsaStatement( _type="https://in-toto.io/Statement/v1", - subject=[ - { - "uri": subject_purl, - } - ], + subject=subjects, predicateType="https://slsa.dev/verification_summary/v1", predicate=VsaPredicate( verifier=Verifier( @@ -173,34 +252,33 @@ def create_vsa_statement( }, ), timeVerified=datetime.datetime.now(tz=datetime.UTC).isoformat(), - resourceUri=subject_purl, + resourceUri=get_common_purl_from_artifact_purls(passed_components.keys()) or "", policy={ "content": policy_content, }, - verificationResult=verification_result, + verificationResult=VerificationResult.PASSED, verifiedLevels=[], ), ) -def get_subject_verification_result(policy_result: dict) -> tuple[str, VerificationResult] | None: - """Get the PURL (string) and verification result of the single software component the policy applies to. +def get_components_passing_policy(policy_result: dict) -> dict[str, int] | None: + """Get the verification result in the form of PURLs and component ids of software artifacts passing the policy. This is currently done by reading the facts of two relations: ``component_violates_policy``, and ``component_satisfies_policy`` from the result of the policy engine. - We define two PURLs to be different if the two PURL strings are different. + The result of this function depends on the policy engine result. - The result of this function depends on the policy engine result: + If there exist any software component failing the policy, this function returns ``None``. - - If there exist multiple different PURLs, this function returns ``None``. - - If there exist multiple occurrences of the same PURL and it is the only unique - PURL in the policy engine result, this function returns the latest occurrence, - which is the PURL that goes with the highest component ID, taking advantage of - component IDs being auto-incremented. - - If there is no PURL in the result, i.e. the policy applies to no software component - in the database, this function also returns ``None``. + When all software components in the result pass the policy, if there exist multiple occurrences + of the same PURL, this function returns the latest occurrence, which is the one with the highest + component id, taking advantage of component ids being auto-incremented. + + If there is no PURL in the result, i.e. the policy applies to no software component in the database, + this function also returns ``None``. Parameters ---------- @@ -210,53 +288,44 @@ def get_subject_verification_result(policy_result: dict) -> tuple[str, Verificat Returns ------- - tuple[str, VerificationResult] | None - A pair of PURL and verification result of the only software component that - the policy applies to, or ``None`` according to the aforementioned conditions. + dict[str, int] | None + A dictionary of software components passing the policy, or ``None`` if there is any + component failing the policy or if there is no software component in the policy engine result. + Each key is a PackageURL of the software component, and each value is the corresponding + component id of that component. """ component_violates_policy_facts = policy_result.get("component_violates_policy", []) component_satisfies_policy_facts = policy_result.get("component_satisfies_policy", []) - # key: PURL; value: result with the highest component id - component_results: dict[str, tuple[int, VerificationResult]] = {} + if len(component_violates_policy_facts) > 0: + logger.info("Encountered software component failing the policy. No VSA is generated.") + return None + + # This dictionary deduplicates multiple occurrences of the same PURL in the + # ``component_satisfies_policy_facts`` result, which may occur because the same PURL + # may appear multiple times in the ``_component`` table of the database. + # Here, we are only taking the latest result into consideration. + # Each key is a PURL and each value is the the highest component id of the + # corresponding PURL, taking advantage of the component id column being auto-incremented. + passed_components: dict[str, int] = {} - for component_id_string, purl, _ in component_violates_policy_facts: - try: - component_id = int(component_id_string) - except ValueError: - logger.error("Expected component id %s to be an integer.", component_id_string) - return None - if purl not in component_results: - component_results[purl] = (component_id, VerificationResult.FAILED) - else: - current_component_id, _ = component_results[purl] - if component_id > current_component_id: - component_results[purl] = (component_id, VerificationResult.FAILED) for component_id_string, purl, _ in component_satisfies_policy_facts: try: component_id = int(component_id_string) except ValueError: logger.error("Expected component id %s to be an integer.", component_id_string) return None - if purl not in component_results: - component_results[purl] = (component_id, VerificationResult.PASSED) + if purl not in passed_components: + passed_components[purl] = component_id else: - current_component_id, _ = component_results[purl] + current_component_id = passed_components[purl] if component_id > current_component_id: - component_results[purl] = (component_id, VerificationResult.PASSED) - - if len(component_results) != 1: - if len(component_results) == 0: - logger.info("The policy applies to no software components.") - if len(component_results) > 1: - logger.info("The policy applies to more than one software components.") - logger.info("No VSA will be generated.") - return None + passed_components[purl] = component_id - subject_purl = next(iter(component_results.keys())) - _, verification_result = component_results[subject_purl] + if len(passed_components) == 0: + return None - return subject_purl, verification_result + return passed_components def generate_vsa(policy_content: str, policy_result: dict) -> Vsa | None: @@ -275,17 +344,14 @@ def generate_vsa(policy_content: str, policy_result: dict) -> Vsa | None: The VSA, or ``None`` if generating a VSA is not appropriate according to the policy engine result. """ - subject_verification_result = get_subject_verification_result(policy_result) + passed_components = get_components_passing_policy(policy_result) - if subject_verification_result is None: + if passed_components is None: return None - subject_purl, verification_result = subject_verification_result - unencoded_payload = create_vsa_statement( - subject_purl=subject_purl, + passed_components, policy_content=policy_content, - verification_result=verification_result, ) try: diff --git a/tests/artifact/__init__.py b/tests/artifact/__init__.py new file mode 100644 index 000000000..c8a50abb7 --- /dev/null +++ b/tests/artifact/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. diff --git a/tests/artifact/test_maven.py b/tests/artifact/test_maven.py new file mode 100644 index 000000000..31e95ba53 --- /dev/null +++ b/tests/artifact/test_maven.py @@ -0,0 +1,88 @@ +# Copyright (c) 2024 - 2024, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Tests for types and utilities for Maven artifacts.""" + +import pytest +from packageurl import PackageURL + +from macaron.artifact.maven import MavenSubjectPURLMatcher +from macaron.slsa_analyzer.provenance.intoto import InTotoPayload, validate_intoto_payload + + +@pytest.mark.parametrize( + ("purl_str", "subject_index"), + [ + pytest.param( + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=jar", + 0, + id="purl for jar artifact", + ), + pytest.param( + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=javadoc", + 1, + id="purl for javadoc artifact", + ), + pytest.param( + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=java-source", + 2, + id="purl for java source artifact", + ), + pytest.param( + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=pom", + 3, + id="purl for pom artifact", + ), + ], +) +def test_to_maven_artifact_subject( + purl_str: str, + subject_index: int, +) -> None: + """Test constructing a ``MavenArtifact`` object from a given artifact name.""" + purl = PackageURL.from_string(purl_str) + provenance_payload: InTotoPayload = validate_intoto_payload( + { + "_type": "https://in-toto.io/Statement/v0.1", + "subject": [ + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.jar", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + }, + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9-javadoc.jar", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + }, + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9-sources.jar", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + }, + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.pom", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + }, + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/foobar.txt", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + }, + ], + "predicateType": "https://witness.testifysec.com/attestation-collection/v0.1", + } + ) + assert ( + MavenSubjectPURLMatcher.get_subject_in_provenance_matching_purl( + provenance_payload=provenance_payload, + purl=purl, + ) + == provenance_payload.statement["subject"][subject_index] + ) diff --git a/tests/slsa_analyzer/provenance/test_witness_provenance.py b/tests/slsa_analyzer/provenance/test_witness_provenance.py index 901c8f1db..576787aed 100644 --- a/tests/slsa_analyzer/provenance/test_witness_provenance.py +++ b/tests/slsa_analyzer/provenance/test_witness_provenance.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 - 2023, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2023 - 2024, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. """Tests for witness provenance.""" @@ -12,9 +12,8 @@ from macaron.config.defaults import load_defaults from macaron.slsa_analyzer.provenance.intoto import InTotoV01Payload, v01 from macaron.slsa_analyzer.provenance.witness import ( - WitnessProvenanceSubject, WitnessVerifierConfig, - extract_witness_provenance_subjects, + extract_build_artifacts_from_witness_subjects, is_witness_provenance_payload, load_witness_verifier_config, ) @@ -124,18 +123,20 @@ def test_is_witness_provenance_payload( } """ ), - { - WitnessProvenanceSubject( - subject_name=( - "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.jar" - ), - sha256_digest="6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", - ), - WitnessProvenanceSubject( - subject_name="https://witness.dev/attestations/product/v0.1/file:foo/bar/baz.txt", - sha256_digest="cbc8f554dbfa17e5c5873c425a09cb1488c2f784ac52340747a92b7ec0aaefba", - ), - }, + [ + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.jar", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + }, + { + "name": "https://witness.dev/attestations/product/v0.1/file:foo/bar/baz.txt", + "digest": { + "sha256": "cbc8f554dbfa17e5c5873c425a09cb1488c2f784ac52340747a92b7ec0aaefba", + }, + }, + ], id="Valid payload", ), pytest.param( @@ -159,22 +160,53 @@ def test_is_witness_provenance_payload( } """ ), - { - WitnessProvenanceSubject( - subject_name=( - "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.jar" - ), - sha256_digest="6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", - ), - }, + [ + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.jar", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + } + ], id="Missing sha256", ), + pytest.param( + json.loads( + """ +{ + "subject": [ + { + "name": "https://witness.dev/attestations/git/v0.1/authoremail:foo.bar@oracle.com", + "digest": { + "sha256": "923e32b55b983525acfd0df3ad18bbb016623bdf33ba7706c7ab8318ff1284a1" + } + }, + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.jar", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e" + } + } + ] +} +""" + ), + [ + { + "name": "https://witness.dev/attestations/product/v0.1/file:target/jackson-annotations-2.9.9.jar", + "digest": { + "sha256": "6f97fe2094bd50435d6fbb7a2f6c2638fe44e6af17cfff98ce111d0abfffe17e", + }, + } + ], + id="Not a subject attested by the product attestor", + ), ], ) -def test_extract_witness_provenances_subjects( +def test_extract_build_artifacts_from_witness_subjects( payload_json: v01.InTotoV01Statement, - expected_subjects: set[WitnessProvenanceSubject], + expected_subjects: list[v01.InTotoV01Subject], ) -> None: """Test the ``extract_witness_provenance_subjects`` function.""" payload = InTotoV01Payload(statement=payload_json) - assert extract_witness_provenance_subjects(payload) == expected_subjects + assert extract_build_artifacts_from_witness_subjects(payload) == expected_subjects diff --git a/tests/vsa/test_vsa.py b/tests/vsa/test_vsa.py index f96a28861..dbe8b768c 100644 --- a/tests/vsa/test_vsa.py +++ b/tests/vsa/test_vsa.py @@ -6,7 +6,7 @@ import pytest -from macaron.vsa.vsa import VerificationResult, get_subject_verification_result +from macaron.vsa.vsa import get_common_purl_from_artifact_purls, get_components_passing_policy @pytest.mark.parametrize( @@ -23,23 +23,9 @@ ], "component_violates_policy": [], }, - ("pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", VerificationResult.PASSED), + {"pkg:github.com/slsa-framework/slsa-verifier@v2.0.0": 1}, id="A single PURL satisfying policy", ), - pytest.param( - { - "component_satisfies_policy": [], - "component_violates_policy": [ - [ - "1", - "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", - "slsa_verifier_policy", - ], - ], - }, - ("pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", VerificationResult.FAILED), - id="A single PURL violating policy", - ), pytest.param( { "component_satisfies_policy": [ @@ -56,13 +42,12 @@ ], "component_violates_policy": [], }, - ("pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", VerificationResult.PASSED), + {"pkg:github.com/slsa-framework/slsa-verifier@v2.0.0": 2}, id="Two occurrences of the same PURL both satisfying a policy", ), pytest.param( { - "component_satisfies_policy": [], - "component_violates_policy": [ + "component_satisfies_policy": [ [ "1", "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", @@ -70,33 +55,43 @@ ], [ "2", - "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", + "pkg:github.com/slsa-framework/slsa-github-generator@v1.0.0", "slsa_verifier_policy", ], ], + "component_violates_policy": [], + }, + { + "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0": 1, + "pkg:github.com/slsa-framework/slsa-github-generator@v1.0.0": 2, }, - ("pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", VerificationResult.FAILED), - id="Two occurrences of the same PURL both violating a policy", + id="Two different PURLs both satisfying a policy", ), + ], +) +def test_valid_subject_verification_result( + policy_result: dict, + expected: dict[str, int], +) -> None: + """Test the ``get_components_passing_policy`` in cases where there is a result.""" + assert get_components_passing_policy(policy_result) == expected + + +@pytest.mark.parametrize( + ("policy_result"), + [ pytest.param( { - "component_satisfies_policy": [ - [ - "1000", - "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", - "slsa_verifier_policy", - ], - ], + "component_satisfies_policy": [], "component_violates_policy": [ [ - "9", + "1", "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", "slsa_verifier_policy", ], ], }, - ("pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", VerificationResult.PASSED), - id="Two occurrences of the same PURL, the one satisfying the policy is latest", + id="A single PURL violating policy", ), pytest.param( { @@ -115,39 +110,26 @@ ], ], }, - ("pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", VerificationResult.FAILED), id="Two occurrences of the same PURL, the one violating the policy is latest", ), - ], -) -def test_valid_subject_verification_result( - policy_result: dict, - expected: tuple[str, VerificationResult], -) -> None: - """Test the ``get_subject_verification_result`` in cases where there is a result.""" - assert get_subject_verification_result(policy_result) == expected - - -@pytest.mark.parametrize( - ("policy_result"), - [ pytest.param( { "component_satisfies_policy": [ [ - "1", + "1000", "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", "slsa_verifier_policy", ], + ], + "component_violates_policy": [ [ - "2", - "pkg:github.com/slsa-framework/slsa-github-generator@v1.0.0", + "9", + "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", "slsa_verifier_policy", ], ], - "component_violates_policy": [], }, - id="Two different PURLs both satisfying a policy", + id="Two occurrences of the same PURL, the one satisfying the policy is latest", ), pytest.param( { @@ -203,23 +185,46 @@ def test_valid_subject_verification_result( }, id="Component id is not an auto-incremented number 1", ), - pytest.param( - { - "component_satisfies_policy": [], - "component_violates_policy": [ - [ - "foo", - "pkg:github.com/slsa-framework/slsa-verifier@v2.0.0", - "slsa_verifier_policy", - ], - ], - }, - id="Component id is not an auto-incremented number 2", - ), ], ) def test_invalid_subject_verification_result( policy_result: dict, ) -> None: - """Test the ``get_subject_verification_result`` in cases where the result should be ``None``.""" - assert get_subject_verification_result(policy_result) is None + """Test the ``get_components_passing_policy`` in cases where the result should be ``None``.""" + assert get_components_passing_policy(policy_result) is None + + +@pytest.mark.parametrize( + ("purl_strs", "expected_purl"), + [ + pytest.param( + [ + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=jar", + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=javadoc", + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=java-source", + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=pom", + ], + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9", + id="Common PURL exists", + ), + pytest.param( + [ + "pkg:maven/com.fasterxml.jackson/jackson-annotations@2.9.9?type=jar", + "pkg:maven/com.fasterxml.jackson/jackson-databind@2.9.9?type=jar", + ], + None, + id="Common PURL does not exist", + ), + pytest.param( + [], + None, + id="Common PURL does not exist", + ), + ], +) +def test_get_common_purl_from_artifact_purl( + purl_strs: list[str], + expected_purl: str | None, +) -> None: + """Test the ``get_common_purl_from_artifact_purls`` function.""" + assert get_common_purl_from_artifact_purls(purl_strs) == expected_purl