Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions src/pkgcheck/checks/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,124 @@ def schedule(self, pkg, executor, futures):

for filename, url in self._get_urls(pkg):
self._schedule_check(filename, url, executor, futures, pkg=pkg)


class DetachedSignatureAvailable(results.VersionResult, results.Info):
"""Detached signature available for a distfile in the package."""

def __init__(self, filename, url, **kwargs):
super().__init__(**kwargs)
self.filename = filename
self.url = url

@property
def desc(self):
return f"Detached signature for distfile {self.filename} is available at {self.url}."


class DetachedSignatureAvailableCheck(NetworkCheck):
"""Check for available detached signatures."""

required_addons = (addons.UseAddon,)

_source = sources.LatestVersionRepoSource

known_results = frozenset(
{
DetachedSignatureAvailable,
SSLCertificateError,
}
)

detached_signature_extensions = [".asc", ".minisig", ".sig", ".sign", ".sigstore"]

def __init__(self, *args, use_addon, **kwargs):
super().__init__(*args, **kwargs)
self.fetch_filter = use_addon.get_filter("fetchables")

def _verifysig_check(self, filename, url, *, pkg):
"""Check for typical verify sig URLS."""
result = None
try:
# Need redirects to deal with the variance of file servers and urls
response = self.session.head(url, allow_redirects=True)
except RequestError:
pass
except SSLError as e:
result = SSLCertificateError("SRC_URI", url, str(e), pkg=pkg)
else:
content_type = response.headers.get("Content-Type")

# Filtering out text/html matches is useful due to possible false matches with authentication
if (
response.ok
and content_type is not None
and not content_type.startswith("text/html")
):
result = DetachedSignatureAvailable(filename, url, pkg=pkg)
return result

def task_done(self, pkg, filename, future):
"""Determine the result of a given URL verification task."""
exc = future.exception()
if exc is not None:
# traceback can't be pickled so serialize it
tb = traceback.format_exc()
# return exceptions that occurred in threads
self.results_q.put(tb)
return

result = future.result()
if result is not None:
if pkg is not None:
# recreate result object with different pkg target and attr
attrs = result._attrs.copy()
attrs["filename"] = filename
result = result._create(**attrs, pkg=pkg)
self.results_q.put([result])

def _schedule_check(self, filename, url, executor, futures, **kwargs):
"""Schedule verification method to run in a separate thread against a given URL.

Note that this tries to avoid hitting the network for the same URL
twice using a mapping from requested URLs to future objects, adding
result-checking callbacks to the futures of existing URLs.
"""
future = futures.get(url)
if future is None:
future = executor.submit(self._verifysig_check, filename, url, **kwargs)
future.add_done_callback(partial(self.task_done, None, None))
futures[url] = future
else:
future.add_done_callback(partial(self.task_done, kwargs["pkg"], filename))

def _get_urls(self, pkg):
# ignore conditionals
fetchables, _ = self.fetch_filter(
(fetchable,),
pkg,
pkg.generate_fetchables(
allow_missing_checksums=True, ignore_unknown_mirrors=True, skip_default_mirrors=True
),
)

filenames = [f.filename for f in fetchables.keys()]

for f in fetchables.keys():
# Don't check for detached signatures if any detached signature is already present for the filename.
if any(
(f.filename.endswith(extension) or f"{f.filename}{extension}" in filenames)
for extension in self.detached_signature_extensions
):
continue

for url in f.uri:
for extension in self.detached_signature_extensions:
yield (f.filename, f"{url}{extension}")
return []

def schedule(self, pkg, executor, futures):
"""Schedule verification methods to run in separate threads for all flagged URLs."""

for filename, url in self._get_urls(pkg):
self._schedule_check(filename, url, executor, futures, pkg=pkg)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"__class__": "DetachedSignatureAvailable", "category": "DetachedSignatureAvailableCheck", "package": "DetachedSignatureAvailable", "version": "0", "filename": "foo.tar.gz", "url": "https://github.com/pkgcore/pkgcheck/foo.tar.gz.minisig"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DESCRIPTION="Ebuild with an available detached signature"
HOMEPAGE="https://github.com/pkgcore/pkgcheck"
SRC_URI="https://github.com/pkgcore/pkgcheck/foo.tar.gz"
LICENSE="BSD"
SLOT="0"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DIST foo.tar.gz 153310 BLAKE2B b7484cd9bebe912f9c8877c0f09df059130c2dc5c4da8c926f8df7945bcb7b255ccf810ce8cd16a957fb5bca3d1e71c088cd894968641db5dfae1c4c059df836 SHA512 86ff9e1c4b9353b1fbb475c7bb9d2a97bd9db8421ea5190b5a84832930b34cb5b79f8c3da68a5eb8db334f06851ec129cc6611a371e47b7c5de7a615feec5e05
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from contextlib import contextmanager

from requests.models import Response


@contextmanager
def responses(req, **kwargs):
possible_responses = {
# success
"minisig": {
"status_code": 200,
"reason": "OK",
"headers": {"Content-Type": "application/pgp-signature"},
},
# false success (like 404 behind authentication redirect)
"sign": {
"status_code": 200,
"reason": "OK",
"headers": {"Content-Type": "text/html"},
},
}

r = Response()
r.status_code = 404
r.reason = "Not Found"

possible_response = possible_responses.get(req.url.split(".")[-1])
if possible_response is not None:
for key, value in possible_response.items():
setattr(r, key, value)
yield r
1 change: 1 addition & 0 deletions testdata/repos/network/profiles/categories
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
DetachedSignatureAvailableCheck
FetchablesUrlCheck
HomepageUrlCheck
MetadataUrlCheck
Expand Down
Loading