Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/+include_exclude_tags.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Renamed `include_tags` and `exclude_tags` to `includes` and `excludes` on the remote.
1 change: 1 addition & 0 deletions CHANGES/1909.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for syncing manifests by digest through new `includes` field on the remote.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import django.contrib.postgres.fields
from django.db import migrations, models


def migrate_to_includes_excludes(apps, schema_editor):
"""Copy include_tags -> includes, exclude_tags -> excludes."""
ContainerRemote = apps.get_model("container", "ContainerRemote")
for remote in ContainerRemote.objects.all():
remote.includes = remote.include_tags or None
remote.excludes = remote.exclude_tags or None
remote.save(update_fields=["includes", "excludes"])

def down_migrate_to_include_exclude_tags(apps, schema_editor):
"""Copy includes + excludes -> include_tags + exclude_tags."""
ContainerRemote = apps.get_model("container", "ContainerRemote")
for remote in ContainerRemote.objects.all():
remote.include_tags = remote.includes or None
remote.exclude_tags = remote.excludes or None
remote.save(update_fields=["include_tags", "exclude_tags"])


class Migration(migrations.Migration):

dependencies = [
("container", "0047_containernamespace_pulp_labels"),
]

operations = [
migrations.AddField(
model_name="containerremote",
name="includes",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.TextField(null=True),
null=True,
size=None,
),
),
migrations.AddField(
model_name="containerremote",
name="excludes",
field=django.contrib.postgres.fields.ArrayField(
base_field=models.TextField(null=True),
null=True,
size=None,
),
),
# 2. Copy existing data.
migrations.RunPython(
migrate_to_includes_excludes,
down_migrate_to_include_exclude_tags,
),
]
11 changes: 8 additions & 3 deletions pulp_container/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,17 +483,22 @@ class ContainerRemote(Remote, AutoAddObjPermsMixin):
upstream_name (models.TextField): The name of the image at the remote.
include_foreign_layers (models.BooleanField): Foreign layers in the remote
are included. They are not included by default.
include_tags (fields.ArrayField): List of tags to include during sync.
exclude_tags (fields.ArrayField): List of tags to exclude during sync.
includes (fields.ArrayField): List of tags (with optional wildcards) and/or
digests (sha256:<hex>) to sync.
excludes (fields.ArrayField): List of tag patterns to exclude from sync.
sigstore (models.TextField): The URL to a sigstore where signatures of container images
should be synced from.
"""

upstream_name = models.TextField(db_index=True)
include_foreign_layers = models.BooleanField(default=False)
includes = fields.ArrayField(models.TextField(null=True), null=True)
excludes = fields.ArrayField(models.TextField(null=True), null=True)
sigstore = models.TextField(null=True)

# Deprecated: kept for ZDT upgrades
include_tags = fields.ArrayField(models.TextField(null=True), null=True)
exclude_tags = fields.ArrayField(models.TextField(null=True), null=True)
sigstore = models.TextField(null=True)

TYPE = "container"

Expand Down
53 changes: 43 additions & 10 deletions pulp_container/app/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,27 +296,58 @@ class ContainerRemoteSerializer(RemoteSerializer):
upstream_name = serializers.CharField(
required=True, allow_blank=False, help_text=_("Name of the upstream repository")
)
includes = serializers.ListField(
child=serializers.CharField(max_length=255),
allow_null=True,
required=False,
help_text=_(
"A list of tags (wildcards *, ? are recognized) and/or digests "
"(format: 'sha256:<hex>') to include during sync. "
"'includes' is evaluated before 'excludes'."
),
)
excludes = serializers.ListField(
child=serializers.CharField(max_length=255),
allow_null=True,
required=False,
help_text=_(
"A list of tag patterns to exclude during sync. "
"Wildcards *, ? are recognized. "
"'excludes' is evaluated after 'includes'."
),
)

# ---------------------------------------------------------------------------
# Deprecated write-only fields kept for backwards compatibility.
# They are merged into `includes` / `excludes` during validation.
# ---------------------------------------------------------------------------
include_tags = serializers.ListField(
child=serializers.CharField(max_length=255),
allow_null=True,
required=False,
help_text=_("""
A list of tags to include during sync.
Wildcards *, ? are recognized.
'include_tags' is evaluated before 'exclude_tags'.
"""),
write_only=True,
help_text=_("Deprecated. Use 'includes' instead."),
)
exclude_tags = serializers.ListField(
child=serializers.CharField(max_length=255),
allow_null=True,
required=False,
help_text=_("""
A list of tags to exclude during sync.
Wildcards *, ? are recognized.
'exclude_tags' is evaluated after 'include_tags'.
"""),
write_only=True,
help_text=_("Deprecated. Use 'excludes' instead."),
)

def validate(self, data):
include_tags = data.pop("include_tags", None)
exclude_tags = data.pop("exclude_tags", None)

if include_tags:
data["includes"] = list(data.get("includes") or []) + list(include_tags)

if exclude_tags:
data["excludes"] = list(data.get("excludes") or []) + list(exclude_tags)

return data

policy = serializers.ChoiceField(
help_text="""
immediate - All manifests and blobs are downloaded and saved during a sync.
Expand All @@ -337,6 +368,8 @@ class ContainerRemoteSerializer(RemoteSerializer):
class Meta:
fields = RemoteSerializer.Meta.fields + (
"upstream_name",
"includes",
"excludes",
"include_tags",
"exclude_tags",
"sigstore",
Expand Down
104 changes: 58 additions & 46 deletions pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import hashlib
import json
import logging
from collections import defaultdict
from urllib.parse import urljoin, urlparse, urlunparse

import aiohttp
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, remote, signed_only):
self.manifest_list_dcs = []
self.manifest_dcs = []
self.signature_dcs = []
self._synced_digests = set()
self._synced_digests = defaultdict(list)
self._full_tag_list = []
self._cosign_tags = []

Expand All @@ -78,20 +79,26 @@ async def _download_manifest_data(self, manifest_url):

return content_data, raw_text_data, response

async def _check_for_existing_manifest(self, download_tag):
response = await download_tag
async def _check_for_existing_manifest(self, head_manifest_task):
response = await head_manifest_task

digest = response.headers.get("docker-content-digest")
url = response.url
original_reference = url.split("/")[-1]

if manifest := await Manifest.objects.filter(
digest=digest, pulp_domain=get_domain()
).afirst():
raw_text_data = manifest.data
content_data = json.loads(raw_text_data)
else:
content_data, raw_text_data, response = await self._download_manifest_data(response.url)
if not original_reference.startswith("sha256:") and digest:
# Fetch the tag with its digest
url = url.rsplit(original_reference, 1)[0] + digest

return content_data, raw_text_data, response
content_data, raw_text_data, response = await self._download_manifest_data(url)

return content_data, raw_text_data, response, original_reference

async def run(self):
"""
Expand All @@ -105,31 +112,33 @@ async def run(self):
repo_name = self.remote.namespaced_upstream_name
tag_list_url = "/v2/{name}/tags/list".format(name=repo_name)
self._full_tag_list = await self.get_paginated_tag_list(tag_list_url, repo_name)
self._cosign_tags = filter_resources(
self._full_tag_list, ["sha256-*"], self.remote.exclude_tags
)
if self.remote.include_tags or self.remote.exclude_tags:
includes = self.remote.includes or []
excludes = self.remote.excludes or []

digest_includes = [i for i in includes if i.startswith("sha256:")]
self._cosign_tags = filter_resources(self._full_tag_list, ["sha256-*"], excludes)

if includes or excludes:
# Split sync into two parts, first all non-cosign tags, then cosign tags
exclude_tags_and_cosign = (self.remote.exclude_tags or []) + ["sha256-*"]
tag_list = filter_resources(
self._full_tag_list, self.remote.include_tags, exclude_tags_and_cosign
)
exclude_and_cosign = excludes + ["sha256-*"]
filtered_tags = filter_resources(self._full_tag_list, includes, exclude_and_cosign)
manifest_list = filtered_tags + digest_includes
else:
tag_list = self._full_tag_list
manifest_list = self._full_tag_list
await pb.aincrement()

await self._process_tags(tag_list, signature_source)
await self._process_manifests(manifest_list, signature_source, "Processing Manifests")

if self.remote.include_tags or self.remote.exclude_tags:
# Process cosign companion tags after all non-cosign tags are synced
if includes or excludes:
# Process cosign companion tags after all primary content is synced
companion_tags = self._find_cosign_companion_tags()
if companion_tags:
log.info(
"Syncing %d cosign companion tag(s) for filtered images",
len(companion_tags),
)
await self._process_tags(
companion_tags, signature_source, msg="Processing Cosign Companion Tags"
await self._process_manifests(
companion_tags, signature_source, "Processing Cosign Companion Tags"
)

def _find_cosign_companion_tags(self):
Expand All @@ -143,54 +152,53 @@ def _find_cosign_companion_tags(self):
companion_tags.append(tag)
return companion_tags

async def _process_tags(self, tag_list, signature_source, msg="Processing Tags"):
"""Download and process a batch of tags, creating declarative content objects."""
async def _process_manifests(self, manifests, signature_source, msg):
"""Download and process a batch of manifests, creating declarative content objects."""
BATCH_SIZE = 500
to_download = []

for tag_name in tag_list:
relative_url = "/v2/{name}/manifests/{tag}".format(
name=self.remote.namespaced_upstream_name, tag=tag_name
)
tag_url = urljoin(self.remote.url, relative_url)
downloader = self.remote.get_downloader(url=tag_url)
for reference in manifests:
relative_url = f"/v2/{self.remote.namespaced_upstream_name}/manifests/{reference}"
manifest_url = urljoin(self.remote.url, relative_url)
downloader = self.remote.get_downloader(url=manifest_url)
to_download.append(
downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"})
)

async with ProgressReport(
message=msg,
code="sync.processing.tag",
total=len(tag_list),
) as pb_parsed_tags:
code="sync.processing.manifest",
total=len(manifests),
) as pb_parsed_manifests:
to_download_artifact = [
self._check_for_existing_manifest(download_tag)
for download_tag in asyncio.as_completed(to_download)
self._check_for_existing_manifest(download_manifest)
for download_manifest in asyncio.as_completed(to_download)
]

for artifact in asyncio.as_completed(to_download_artifact):
content_data, raw_text_data, response = await artifact
content_data, raw_text_data, response, manifest_ref = await artifact

digest = calculate_digest(raw_text_data)
tag_name = response.url.split("/")[-1]
is_tag = not manifest_ref.startswith("sha256:")
media_type = determine_media_type(content_data, response)

if self.signed_only and not signature_source:
if not (
self._is_cosign_companion_tag(tag_name, media_type, content_data)
self._is_cosign_companion_tag(manifest_ref, media_type, content_data)
or await self._has_cosign_signature(digest)
):
log.info(
"The unsigned image {digest} can't be synced "
"due to a requirement to sync signed content "
"only.".format(digest=digest)
)
await pb_parsed_tags.aincrement()
await pb_parsed_manifests.aincrement()
continue

validate_manifest(content_data, media_type, digest)

tag_dc = DeclarativeContent(Tag(name=tag_name))
if is_tag:
tag_dc = DeclarativeContent(Tag(name=manifest_ref))

if media_type in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI):
list_dc = self.create_manifest_list(
Expand All @@ -214,7 +222,7 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
"The whole manifest list is skipped.".format(
img_digest=man_dc.content.digest,
ml_digest=list_dc.content.digest,
tag=tag_name,
tag=manifest_ref,
)
)
break
Expand All @@ -224,20 +232,23 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
else:
# Manifest indices can be signed too. It is not mandatory.
# If signature is available mirror it.
self._synced_digests.add(digest)
self._synced_digests[digest].append(manifest_ref)
if signature_source is not None:
list_sig_dcs = await self.create_signatures(list_dc, signature_source)
if list_sig_dcs:
self.signature_dcs.extend(list_sig_dcs)
tag_dc.extra_data["tagged_manifest_dc"] = list_dc
for listed_manifest in list_dc.extra_data["listed_manifests"]:
self._synced_digests.add(listed_manifest["manifest_dc"].content.digest)
self._synced_digests[
listed_manifest["manifest_dc"].content.digest
].append(manifest_ref)
await self.handle_blobs(
listed_manifest["manifest_dc"], listed_manifest["content_data"]
)
self.manifest_dcs.append(listed_manifest["manifest_dc"])
self.manifest_list_dcs.append(list_dc)
self.tag_dcs.append(tag_dc)
if is_tag:
tag_dc.extra_data["tagged_manifest_dc"] = list_dc
self.tag_dcs.append(tag_dc)

else:
# Simple tagged manifest
Expand All @@ -249,14 +260,15 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
if self.signed_only and not man_sig_dcs:
continue
self.signature_dcs.extend(man_sig_dcs)
self._synced_digests.add(digest)
tag_dc.extra_data["tagged_manifest_dc"] = man_dc
self._synced_digests[digest].append(manifest_ref)
await self.handle_blobs(man_dc, content_data)
self.tag_dcs.append(tag_dc)
if is_tag:
tag_dc.extra_data["tagged_manifest_dc"] = man_dc
self.tag_dcs.append(tag_dc)
self.manifest_dcs.append(man_dc)

# Count the skipped tasks as parsed too.
await pb_parsed_tags.aincrement()
await pb_parsed_manifests.aincrement()

# Flush the queues to prevent overly excessive memory usage.
# This will cap the number of in flight high level objects to about BATCH_SIZE.
Expand Down
Loading
Loading