Skip to content

Commit 43ee67f

Browse files
fetchArtifact function
Signed-off-by: Ilya Dmitrichenko <[email protected]>
1 parent 053d33d commit 43ee67f

File tree

2 files changed

+124
-46
lines changed

2 files changed

+124
-46
lines changed

api/v1beta2/ocirepository_types.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ type OCILayerSelector struct {
183183
// +kubebuilder:validation:Minimum=0
184184
// +optional
185185
Offset *int `json:"offset,omitempty"`
186+
187+
// TODO: next API version should probably use artifact media types
188+
// at the top level and make layer selector optional
189+
ArtifactMediaType string `json:"artifactMediaType,omitempty"`
186190
}
187191

188192
// OCIRepositoryVerification verifies the authenticity of an OCI Artifact

internal/controller/ocirepository_controller.go

Lines changed: 120 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
corev1 "k8s.io/api/core/v1"
4040
"k8s.io/apimachinery/pkg/runtime"
4141
"k8s.io/apimachinery/pkg/types"
42+
kerrors "k8s.io/apimachinery/pkg/util/errors"
4243
"k8s.io/apimachinery/pkg/util/sets"
4344
kuberecorder "k8s.io/client-go/tools/record"
4445
"k8s.io/utils/ptr"
@@ -392,7 +393,7 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, sp *patch
392393

393394
// Get the upstream revision from the artifact digest
394395
// TODO: getRevision resolves the digest, which may change before image is fetched, so it should probaly update ref
395-
revision, err := r.getRevision(ref, opts)
396+
revision, ref, desc, err := r.getRevision(ref, opts)
396397
if err != nil {
397398
e := serror.NewGeneric(
398399
fmt.Errorf("failed to determine artifact digest: %w", err),
@@ -454,35 +455,10 @@ func (r *OCIRepositoryReconciler) reconcileSource(ctx context.Context, sp *patch
454455
return sreconcile.ResultSuccess, nil
455456
}
456457

457-
// Pull artifact from the remote container registry
458-
img, err := remote.Image(ref, opts...)
459-
if err != nil {
460-
e := serror.NewGeneric(
461-
fmt.Errorf("failed to pull artifact from '%s': %w", obj.Spec.URL, err),
462-
ociv1.OCIPullFailedReason,
463-
)
464-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
465-
return sreconcile.ResultEmpty, e
466-
}
467-
468-
// Copy the OCI annotations to the internal artifact metadata
469-
manifest, err := img.Manifest()
470-
if err != nil {
471-
e := serror.NewGeneric(
472-
fmt.Errorf("failed to parse artifact manifest: %w", err),
473-
ociv1.OCILayerOperationFailedReason,
474-
)
475-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
476-
return sreconcile.ResultEmpty, e
477-
}
478-
metadata.Metadata = manifest.Annotations
479-
480-
// Extract the compressed content from the selected layer
481-
blob, err := r.selectLayer(obj, img)
482-
if err != nil {
483-
e := serror.NewGeneric(err, ociv1.OCILayerOperationFailedReason)
484-
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Err.Error())
485-
return sreconcile.ResultEmpty, e
458+
blob, serr := r.fetchArtifact(obj, metadata, ref, desc, opts)
459+
if serr != nil {
460+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, serr.Reason, serr.Err.Error())
461+
return sreconcile.ResultEmpty, serr
486462
}
487463

488464
// Persist layer content to storage using the specified operation
@@ -583,32 +559,130 @@ func (r *OCIRepositoryReconciler) selectLayer(obj *ociv1.OCIRepository, image gc
583559
return blob, nil
584560
}
585561

562+
func newPullErr(format string, a ...any) *serror.Generic {
563+
return serror.NewGeneric(fmt.Errorf(format, a...), ociv1.OCIPullFailedReason)
564+
}
565+
566+
func newLayerOperationErr(format string, a ...any) *serror.Generic {
567+
return serror.NewGeneric(fmt.Errorf(format, a...), ociv1.OCILayerOperationFailedReason)
568+
}
569+
570+
func (r *OCIRepositoryReconciler) fetchArtifact(obj *ociv1.OCIRepository, metadata *sourcev1.Artifact, ref name.Reference, desc *remote.Descriptor, options remoteOptions) (io.ReadCloser, *serror.Generic) {
571+
switch mt := desc.MediaType; {
572+
case mt.IsImage():
573+
// Pull artifact from the remote container registry
574+
img, err := desc.Image()
575+
if err != nil {
576+
return nil, newPullErr("failed to parse artifact image from '%s': %w", obj.Spec.URL, err)
577+
}
578+
579+
// Copy the OCI annotations to the internal artifact metadata
580+
manifest, err := img.Manifest()
581+
if err != nil {
582+
return nil, newLayerOperationErr("failed to parse artifact image manifest: %w", err)
583+
}
584+
metadata.Metadata = manifest.Annotations
585+
586+
// Extract the compressed content from the selected layer
587+
blob, err := r.selectLayer(obj, img)
588+
if err != nil {
589+
e := serror.NewGeneric(err, ociv1.OCILayerOperationFailedReason)
590+
return nil, e
591+
}
592+
return blob, nil
593+
case mt.IsIndex():
594+
idx, err := desc.ImageIndex()
595+
if err != nil {
596+
return nil, newPullErr("failed to parse artifact index from '%s': %w", obj.Spec.URL, err)
597+
}
598+
599+
manifest, err := idx.IndexManifest()
600+
if err != nil {
601+
return nil, newPullErr("failed to parse artifact index manifest: %w", err)
602+
}
603+
604+
if len(manifest.Manifests) == 0 {
605+
return nil, newLayerOperationErr("empty index")
606+
}
607+
608+
images := make([]gcrv1.Image, 0, len(manifest.Manifests))
609+
610+
for i := range manifest.Manifests {
611+
manifest := manifest.Manifests[i]
612+
if manifest.MediaType.IsIndex() {
613+
r.Eventf(obj, corev1.EventTypeWarning, "OCINestedIndexUnsupported", "skipping nested index manifest '%s' in '%s'", manifest.Digest.String(), desc.Digest.String())
614+
continue
615+
}
616+
if !manifest.MediaType.IsImage() {
617+
r.Eventf(obj, corev1.EventTypeNormal, "OCIImageUnsupported", "skipping runnable image '%s' in '%s'", manifest.Digest.String(), ref)
618+
continue
619+
}
620+
if manifest.ArtifactType != "" {
621+
img, err := idx.Image(manifest.Digest)
622+
if err != nil {
623+
return nil, newPullErr("failed to pull artifact image '%s' from '%s': %w", manifest.Digest.String(), ref, err)
624+
}
625+
images = append(images, img)
626+
}
627+
}
628+
629+
if len(images) == 0 {
630+
return nil, newPullErr("no suitable artifacts found in index '%s': %w", desc.Digest.String(), err)
631+
}
632+
633+
var errs []error
634+
for i := range images {
635+
blob, err := r.selectLayer(obj, images[i])
636+
if err != nil {
637+
errs = append(errs, err)
638+
continue
639+
}
640+
return blob, nil
641+
}
642+
if len(errs) > 0 {
643+
return nil, newLayerOperationErr("%w", kerrors.NewAggregate(errs))
644+
}
645+
return nil, newPullErr("no suitable layers found in index '%s': %w", desc.Digest.String(), err)
646+
default:
647+
return nil, newLayerOperationErr("media type '%s' of '%s' is not index or image", mt, ref)
648+
}
649+
}
650+
651+
func (r *OCIRepositoryReconciler) getDescriptor(ref name.Reference, options remoteOptions) (*remote.Descriptor, error) {
652+
// NB: there is no good enought reason to use remote.Head first,
653+
// as it's only in error case that remote.Get won't have to be
654+
// done afterwards anyway
655+
desc, err := remote.Get(ref, options...)
656+
if err != nil {
657+
return nil, fmt.Errorf("failed to fetch %w", err)
658+
}
659+
return desc, nil
660+
661+
}
662+
586663
// getRevision fetches the upstream digest, returning the revision in the
587664
// format '<tag>@<digest>'.
588-
func (r *OCIRepositoryReconciler) getRevision(ref name.Reference, options []remote.Option) (string, error) {
665+
func (r *OCIRepositoryReconciler) getRevision(ref name.Reference, options remoteOptions) (string, name.Reference, *remote.Descriptor, error) {
589666
switch ref := ref.(type) {
590667
case name.Digest:
591668
digest, err := gcrv1.NewHash(ref.DigestStr())
592669
if err != nil {
593-
return "", err
670+
return "", nil, nil, err
671+
}
672+
desc, err := r.getDescriptor(ref, options)
673+
if err != nil {
674+
return "", nil, nil, fmt.Errorf("unable to check digest in registry: %w", err)
594675
}
595-
return digest.String(), nil
676+
return digest.String(), ref, desc, nil
596677
case name.Tag:
597-
var digest gcrv1.Hash
598-
599-
desc, err := remote.Head(ref, options...)
600-
if err == nil {
601-
digest = desc.Digest
602-
} else {
603-
rdesc, err := remote.Get(ref, options...)
604-
if err != nil {
605-
return "", err
606-
}
607-
digest = rdesc.Descriptor.Digest
678+
desc, err := r.getDescriptor(ref, options)
679+
if err != nil {
680+
return "", nil, nil, err
608681
}
609-
return fmt.Sprintf("%s@%s", ref.TagStr(), digest.String()), nil
682+
digest := desc.Digest.String()
683+
return fmt.Sprintf("%s@%s", ref.TagStr(), digest), ref.Digest(digest), desc, nil
610684
default:
611-
return "", fmt.Errorf("unsupported reference type: %T", ref)
685+
return "", nil, nil, fmt.Errorf("unsupported reference type: %T", ref)
612686
}
613687
}
614688

0 commit comments

Comments
 (0)