feat: [AH-900]: implement: write event to redis every time a blob is stored in storage. Implement for both OCI and generic blobs (#3548)

* feat: [AH-900]: lint error
* feat: [AH-900]: lint error
* feat: [AH-900]: implement: - add helper function to generate path
* feat: [AH-900]: rebase
* feat: [AH-900]: cleanup
* feat: [AH-900]: don't send event if the blob was updated. send only for created
* feat: [AH-900]: extract common method
* feat: [AH-900]: implement: - review comments
* feat: [AH-900]: implement: - review comments
* feat: [AH-900]: rebase
main
Shivakumar Ningappa 2025-03-25 05:58:33 +00:00 committed by Harness
parent ced5ce2f65
commit 4fc663ed04
12 changed files with 216 additions and 56 deletions

View File

@ -480,7 +480,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
registryBlobRepository := database2.ProvideRegistryBlobDao(db)
bandwidthStatRepository := database2.ProvideBandwidthStatDao(db)
downloadStatRepository := database2.ProvideDownloadStatDao(db)
localRegistry := docker.LocalRegistryProvider(app, manifestService, blobRepository, registryRepository, manifestRepository, registryBlobRepository, mediaTypesRepository, tagRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository, gcService, transactor)
localRegistry := docker.LocalRegistryProvider(app, manifestService, blobRepository, registryRepository, manifestRepository, registryBlobRepository, mediaTypesRepository, tagRepository, imageRepository, artifactRepository, bandwidthStatRepository, downloadStatRepository, gcService, transactor, eventReporter)
upstreamProxyConfigRepository := database2.ProvideUpstreamDao(db, registryRepository, spaceFinder)
proxyController := docker.ProvideProxyController(localRegistry, manifestService, secretService, spaceFinder)
remoteRegistry := docker.RemoteRegistryProvider(localRegistry, app, upstreamProxyConfigRepository, spaceFinder, secretService, proxyController)
@ -492,7 +492,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
filemanagerApp := filemanager.NewApp(ctx, config, storageService)
genericBlobRepository := database2.ProvideGenericBlobDao(db)
nodesRepository := database2.ProvideNodeDao(db)
fileManager := filemanager.Provider(filemanagerApp, registryRepository, genericBlobRepository, nodesRepository, transactor)
fileManager := filemanager.Provider(filemanagerApp, registryRepository, genericBlobRepository, nodesRepository, transactor, eventReporter)
cleanupPolicyRepository := database2.ProvideCleanupPolicyDao(db, transactor)
webhooksRepository := database2.ProvideWebhookDao(db)
webhooksExecutionRepository := database2.ProvideWebhookExecutionDao(db)

View File

@ -22,6 +22,8 @@ import (
)
type PackageType int32
type BlobAction int32
type Provider int32
type ArtifactDetails struct {
RegistryID int64 `json:"registry_id,omitempty"`
@ -30,6 +32,19 @@ type ArtifactDetails struct {
PackageType PackageType `json:"package_type,omitempty"`
}
// ReplicationDetails represents the ReplicationDetails message from the proto file.
type ReplicationDetails struct {
AccountID string `json:"account_id,omitempty"`
Action BlobAction `json:"action,omitempty"`
BlobID int64 `json:"blob_id,omitempty"`
GenericBlobID string `json:"generic_blob_id,omitempty"`
Path string `json:"path,omitempty"`
Provider Provider `json:"provider,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Region string `json:"region,omitempty"`
Bucket string `json:"bucket,omitempty"`
}
// PackageType constants using iota.
const (
PackageTypeDOCKER = iota
@ -38,6 +53,16 @@ const (
PackageTypeMAVEN
)
const (
BlobCreate BlobAction = 0
BlobDelete BlobAction = 1
)
const (
CLOUDFLARE Provider = 0
GCS Provider = 1
)
var PackageTypeValue = map[string]PackageType{
string(artifact.PackageTypeDOCKER): PackageTypeDOCKER,
string(artifact.PackageTypeGENERIC): PackageTypeGENERIC,
@ -45,6 +70,16 @@ var PackageTypeValue = map[string]PackageType{
string(artifact.PackageTypeMAVEN): PackageTypeMAVEN,
}
var BlobActionValue = map[string]BlobAction{
"BlobCreate": BlobCreate,
"BlobDelete": BlobDelete,
}
var ProviderValue = map[string]Provider{
"CLOUDFLARE": CLOUDFLARE,
"GCS": GCS,
}
// GetPackageTypeFromString returns the PackageType constant corresponding to the given string value.
func GetPackageTypeFromString(value string) (PackageType, error) {
if val, ok := PackageTypeValue[value]; ok {

View File

@ -0,0 +1,71 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"context"
"errors"
"strings"
a "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact"
s "github.com/harness/gitness/registry/app/storage"
"github.com/harness/gitness/types"
"github.com/rs/zerolog/log"
)
func ReportEventAsync(
ctx context.Context,
accountID string,
reporter Reporter,
action BlobAction,
blobID int64,
genericBlobID string,
sha256 string,
conf *types.Config,
) {
var path string
var err error
switch {
case blobID != 0:
path, err = s.BlobPath(accountID, string(a.PackageTypeDOCKER), sha256)
case genericBlobID != "":
path, err = s.BlobPath(accountID, string(a.PackageTypeGENERIC), sha256)
default:
err = errors.New("blobID or genericBlobID must be set")
}
if err != nil {
log.Error().
Err(err).
Int64("blobID", blobID).
Str("genericBlobID", genericBlobID).
Str("action", string(action)).
Msg("Failed to determine blob path for event reporting")
return
}
go reporter.ReportEvent(ctx, &ReplicationDetails{
AccountID: accountID,
Action: action,
BlobID: blobID,
GenericBlobID: genericBlobID,
Path: path,
Provider: ProviderValue[strings.ToUpper(conf.Registry.Storage.S3Storage.Provider)],
Endpoint: conf.Registry.Storage.S3Storage.RegionEndpoint,
Region: conf.Registry.Storage.S3Storage.Region,
Bucket: conf.Registry.Storage.S3Storage.Bucket,
}, "")
}

View File

@ -38,6 +38,7 @@ import (
"github.com/harness/gitness/registry/app/api/openapi/contracts/artifact"
"github.com/harness/gitness/registry/app/dist_temp/dcontext"
"github.com/harness/gitness/registry/app/dist_temp/errcode"
"github.com/harness/gitness/registry/app/event"
"github.com/harness/gitness/registry/app/manifest"
"github.com/harness/gitness/registry/app/manifest/manifestlist"
"github.com/harness/gitness/registry/app/manifest/ocischema"
@ -112,7 +113,7 @@ func NewLocalRegistry(
blobRepo store.BlobRepository, mtRepository store.MediaTypesRepository,
tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository,
bandwidthStatDao store.BandwidthStatRepository, downloadStatDao store.DownloadStatRepository,
gcService gc.Service, tx dbtx.Transactor,
gcService gc.Service, tx dbtx.Transactor, reporter event.Reporter,
) Registry {
return &LocalRegistry{
App: app,
@ -129,6 +130,7 @@ func NewLocalRegistry(
downloadStatDao: downloadStatDao,
gcService: gcService,
tx: tx,
reporter: reporter,
}
}
@ -147,6 +149,7 @@ type LocalRegistry struct {
downloadStatDao store.DownloadStatRepository
gcService gc.Service
tx dbtx.Transactor
reporter event.Reporter
}
func (r *LocalRegistry) Base() error {
@ -1590,7 +1593,7 @@ func (r *LocalRegistry) dbBlobLinkExists(
}
func (r *LocalRegistry) dbPutBlobUploadComplete(
ctx context.Context,
ctx *Context,
repoName string,
mediaType string,
digestVal string,
@ -1604,14 +1607,16 @@ func (r *LocalRegistry) dbPutBlobUploadComplete(
Size: int64(size),
}
var storedBlob *types.Blob
created := false
err := r.tx.WithTx(
ctx, func(ctx context.Context) error {
ctx.Context, func(ctx context.Context) error {
registry, err := r.registryDao.GetByParentIDAndName(ctx, info.ParentID, repoName)
if err != nil {
return err
}
storedBlob, err := r.blobRepo.CreateOrFind(ctx, blob)
storedBlob, created, err = r.blobRepo.CreateOrFind(ctx, blob)
if err != nil && !errors.Is(err, store2.ErrResourceNotFound) {
return err
}
@ -1634,6 +1639,12 @@ func (r *LocalRegistry) dbPutBlobUploadComplete(
return fmt.Errorf("committing database transaction: %w", err)
}
// Emit blob create event
if created {
event.ReportEventAsync(ctx.Context, ctx.OciBlobStore.Path(),
r.reporter, event.BlobCreate, storedBlob.ID,
"", digestVal, r.App.Config)
}
return nil
}
@ -1674,6 +1685,7 @@ func (r *LocalRegistry) dbDeleteBlob(
return storage.ErrBlobUnknown
}
// No need to emit blob delete event here. The GC will take care of it even in the replicated regions.
return nil
}

View File

@ -44,11 +44,12 @@ func LocalRegistryProvider(
mtRepository store.MediaTypesRepository,
tagDao store.TagRepository, imageDao store.ImageRepository, artifactDao store.ArtifactRepository,
bandwidthStatDao store.BandwidthStatRepository, downloadStatDao store.DownloadStatRepository,
gcService gc.Service, tx dbtx.Transactor,
gcService gc.Service, tx dbtx.Transactor, reporter event.Reporter,
) *LocalRegistry {
registry, ok := NewLocalRegistry(
app, ms, manifestDao, registryDao, registryBlobDao, blobRepo,
mtRepository, tagDao, imageDao, artifactDao, bandwidthStatDao, downloadStatDao, gcService, tx,
mtRepository, tagDao, imageDao, artifactDao, bandwidthStatDao, downloadStatDao,
gcService, tx, reporter,
).(*LocalRegistry)
if !ok {
return nil

View File

@ -22,6 +22,7 @@ import (
"path"
"strings"
"github.com/harness/gitness/registry/app/event"
"github.com/harness/gitness/registry/app/storage"
"github.com/harness/gitness/registry/app/store"
"github.com/harness/gitness/registry/types"
@ -42,7 +43,8 @@ const (
func NewFileManager(
app *App, registryDao store.RegistryRepository, genericBlobDao store.GenericBlobRepository,
nodesDao store.NodesRepository,
tx dbtx.Transactor,
tx dbtx.Transactor, reporter event.Reporter,
) FileManager {
return FileManager{
App: app,
@ -50,6 +52,7 @@ func NewFileManager(
genericBlobDao: genericBlobDao,
nodesDao: nodesDao,
tx: tx,
reporter: reporter,
}
}
@ -59,6 +62,7 @@ type FileManager struct {
genericBlobDao store.GenericBlobRepository
nodesDao store.NodesRepository
tx dbtx.Transactor
reporter event.Reporter
}
func (f *FileManager) UploadFile(
@ -116,7 +120,8 @@ func (f *FileManager) UploadFile(
MD5: fileInfo.MD5,
Size: fileInfo.Size,
}
err = f.genericBlobDao.Create(ctx, gb)
var created bool
created, err = f.genericBlobDao.Create(ctx, gb)
if err != nil {
log.Error().Msgf("failed to save generic blob in db with "+
"sha256 : %s, err: %s", fileInfo.Sha256, err.Error())
@ -138,6 +143,12 @@ func (f *FileManager) UploadFile(
return types.FileInfo{}, fmt.Errorf("failed to save nodes for"+
" file : %s, with path : %s, err: %w", filename, filePath, err)
}
// Emit blob create event
if created {
event.ReportEventAsync(ctx, rootIdentifier, f.reporter, event.BlobCreate, 0, blobID, fileInfo.Sha256,
f.App.Config)
}
return fileInfo, nil
}

View File

@ -15,6 +15,7 @@
package filemanager
import (
"github.com/harness/gitness/registry/app/event"
"github.com/harness/gitness/registry/app/store"
"github.com/harness/gitness/store/database/dbtx"
@ -24,8 +25,9 @@ import (
func Provider(app *App, registryDao store.RegistryRepository, genericBlobDao store.GenericBlobRepository,
nodesDao store.NodesRepository,
tx dbtx.Transactor,
reporter event.Reporter,
) FileManager {
return NewFileManager(app, registryDao, genericBlobDao, nodesDao, tx)
return NewFileManager(app, registryDao, genericBlobDao, nodesDao, tx, reporter)
}
var AppSet = wire.NewSet(NewApp)

View File

@ -21,6 +21,8 @@ import (
"path"
"strings"
a "github.com/harness/gitness/registry/app/api/openapi/contracts/artifact"
"github.com/opencontainers/go-digest"
)
@ -30,6 +32,11 @@ const (
blobs = "blobs"
)
// PackageType constants using iota.
const (
PackageTypeDOCKER = iota
)
func pathFor(spec pathSpec) (string, error) {
rootPrefix := []string{storagePathRoot}
switch v := spec.(type) {
@ -182,3 +189,19 @@ func digestPathComponents(dgst digest.Digest, multilevel bool) ([]string, error)
return append(prefix, suffix...), nil
}
// BlobPath returns the path for a blob based on the package type.
func BlobPath(acctID string, packageType string, sha256 string) (string, error) {
acctID = strings.ToLower(acctID)
// sample = sha256:50f564aff30aeb53eb88b0eb2c2ba59878e9854681989faa5ff7396bdfaf509b
sha256 = strings.TrimPrefix(sha256, "sha256:")
sha256Prefix := sha256[:2]
switch packageType {
case string(a.PackageTypeDOCKER):
// format: /accountId(lowercase)/docker/blobs/sha256/(2 character prefix of sha)/sha/data
return fmt.Sprintf("/%s/docker/blobs/sha256/%s/%s/data", acctID, sha256Prefix, sha256), nil
default:
return fmt.Sprintf("/%s/files/%s", acctID, sha256), nil
}
}

View File

@ -41,7 +41,7 @@ type BlobRepository interface {
ctx context.Context, d digest.Digest, repoID int64,
imageName string,
) (*types.Blob, error)
CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, error)
CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, bool, error)
DeleteByID(ctx context.Context, id int64) error
ExistsBlob(
ctx context.Context, repoID int64, d digest.Digest,
@ -574,7 +574,7 @@ type GenericBlobRepository interface {
ctx context.Context, sha256 string,
rootParentID int64,
) (*types.GenericBlob, error)
Create(ctx context.Context, gb *types.GenericBlob) error
Create(ctx context.Context, gb *types.GenericBlob) (bool, error)
DeleteByID(ctx context.Context, id string) error
TotalSizeByRootParentID(ctx context.Context, id int64) (int64, error)
}

View File

@ -24,7 +24,6 @@ import (
"github.com/harness/gitness/registry/app/store"
"github.com/harness/gitness/registry/app/store/database/util"
"github.com/harness/gitness/registry/types"
store2 "github.com/harness/gitness/store"
"github.com/harness/gitness/store/database"
"github.com/harness/gitness/store/database/dbtx"
@ -173,16 +172,16 @@ func (bd blobDao) FindByDigestAndRepoID(ctx context.Context, d digest.Digest, re
return bd.mapToBlob(dst)
}
func (bd blobDao) CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, error) {
func (bd blobDao) CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob, bool, error) {
sqlQuery := `INSERT INTO blobs (
blob_digest,
blob_root_parent_id,
blob_media_type_id,
blob_digest,
blob_root_parent_id,
blob_media_type_id,
blob_size,
blob_created_at,
blob_created_by
) VALUES (
:blob_digest,
:blob_digest,
:blob_root_parent_id,
:blob_media_type_id,
:blob_size,
@ -190,33 +189,38 @@ func (bd blobDao) CreateOrFind(ctx context.Context, b *types.Blob) (*types.Blob,
:blob_created_by
) ON CONFLICT (
blob_digest, blob_root_parent_id
) DO NOTHING
) DO NOTHING
RETURNING blob_id`
mediaTypeID, err := bd.mtRepository.MapMediaType(ctx, b.MediaType)
if err != nil {
return nil, err
return nil, false, err
}
b.MediaTypeID = mediaTypeID
db := dbtx.GetAccessor(ctx, bd.db)
blob, err := mapToInternalBlob(ctx, b)
if err != nil {
return nil, err
return nil, false, err
}
query, arg, err := db.BindNamed(sqlQuery, blob)
if err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "Failed to bind repo object")
return nil, false, database.ProcessSQLErrorf(ctx, err, "Failed to bind repo object")
}
var created bool
if err = db.QueryRowContext(ctx, query, arg...).Scan(&b.ID); err != nil {
err := database.ProcessSQLErrorf(ctx, err, "Insert query failed")
if !errors2.Is(err, store2.ErrResourceNotFound) {
return nil, err
if errors2.Is(err, sql.ErrNoRows) {
created = false
} else {
return nil, false, database.ProcessSQLErrorf(ctx, err, "Insert query failed")
}
} else {
created = true
}
return bd.FindByDigestAndRootParentID(ctx, b.Digest, b.RootParentID)
blob2, err := bd.FindByDigestAndRootParentID(ctx, b.Digest, b.RootParentID)
return blob2, created, err
}
func (bd blobDao) DeleteByID(ctx context.Context, id int64) error {

View File

@ -102,45 +102,45 @@ func (g GenericBlobDao) FindBySha256AndRootParentID(ctx context.Context,
return g.mapToGenericBlob(ctx, dst)
}
func (g GenericBlobDao) Create(ctx context.Context, gb *types.GenericBlob) error {
func (g GenericBlobDao) Create(ctx context.Context, gb *types.GenericBlob) (bool, error) {
const sqlQuery = `
INSERT INTO generic_blobs (
generic_blob_id
,generic_blob_root_parent_id
,generic_blob_sha_1
,generic_blob_sha_256
,generic_blob_sha_512
,generic_blob_md5
,generic_blob_size
,generic_blob_created_at
,generic_blob_created_by
) VALUES (
:generic_blob_id
, :generic_blob_root_parent_id
,:generic_blob_sha_1
,:generic_blob_sha_256
,:generic_blob_sha_512
,:generic_blob_md5
,:generic_blob_size
,:generic_blob_created_at
,:generic_blob_created_by
) ON CONFLICT (generic_blob_root_parent_id, generic_blob_sha_256)
DO UPDATE SET generic_blob_id = generic_blobs.generic_blob_id
RETURNING generic_blob_id`
INSERT INTO generic_blobs (
generic_blob_id,
generic_blob_root_parent_id,
generic_blob_sha_1,
generic_blob_sha_256,
generic_blob_sha_512,
generic_blob_md5,
generic_blob_size,
generic_blob_created_at,
generic_blob_created_by
) VALUES (
:generic_blob_id,
:generic_blob_root_parent_id,
:generic_blob_sha_1,
:generic_blob_sha_256,
:generic_blob_sha_512,
:generic_blob_md5,
:generic_blob_size,
:generic_blob_created_at,
:generic_blob_created_by
) ON CONFLICT (generic_blob_root_parent_id, generic_blob_sha_256)
DO UPDATE SET generic_blob_id = generic_blobs.generic_blob_id
RETURNING generic_blob_id`
db := dbtx.GetAccessor(ctx, g.sqlDB)
query, arg, err := db.BindNamed(sqlQuery, g.mapToInternalGenericBlob(ctx, gb))
if err != nil {
return databaseg.ProcessSQLErrorf(ctx, err, "Failed to bind generic blob object")
return false, databaseg.ProcessSQLErrorf(ctx, err, "Failed to bind generic blob object")
}
if err = db.QueryRowContext(ctx, query, arg...).Scan(&gb.ID); err != nil {
if errors.Is(err, sql.ErrNoRows) || errors.Is(err, store2.ErrDuplicate) {
return nil
return false, nil
}
return databaseg.ProcessSQLErrorf(ctx, err, "Insert query failed")
return false, databaseg.ProcessSQLErrorf(ctx, err, "Insert query failed")
}
return nil
return true, nil
}
func (g GenericBlobDao) DeleteByID(_ context.Context, _ string) error {

View File

@ -184,7 +184,7 @@ type Config struct {
// BlobStore defines the blob storage configuration parameters.
BlobStore struct {
// Provider is a name of blob storage service like filesystem or gcs
// Provider is a name of blob storage service like filesystem or gcs or cloudflare
Provider blob.Provider `envconfig:"GITNESS_BLOBSTORE_PROVIDER" default:"filesystem"`
// Bucket is a path to the directory where the files will be stored when using filesystem blob storage,
// in case of gcs provider this will be the actual bucket where the images are stored.
@ -520,6 +520,7 @@ type Config struct {
LogLevel string `envconfig:"GITNESS_REGISTRY_S3_LOG_LEVEL" default:"info"`
Delete bool `envconfig:"GITNESS_REGISTRY_S3_DELETE_ENABLED" default:"true"`
Redirect bool `envconfig:"GITNESS_REGISTRY_S3_STORAGE_REDIRECT" default:"false"`
Provider string `envconfig:"GITNESS_REGISTRY_S3_PROVIDER" default:"cloudflare"`
}
}