Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 98 additions & 2 deletions sigstore/_internal/key_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,108 @@
# limitations under the License.

"""
Utilities for getting PublicKeyDetails.
Utilities for PublicKeyDetails and the algorithm registry.
"""

from __future__ import annotations

import hashlib
from collections.abc import Callable
from dataclasses import dataclass

from cryptography.hazmat.primitives.asymmetric import ec, ed25519, padding, rsa
from cryptography.x509 import Certificate
from sigstore_models.common.v1 import PublicKeyDetails
from sigstore_models.common.v1 import HashAlgorithm, PublicKeyDetails


@dataclass(frozen=True)
class AlgorithmDetails:
"""Details for a single entry in the algorithm registry."""

key_details: PublicKeyDetails
hash_algorithm: HashAlgorithm | None
hash_func: Callable[[bytes], hashlib._Hash] | None


# Algorithm registry table.
# See https://github.com/sigstore/architecture-docs/blob/main/algorithm-registry.md
_ALGORITHM_REGISTRY: list[AlgorithmDetails] = [
# RSA PKCS1v15
AlgorithmDetails(
PublicKeyDetails.PKIX_RSA_PKCS1V15_2048_SHA256,
HashAlgorithm.SHA2_256,
hashlib.sha256,
),
AlgorithmDetails(
PublicKeyDetails.PKIX_RSA_PKCS1V15_3072_SHA256,
HashAlgorithm.SHA2_256,
hashlib.sha256,
),
AlgorithmDetails(
PublicKeyDetails.PKIX_RSA_PKCS1V15_4096_SHA256,
HashAlgorithm.SHA2_256,
hashlib.sha256,
),
# ECDSA
AlgorithmDetails(
PublicKeyDetails.PKIX_ECDSA_P256_SHA_256,
HashAlgorithm.SHA2_256,
hashlib.sha256,
),
AlgorithmDetails(
PublicKeyDetails.PKIX_ECDSA_P384_SHA_384,
HashAlgorithm.SHA2_384,
hashlib.sha384,
),
AlgorithmDetails(
PublicKeyDetails.PKIX_ECDSA_P521_SHA_512,
HashAlgorithm.SHA2_512,
hashlib.sha512,
),
# Ed25519
AlgorithmDetails(
PublicKeyDetails.PKIX_ED25519,
None,
None,
),
AlgorithmDetails(
PublicKeyDetails.PKIX_ED25519_PH,
HashAlgorithm.SHA2_512,
hashlib.sha512,
),
]

_DETAILS_BY_KEY: dict[PublicKeyDetails, AlgorithmDetails] = {
entry.key_details: entry for entry in _ALGORITHM_REGISTRY
}


def _get_algorithm_details(key_details: PublicKeyDetails) -> AlgorithmDetails:
"""
Look up algorithm details by ``PublicKeyDetails`` enum value.
"""
details = _DETAILS_BY_KEY.get(key_details)
if details is None:
raise ValueError(f"unknown signature algorithm: {key_details}")
return details


def _get_prehash(
key_details: PublicKeyDetails,
) -> tuple[HashAlgorithm, Callable[[bytes], hashlib._Hash]]:
"""
Return the externalized hash function for a signing algorithm.

Only algorithms with an externalized prehash can be used in hashedrekord
entries. Pure ed25519 (no prehash) raises ``ValueError``.
"""
details = _get_algorithm_details(key_details)
if details.hash_algorithm is None or details.hash_func is None:
raise ValueError(
f"signing algorithm {key_details} has no externalized prehash; "
"cannot be used for a hashedrekord entry (rekor-v2-spec §6.1.4)"
)
return details.hash_algorithm, details.hash_func


def _get_key_details(certificate: Certificate) -> PublicKeyDetails:
Expand Down
27 changes: 18 additions & 9 deletions sigstore/_internal/rekor/client_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from sigstore_models.rekor.v1 import TransparencyLogEntry as _TransparencyLogEntry

from sigstore._internal import USER_AGENT
from sigstore._internal.key_details import _get_key_details
from sigstore._internal.key_details import _get_key_details, _get_prehash
from sigstore._internal.rekor import (
EntryRequestBody,
RekorClientError,
Expand Down Expand Up @@ -137,23 +137,32 @@ def _build_dsse_request(
cls, envelope: Envelope, certificate: Certificate
) -> EntryRequestBody:
"""
Construct a dsse request to submit to Rekor.
Construct a hashedrekord request for a DSSE envelope.

Rekor v2 only supports the hashedrekord entry type; DSSE envelopes are
uploaded as a hashedrekord whose digest is `Hash(envelope.pae())` and
whose `signature.content` equals `envelope.signatures[0].sig`. See
rekor-v2-spec §6.1.4.
"""
key_details = _get_key_details(certificate)
_, hash_func = _get_prehash(key_details)
digest = hash_func(envelope.pae()).digest()
req = rekor_v2.entry.CreateEntryRequest(
dsse_request_v002=rekor_v2.dsse.DSSERequestV002(
envelope=envelope._inner,
verifiers=[
rekor_v2.verifier.Verifier(
hashed_rekord_request_v002=rekor_v2.hashedrekord.HashedRekordRequestV002(
digest=base64.b64encode(digest),
signature=rekor_v2.verifier.Signature(
content=base64.b64encode(envelope.signature),
verifier=rekor_v2.verifier.Verifier(
x509_certificate=common_v1.X509Certificate(
raw_bytes=base64.b64encode(
certificate.public_bytes(
encoding=serialization.Encoding.DER
)
)
),
key_details=_get_key_details(certificate),
)
],
key_details=key_details,
),
),
)
)
return EntryRequestBody(req.to_dict())
6 changes: 6 additions & 0 deletions sigstore/dsse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ def signature(self) -> bytes:
"""Return the decoded bytes of the Envelope signature."""
return self._signature_bytes

def pae(self) -> bytes:
"""
Return the PAE encoding of this envelope's `payloadType` and `payload`.
"""
return _pae(self._inner.payload_type, self._inner.payload)


def _pae(type_: str, body: bytes) -> bytes:
"""
Expand Down
138 changes: 68 additions & 70 deletions sigstore/verify/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from sigstore_models.rekor import v2

from sigstore import dsse
from sigstore._internal.key_details import _get_key_details, _get_prehash
from sigstore._internal.rekor import _hashedrekord_from_parts
from sigstore._internal.rekor.client import RekorClient
from sigstore._internal.sct import (
Expand Down Expand Up @@ -425,24 +426,24 @@ def verify_dsse(

# (8): verify the consistency of the log entry's body against
# the other bundle materials.
# NOTE: This is very slightly weaker than the consistency check
# for hashedrekord entries, due to how inclusion is recorded for DSSE:
# the included entry for DSSE includes an envelope hash that we
# *cannot* verify, since the envelope is uncanonicalized JSON.
# Instead, we manually pick apart the entry body below and verify
# the parts we can (namely the payload hash and signature list).
# Rekor v2 records DSSE envelopes as hashedrekord/0.0.2 entries whose
# digest covers PAE(payloadType, payload) and whose signature.content
# equals envelope.signatures[0].sig (rekor-v2-spec §6.1.4). Rekor v1
# used a dsse/0.0.1 entry, which is slightly weaker than the
# hashedrekord consistency check: dsse entries record an envelope
# hash that we *cannot* verify (the envelope is uncanonicalized JSON),
# so we manually pick apart the entry body and verify the parts we
# can (payload hash and signature list).
entry = bundle.log_entry
if entry._inner.kind_version.kind != "dsse":
raise VerificationError(
f"Expected entry type dsse, got {entry._inner.kind_version.kind}"
)
if entry._inner.kind_version.version == "0.0.2":
_validate_dsse_v002_entry_body(bundle)
elif entry._inner.kind_version.version == "0.0.1":
kind = entry._inner.kind_version.kind
version = entry._inner.kind_version.version
if kind == "hashedrekord" and version == "0.0.2":
_validate_hashedrekord_v002_dsse_entry_body(bundle)
elif kind == "dsse" and version == "0.0.1":
_validate_dsse_v001_entry_body(bundle)
else:
raise VerificationError(
f"Unsupported dsse version {entry._inner.kind_version.version}"
f"Unsupported DSSE log entry type: {kind}/{version}"
)

return (envelope._inner.payload_type, envelope._inner.payload)
Expand Down Expand Up @@ -554,42 +555,6 @@ def _validate_dsse_v001_entry_body(bundle: Bundle) -> None:
raise VerificationError("log entry signatures do not match bundle")


def _validate_dsse_v002_entry_body(bundle: Bundle) -> None:
"""
Validate Entry body for dsse v002.
"""
entry = bundle.log_entry
envelope = bundle._dsse_envelope
if envelope is None:
raise VerificationError(
"cannot perform DSSE verification on a bundle without a DSSE envelope"
)
try:
v2_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body)
except ValidationError as exc:
raise VerificationError(f"invalid DSSE log entry: {exc}")

if v2_body.spec.dsse_v002 is None:
raise VerificationError("invalid DSSE log entry: missing dsse_v002 field")

if v2_body.spec.dsse_v002.payload_hash.algorithm != v1.HashAlgorithm.SHA2_256:
raise VerificationError("expected SHA256 hash in DSSE entry")

digest = sha256_digest(envelope._inner.payload).digest
if v2_body.spec.dsse_v002.payload_hash.digest != digest:
raise VerificationError("DSSE entry payload hash does not match bundle")

v2_signatures = [
v2.verifier.Signature(
content=base64.b64encode(signature.sig),
verifier=_v2_verifier_from_certificate(bundle.signing_certificate),
)
for signature in envelope._inner.signatures
]
if v2_signatures != v2_body.spec.dsse_v002.signatures:
raise VerificationError("log entry signatures do not match bundle")


def _validate_hashedrekord_v001_entry_body(
bundle: Bundle, hashed_input: Hashed
) -> None:
Expand All @@ -611,6 +576,56 @@ def _validate_hashedrekord_v001_entry_body(
)


def _validate_hashedrekord_v002_dsse_entry_body(bundle: Bundle) -> None:
"""
Validate Entry body for a Rekor v2 DSSE envelope encoded as a
hashedrekord/0.0.2 entry (rekor-v2-spec §6.1.4).

The expected entry body has:
- data.digest = Hash(PAE(payloadType, payload)), where Hash is the
externalized hash function of the entry's signing algorithm.
- data.algorithm = the matching HashAlgorithm.
- signature.content = envelope.signatures[0].sig.
- signature.verifier = the bundle's signing certificate.
"""
entry = bundle.log_entry
envelope = bundle._dsse_envelope
if envelope is None:
raise VerificationError(
"cannot perform DSSE verification on a bundle without a DSSE envelope"
)
if len(envelope._inner.signatures) != 1:
raise VerificationError(
"DSSE envelope must have exactly one signature for hashedrekord encoding"
)

expected_verifier = _v2_verifier_from_certificate(bundle.signing_certificate)
algorithm, hash_func = _get_prehash(expected_verifier.key_details)
pae_digest = hash_func(envelope.pae()).digest()

expected_body = v2.entry.Entry(
kind=entry._inner.kind_version.kind,
api_version=entry._inner.kind_version.version,
spec=v2.entry.Spec(
hashed_rekord_v002=v2.hashedrekord.HashedRekordLogEntryV002(
data=v1.HashOutput(
algorithm=algorithm,
digest=base64.b64encode(pae_digest),
),
signature=v2.verifier.Signature(
content=base64.b64encode(envelope.signature),
verifier=expected_verifier,
),
)
),
)
actual_body = v2.entry.Entry.from_json(entry._inner.canonicalized_body)
if expected_body != actual_body:
raise VerificationError(
"transparency log entry is inconsistent with other materials"
)


def _validate_hashedrekord_v002_entry_body(
bundle: Bundle, hashed_input: Hashed
) -> None:
Expand Down Expand Up @@ -649,31 +664,14 @@ def _v2_verifier_from_certificate(certificate: Certificate) -> v2.verifier.Verif
"""
Return a Rekor v2 Verifier for the signing certificate.

This method decides which signature algorithms are supported for verification
(in a rekor v2 entry), see
https://github.com/sigstore/architecture-docs/blob/main/algorithm-registry.md.
Note that actual signature verification happens in verify_artifact() and
verify_dsse(): New keytypes need to be added here and in those methods.
Key-to-algorithm mapping is handled by the algorithm registry via
`_get_key_details`.
"""
public_key = certificate.public_key()

if isinstance(public_key, ec.EllipticCurvePublicKey):
if isinstance(public_key.curve, ec.SECP256R1):
key_details = v1.PublicKeyDetails.PKIX_ECDSA_P256_SHA_256
elif isinstance(public_key.curve, ec.SECP384R1):
key_details = v1.PublicKeyDetails.PKIX_ECDSA_P384_SHA_384
elif isinstance(public_key.curve, ec.SECP521R1):
key_details = v1.PublicKeyDetails.PKIX_ECDSA_P521_SHA_512
else:
raise ValueError(f"Unsupported EC curve: {public_key.curve.name}")
else:
raise ValueError(f"Unsupported public key type: {type(public_key)}")

return v2.verifier.Verifier(
x509_certificate=v1.X509Certificate(
raw_bytes=base64.b64encode(
certificate.public_bytes(encoding=serialization.Encoding.DER)
)
),
key_details=key_details,
key_details=_get_key_details(certificate),
)
Loading
Loading