Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/2096.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed syncing of cosign signatures, attestations, and SBOMs (stored as companion tags) being silently skipped when `include_tags` was set on the remote.
74 changes: 53 additions & 21 deletions pulp_container/app/tasks/sync_stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

log = logging.getLogger(__name__)

COSIGN_TAG_SUFFIXES = (".sig", ".att", ".sbom")


class ContainerFirstStage(Stage):
"""
Expand All @@ -60,6 +62,8 @@ def __init__(self, remote, signed_only):
self.manifest_list_dcs = []
self.manifest_dcs = []
self.signature_dcs = []
self._synced_digests = set()
self._full_tag_list = []

async def _download_manifest_data(self, manifest_url):
downloader = self.remote.get_downloader(url=manifest_url)
Expand Down Expand Up @@ -92,24 +96,57 @@ async def run(self):
"""
ContainerFirstStage.
"""

to_download = []
BATCH_SIZE = 500

# it can be whether a separate sigstore location or registry with extended signatures API
signature_source = await self.get_signature_source()

async with ProgressReport(
message="Downloading tag list", code="sync.downloading.tag_list", total=1
) as pb:
repo_name = self.remote.namespaced_upstream_name
tag_list_url = "/v2/{name}/tags/list".format(name=repo_name)
tag_list = await self.get_paginated_tag_list(tag_list_url, repo_name)
self._full_tag_list = await self.get_paginated_tag_list(tag_list_url, repo_name)
tag_list = filter_resources(
tag_list, self.remote.include_tags, self.remote.exclude_tags
self._full_tag_list, self.remote.include_tags, self.remote.exclude_tags
)
await pb.aincrement()

await self._process_tags(tag_list, signature_source)

if self.remote.include_tags or self.remote.exclude_tags:
companion_tags = self._find_cosign_companion_tags(tag_list)
if companion_tags:
log.info(
"Syncing %d cosign companion tag(s) for filtered images",
len(companion_tags),
)
await self._process_tags(
companion_tags, signature_source, msg="Processing Cosign Companion Tags"
)

await self.resolve_flush()

def _find_cosign_companion_tags(self, filtered_tag_list):
"""Find cosign companion tags for synced digests that were excluded by tag filtering."""
rest_of_tags = filter_resources(self._full_tag_list, [], filtered_tag_list)
rest_of_tags = filter_resources(rest_of_tags, [], self.remote.exclude_tags)
companion_tags = []
for tag in rest_of_tags:
if not tag.startswith("sha256-"):
continue
if not any(tag.endswith(suffix) for suffix in COSIGN_TAG_SUFFIXES):
continue
# Derive the image digest from the cosign tag name:
# sha256-<hex>.<suffix> -> sha256:<hex>
tag_without_suffix = tag.rsplit(".", 1)[0]
digest = tag_without_suffix.replace("-", ":", 1)
if digest in self._synced_digests:
companion_tags.append(tag)
return companion_tags

async def _process_tags(self, tag_list, signature_source, msg="Processing Tags"):
"""Download and process a batch of tags, creating declarative content objects."""
BATCH_SIZE = 500
to_download = []

for tag_name in tag_list:
relative_url = "/v2/{name}/manifests/{tag}".format(
name=self.remote.namespaced_upstream_name, tag=tag_name
Expand All @@ -121,7 +158,7 @@ async def run(self):
)

async with ProgressReport(
message="Processing Tags",
message=msg,
code="sync.processing.tag",
total=len(tag_list),
) as pb_parsed_tags:
Expand All @@ -134,22 +171,23 @@ async def run(self):
content_data, raw_text_data, response = await artifact

digest = calculate_digest(raw_text_data)
self._synced_digests.add(digest)
tag_name = response.url.split("/")[-1]

# Look for cosign signatures
# cosign signature has a tag convention 'sha256-1234.sig'
if self.signed_only and not signature_source:
is_cosign_companion = tag_name.startswith("sha256-") and any(
tag_name.endswith(s) for s in COSIGN_TAG_SUFFIXES
)
if (
not (tag_name.endswith(".sig") and tag_name.startswith("sha256-"))
and f"sha256-{digest.removeprefix('sha256:')}.sig" not in tag_list
not is_cosign_companion
and f"sha256-{digest.removeprefix('sha256:')}.sig"
not in self._full_tag_list
):
# skip this tag, there is no corresponding signature
log.info(
"The unsigned image {digest} can't be synced "
"due to a requirement to sync signed content "
"only.".format(digest=digest)
)
# Count the skipped tagks as parsed too.
await pb_parsed_tags.aincrement()
continue

Expand All @@ -170,6 +208,7 @@ async def run(self):
):
listed_manifest = await listed_manifest_task
man_dc = listed_manifest["manifest_dc"]
self._synced_digests.add(man_dc.content.digest)
if signature_source is not None:
man_sig_dcs = await self.create_signatures(man_dc, signature_source)
if self.signed_only and not man_sig_dcs:
Expand All @@ -183,8 +222,6 @@ async def run(self):
tag=tag_name,
)
)
# do not pass down the pipeline a manifest list with unsigned
# manifests.
break
self.signature_dcs.extend(man_sig_dcs)
list_dc.extra_data["listed_manifests"].append(listed_manifest)
Expand All @@ -196,8 +233,6 @@ async def run(self):
list_sig_dcs = await self.create_signatures(list_dc, signature_source)
if list_sig_dcs:
self.signature_dcs.extend(list_sig_dcs)
# only pass the manifest list and tag down the pipeline if there were no
# issues with signatures (no `break` in the `for` loop)
tag_dc.extra_data["tagged_manifest_dc"] = list_dc
for listed_manifest in list_dc.extra_data["listed_manifests"]:
await self.handle_blobs(
Expand All @@ -215,7 +250,6 @@ async def run(self):
if signature_source is not None:
man_sig_dcs = await self.create_signatures(man_dc, signature_source)
if self.signed_only and not man_sig_dcs:
# do not pass down the pipeline unsigned manifests
continue
self.signature_dcs.extend(man_sig_dcs)
tag_dc.extra_data["tagged_manifest_dc"] = man_dc
Expand All @@ -237,8 +271,6 @@ async def run(self):
):
await self.resolve_flush()

await self.resolve_flush()

async def get_signature_source(self):
"""
Find out where signatures come from: sigstore, extension API or not available at all.
Expand Down
Loading