[fix]: [AH-388]: Move tracking of Bandwidth and Download Stats to Middleware (#2800)

* [fix]: [AH-388]: fix lint
* [fix]: [AH-388]: fix lint
* [fix]: [AH-388]: fix lint
* Merge branch 'main' of https://git0.harness.io/l7B_kbSEQD2wjrM7PShm5w/PROD/Harness_Commons/gitness into fix-stats
* [fix]: [AH-388]: fix merge conflict
* Merge branch 'main' of https://git0.harness.io/l7B_kbSEQD2wjrM7PShm5w/PROD/Harness_Commons/gitness into fix-stats
* fixed upstream proxy issues
* Merge branch 'main' of https://git0.harness.io/l7B_kbSEQD2wjrM7PShm5w/PROD/Harness_Commons/gitness into fix-stats
* [fix]: [AH-388]: fix image and artifact upsert
* [fix]: [AH-388]: fix dependency
* [fix]: [AH-388]: Fix Download Stats
* Merge branch 'main' of https://git0.harness.io/l7B_kbSEQD2wjrM7PShm5w/PROD/Harness_Commons/gitness into fix-stats
* [fix]: [AH-388]: Fix Download Stats
* [fix]: [AH-388]: Fix Bandwidth Stats
devcontainer-setup
Pragyesh Mishra 2024-10-28 13:52:24 +00:00 committed by Harness
parent 0d84fa85ff
commit e5d5ce6a04
25 changed files with 468 additions and 243 deletions

View File

@ -456,7 +456,8 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
proxyController := docker.ProvideProxyController(localRegistry, manifestService, secretService, spacePathStore)
remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spacePathStore, secretService, proxyController)
coreController := pkg.CoreControllerProvider(registryRepository)
dockerController := docker.ControllerProvider(localRegistry, remoteRegistry, coreController, spaceStore, authorizer)
dbStore := docker.DBStoreProvider(blobRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository)
dockerController := docker.ControllerProvider(localRegistry, remoteRegistry, coreController, spaceStore, authorizer, dbStore)
handler := api2.NewHandlerProvider(dockerController, spaceStore, tokenStore, controller, authenticator, provider, authorizer)
registryOCIHandler := router.OCIHandlerProvider(handler)
cleanupPolicyRepository := database2.ProvideCleanupPolicyDao(db, transactor)

View File

@ -176,7 +176,7 @@ func handleErrors(ctx context.Context, errors errcode.Errors, w http.ResponseWri
}
}
func (h *Handler) getRegistryInfo(r *http.Request, remoteSupport bool) (pkg.RegistryInfo, error) {
func (h *Handler) GetRegistryInfo(r *http.Request, remoteSupport bool) (pkg.RegistryInfo, error) {
ctx := r.Context()
queryParams := r.URL.Query()
path := r.URL.Path

View File

@ -21,7 +21,7 @@ import (
)
func (h *Handler) DeleteBlob(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -21,7 +21,7 @@ import (
)
func (h *Handler) CancelBlobUpload(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -25,7 +25,7 @@ import (
// PutManifest validates and stores a manifest in the registry.
func (h *Handler) DeleteManifest(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -30,7 +30,7 @@ import (
func (h *Handler) GetBlob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
info, err := h.getRegistryInfo(r, true)
info, err := h.GetRegistryInfo(r, true)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -21,7 +21,7 @@ import (
)
func (h *Handler) GetUploadBlobStatus(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -27,7 +27,7 @@ import (
// GetManifest fetches the image manifest from the storage backend, if it exists.
func (h *Handler) GetManifest(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
info, err := h.getRegistryInfo(r, true)
info, err := h.GetRegistryInfo(r, true)
if err != nil {
handleErrors(ctx, errcode.Errors{err}, w)
return

View File

@ -22,7 +22,7 @@ import (
)
func (h *Handler) GetReferrers(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -27,7 +27,7 @@ import (
func (h *Handler) GetTags(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(ctx, []error{err}, w)
return

View File

@ -21,7 +21,7 @@ import (
)
func (h *Handler) HeadBlob(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -24,7 +24,7 @@ import (
// HeadManifest fetches the image manifest from the storage backend, if it exists.
func (h *Handler) HeadManifest(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, true)
info, err := h.GetRegistryInfo(r, true)
if err != nil {
handleErrors(r.Context(), errcode.Errors{err}, w)
return

View File

@ -21,7 +21,7 @@ import (
)
func (h *Handler) PatchBlobUpload(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -21,7 +21,7 @@ import (
)
func (h *Handler) InitiateUploadBlob(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -21,7 +21,7 @@ import (
)
func (h *Handler) CompleteBlobUpload(w http.ResponseWriter, r *http.Request) {
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -29,7 +29,7 @@ const (
// PutManifest validates and stores a manifest in the registry.
func (h *Handler) PutManifest(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
info, err := h.getRegistryInfo(r, false)
info, err := h.GetRegistryInfo(r, false)
if err != nil {
handleErrors(r.Context(), []error{err}, w)
return

View File

@ -0,0 +1,152 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package middleware
import (
"context"
"errors"
"net/http"
"github.com/harness/gitness/registry/app/api/handler/oci"
"github.com/harness/gitness/registry/app/api/router/utils"
"github.com/harness/gitness/registry/app/pkg"
"github.com/harness/gitness/registry/app/pkg/docker"
"github.com/harness/gitness/registry/types"
"github.com/harness/gitness/store"
"github.com/opencontainers/go-digest"
"github.com/rs/zerolog/log"
)
type StatusWriter struct {
http.ResponseWriter
StatusCode int
}
func (w *StatusWriter) WriteHeader(code int) {
w.StatusCode = code
w.ResponseWriter.WriteHeader(code)
}
func (w *StatusWriter) Write(p []byte) (n int, err error) {
n, err = w.ResponseWriter.Write(p)
if w.StatusCode == 0 {
w.StatusCode = http.StatusOK
}
return
}
func TrackBandwidthStat(h *oci.Handler) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
methodType := r.Method
requestType := utils.GetRouteTypeV2(path)
sw := &StatusWriter{ResponseWriter: w}
bandwidthType := types.BandwidthTypeUPLOAD
//nolint:gocritic
if utils.Blobs == requestType && http.MethodGet == methodType {
next.ServeHTTP(sw, r)
bandwidthType = types.BandwidthTypeDOWNLOAD
} else if utils.BlobsUploadsSession == requestType && http.MethodPut == methodType {
next.ServeHTTP(sw, r)
} else {
next.ServeHTTP(w, r)
return
}
if types.BandwidthTypeUPLOAD == bandwidthType && sw.StatusCode != http.StatusCreated {
return
} else if types.BandwidthTypeDOWNLOAD == bandwidthType && sw.StatusCode != http.StatusOK {
return
}
ctx := r.Context()
info, err := h.GetRegistryInfo(r, true)
if err != nil {
log.Ctx(ctx).Error().Stack().Str("middleware",
"TrackBandwidthStat").Err(err).Msgf("error while putting bandwidth stat of artifact, %v",
err)
return
}
err = dbBandwidthStat(ctx, h.Controller, info, bandwidthType)
if err != nil {
log.Ctx(ctx).Error().Stack().Str("middleware",
"TrackBandwidthStat").Err(err).Msgf("error while putting bandwidth stat of artifact, %v",
err)
return
}
},
)
}
}
func dbBandwidthStat(
ctx context.Context,
c *docker.Controller,
info pkg.RegistryInfo,
bandwidthType types.BandwidthType,
) error {
dgst := digest.Digest(info.Digest)
registry, err := c.RegistryDao.GetByParentIDAndName(ctx, info.ParentID, info.RegIdentifier)
if err != nil {
return err
}
blob, err := c.DBStore.BlobRepo.FindByDigestAndRootParentID(ctx, dgst, info.RootParentID)
if err != nil {
return err
}
image, err := c.DBStore.ImageDao.GetByName(ctx, registry.ID, info.Image)
if errors.Is(err, store.ErrResourceNotFound) {
image, err = getImageFromUpstreamProxy(ctx, c, info)
}
if err != nil {
return err
}
bandwidthStat := &types.BandwidthStat{
ImageID: image.ID,
Type: bandwidthType,
Bytes: blob.Size,
}
if err := c.DBStore.BandwidthStatDao.Create(ctx, bandwidthStat); err != nil {
return err
}
return nil
}
func getImageFromUpstreamProxy(ctx context.Context, c *docker.Controller, info pkg.RegistryInfo) (*types.Image, error) {
repos, err := c.GetOrderedRepos(ctx, info.RegIdentifier, info)
if err != nil {
return nil, err
}
for _, registry := range repos {
log.Ctx(ctx).Info().Msgf("Using Repository: %s, Type: %s", registry.Name, registry.Type)
image, err := c.DBStore.ImageDao.GetByName(ctx, registry.ID, info.Image)
if err == nil && image != nil {
return image, nil
}
}
//nolint:nilnil
return nil, nil
}

View File

@ -0,0 +1,112 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package middleware
import (
"context"
"errors"
"net/http"
"github.com/harness/gitness/registry/app/api/handler/oci"
"github.com/harness/gitness/registry/app/api/router/utils"
"github.com/harness/gitness/registry/app/pkg"
"github.com/harness/gitness/registry/app/pkg/docker"
"github.com/harness/gitness/registry/types"
"github.com/harness/gitness/store"
"github.com/opencontainers/go-digest"
"github.com/rs/zerolog/log"
)
func TrackDownloadStat(h *oci.Handler) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
methodType := r.Method
requestType := utils.GetRouteTypeV2(path)
sw := &StatusWriter{ResponseWriter: w}
if utils.Manifests == requestType && http.MethodGet == methodType {
next.ServeHTTP(sw, r)
} else {
next.ServeHTTP(w, r)
return
}
if sw.StatusCode != http.StatusOK {
return
}
ctx := r.Context()
info, err := h.GetRegistryInfo(r, true)
if err != nil {
log.Ctx(ctx).Error().Stack().Str("middleware",
"TrackDownloadStat").Err(err).Msgf("error while putting download stat of artifact, %v",
err)
return
}
err = dbDownloadStat(ctx, h.Controller, info)
if err != nil {
log.Ctx(ctx).Error().Stack().Str("middleware",
"TrackDownloadStat").Err(err).Msgf("error while putting download stat of artifact, %v",
err)
return
}
},
)
}
}
func dbDownloadStat(
ctx context.Context,
c *docker.Controller,
info pkg.RegistryInfo,
) error {
registry, err := c.RegistryDao.GetByParentIDAndName(ctx, info.ParentID, info.RegIdentifier)
if err != nil {
return err
}
image, err := c.DBStore.ImageDao.GetByName(ctx, registry.ID, info.Image)
if errors.Is(err, store.ErrResourceNotFound) {
image, err = getImageFromUpstreamProxy(ctx, c, info)
}
if err != nil {
return err
}
dgst, err := types.NewDigest(digest.Digest(info.Digest))
if err != nil {
return err
}
artifact, err := c.DBStore.ArtifactDao.GetByName(ctx, image.ID, dgst.String())
if err != nil {
return err
}
downloadStat := &types.DownloadStat{
ArtifactID: artifact.ID,
}
if err := c.DBStore.DownloadStatDao.Create(ctx, downloadStat); err != nil {
return err
}
return nil
}

View File

@ -16,56 +16,17 @@ package oci
import (
"net/http"
"strings"
middlewareauthn "github.com/harness/gitness/app/api/middleware/authn"
"github.com/harness/gitness/registry/app/api/handler/oci"
"github.com/harness/gitness/registry/app/api/middleware"
"github.com/harness/gitness/registry/app/api/router/utils"
"github.com/harness/gitness/registry/app/common"
"github.com/go-chi/chi/v5"
"github.com/rs/zerolog/log"
)
type RouteType string
const (
Manifests RouteType = "manifests" // /v2/:registry/:image/manifests/:reference.
Blobs RouteType = "blobs" // /v2/:registry/:image/blobs/:digest.
BlobsUploadsSession RouteType = "blob-uploads-session" // /v2/:registry/:image/blobs/uploads/:session_id.
Tags RouteType = "tags" // /v2/:registry/:image/tags/list.
Referrers RouteType = "referrers" // /v2/:registry/:image/referrers/:digest.
Invalid RouteType = "invalid" // Invalid route.
// Add other route types here.
)
func GetRouteTypeV2(url string) RouteType {
url = strings.Trim(url, "/")
segments := strings.Split(url, "/")
if len(segments) < 4 {
return Invalid
}
typ := segments[len(segments)-2]
switch typ {
case "manifests":
return Manifests
case "blobs":
if segments[len(segments)-1] == "uploads" {
return BlobsUploadsSession
}
return Blobs
case "uploads":
return BlobsUploadsSession
case "tags":
return Tags
case "referrers":
return Referrers
}
return Invalid
}
type HandlerBlock struct {
Handler2 http.HandlerFunc
RemoteSupport bool
@ -85,29 +46,29 @@ type RegistryOCIHandler interface {
func NewOCIHandler(handlerV2 *oci.Handler) RegistryOCIHandler {
r := chi.NewRouter()
var routeHandlers = map[RouteType]map[string]HandlerBlock{
Manifests: {
var routeHandlers = map[utils.RouteType]map[string]HandlerBlock{
utils.Manifests: {
http.MethodGet: NewHandlerBlock2(handlerV2.GetManifest, true),
http.MethodHead: NewHandlerBlock2(handlerV2.HeadManifest, true),
http.MethodPut: NewHandlerBlock2(handlerV2.PutManifest, false),
http.MethodDelete: NewHandlerBlock2(handlerV2.DeleteManifest, false),
},
Blobs: {
utils.Blobs: {
http.MethodGet: NewHandlerBlock2(handlerV2.GetBlob, true),
http.MethodHead: NewHandlerBlock2(handlerV2.HeadBlob, false),
http.MethodDelete: NewHandlerBlock2(handlerV2.DeleteBlob, false),
},
BlobsUploadsSession: {
utils.BlobsUploadsSession: {
http.MethodGet: NewHandlerBlock2(handlerV2.GetUploadBlobStatus, false),
http.MethodPatch: NewHandlerBlock2(handlerV2.PatchBlobUpload, false),
http.MethodPut: NewHandlerBlock2(handlerV2.CompleteBlobUpload, false),
http.MethodDelete: NewHandlerBlock2(handlerV2.CancelBlobUpload, false),
http.MethodPost: NewHandlerBlock2(handlerV2.InitiateUploadBlob, false),
},
Tags: {
utils.Tags: {
http.MethodGet: NewHandlerBlock2(handlerV2.GetTags, false),
},
Referrers: {
utils.Referrers: {
http.MethodGet: NewHandlerBlock2(handlerV2.GetReferrers, false),
},
}
@ -121,14 +82,18 @@ func NewOCIHandler(handlerV2 *oci.Handler) RegistryOCIHandler {
Get("/", func(w http.ResponseWriter, req *http.Request) {
handlerV2.APIBase(w, req)
})
r.Route("/{registryIdentifier}", func(r chi.Router) {
r.Use(middleware.OciCheckAuth(common.GenerateOciTokenURL(handlerV2.URLProvider.RegistryURL())))
r.Use(middleware.BlockNonOciSourceToken(handlerV2.URLProvider.RegistryURL()))
r.Use(middleware.TrackDownloadStat(handlerV2))
r.Use(middleware.TrackBandwidthStat(handlerV2))
r.Handle("/*", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
path := req.URL.Path
methodType := req.Method
requestType := GetRouteTypeV2(path)
requestType := utils.GetRouteTypeV2(path)
if _, ok := routeHandlers[requestType]; ok {
if h, ok2 := routeHandlers[requestType][methodType]; ok2 {

View File

@ -0,0 +1,56 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import "strings"
type RouteType string
const (
Manifests RouteType = "manifests" // /v2/:registry/:image/manifests/:reference.
Blobs RouteType = "blobs" // /v2/:registry/:image/blobs/:digest.
BlobsUploadsSession RouteType = "blob-uploads-session" // /v2/:registry/:image/blobs/uploads/:session_id.
Tags RouteType = "tags" // /v2/:registry/:image/tags/list.
Referrers RouteType = "referrers" // /v2/:registry/:image/referrers/:digest.
Invalid RouteType = "invalid" // Invalid route.
// Add other route types here.
)
func GetRouteTypeV2(url string) RouteType {
url = strings.Trim(url, "/")
segments := strings.Split(url, "/")
if len(segments) < 4 {
return Invalid
}
typ := segments[len(segments)-2]
switch typ {
case "manifests":
return Manifests
case "blobs":
if segments[len(segments)-1] == "uploads" {
return BlobsUploadsSession
}
return Blobs
case "uploads":
return BlobsUploadsSession
case "tags":
return Tags
case "referrers":
return Referrers
}
return Invalid
}

View File

@ -34,6 +34,7 @@ import (
"github.com/harness/gitness/registry/app/pkg"
"github.com/harness/gitness/registry/app/pkg/commons"
"github.com/harness/gitness/registry/app/storage"
"github.com/harness/gitness/registry/app/store"
registrytypes "github.com/harness/gitness/registry/types"
"github.com/harness/gitness/types/enum"
@ -47,6 +48,15 @@ type Controller struct {
remote *RemoteRegistry
spaceStore corestore.SpaceStore
authorizer authz.Authorizer
DBStore *DBStore
}
type DBStore struct {
BlobRepo store.BlobRepository
ImageDao store.ImageRepository
ArtifactDao store.ArtifactRepository
BandwidthStatDao store.BandwidthStatRepository
DownloadStatDao store.DownloadStatRepository
}
type TagsAPIResponse struct {
@ -63,6 +73,7 @@ func NewController(
coreController *pkg.CoreController,
spaceStore corestore.SpaceStore,
authorizer authz.Authorizer,
dBStore *DBStore,
) *Controller {
c := &Controller{
CoreController: coreController,
@ -70,6 +81,7 @@ func NewController(
remote: remote,
spaceStore: spaceStore,
authorizer: authorizer,
DBStore: dBStore,
}
pkg.TypeRegistry[pkg.LocalRegistry] = local
@ -77,6 +89,22 @@ func NewController(
return c
}
func NewDBStore(
blobRepo store.BlobRepository,
imageDao store.ImageRepository,
artifactDao store.ArtifactRepository,
bandwidthStatDao store.BandwidthStatRepository,
downloadStatDao store.DownloadStatRepository,
) *DBStore {
return &DBStore{
BlobRepo: blobRepo,
ImageDao: imageDao,
ArtifactDao: artifactDao,
BandwidthStatDao: bandwidthStatDao,
DownloadStatDao: downloadStatDao,
}
}
func isEmpty(slice interface{}) bool {
if slice == nil {
return true

View File

@ -34,7 +34,6 @@ import (
"strings"
"time"
"github.com/harness/gitness/app/api/request"
"github.com/harness/gitness/app/paths"
"github.com/harness/gitness/registry/app/dist_temp/dcontext"
"github.com/harness/gitness/registry/app/dist_temp/errcode"
@ -403,25 +402,6 @@ func (r *LocalRegistry) fetchBlobInternal(
}
return responseHeaders, nil, -1, nil, "", errs
}
if http.MethodGet == method {
// This GoRoutine is used to update the bandwidth stat of the artifact
go func(art pkg.RegistryInfo, dgst digest.Digest) {
// Cloning Context.
session, _ := request.AuthSessionFrom(ctx)
ctx3 := request.WithAuthSession(context.Background(), session)
err := r.dbBlobDownloadComplete(ctx3, dgst, info)
if err != nil {
log.Ctx(ctx3).Error().Stack().Str("goRoutine",
"UpdateBandwidth").Err(err).Msgf("error while putting bandwidth stat of artifact, %v",
err)
return
}
log.Ctx(ctx3).Info().Str("goRoutine",
"UpdateBandwidth").Msgf("Successfully updated the bandwidth stat metrics %s", art.Digest)
}(info, dgst)
}
if redirectURL != "" {
return responseHeaders, nil, -1, nil, redirectURL, errs
}
@ -440,24 +420,6 @@ func (r *LocalRegistry) PullManifest(
ifNoneMatchHeader []string,
) (responseHeaders *commons.ResponseHeaders, descriptor manifest.Descriptor, manifest manifest.Manifest, errs []error) {
responseHeaders, descriptor, manifest, errs = r.ManifestExist(ctx, artInfo, acceptHeaders, ifNoneMatchHeader)
// This GoRoutine is used to update the download stat of the artifact when manifest is pulled
go func(art pkg.RegistryInfo) {
// Cloning Context.
session, _ := request.AuthSessionFrom(ctx)
ctx2 := request.WithAuthSession(context.Background(), session)
ctx2 = log.Ctx(ctx2).With().
Str("goRoutine", "UpdateDownload").
Logger().WithContext(ctx2)
err := r.dbGetManifestComplete(ctx2, artInfo)
if err != nil {
log.Ctx(ctx2).Error().Str("goRoutine",
"UpdateDownload").Stack().Err(err).Msgf("error while putting download stat of artifact, %v", err)
return
}
log.Ctx(ctx2).Info().Str("goRoutine",
"UpdateDownload").Msgf("Successfully updated the download stat metrics %s", art.Digest)
}(artInfo)
return responseHeaders, descriptor, manifest, errs
}
@ -1616,98 +1578,6 @@ func (r *LocalRegistry) dbBlobLinkExists(
return nil
}
func (r *LocalRegistry) dbBlobDownloadComplete(
ctx context.Context,
dgst digest.Digest,
info pkg.RegistryInfo,
) error {
err := r.tx.WithTx(
ctx, func(ctx context.Context) error {
registry, err := r.registryDao.GetByParentIDAndName(ctx, info.ParentID, info.RegIdentifier)
if err != nil {
return err
}
blob, err := r.blobRepo.FindByDigestAndRootParentID(ctx, dgst, info.RootParentID)
if err != nil {
return err
}
image, err := r.imageDao.GetByName(ctx, registry.ID, info.Image)
if err != nil {
return err
}
bandwidthStat := &types.BandwidthStat{
ImageID: image.ID,
Type: types.BandwidthTypeDOWNLOAD,
Bytes: blob.Size,
}
if err := r.bandwidthStatDao.Create(ctx, bandwidthStat); err != nil {
return err
}
return nil
}, dbtx.TxDefault,
)
if err != nil {
log.Error().Msgf("failed to put download bandwidth stat in database: %v", err)
return fmt.Errorf("committing database transaction: %w", err)
}
return nil
}
func (r *LocalRegistry) dbGetManifestComplete(
ctx context.Context,
info pkg.RegistryInfo,
) error {
// FIXME: Update logic incase requests are internal. Currently, we are updating the stats for all requests.
if info.Digest == "" {
return nil
}
err := r.tx.WithTx(
ctx, func(ctx context.Context) error {
registry, err := r.registryDao.GetByParentIDAndName(ctx, info.ParentID, info.RegIdentifier)
if err != nil {
return err
}
image, err := r.imageDao.GetByName(ctx, registry.ID, info.Image)
if err != nil {
return err
}
newDigest, err := types.NewDigest(digest.Digest(info.Digest))
if err != nil {
log.Ctx(ctx).Error().Stack().Err(err).Msgf("error parsing digest: %s %v", info.Digest, err)
}
artifact, err := r.artifactDao.GetByName(ctx, image.ID, newDigest.String())
if err != nil {
return err
}
downloadStat := &types.DownloadStat{
ArtifactID: artifact.ID,
}
if err := r.downloadStatDao.Create(ctx, downloadStat); err != nil {
return err
}
return nil
}, dbtx.TxDefault,
)
if err != nil {
log.Error().Msgf("failed to put download stat in database: %v", err)
return fmt.Errorf("committing database transaction: %w", err)
}
return nil
}
func (r *LocalRegistry) dbPutBlobUploadComplete(
ctx context.Context,
repoName string,
@ -1740,23 +1610,8 @@ func (r *LocalRegistry) dbPutBlobUploadComplete(
return err
}
image := &types.Image{
Name: info.Image,
RegistryID: registry.ID,
Enabled: false,
}
if err := r.imageDao.CreateOrUpdate(ctx, image); err != nil {
return err
}
bandwidthStat := &types.BandwidthStat{
ImageID: image.ID,
Type: types.BandwidthTypeUPLOAD,
Bytes: int64(size),
}
if err := r.bandwidthStatDao.Create(ctx, bandwidthStat); err != nil {
err = r.ms.UpsertImage(ctx, repoName, info)
if err != nil {
return err
}

View File

@ -115,6 +115,7 @@ type ManifestService interface {
DBFindRepositoryBlob(
ctx context.Context, desc manifest.Descriptor, repoID int64, imageName string,
) (*types.Blob, error)
UpsertImage(ctx context.Context, repoKey string, info pkg.RegistryInfo) error
}
func (l *manifestService) DBTag(
@ -208,8 +209,7 @@ func (l *manifestService) dbTagManifest(
}
// Create or update artifact and tag records
if err := l.upsertArtifactAndTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName,
dgst); err != nil {
if err := l.upsertTag(ctx, dbRegistry.ID, dbManifest.ID, imageName, tagName); err != nil {
return formatFailedToTagErr(err)
}
@ -257,37 +257,13 @@ func (l *manifestService) lockManifestForGC(ctx context.Context, repoID, manifes
}
// Creates or updates artifact and tag records.
func (l *manifestService) upsertArtifactAndTag(
func (l *manifestService) upsertTag(
ctx context.Context,
registryID,
manifestID int64,
imageName,
tagName string,
dgst digest.Digest,
) error {
image := &types.Image{
Name: imageName,
RegistryID: registryID,
Enabled: true,
}
if err := l.imageDao.CreateOrUpdate(ctx, image); err != nil {
return err
}
digest, err := types.NewDigest(dgst)
if err != nil {
return err
}
artifact := &types.Artifact{
ImageID: image.ID,
Version: digest.String(),
}
if err := l.artifactDao.CreateOrUpdate(ctx, artifact); err != nil {
return err
}
tag := &types.Tag{
Name: tagName,
ImageName: imageName,
@ -368,9 +344,15 @@ func (l *manifestService) dbPutManifest(
) error {
switch reqManifest := manifest.(type) {
case *schema2.DeserializedManifest:
return l.dbPutManifestSchema2(ctx, reqManifest, payload, d, repoKey, headers, info)
if err := l.dbPutManifestSchema2(ctx, reqManifest, payload, d, repoKey, headers, info); err != nil {
return err
}
return l.upsertImageAndArtifact(ctx, d, repoKey, info)
case *ocischema.DeserializedManifest:
return l.dbPutManifestOCI(ctx, reqManifest, payload, d, repoKey, headers, info)
if err := l.dbPutManifestOCI(ctx, reqManifest, payload, d, repoKey, headers, info); err != nil {
return err
}
return l.upsertImageAndArtifact(ctx, d, repoKey, info)
case *manifestlist.DeserializedManifestList:
return l.dbPutManifestList(ctx, reqManifest, payload, d, repoKey, headers, info)
case *ocischema.DeserializedImageIndex:
@ -380,6 +362,68 @@ func (l *manifestService) dbPutManifest(
}
}
func (l *manifestService) upsertImageAndArtifact(
ctx context.Context,
d digest.Digest,
repoKey string,
info pkg.RegistryInfo) error {
dbRepo, err := l.registryDao.GetByParentIDAndName(ctx, info.ParentID, repoKey)
if err != nil {
return err
}
dbImage := &types.Image{
Name: info.Image,
RegistryID: dbRepo.ID,
Enabled: true,
}
if err := l.imageDao.CreateOrUpdate(ctx, dbImage); err != nil {
return err
}
dgst, err := types.NewDigest(d)
if err != nil {
return err
}
dbArtifact := &types.Artifact{
ImageID: dbImage.ID,
Version: dgst.String(),
}
if err := l.artifactDao.CreateOrUpdate(ctx, dbArtifact); err != nil {
return err
}
return nil
}
func (l *manifestService) UpsertImage(
ctx context.Context,
repoKey string,
info pkg.RegistryInfo) error {
dbRepo, err := l.registryDao.GetByParentIDAndName(ctx, info.ParentID, repoKey)
if err != nil {
return err
}
image, err := l.imageDao.GetByName(ctx, dbRepo.ID, info.Image)
if err != nil && !errors.Is(err, gitnessstore.ErrResourceNotFound) {
return err
} else if image != nil {
return nil
}
dbImage := &types.Image{
Name: info.Image,
RegistryID: dbRepo.ID,
Enabled: false,
}
if err := l.imageDao.CreateOrUpdate(ctx, dbImage); err != nil {
return err
}
return nil
}
func (l *manifestService) dbPutManifestSchema2(
ctx context.Context,
manifest *schema2.DeserializedManifest,

View File

@ -78,8 +78,19 @@ func ControllerProvider(
controller *pkg.CoreController,
spaceStore gitnessstore.SpaceStore,
authorizer authz.Authorizer,
dBStore *DBStore,
) *Controller {
return NewController(local, remote, controller, spaceStore, authorizer)
return NewController(local, remote, controller, spaceStore, authorizer, dBStore)
}
func DBStoreProvider(
blobRepo store.BlobRepository,
imageDao store.ImageRepository,
artifactDao store.ArtifactRepository,
bandwidthStatDao store.BandwidthStatRepository,
downloadStatDao store.DownloadStatRepository,
) *DBStore {
return NewDBStore(blobRepo, imageDao, artifactDao, bandwidthStatDao, downloadStatDao)
}
func StorageServiceProvider(cfg *types.Config, driver storagedriver.StorageDriver) *storage.Service {
@ -113,8 +124,9 @@ func getManifestCacheHandler(
}
var ControllerSet = wire.NewSet(ControllerProvider)
var DBStoreSet = wire.NewSet(DBStoreProvider)
var RegistrySet = wire.NewSet(LocalRegistryProvider, ManifestServiceProvider, RemoteRegistryProvider)
var ProxySet = wire.NewSet(ProvideProxyController)
var StorageServiceSet = wire.NewSet(StorageServiceProvider)
var AppSet = wire.NewSet(NewApp)
var WireSet = wire.NewSet(ControllerSet, RegistrySet, StorageServiceSet, AppSet, ProxySet)
var WireSet = wire.NewSet(ControllerSet, DBStoreSet, RegistrySet, StorageServiceSet, AppSet, ProxySet)

View File

@ -417,9 +417,9 @@ type ImageRepository interface {
ctx context.Context, parentID int64,
repo string, name string,
) (*types.Image, error)
// Create an Artifact
// Create an Image
CreateOrUpdate(ctx context.Context, image *types.Image) error
// Update an Artifact
// Update an Image
Update(ctx context.Context, artifact *types.Image) (err error)
UpdateStatus(ctx context.Context, artifact *types.Image) (err error)