diff --git a/app/store/database/migrate/postgres/0076_oci_image_index_mapping.down.sql b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.down.sql new file mode 100644 index 000000000..5d82269d7 --- /dev/null +++ b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS oci_image_index_mappings; \ No newline at end of file diff --git a/app/store/database/migrate/postgres/0076_oci_image_index_mapping.up.sql b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.up.sql new file mode 100644 index 000000000..b4db8b53a --- /dev/null +++ b/app/store/database/migrate/postgres/0076_oci_image_index_mapping.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS oci_image_index_mappings +( + oci_mapping_id SERIAL PRIMARY KEY, + oci_mapping_parent_manifest_id BIGINT NOT NULL, + oci_mapping_child_digest bytea NOT NULL, + oci_mapping_created_at BIGINT NOT NULL, + oci_mapping_updated_at BIGINT NOT NULL, + oci_mapping_created_by INTEGER NOT NULL, + oci_mapping_updated_by INTEGER NOT NULL, + CONSTRAINT unique_oci_mapping_digests + UNIQUE (oci_mapping_parent_manifest_id, oci_mapping_child_digest), + CONSTRAINT fk_oci_mapping_registry_id + FOREIGN KEY (oci_mapping_parent_manifest_id) + REFERENCES manifests(manifest_id) + ON DELETE CASCADE +) \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.down.sql b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.down.sql new file mode 100644 index 000000000..5d82269d7 --- /dev/null +++ b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS oci_image_index_mappings; \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.up.sql b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.up.sql new file mode 100644 index 000000000..f95ba9815 --- /dev/null +++ b/app/store/database/migrate/sqlite/0076_oci_image_index_mapping.up.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS oci_image_index_mappings +( + oci_mapping_id INTEGER PRIMARY KEY AUTOINCREMENT, + oci_mapping_parent_manifest_id BIGINT NOT NULL, + oci_mapping_child_digest bytea NOT NULL, + oci_mapping_created_at BIGINT NOT NULL, + oci_mapping_updated_at BIGINT NOT NULL, + oci_mapping_created_by INTEGER NOT NULL, + oci_mapping_updated_by INTEGER NOT NULL, + CONSTRAINT unique_oci_mapping_digests + UNIQUE (oci_mapping_parent_manifest_id, oci_mapping_child_digest), + CONSTRAINT fk_oci_mapping_registry_id + FOREIGN KEY (oci_mapping_parent_manifest_id) + REFERENCES manifests(manifest_id) + ON DELETE CASCADE +) \ No newline at end of file diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index 10fcbb7d0..240824c23 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -442,14 +442,16 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro artifactRepository := database2.ProvideArtifactDao(db) layerRepository := database2.ProvideLayerDao(db, mediaTypesRepository) eventReporter := docker.ProvideReporter() - manifestService := docker.ManifestServiceProvider(registryRepository, manifestRepository, blobRepository, mediaTypesRepository, manifestReferenceRepository, tagRepository, imageRepository, artifactRepository, layerRepository, gcService, transactor, eventReporter, spacePathStore) + ociImageIndexMappingRepository := database2.ProvideOCIImageIndexMappingDao(db) + manifestService := docker.ManifestServiceProvider(registryRepository, manifestRepository, blobRepository, mediaTypesRepository, manifestReferenceRepository, tagRepository, imageRepository, artifactRepository, layerRepository, gcService, transactor, eventReporter, spacePathStore, ociImageIndexMappingRepository) registryBlobRepository := database2.ProvideRegistryBlobDao(db) bandwidthStatRepository := database2.ProvideBandwidthStatDao(db) downloadStatRepository := database2.ProvideDownloadStatDao(db) localRegistry := docker.LocalRegistryProvider(app, manifestService, blobRepository, registryRepository, manifestRepository, registryBlobRepository, mediaTypesRepository, tagRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository, gcService, transactor) upstreamProxyConfigRepository := database2.ProvideUpstreamDao(db, registryRepository, spacePathStore) secretService := secret3.ProvideSecretService(secretStore, encrypter, spacePathStore) - remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spacePathStore, secretService) + proxyController := docker.ProvideProxyController(localRegistry, manifestService, secretService, spacePathStore) + remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spacePathStore, secretService, proxyController) coreController := pkg.CoreControllerProvider(registryRepository) dockerController := docker.ControllerProvider(localRegistry, remoteRegistry, coreController, spaceStore, authorizer) handler := api2.NewHandlerProvider(dockerController, spaceStore, tokenStore, controller, authenticator, provider, authorizer) diff --git a/registry/app/api/controller/metadata/create_registry.go b/registry/app/api/controller/metadata/create_registry.go index 254ff07b8..732044c3a 100644 --- a/registry/app/api/controller/metadata/create_registry.go +++ b/registry/app/api/controller/metadata/create_registry.go @@ -320,6 +320,7 @@ func (c *APIController) CreateUpstreamProxyEntity( } upstreamProxyConfigEntity.SecretSpaceID = *res.SecretSpaceId + upstreamProxyConfigEntity.SecretIdentifier = *res.SecretIdentifier } return repoEntity, upstreamProxyConfigEntity, nil } diff --git a/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go b/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go index 8854aacfa..bce5adf36 100644 --- a/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go +++ b/registry/app/api/controller/metadata/get_artifacts_docker_manifests.go @@ -102,28 +102,9 @@ func (c *APIController) GetDockerArtifactManifests( } manifestDetailsList = append(manifestDetailsList, getManifestDetails(m, mConfig)) case *ml.DeserializedManifestList: - for _, manifestEntry := range reqManifest.Manifests { - dgst, err := types.NewDigest(manifestEntry.Digest) - if err != nil { - return artifactManifestsErrorRs(err), nil - } - referencedManifest, err := c.ManifestStore.FindManifestByDigest(ctx, registry.ID, image, dgst) - if err != nil { - if errors.Is(err, store2.ErrResourceNotFound) { - return artifactManifestsErrorRs( - fmt.Errorf("manifest not found"), - ), nil - } - return artifactManifestsErrorRs(err), nil - } - mConfig, err := getManifestConfig( - ctx, referencedManifest.Configuration.Digest, - regInfo.RootIdentifier, c.StorageDriver, - ) - if err != nil { - return artifactManifestsErrorRs(err), nil - } - manifestDetailsList = append(manifestDetailsList, getManifestDetails(referencedManifest, mConfig)) + manifestDetailsList, err = c.getManifestList(ctx, reqManifest, registry, image, regInfo) + if err != nil { + return artifactManifestsErrorRs(err), nil } default: log.Ctx(ctx).Error().Stack().Err(err).Msgf("Unknown manifest type: %T", manifest) @@ -141,6 +122,38 @@ func (c *APIController) GetDockerArtifactManifests( }, nil } +func (c *APIController) getManifestList( + ctx context.Context, reqManifest *ml.DeserializedManifestList, registry *types.Registry, image string, + regInfo *RegistryRequestBaseInfo, +) ([]artifact.DockerManifestDetails, error) { + manifestDetailsList := []artifact.DockerManifestDetails{} + for _, manifestEntry := range reqManifest.Manifests { + dgst, err := types.NewDigest(manifestEntry.Digest) + if err != nil { + return nil, err + } + referencedManifest, err := c.ManifestStore.FindManifestByDigest(ctx, registry.ID, image, dgst) + if err != nil { + if errors.Is(err, store2.ErrResourceNotFound) { + if registry.Type == artifact.RegistryTypeUPSTREAM { + continue + } + return nil, fmt.Errorf("manifest: %s not found", dgst.String()) + } + return nil, err + } + mConfig, err := getManifestConfig( + ctx, referencedManifest.Configuration.Digest, + regInfo.RootIdentifier, c.StorageDriver, + ) + if err != nil { + return nil, err + } + manifestDetailsList = append(manifestDetailsList, getManifestDetails(referencedManifest, mConfig)) + } + return manifestDetailsList, nil +} + func artifactManifestsErrorRs(err error) artifact.GetDockerArtifactManifestsResponseObject { return artifact.GetDockerArtifactManifests500JSONResponse{ InternalServerErrorJSONResponse: artifact.InternalServerErrorJSONResponse( diff --git a/registry/app/api/controller/metadata/update_registry.go b/registry/app/api/controller/metadata/update_registry.go index 7c08d04f5..fe917a94c 100644 --- a/registry/app/api/controller/metadata/update_registry.go +++ b/registry/app/api/controller/metadata/update_registry.go @@ -394,6 +394,7 @@ func (c *APIController) UpdateUpstreamProxyEntity( return nil, nil, err } upstreamProxyConfigEntity.SecretSpaceID = *res.SecretSpaceId + upstreamProxyConfigEntity.SecretIdentifier = *res.SecretIdentifier } else { upstreamProxyConfigEntity.UserName = "" upstreamProxyConfigEntity.SecretIdentifier = "" diff --git a/registry/app/api/handler/oci/get_blob.go b/registry/app/api/handler/oci/get_blob.go index 37f734bf8..e55c2086c 100644 --- a/registry/app/api/handler/oci/get_blob.go +++ b/registry/app/api/handler/oci/get_blob.go @@ -45,10 +45,16 @@ func (h *Handler) GetBlob(w http.ResponseWriter, r *http.Request) { } defer func() { if response.Body != nil { - response.Body.Close() + err := response.Body.Close() + if err != nil { + log.Ctx(ctx).Error().Msgf("Failed to close body: %v", err) + } } if response.ReadCloser != nil { - response.ReadCloser.Close() + err := response.ReadCloser.Close() + if err != nil { + log.Ctx(ctx).Error().Msgf("Failed to close readCloser: %v", err) + } } }() @@ -57,6 +63,11 @@ func (h *Handler) GetBlob(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, response.RedirectURL, http.StatusTemporaryRedirect) return } + + if response.ResponseHeaders != nil && response.ResponseHeaders.Code == http.StatusMovedPermanently { + response.ResponseHeaders.WriteToResponse(w) + return + } response.ResponseHeaders.WriteHeadersToResponse(w) if r.Method == http.MethodHead { return diff --git a/registry/app/api/handler/oci/get_manifest.go b/registry/app/api/handler/oci/get_manifest.go index cf65e8c9a..be7f5f430 100644 --- a/registry/app/api/handler/oci/get_manifest.go +++ b/registry/app/api/handler/oci/get_manifest.go @@ -46,6 +46,9 @@ func (h *Handler) GetManifest(w http.ResponseWriter, r *http.Request) { return } response.ResponseHeaders.WriteToResponse(w) + if response.ResponseHeaders.Code == http.StatusMovedPermanently { + return + } _, bytes, _ := response.Manifest.Payload() if _, err := w.Write(bytes); err != nil { log.Ctx(ctx).Error().Err(err).Msg("Failed to write response") diff --git a/registry/app/pkg/docker/local.go b/registry/app/pkg/docker/local.go index 8183e7f9a..af67dd4cf 100644 --- a/registry/app/pkg/docker/local.go +++ b/registry/app/pkg/docker/local.go @@ -404,16 +404,20 @@ func (r *LocalRegistry) fetchBlobInternal( } if http.MethodGet == method { + // This GoRoutine is used to update the bandwidth stat of the artifact go func(art pkg.RegistryInfo, dgst digest.Digest) { // Cloning Context. session, _ := request.AuthSessionFrom(ctx) ctx3 := request.WithAuthSession(context.Background(), session) err := r.dbBlobDownloadComplete(ctx3, dgst, info) if err != nil { - log.Error().Stack().Err(err).Msgf("error while putting bandwidth stat of artifact, %v", err) + log.Ctx(ctx3).Error().Stack().Str("goRoutine", + "UpdateBandwidth").Err(err).Msgf("error while putting bandwidth stat of artifact, %v", + err) return } - log.Info().Msgf("Successfully updated the bandwidth stat metrics %s", art.Digest) + log.Ctx(ctx3).Info().Str("goRoutine", + "UpdateBandwidth").Msgf("Successfully updated the bandwidth stat metrics %s", art.Digest) }(info, dgst) } @@ -435,16 +439,23 @@ func (r *LocalRegistry) PullManifest( ifNoneMatchHeader []string, ) (responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifest manifest.Manifest, errs []error) { responseHeaders, descriptor, manifest, errs = r.ManifestExist(ctx, artInfo, acceptHeaders, ifNoneMatchHeader) + + // This GoRoutine is used to update the download stat of the artifact when manifest is pulled go func(art pkg.RegistryInfo) { // Cloning Context. session, _ := request.AuthSessionFrom(ctx) ctx2 := request.WithAuthSession(context.Background(), session) + ctx2 = log.Ctx(ctx2).With(). + Str("goRoutine", "UpdateDownload"). + Logger().WithContext(ctx2) err := r.dbGetManifestComplete(ctx2, artInfo) if err != nil { - log.Error().Stack().Err(err).Msgf("error while putting download stat of artifact, %v", err) + log.Ctx(ctx2).Error().Str("goRoutine", + "UpdateDownload").Stack().Err(err).Msgf("error while putting download stat of artifact, %v", err) return } - log.Info().Msgf("Successfully updated the download stat metrics %s", art.Digest) + log.Ctx(ctx2).Info().Str("goRoutine", + "UpdateDownload").Msgf("Successfully updated the download stat metrics %s", art.Digest) }(artInfo) return responseHeaders, descriptor, manifest, errs } @@ -821,7 +832,7 @@ func (r *LocalRegistry) PutManifest( responseHeaders.Headers["Docker-Content-Digest"] = d.String() responseHeaders.Code = http.StatusCreated - log.Debug().Msg("Succeeded in putting manifest!") + log.Ctx(ctx).Debug().Msgf("Succeeded in putting manifest: %s", d.String()) return responseHeaders, errs } @@ -1651,6 +1662,11 @@ func (r *LocalRegistry) dbGetManifestComplete( ctx context.Context, info pkg.RegistryInfo, ) error { + // FIXME: Update logic incase requests are internal. Currently, we are updating the stats for all requests. + if info.Digest == "" { + return nil + } + err := r.tx.WithTx( ctx, func(ctx context.Context) error { registry, err := r.registryDao.GetByParentIDAndName(ctx, info.ParentID, info.RegIdentifier) @@ -1663,7 +1679,11 @@ func (r *LocalRegistry) dbGetManifestComplete( return err } - artifact, err := r.artifactDao.GetByName(ctx, image.ID, info.Digest) + newDigest, err := types.NewDigest(digest.Digest(info.Digest)) + if err != nil { + log.Ctx(ctx).Error().Stack().Err(err).Msgf("error parsing digest: %s %v", info.Digest, err) + } + artifact, err := r.artifactDao.GetByName(ctx, image.ID, newDigest.String()) if err != nil { return err } diff --git a/registry/app/pkg/docker/manifest_service.go b/registry/app/pkg/docker/manifest_service.go index 48fb6beed..33dfaa6dc 100644 --- a/registry/app/pkg/docker/manifest_service.go +++ b/registry/app/pkg/docker/manifest_service.go @@ -23,7 +23,8 @@ import ( "fmt" "time" - gitnessappstore "github.com/harness/gitness/app/store" + gas "github.com/harness/gitness/app/store" + "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact" "github.com/harness/gitness/registry/app/event" "github.com/harness/gitness/registry/app/manifest" "github.com/harness/gitness/registry/app/manifest/manifestlist" @@ -47,19 +48,20 @@ import ( ) type manifestService struct { - registryDao store.RegistryRepository - manifestDao store.ManifestRepository - layerDao store.LayerRepository - blobRepo store.BlobRepository - mtRepository store.MediaTypesRepository - tagDao store.TagRepository - imageDao store.ImageRepository - artifactDao store.ArtifactRepository - manifestRefDao store.ManifestReferenceRepository - spacePathStore gitnessappstore.SpacePathStore - gcService gc.Service - tx dbtx.Transactor - reporter event.Reporter + registryDao store.RegistryRepository + manifestDao store.ManifestRepository + layerDao store.LayerRepository + blobRepo store.BlobRepository + mtRepository store.MediaTypesRepository + tagDao store.TagRepository + imageDao store.ImageRepository + artifactDao store.ArtifactRepository + manifestRefDao store.ManifestReferenceRepository + ociImageIndexMappingDao store.OCIImageIndexMappingRepository + spacePathStore gas.SpacePathStore + gcService gc.Service + tx dbtx.Transactor + reporter event.Reporter } func NewManifestService( @@ -67,22 +69,24 @@ func NewManifestService( blobRepo store.BlobRepository, mtRepository store.MediaTypesRepository, tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository, layerDao store.LayerRepository, manifestRefDao store.ManifestReferenceRepository, - tx dbtx.Transactor, gcService gc.Service, reporter event.Reporter, spacePathStore gitnessappstore.SpacePathStore, + tx dbtx.Transactor, gcService gc.Service, reporter event.Reporter, spacePathStore gas.SpacePathStore, + ociImageIndexMappingDao store.OCIImageIndexMappingRepository, ) ManifestService { return &manifestService{ - registryDao: registryDao, - manifestDao: manifestDao, - layerDao: layerDao, - blobRepo: blobRepo, - mtRepository: mtRepository, - tagDao: tagDao, - artifactDao: artifactDao, - imageDao: imageDao, - manifestRefDao: manifestRefDao, - gcService: gcService, - tx: tx, - reporter: reporter, - spacePathStore: spacePathStore, + registryDao: registryDao, + manifestDao: manifestDao, + layerDao: layerDao, + blobRepo: blobRepo, + mtRepository: mtRepository, + tagDao: tagDao, + artifactDao: artifactDao, + imageDao: imageDao, + manifestRefDao: manifestRefDao, + gcService: gcService, + tx: tx, + reporter: reporter, + spacePathStore: spacePathStore, + ociImageIndexMappingDao: ociImageIndexMappingDao, } } @@ -107,6 +111,7 @@ type ManifestService interface { ) error DeleteTag(ctx context.Context, repoKey string, tag string, info pkg.RegistryInfo) (bool, error) DeleteManifest(ctx context.Context, repoKey string, d digest.Digest, info pkg.RegistryInfo) error + AddManifestAssociation(ctx context.Context, repoKey string, digest digest.Digest, info pkg.RegistryInfo) error DBFindRepositoryBlob( ctx context.Context, desc manifest.Descriptor, repoID int64, info pkg.RegistryInfo, @@ -204,7 +209,8 @@ func (l *manifestService) dbTagManifest( } // Create or update artifact and tag records - if err := l.createOrUpdateArtifactAndTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName, dgst); err != nil { + if err := l.upsertArtifactAndTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName, + dgst); err != nil { return formatFailedToTagErr(err) } @@ -246,7 +252,7 @@ func (l *manifestService) lockManifestForGC(ctx context.Context, repoID, manifes } // Creates or updates artifact and tag records. -func (l *manifestService) createOrUpdateArtifactAndTag( +func (l *manifestService) upsertArtifactAndTag( ctx context.Context, registryID, manifestID int64, @@ -418,7 +424,7 @@ func (l *manifestService) dbPutManifestV2( return nil } - log.Debug().Msgf("manifest not found in database") + log.Debug().Msgf("manifest %s not found in database", dgst.String()) cfg := &types.Configuration{ MediaType: mfst.Config().MediaType, @@ -524,6 +530,56 @@ func (l *manifestService) DBFindRepositoryBlob( return b, nil } +// AddManifestAssociation This updates the manifestRefs for all new childDigests to their already existing parent +// manifests. This is used when a manifest from a manifest list is pulled from the remote and manifest list already +// exists in the database. +func (l *manifestService) AddManifestAssociation( + ctx context.Context, repoKey string, childDigest digest.Digest, info pkg.RegistryInfo, +) error { + newDigest, err2 := types.NewDigest(childDigest) + if err2 != nil { + return fmt.Errorf("failed to create digest: %s %w", childDigest, err2) + } + r, err := l.registryDao.GetByParentIDAndName(ctx, info.ParentID, repoKey) + if err != nil { + return fmt.Errorf("failed to get registry: %s %w", repoKey, err) + } + childManifest, err2 := l.manifestDao.FindManifestByDigest(ctx, r.ID, info.Image, newDigest) + if err2 != nil { + return fmt.Errorf("failed to find manifest by digest. Repo: %d Image: %s %w", r.ID, info.Image, err2) + } + mappings, err := l.ociImageIndexMappingDao.GetAllByChildDigest(ctx, r.ID, childManifest.ImageName, newDigest) + if err != nil { + return fmt.Errorf("failed to get oci image index mappings. Repo: %d Image: %s %w", + r.ID, + childManifest.ImageName, + err) + } + for _, mapping := range mappings { + parentManifest, err := l.manifestDao.Get(ctx, mapping.ParentManifestID) + if err != nil { + return fmt.Errorf("failed to get manifest with ID: %d %w", mapping.ParentManifestID, err) + } + if err := l.manifestRefDao.AssociateManifest(ctx, parentManifest, childManifest); err != nil { + if errors.Is(err, util.ErrRefManifestNotFound) { + // This can only happen if the online GC deleted one + // of the referenced manifests (because they were + // untagged/unreferenced) between the call to + // `FindAndLockNBefore` and `AssociateManifest`. For now + // we need to return this error to mimic the behaviour + // of the corresponding filesystem validation. + log.Error(). + Msgf("Failed to associate manifest Ref Manifest not found. parentDigest:%s childDigest:%s %v", + parentManifest.Digest.String(), + childManifest.Digest.String(), + err) + return err + } + } + } + return nil +} + func (l *manifestService) handleSubject( ctx context.Context, subject manifest.Descriptor, artifactType string, annotations map[string]string, dbRepo *types.Registry, @@ -620,15 +676,9 @@ func (l *manifestService) dbPutManifestList( ImageName: info.Image, } - mm := make([]*types.Manifest, 0, len(manifestList.Manifests)) - ids := make([]int64, 0, len(mm)) - for _, desc := range manifestList.Manifests { - m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) - if err != nil { - return err - } - mm = append(mm, m) - ids = append(ids, m.ID) + mm, ids, err2 := l.validateManifestList(ctx, manifestList, r, info) + if err2 != nil { + return err2 } err = l.tx.WithTx( @@ -675,14 +725,102 @@ func (l *manifestService) dbPutManifestList( return err } } + + err = l.mapManifestList(ctx, ml.ID, manifestList, r) + if err != nil { + return fmt.Errorf("failed to map manifest list: %w", err) + } + return nil }, ) if err != nil { - log.Ctx(ctx).Error().Err(err).Msgf("failed to create manifest list in database") + log.Ctx(ctx).Error().Err(err).Msgf("failed to create manifest list in database: %v", err) + return fmt.Errorf("failed to create manifest list in database: %w", err) } - return err + return nil +} + +func (l *manifestService) validateManifestIndex( + ctx context.Context, manifestList *ocischema.DeserializedImageIndex, r *types.Registry, info pkg.RegistryInfo, +) ([]*types.Manifest, []int64, error) { + mm := make([]*types.Manifest, 0, len(manifestList.Manifests)) + ids := make([]int64, 0, len(manifestList.Manifests)) + for _, desc := range manifestList.Manifests { + m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) + if errors.Is(err, gitnessstore.ErrResourceNotFound) && r.Type == artifact.RegistryTypeUPSTREAM { + continue + } + if err != nil { + return nil, nil, err + } + mm = append(mm, m) + ids = append(ids, m.ID) + } + log.Ctx(ctx).Debug().Msgf("validated %d / %d manifests in index", len(mm), len(manifestList.Manifests)) + return mm, ids, nil +} + +func (l *manifestService) mapManifestIndex( + ctx context.Context, mi int64, manifestList *ocischema.DeserializedImageIndex, r *types.Registry, +) error { + if r.Type != artifact.RegistryTypeUPSTREAM { + return nil + } + for _, desc := range manifestList.Manifests { + err := l.ociImageIndexMappingDao.Create(ctx, &types.OCIImageIndexMapping{ + ParentManifestID: mi, + ChildManifestDigest: desc.Digest, + }) + if err != nil { + log.Ctx(ctx).Error().Err(err). + Msgf("failed to create oci image index manifest for digest %s", desc.Digest) + return fmt.Errorf("failed to create oci image index manifest: %w", err) + } + } + log.Ctx(ctx).Debug().Msgf("successfully mapped manifest index %d with its manifests", mi) + return nil +} + +func (l *manifestService) validateManifestList( + ctx context.Context, manifestList *manifestlist.DeserializedManifestList, r *types.Registry, info pkg.RegistryInfo, +) ([]*types.Manifest, []int64, error) { + mm := make([]*types.Manifest, 0, len(manifestList.Manifests)) + ids := make([]int64, 0, len(manifestList.Manifests)) + for _, desc := range manifestList.Manifests { + m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) + if errors.Is(err, gitnessstore.ErrResourceNotFound) && r.Type == artifact.RegistryTypeUPSTREAM { + continue + } + if err != nil { + return nil, nil, err + } + mm = append(mm, m) + ids = append(ids, m.ID) + } + log.Ctx(ctx).Debug().Msgf("validated %d / %d manifests in list", len(mm), len(manifestList.Manifests)) + return mm, ids, nil +} + +func (l *manifestService) mapManifestList( + ctx context.Context, mi int64, manifestList *manifestlist.DeserializedManifestList, r *types.Registry, +) error { + if r.Type != artifact.RegistryTypeUPSTREAM { + return nil + } + for _, desc := range manifestList.Manifests { + err := l.ociImageIndexMappingDao.Create(ctx, &types.OCIImageIndexMapping{ + ParentManifestID: mi, + ChildManifestDigest: desc.Digest, + }) + if err != nil { + log.Ctx(ctx).Error().Err(err).Msgf("failed to create oci image index manifest for digest %s", desc.Digest) + return fmt.Errorf("failed to create oci image index manifest: %w", err) + } + } + log.Ctx(ctx).Debug().Msgf("successfully mapped manifest list %d with its manifests", mi) + return nil } func (l *manifestService) dbPutImageIndex( @@ -738,15 +876,9 @@ func (l *manifestService) dbPutImageIndex( return subjectHandlingError } - mm := make([]*types.Manifest, 0, len(imageIndex.Manifests)) - ids := make([]int64, 0, len(mm)) - for _, desc := range imageIndex.Manifests { - m, err := l.dbFindManifestListManifest(ctx, r, info.Image, desc.Digest) - if err != nil { - return err - } - mm = append(mm, m) - ids = append(ids, m.ID) + mm, ids, err := l.validateManifestIndex(ctx, imageIndex, r, info) + if err != nil { + return fmt.Errorf("failed to map manifest index: %w", err) } err = l.tx.WithTx( @@ -792,6 +924,11 @@ func (l *manifestService) dbPutImageIndex( return err } } + + err = l.mapManifestIndex(ctx, mi.ID, imageIndex, r) + if err != nil { + return fmt.Errorf("failed to map manifest index: %w", err) + } return nil }, ) @@ -843,8 +980,8 @@ func (l *manifestService) dbFindManifestListManifest( if err != nil { if errors.Is(err, gitnessstore.ErrResourceNotFound) { return nil, fmt.Errorf( - "manifest %s not found for %s/%s", digest.String(), - repository.Name, imageName, + "manifest %s not found for %s/%s: %w", digest.String(), + repository.Name, imageName, err, ) } return nil, err diff --git a/registry/app/pkg/docker/remote.go b/registry/app/pkg/docker/remote.go index b7e37c544..21fb34489 100644 --- a/registry/app/pkg/docker/remote.go +++ b/registry/app/pkg/docker/remote.go @@ -21,12 +21,16 @@ import ( "fmt" "io" "net/http" + "strings" "time" "github.com/harness/gitness/app/api/request" store2 "github.com/harness/gitness/app/store" + "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact" "github.com/harness/gitness/registry/app/common/lib/errors" "github.com/harness/gitness/registry/app/manifest" + "github.com/harness/gitness/registry/app/manifest/manifestlist" + "github.com/harness/gitness/registry/app/manifest/schema2" "github.com/harness/gitness/registry/app/pkg" "github.com/harness/gitness/registry/app/pkg/commons" proxy2 "github.com/harness/gitness/registry/app/remote/controller/proxy" @@ -49,14 +53,26 @@ const ( func NewRemoteRegistry( local *LocalRegistry, app *App, upstreamProxyConfigRepo store.UpstreamProxyConfigRepository, - spacePathStore store2.SpacePathStore, secretService secret.Service, + spacePathStore store2.SpacePathStore, secretService secret.Service, proxyCtl proxy2.Controller, ) Registry { + cache := proxy2.GetManifestCache(local, local.ms) + listCache := proxy2.GetManifestListCache(local) + + registry := map[string]proxy2.ManifestCacheHandler{ + manifestlist.MediaTypeManifestList: listCache, + v1.MediaTypeImageIndex: listCache, + schema2.MediaTypeManifest: cache, + proxy2.DefaultHandler: cache, + } + return &RemoteRegistry{ local: local, App: app, upstreamProxyConfigRepo: upstreamProxyConfigRepo, spacePathStore: spacePathStore, secretService: secretService, + manifestCacheHandlerMap: registry, + proxyCtl: proxyCtl, } } @@ -70,21 +86,42 @@ type RemoteRegistry struct { upstreamProxyConfigRepo store.UpstreamProxyConfigRepository spacePathStore store2.SpacePathStore secretService secret.Service + proxyCtl proxy2.Controller + manifestCacheHandlerMap map[string]proxy2.ManifestCacheHandler } func (r *RemoteRegistry) Base() error { panic("Not implemented yet, will be done during Replication flows") } -func defaultLibrary() (bool, string, error) { - // get upstream Repository and check if the path contains library prefix. If yes, redirect to the correct path without - // library prefix. - return false, "", nil +// defaultLibrary checks if we need to append "library/" to dockerhub images. For example, if the image is +// "alpine" then we need to append "library/alpine" to the image. +func (r *RemoteRegistry) defaultLibrary(ctx context.Context, artInfo pkg.RegistryInfo) (bool, error) { + upstreamProxy, err := r.upstreamProxyConfigRepo.GetByRegistryIdentifier( + ctx, artInfo.ParentID, artInfo.RegIdentifier, + ) + if err != nil { + return false, err + } + if upstreamProxy.Source != string(artifact.UpstreamConfigSourceDockerhub) { + log.Ctx(ctx).Debug().Msg("upstream proxy source is not Dockerhub") + return false, nil + } + if strings.Contains(artInfo.Image, "/") { + log.Ctx(ctx).Debug().Msgf("image name %s contains a slash", artInfo.Image) + return false, nil + } + return true, nil } // defaultManifestURL return the real url for request with default project. -func defaultManifestURL(regIdentifier string, name string, a pkg.RegistryInfo) string { - return fmt.Sprintf("/v2/%s/library/%s/manifests/%s", regIdentifier, name, a.Reference) +func defaultManifestURL(rootIdentifier string, regIdentifier string, name string, a pkg.RegistryInfo) string { + return fmt.Sprintf("/v2/%s/%s/library/%s/manifests/%s", rootIdentifier, regIdentifier, name, a.Reference) +} + +// defaultBlobURL return the real url for request with default project. +func defaultBlobURL(rootIdentifier string, regIdentifier string, name string, digest string) string { + return fmt.Sprintf("/v2/%s/%s/library/%s/blobs/%s", rootIdentifier, regIdentifier, name, digest) } func proxyManifestHead( @@ -106,6 +143,7 @@ func proxyManifestHead( return errors.NotFoundError(fmt.Errorf("the tag %v:%v is not found", art.Image, art.Tag)) } + // This goRoutine is to update the tag of recently pulled manifest if required if len(art.Tag) > 0 { go func(art pkg.RegistryInfo) { // Write function to update local storage. @@ -119,12 +157,17 @@ func proxyManifestHead( for i := 0; i < ensureTagMaxRetry; i++ { time.Sleep(ensureTagInterval) count++ - log.Ctx(ctx).Info().Msgf("Ensure tag: %s for image: %s, retry: %d", tag, info.Image, count) + log.Ctx(ctx2).Info().Str("goRoutine", "EnsureTag").Msgf("Tag %s for image: %s, retry: %d", tag, + info.Image, + count) e := ctl.EnsureTag(ctx2, responseHeaders, art, acceptHeaders, ifNoneMatchHeader) if e != nil { - log.Ctx(ctx).Warn().Err(e).Msgf("Failed to update tag: ") + log.Ctx(ctx2).Warn().Str("goRoutine", + "EnsureTag").Err(e).Msgf("Failed to update tag: %s for image: %s", + tag, info.Image) } else { - log.Ctx(ctx).Info().Msgf("Tag updated: %s for image: %s", tag, info.Image) + log.Ctx(ctx2).Info().Str("goRoutine", "EnsureTag").Msgf("Tag updated: %s for image: %s", tag, + info.Image) return } } @@ -147,20 +190,19 @@ func (r *RemoteRegistry) ManifestExist( responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifestResult manifest.Manifest, errs []error, ) { - proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore) responseHeaders = &commons.ResponseHeaders{ Headers: make(map[string]string), } - defaultProj, name, err := defaultLibrary() + isDefault, err := r.defaultLibrary(ctx, artInfo) if err != nil { errs = append(errs, err) return responseHeaders, descriptor, manifestResult, errs } registryInfo := artInfo - if defaultProj { + if isDefault { responseHeaders.Code = http.StatusMovedPermanently responseHeaders.Headers = map[string]string{ - "Location": defaultManifestURL(artInfo.RegIdentifier, name, registryInfo), + "Location": defaultManifestURL(artInfo.RootIdentifier, artInfo.RegIdentifier, artInfo.Image, registryInfo), } return responseHeaders, descriptor, manifestResult, errs } @@ -183,7 +225,7 @@ func (r *RemoteRegistry) ManifestExist( errs = append(errs, errors.New("Proxy is down")) return responseHeaders, descriptor, manifestResult, errs } - useLocal, man, err := proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) + useLocal, man, err := r.proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) if err != nil { errs = append(errs, err) @@ -210,7 +252,7 @@ func (r *RemoteRegistry) ManifestExist( err = proxyManifestHead( ctx, responseHeaders, - proxyCtl, + r.proxyCtl, registryInfo, remoteHelper, artInfo, @@ -237,20 +279,19 @@ func (r *RemoteRegistry) PullManifest( responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifestResult manifest.Manifest, errs []error, ) { - proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore) responseHeaders = &commons.ResponseHeaders{ Headers: make(map[string]string), } - defaultProj, name, err := defaultLibrary() + isDefault, err := r.defaultLibrary(ctx, artInfo) if err != nil { errs = append(errs, err) return responseHeaders, descriptor, manifestResult, errs } registryInfo := artInfo - if defaultProj { + if isDefault { responseHeaders.Code = http.StatusMovedPermanently responseHeaders.Headers = map[string]string{ - "Location": defaultManifestURL(artInfo.RegIdentifier, name, registryInfo), + "Location": defaultManifestURL(artInfo.RootIdentifier, artInfo.RegIdentifier, artInfo.Image, registryInfo), } return responseHeaders, descriptor, manifestResult, errs } @@ -272,7 +313,7 @@ func (r *RemoteRegistry) PullManifest( errs = append(errs, errors.New("Proxy is down")) return responseHeaders, descriptor, manifestResult, errs } - useLocal, man, err := proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) + useLocal, man, err := r.proxyCtl.UseLocalManifest(ctx, registryInfo, remoteHelper, acceptHeaders, ifNoneMatchHeader) if err != nil { errs = append(errs, err) @@ -304,7 +345,7 @@ func (r *RemoteRegistry) PullManifest( manifestResult, err = proxyManifestGet( ctx, responseHeaders, - proxyCtl, + r.proxyCtl, registryInfo, remoteHelper, artInfo.RegIdentifier, @@ -352,7 +393,6 @@ func (r *RemoteRegistry) fetchBlobInternal( responseHeaders *commons.ResponseHeaders, fr *storage.FileReader, size int64, readCloser io.ReadCloser, redirectURL string, errs []error, ) { - proxyCtl := proxy2.ControllerInstance(r.local, r.local.ms, r.secretService, r.spacePathStore) responseHeaders = &commons.ResponseHeaders{ Headers: make(map[string]string), } @@ -360,7 +400,7 @@ func (r *RemoteRegistry) fetchBlobInternal( log.Ctx(ctx).Info().Msgf("Proxy: %s", repoKey) // Handle dockerhub request without library prefix. - isDefault, name, err := defaultLibrary() + isDefault, err := r.defaultLibrary(ctx, info) if err != nil { errs = append(errs, err) return responseHeaders, fr, size, readCloser, redirectURL, errs @@ -369,7 +409,7 @@ func (r *RemoteRegistry) fetchBlobInternal( if isDefault { responseHeaders.Code = http.StatusMovedPermanently responseHeaders.Headers = map[string]string{ - "Location": defaultManifestURL(repoKey, name, registryInfo), + "Location": defaultBlobURL(info.RootIdentifier, repoKey, info.Image, info.Digest), } return responseHeaders, fr, size, readCloser, redirectURL, errs } @@ -378,7 +418,7 @@ func (r *RemoteRegistry) fetchBlobInternal( errs = append(errs, errors.New("Blob not found")) } - if proxyCtl.UseLocalBlob(ctx, registryInfo) { + if r.proxyCtl.UseLocalBlob(ctx, registryInfo) { switch method { case http.MethodGet: headers, reader, s, closer, url, e := r.local.GetBlob(ctx, info) @@ -398,7 +438,7 @@ func (r *RemoteRegistry) fetchBlobInternal( } // This is start of proxy Code. - size, readCloser, err = proxyCtl.ProxyBlob(ctx, registryInfo, repoKey, *upstreamProxy) + size, readCloser, err = r.proxyCtl.ProxyBlob(ctx, registryInfo, repoKey, *upstreamProxy) if err != nil { errs = append(errs, err) return responseHeaders, fr, size, readCloser, redirectURL, errs diff --git a/registry/app/pkg/docker/wire.go b/registry/app/pkg/docker/wire.go index 96ebdd8a8..8a24cc016 100644 --- a/registry/app/pkg/docker/wire.go +++ b/registry/app/pkg/docker/wire.go @@ -19,7 +19,10 @@ import ( gitnessstore "github.com/harness/gitness/app/store" storagedriver "github.com/harness/gitness/registry/app/driver" "github.com/harness/gitness/registry/app/event" + "github.com/harness/gitness/registry/app/manifest/manifestlist" + "github.com/harness/gitness/registry/app/manifest/schema2" "github.com/harness/gitness/registry/app/pkg" + proxy2 "github.com/harness/gitness/registry/app/remote/controller/proxy" "github.com/harness/gitness/registry/app/storage" "github.com/harness/gitness/registry/app/store" "github.com/harness/gitness/registry/gc" @@ -28,6 +31,7 @@ import ( "github.com/harness/gitness/types" "github.com/google/wire" + v1 "github.com/opencontainers/image-spec/specs-go/v1" ) func LocalRegistryProvider( @@ -51,18 +55,21 @@ func ManifestServiceProvider( manifestRefDao store.ManifestReferenceRepository, tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository, layerDao store.LayerRepository, gcService gc.Service, tx dbtx.Transactor, reporter event.Reporter, spacePathStore gitnessstore.SpacePathStore, + ociImageIndexMappingDao store.OCIImageIndexMappingRepository, ) ManifestService { return NewManifestService( registryDao, manifestDao, blobRepo, mtRepository, tagDao, imageDao, artifactDao, layerDao, manifestRefDao, tx, gcService, reporter, spacePathStore, + ociImageIndexMappingDao, ) } func RemoteRegistryProvider( local *LocalRegistry, app *App, upstreamProxyConfigRepo store.UpstreamProxyConfigRepository, - spacePathStore gitnessstore.SpacePathStore, secretService secret.Service, + spacePathStore gitnessstore.SpacePathStore, secretService secret.Service, proxyCtrl proxy2.Controller, ) *RemoteRegistry { - return NewRemoteRegistry(local, app, upstreamProxyConfigRepo, spacePathStore, secretService).(*RemoteRegistry) + return NewRemoteRegistry(local, app, upstreamProxyConfigRepo, spacePathStore, secretService, + proxyCtrl).(*RemoteRegistry) } func ControllerProvider( @@ -83,8 +90,31 @@ func ProvideReporter() event.Reporter { return &event.Noop{} } +func ProvideProxyController( + registry *LocalRegistry, ms ManifestService, secretService secret.Service, + spacePathStore gitnessstore.SpacePathStore, +) proxy2.Controller { + manifestCacheHandler := getManifestCacheHandler(registry, ms) + return proxy2.NewProxyController(registry, ms, secretService, spacePathStore, manifestCacheHandler) +} + +func getManifestCacheHandler( + registry *LocalRegistry, ms ManifestService, +) map[string]proxy2.ManifestCacheHandler { + cache := proxy2.GetManifestCache(registry, ms) + listCache := proxy2.GetManifestListCache(registry) + + return map[string]proxy2.ManifestCacheHandler{ + manifestlist.MediaTypeManifestList: listCache, + v1.MediaTypeImageIndex: listCache, + schema2.MediaTypeManifest: cache, + proxy2.DefaultHandler: cache, + } +} + var ControllerSet = wire.NewSet(ControllerProvider) var RegistrySet = wire.NewSet(LocalRegistryProvider, ManifestServiceProvider, RemoteRegistryProvider) +var ProxySet = wire.NewSet(ProvideProxyController) var StorageServiceSet = wire.NewSet(StorageServiceProvider) var AppSet = wire.NewSet(NewApp) -var WireSet = wire.NewSet(ControllerSet, RegistrySet, StorageServiceSet, AppSet) +var WireSet = wire.NewSet(ControllerSet, RegistrySet, StorageServiceSet, AppSet, ProxySet) diff --git a/registry/app/remote/controller/proxy/controller.go b/registry/app/remote/controller/proxy/controller.go index c5dc4aa2e..54e53f60b 100644 --- a/registry/app/remote/controller/proxy/controller.go +++ b/registry/app/remote/controller/proxy/controller.go @@ -22,7 +22,6 @@ import ( "fmt" "io" "net/url" - "sync" "time" "github.com/harness/gitness/app/api/request" @@ -41,18 +40,13 @@ import ( const ( // wait more time than manifest (maxManifestWait) because manifest list depends on manifest ready. - maxManifestListWait = 20 - maxManifestWait = 10 - sleepIntervalSec = 20 + maxManifestWait = 10 + maxManifestMappingWait = 10 + maxManifestMappingIntervalSec = 10 + sleepIntervalSec = 20 // keep manifest list in cache for one week. ) -var ( - // Ctl is a global proxy controller instance. - ctl Controller - once sync.Once -) - // Controller defines the operations related with pull through proxy. type Controller interface { // UseLocalBlob check if the blob should use localRegistry copy. @@ -94,29 +88,25 @@ type Controller interface { } type controller struct { - localRegistry registryInterface - localManifestRegistry registryManifestInterface - secretService secret.Service - spacePathStore store.SpacePathStore + localRegistry registryInterface + localManifestRegistry registryManifestInterface + secretService secret.Service + spacePathStore store.SpacePathStore + manifestCacheHandlerMap map[string]ManifestCacheHandler } -// ControllerInstance -- get the proxy controller instance. -func ControllerInstance( +// NewProxyController -- get the proxy controller instance. +func NewProxyController( l registryInterface, lm registryManifestInterface, secretService secret.Service, - spacePathStore store.SpacePathStore, + spacePathStore store.SpacePathStore, manifestCacheHandlerMap map[string]ManifestCacheHandler, ) Controller { - once.Do( - func() { - ctl = &controller{ - localRegistry: l, - localManifestRegistry: lm, - secretService: secretService, - spacePathStore: spacePathStore, - } - }, - ) - - return ctl + return &controller{ + localRegistry: l, + localManifestRegistry: lm, + secretService: secretService, + spacePathStore: spacePathStore, + manifestCacheHandlerMap: manifestCacheHandlerMap, + } } func (c *controller) EnsureTag( @@ -133,7 +123,7 @@ func (c *controller) EnsureTag( return err[0] } - //Fixme: Need to properly pick tag. + // Fixme: Need to properly pick tag. e := c.localManifestRegistry.DBTag(ctx, mfst, desc.Digest, info.Reference, info.RegIdentifier, rsHeaders, info) if e != nil { log.Error().Err(e).Msgf("Error in ensuring tag: %s", e) @@ -147,7 +137,7 @@ func (c *controller) UseLocalBlob(ctx context.Context, art pkg.RegistryInfo) boo } // TODO: Get from Local storage. _, _, _, _, _, e := c.localRegistry.GetBlob(ctx, art) - return e == nil + return len(e) == 0 } // ManifestList ... @@ -177,16 +167,18 @@ func (c *controller) UseLocalManifest( remoteRepo := getRemoteRepo(art) exist, desc, err := remote.ManifestExist(remoteRepo, getReference(art)) // HEAD. - log.Info().Msgf("Manifest exist: %t %s %d %s", exist, desc.Digest.String(), desc.Size, desc.MediaType) // TODO: Check for rate limit error. if err != nil { if errors.IsRateLimitError(err) { // if rate limit, use localRegistry if it exists, otherwise return error. + log.Ctx(ctx).Warn().Msgf("Rate limit error: %v", err) return true, nil, nil } + log.Ctx(ctx).Warn().Msgf("Error in checking remote manifest exist: %v", err) return false, nil, err } + log.Info().Msgf("Manifest exist: %t %s %d %s", exist, desc.Digest.String(), desc.Size, desc.MediaType) - // TODO: Delete if does not exist on remote. + // TODO: Delete if does not exist on remote. Validate this if !exist || desc == nil { go func() { c.localRegistry.DeleteManifest(ctx, art) @@ -194,7 +186,7 @@ func (c *controller) UseLocalManifest( return false, nil, errors.NotFoundError(fmt.Errorf("registry %v, tag %v not found", art.RegIdentifier, art.Tag)) } - log.Info().Msgf("Manifest: %s %s", man, getReference(art)) + log.Info().Msgf("Manifest: %s", getReference(art)) mediaType, payload, _ := man.Payload() return true, &ManifestList{payload, d.Digest.String(), mediaType}, nil @@ -228,13 +220,13 @@ func (c *controller) ProxyManifest( } return man, err } - ct, payload, err := man.Payload() + ct, _, err := man.Payload() log.Info().Msgf("Content type: %s", ct) if err != nil { return man, err } - // Push manifest in background. + // This GoRoutine is to push the manifest from Remote to Local registry. go func(_, ct string) { session, _ := request.AuthSessionFrom(ctx) ctx2 := request.WithAuthSession(context.Background(), session) @@ -242,10 +234,13 @@ func (c *controller) ProxyManifest( for n := 0; n < maxManifestWait; n++ { time.Sleep(sleepIntervalSec * time.Second) count++ - log.Info().Msgf("Current retry=%v artifact: %v:%v", count, repoKey, imageName) + log.Ctx(ctx2).Info().Str("goRoutine", "UpdateManifest").Msgf("Current retry=%v artifact: %v:%v, digest: %s", + count, repoKey, imageName, + art.Digest) _, des, _, e := c.localRegistry.PullManifest(ctx2, art, acceptHeader, ifNoneMatchHeader) - if e != nil { - log.Info().Stack().Err(err).Msgf("failed to get manifest during remote cache update, error %v", err) + if len(e) > 0 { + log.Ctx(ctx2).Info().Str("goRoutine", + "UpdateManifest").Stack().Err(err).Msgf("Local manifest doesn't exist, error %v", e[0]) } // Push manifest to localRegistry when pull with digest, or artifact not found, or digest mismatch. errs := []error{} @@ -254,17 +249,21 @@ func (c *controller) ProxyManifest( if len(artInfo.Digest) == 0 { artInfo.Digest = dig } - // Push manifest to localRegistry. - _, errs = c.localRegistry.PutManifest(ctx2, art, ct, ByteToReadCloser(payload), int64(len(payload))) + + err = c.waitAndPushManifest(ctx2, art, ct, man) + if err != nil { + continue + } } // Query artifact after push. if e == nil || commons.IsEmpty(errs) { _, _, _, err := c.localRegistry.PullManifest(ctx2, art, acceptHeader, ifNoneMatchHeader) if err != nil { - log.Error().Stack().Msgf("failed to get manifest, error %v", err) + log.Ctx(ctx2).Error().Str("goRoutine", + "UpdateManifest").Stack().Msgf("failed to get manifest, error %v", err) } else { - log.Info().Msgf( + log.Ctx(ctx2).Info().Str("goRoutine", "UpdateManifest").Msgf( "Completed manifest push to localRegistry registry. Image: %s, Tag: %s, Digest: %s", art.Image, art.Tag, art.Digest, ) @@ -308,15 +307,27 @@ func (c *controller) ProxyBlob( return 0, nil, errcode.ErrorCodeBlobUnknown.WithDetail(art.Digest) } desc := manifest.Descriptor{Size: size, Digest: digest.Digest(art.Digest)} + + // This GoRoutine is to push the blob from Remote to Local registry. No retry logic is defined here. go func(art pkg.RegistryInfo) { // Cloning Context. - session, _ := request.AuthSessionFrom(ctx) + session, ok := request.AuthSessionFrom(ctx) + if !ok { + log.Error().Stack().Err(err).Msg("failed to get auth session from context") + return + } ctx2 := request.WithAuthSession(context.Background(), session) + ctx2 = log.Ctx(ctx2).With(). + Str("goRoutine", "AddBlob"). + Logger().WithContext(ctx2) err := c.putBlobToLocal(ctx2, art, remoteImage, repoKey, desc, rHelper) if err != nil { - log.Error().Stack().Err(err).Msgf("error while putting blob to localRegistry registry, %v", err) + log.Ctx(ctx2).Error().Str("goRoutine", + "AddBlob").Stack().Err(err).Msgf("error while putting blob to localRegistry registry, %v", err) + return } - log.Info().Msgf("Successfully updated the cache for digest %s", art.Digest) + log.Ctx(ctx2).Info().Str("goRoutine", "AddBlob").Msgf("Successfully updated the cache for digest %s", + art.Digest) }(art) return size, bReader, nil } @@ -358,6 +369,24 @@ func (c *controller) putBlobToLocal( return err } +func (c *controller) waitAndPushManifest( + ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest, +) error { + h, ok := c.manifestCacheHandlerMap[contentType] + if !ok { + h, ok = c.manifestCacheHandlerMap[DefaultHandler] + if !ok { + return fmt.Errorf("failed to get default manifest cache handler") + } + } + err := h.CacheContent(ctx, art, contentType, man) + if err != nil { + log.Error().Stack().Err(err).Msgf("Error in caching manifest: %s", err) + return err + } + return nil +} + func getRemoteRepo(art pkg.RegistryInfo) string { return art.Image } @@ -368,3 +397,112 @@ func getReference(art pkg.RegistryInfo) string { } return art.Tag } + +const DefaultHandler = "default" + +// ManifestCache default Manifest handler. +type ManifestCache struct { + localRegistry registryInterface + localManifestRegistry registryManifestInterface +} + +func GetManifestCache(localRegistry registryInterface, localManifestRegistry registryManifestInterface) *ManifestCache { + return &ManifestCache{ + localRegistry: localRegistry, + localManifestRegistry: localManifestRegistry, + } +} + +// ManifestListCache handle Manifest list type and index type. +type ManifestListCache struct { + localRegistry registryInterface +} + +func GetManifestListCache(localRegistry registryInterface) *ManifestListCache { + return &ManifestListCache{localRegistry: localRegistry} +} + +// ManifestCacheHandler define how to cache manifest content. +type ManifestCacheHandler interface { + // CacheContent - cache the content of the manifest + CacheContent(ctx context.Context, art pkg.RegistryInfo, contentType string, m manifest.Manifest) error +} + +func (m *ManifestCache) CacheContent( + ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest, +) error { + _, payload, err := man.Payload() + if err != nil { + return err + } + // Push manifest to localRegistry. + _, errs := m.localRegistry.PutManifest(ctx, art, contentType, ByteToReadCloser(payload), int64(len(payload))) + if len(errs) > 0 { + return errs[0] + } + + for n := 0; n < maxManifestMappingWait; n++ { + time.Sleep(maxManifestMappingIntervalSec * time.Second) + err = m.localManifestRegistry.AddManifestAssociation(ctx, art.RegIdentifier, digest.Digest(art.Digest), art) + if err != nil { + log.Error().Stack().Err(err).Msgf("failed to add manifest association, error %v", err) + continue + } + return nil + } + log.Ctx(ctx).Info().Msgf("Successfully cached manifest for image: %s, tag: %s, digest: %s", + art.Image, art.Tag, art.Digest) + return err +} + +func (m *ManifestListCache) CacheContent( + ctx context.Context, art pkg.RegistryInfo, contentType string, man manifest.Manifest, +) error { + _, payload, err := man.Payload() + if err != nil { + log.Error().Msg("failed to get payload") + return err + } + if len(getReference(art)) == 0 { + log.Error().Msg("failed to get reference, reference is empty, skip to cache manifest list") + return fmt.Errorf("failed to get reference, reference is empty, skip to cache manifest list") + } + // cache key should contain digest if digest exist + if len(art.Digest) == 0 { + art.Digest = string(digest.FromBytes(payload)) + } + + if err = m.push(ctx, art, man, contentType); err != nil { + log.Error().Msgf("error when push manifest list to local :%v", err) + return err + } + log.Ctx(ctx).Info().Msgf("Successfully cached manifest list for image: %s, tag: %s, digest: %s", + art.Image, art.Tag, art.Digest) + return nil +} + +func (m *ManifestListCache) push( + ctx context.Context, art pkg.RegistryInfo, man manifest.Manifest, contentType string, +) error { + if len(man.References()) == 0 { + return errors.New("manifest list doesn't contain any pushed manifest") + } + _, pl, err := man.Payload() + if err != nil { + log.Error().Msgf("failed to get payload, error %v", err) + return err + } + log.Debug().Msgf("The manifest list payload: %v", string(pl)) + newDig := digest.FromBytes(pl) + // Because the manifest list maybe updated, need to recheck if it is exist in local + _, descriptor, manifest2, _ := m.localRegistry.PullManifest(ctx, art, nil, nil) + if manifest2 != nil && descriptor.Digest == newDig { + return nil + } + + _, errs := m.localRegistry.PutManifest(ctx, art, contentType, ByteToReadCloser(pl), int64(len(pl))) + if len(errs) > 0 { + return errs[0] + } + return nil +} diff --git a/registry/app/remote/controller/proxy/local.go b/registry/app/remote/controller/proxy/local.go index c4702576f..64e6aa6d1 100644 --- a/registry/app/remote/controller/proxy/local.go +++ b/registry/app/remote/controller/proxy/local.go @@ -86,4 +86,5 @@ type registryManifestInterface interface { headers *commons.ResponseHeaders, info pkg.RegistryInfo, ) error + AddManifestAssociation(ctx context.Context, repoKey string, digest digest.Digest, info pkg.RegistryInfo) error } diff --git a/registry/app/store/database.go b/registry/app/store/database.go index b26885a5c..a991feb54 100644 --- a/registry/app/store/database.go +++ b/registry/app/store/database.go @@ -144,6 +144,13 @@ type ManifestReferenceRepository interface { ) error } +type OCIImageIndexMappingRepository interface { + Create(ctx context.Context, ociManifest *types.OCIImageIndexMapping) error + GetAllByChildDigest(ctx context.Context, registryID int64, imageName string, childDigest types.Digest) ( + []*types.OCIImageIndexMapping, error, + ) +} + type LayerRepository interface { AssociateLayerBlob(ctx context.Context, m *types.Manifest, b *types.Blob) error } diff --git a/registry/app/store/database/oci_image_index_mapping.go b/registry/app/store/database/oci_image_index_mapping.go new file mode 100644 index 000000000..badf779d5 --- /dev/null +++ b/registry/app/store/database/oci_image_index_mapping.go @@ -0,0 +1,179 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package database + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/harness/gitness/app/api/request" + "github.com/harness/gitness/registry/app/store" + "github.com/harness/gitness/registry/app/store/database/util" + "github.com/harness/gitness/registry/types" + store2 "github.com/harness/gitness/store" + databaseg "github.com/harness/gitness/store/database" + "github.com/harness/gitness/store/database/dbtx" + + "github.com/jmoiron/sqlx" + "github.com/rs/zerolog/log" +) + +type ociImageIndexMappingDao struct { + db *sqlx.DB +} + +func NewOCIImageIndexMappingDao(db *sqlx.DB) store.OCIImageIndexMappingRepository { + return &ociImageIndexMappingDao{ + db: db, + } +} + +type ociImageIndexMappingDB struct { + ID int64 `db:"oci_mapping_id"` + ParentManifestID int64 `db:"oci_mapping_parent_manifest_id"` + ChildDigest []byte `db:"oci_mapping_child_digest"` + CreatedAt int64 `db:"oci_mapping_created_at"` + UpdatedAt int64 `db:"oci_mapping_updated_at"` + CreatedBy int64 `db:"oci_mapping_created_by"` + UpdatedBy int64 `db:"oci_mapping_updated_by"` +} + +func (dao *ociImageIndexMappingDao) Create( + ctx context.Context, + ociManifest *types.OCIImageIndexMapping, +) error { + const sqlQuery = ` + INSERT INTO oci_image_index_mappings ( + oci_mapping_parent_manifest_id, + oci_mapping_child_digest, + oci_mapping_created_at, + oci_mapping_updated_at, + oci_mapping_created_by, + oci_mapping_updated_by + ) VALUES ( + :oci_mapping_parent_manifest_id, + :oci_mapping_child_digest, + :oci_mapping_created_at, + :oci_mapping_updated_at, + :oci_mapping_created_by, + :oci_mapping_updated_by + ) ON CONFLICT (oci_mapping_parent_manifest_id, oci_mapping_child_digest) + DO NOTHING + RETURNING oci_mapping_id` + + db := dbtx.GetAccessor(ctx, dao.db) + internalManifest := mapToInternalOCIMapping(ctx, ociManifest) + query, args, err := db.BindNamed(sqlQuery, internalManifest) + if err != nil { + return databaseg.ProcessSQLErrorf(ctx, err, "Bind query failed") + } + + if err = db.QueryRowContext(ctx, query, args...).Scan(&ociManifest.ID); err != nil { + err = databaseg.ProcessSQLErrorf(ctx, err, "QueryRowContext failed") + if errors.Is(err, store2.ErrDuplicate) { + return nil + } + return fmt.Errorf("inserting OCI image index mapping: %w", err) + } + return nil +} + +func (dao *ociImageIndexMappingDao) GetAllByChildDigest( + ctx context.Context, registryID int64, imageName string, childDigest types.Digest, +) ([]*types.OCIImageIndexMapping, error) { + digestBytes, err := util.GetHexDecodedBytes(string(childDigest)) + if err != nil { + return nil, fmt.Errorf("failed to get digest bytes: %w", err) + } + const sqlQuery = ` + SELECT + oci_mapping_id, + oci_mapping_parent_manifest_id, + oci_mapping_child_digest, + oci_mapping_created_at, + oci_mapping_updated_at, + oci_mapping_created_by, + oci_mapping_updated_by + FROM + oci_image_index_mappings + JOIN manifests ON manifests.manifest_id = oci_image_index_mappings.oci_mapping_parent_manifest_id + WHERE + manifest_registry_id = $1 AND + manifest_image_name = $2 AND + oci_mapping_child_digest = $3` + + db := dbtx.GetAccessor(ctx, dao.db) + rows, err := db.QueryxContext(ctx, sqlQuery, registryID, imageName, digestBytes) + if err != nil || rows.Err() != nil { + return nil, databaseg.ProcessSQLErrorf(ctx, err, "QueryxContext failed") + } + defer rows.Close() + + var manifests []*types.OCIImageIndexMapping + for rows.Next() { + var dbManifest ociImageIndexMappingDB + if err := rows.StructScan(&dbManifest); err != nil { + return nil, databaseg.ProcessSQLErrorf(ctx, err, "StructScan failed") + } + manifests = append(manifests, mapToExternalOCIManifest(&dbManifest)) + } + return manifests, nil +} + +func mapToInternalOCIMapping(ctx context.Context, in *types.OCIImageIndexMapping) *ociImageIndexMappingDB { + if in.CreatedAt.IsZero() { + in.CreatedAt = time.Now() + } + in.UpdatedAt = time.Now() + session, _ := request.AuthSessionFrom(ctx) + if in.CreatedBy == 0 { + in.CreatedBy = session.Principal.ID + } + in.UpdatedBy = session.Principal.ID + childBytes, err := types.GetDigestBytes(in.ChildManifestDigest) + if err != nil { + log.Error().Msgf("failed to get digest bytes: %v", err) + } + + return &ociImageIndexMappingDB{ + ID: in.ID, + ParentManifestID: in.ParentManifestID, + ChildDigest: childBytes, + CreatedAt: in.CreatedAt.UnixMilli(), + UpdatedAt: in.UpdatedAt.UnixMilli(), + CreatedBy: in.CreatedBy, + UpdatedBy: in.UpdatedBy, + } +} + +func mapToExternalOCIManifest(in *ociImageIndexMappingDB) *types.OCIImageIndexMapping { + childDgst := types.Digest(util.GetHexEncodedString(in.ChildDigest)) + parsedChildDigest, err := childDgst.Parse() + if err != nil { + log.Error().Msgf("failed to child parse digest: %v", err) + } + + return &types.OCIImageIndexMapping{ + ID: in.ID, + ParentManifestID: in.ParentManifestID, + ChildManifestDigest: parsedChildDigest, + CreatedAt: time.UnixMilli(in.CreatedAt), + UpdatedAt: time.UnixMilli(in.UpdatedAt), + CreatedBy: in.CreatedBy, + UpdatedBy: in.UpdatedBy, + } +} diff --git a/registry/app/store/database/wire.go b/registry/app/store/database/wire.go index af13fffb6..293043c18 100644 --- a/registry/app/store/database/wire.go +++ b/registry/app/store/database/wire.go @@ -75,6 +75,10 @@ func ProvideManifestRefDao(db *sqlx.DB) store.ManifestReferenceRepository { return NewManifestReferenceDao(db) } +func ProvideOCIImageIndexMappingDao(db *sqlx.DB) store.OCIImageIndexMappingRepository { + return NewOCIImageIndexMappingDao(db) +} + func ProvideLayerDao(db *sqlx.DB, mtRepository store.MediaTypesRepository) store.LayerRepository { return NewLayersDao(db, mtRepository) } @@ -93,6 +97,7 @@ var WireSet = wire.NewSet( ProvideManifestDao, ProvideCleanupPolicyDao, ProvideManifestRefDao, + ProvideOCIImageIndexMappingDao, ProvideLayerDao, ProvideImageDao, ProvideArtifactDao, diff --git a/registry/types/oci_Image_index_mapping.go b/registry/types/oci_Image_index_mapping.go new file mode 100644 index 000000000..5e308690a --- /dev/null +++ b/registry/types/oci_Image_index_mapping.go @@ -0,0 +1,31 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +import ( + "time" + + "github.com/opencontainers/go-digest" +) + +type OCIImageIndexMapping struct { + ID int64 + ParentManifestID int64 + ChildManifestDigest digest.Digest + CreatedAt time.Time + UpdatedAt time.Time + CreatedBy int64 + UpdatedBy int64 +}