From 4fc38bca65ab1ef221975e60a33059a4849357dc Mon Sep 17 00:00:00 2001 From: Akhilesh Pandey Date: Mon, 27 Nov 2023 14:41:39 +0000 Subject: [PATCH] feat: Update token source impersonation and add expiration check (#815) --- app/api/controller/upload/download.go | 2 +- blob/filesystem.go | 2 +- blob/gcs.go | 103 +++++++++++++++++++------- blob/interface.go | 2 +- blob/wire.go | 5 +- cmd/gitness/wire_gen.go | 2 +- 6 files changed, 85 insertions(+), 31 deletions(-) diff --git a/app/api/controller/upload/download.go b/app/api/controller/upload/download.go index e1b422515..6c6244917 100644 --- a/app/api/controller/upload/download.go +++ b/app/api/controller/upload/download.go @@ -38,7 +38,7 @@ func (c *Controller) Download( fileBucketPath := getFileBucketPath(repo.ID, filePath) - signedURL, err := c.blobStore.GetSignedURL(fileBucketPath) + signedURL, err := c.blobStore.GetSignedURL(ctx, fileBucketPath) if err != nil && !errors.Is(err, blob.ErrNotSupported) { return "", nil, fmt.Errorf("failed to get signed URL: %w", err) } diff --git a/blob/filesystem.go b/blob/filesystem.go index 1722f2789..52d94f818 100644 --- a/blob/filesystem.go +++ b/blob/filesystem.go @@ -76,7 +76,7 @@ func (c FileSystemStore) Upload(ctx context.Context, return nil } -func (c FileSystemStore) GetSignedURL(_ string) (string, error) { +func (c FileSystemStore) GetSignedURL(_ context.Context, _ string) (string, error) { return "", ErrNotSupported } diff --git a/blob/gcs.go b/blob/gcs.go index af2366f48..7c841131b 100644 --- a/blob/gcs.go +++ b/blob/gcs.go @@ -23,6 +23,7 @@ import ( "cloud.google.com/go/storage" "github.com/rs/zerolog/log" + "golang.org/x/oauth2" "google.golang.org/api/impersonate" "google.golang.org/api/option" ) @@ -32,67 +33,72 @@ const defaultScope = "https://www.googleapis.com/auth/cloud-platform" type GCSStore struct { // Bucket is the name of the GCS bucket to use. - bucket string - client *storage.Client + cachedClient *storage.Client + config Config + tokenExpirationTime time.Time } -func NewGCSStore(cfg Config) (Store, error) { +func NewGCSStore(ctx context.Context, cfg Config) (Store, error) { // Use service account [Development and Non-GCP environments] if cfg.KeyPath != "" { - client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(cfg.KeyPath)) + client, err := storage.NewClient(ctx, option.WithCredentialsFile(cfg.KeyPath)) if err != nil { return nil, fmt.Errorf("failed to create GCS client with service account key: %w", err) } return &GCSStore{ - bucket: cfg.Bucket, - client: client, + config: cfg, + cachedClient: client, + tokenExpirationTime: time.Now().Add(cfg.ImpersonationLifetime), }, nil } - // Use workload identity impersonation default credentials (GKE environment) - ts, err := impersonate.CredentialsTokenSource(context.Background(), impersonate.CredentialsConfig{ - TargetPrincipal: cfg.TargetPrincipal, - Scopes: []string{defaultScope}, // Required field - Lifetime: cfg.ImpersonationLifetime, - }) - if err != nil { - return nil, fmt.Errorf("failed to impersonate the client service account %s : %w", cfg.TargetPrincipal, err) - } - client, err := storage.NewClient(context.Background(), option.WithTokenSource(ts)) + client, err := createNewImpersonatedClient(ctx, cfg) if err != nil { return nil, fmt.Errorf("failed to create GCS client with workload identity impersonation: %w", err) } return &GCSStore{ - bucket: cfg.Bucket, - client: client, + config: cfg, + cachedClient: client, + tokenExpirationTime: time.Now().Add(cfg.ImpersonationLifetime), }, nil } func (c *GCSStore) Upload(ctx context.Context, file io.Reader, filePath string) error { - wc := c.client.Bucket(c.bucket).Object(filePath).NewWriter(ctx) + gcsClient, err := c.getLatestClient(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve latest client: %w", err) + } + + bkt := gcsClient.Bucket(c.config.Bucket) + wc := bkt.Object(filePath).NewWriter(ctx) defer func() { cErr := wc.Close() if cErr != nil { log.Ctx(ctx).Err(cErr). - Msgf("failed to close gcs blob writer for file '%s' in bucket '%s'", filePath, c.bucket) + Msgf("failed to close gcs blob writer for file '%s' in bucket '%s'", filePath, c.config.Bucket) } }() if _, err := io.Copy(wc, file); err != nil { // Remove the file if it was created. - deleteErr := c.client.Bucket(c.bucket).Object(filePath).Delete(ctx) + deleteErr := gcsClient.Bucket(c.config.Bucket).Object(filePath).Delete(ctx) if deleteErr != nil { - return fmt.Errorf("failed to delete file: %s from bucket: %s %w", filePath, c.bucket, deleteErr) + return fmt.Errorf("failed to delete file: %s from bucket: %s %w", filePath, c.config.Bucket, deleteErr) } - return fmt.Errorf("failed to write file to GCS: %w", err) } return nil } -func (c *GCSStore) GetSignedURL(filePath string) (string, error) { - signedURL, err := c.client.Bucket(c.bucket).SignedURL(filePath, &storage.SignedURLOptions{ +func (c *GCSStore) GetSignedURL(ctx context.Context, filePath string) (string, error) { + gcsClient, err := c.getLatestClient(ctx) + if err != nil { + return "", fmt.Errorf("failed to retrieve latest client: %w", err) + } + + bkt := gcsClient.Bucket(c.config.Bucket) + signedURL, err := bkt.SignedURL(filePath, &storage.SignedURLOptions{ Method: http.MethodGet, Expires: time.Now().Add(1 * time.Hour), }) @@ -101,6 +107,53 @@ func (c *GCSStore) GetSignedURL(filePath string) (string, error) { } return signedURL, nil } + func (c *GCSStore) Download(_ context.Context, _ string) (io.ReadCloser, error) { return nil, fmt.Errorf("not implemented") } + +func createNewImpersonatedClient(ctx context.Context, cfg Config) (*storage.Client, error) { + // Use workload identity impersonation default credentials (GKE environment) + ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{ + TargetPrincipal: cfg.TargetPrincipal, + Scopes: []string{defaultScope}, // Required field + Lifetime: cfg.ImpersonationLifetime, + }) + if err != nil { + return nil, fmt.Errorf("failed to impersonate the client service account %s : %w", cfg.TargetPrincipal, err) + } + + // Generate a new token + token, err := ts.Token() + if err != nil { + return nil, fmt.Errorf("failed to refresh token from impersonated credentials: %w", err) + } + + client, err := storage.NewClient(ctx, option.WithTokenSource(oauth2.StaticTokenSource(token))) + if err != nil { + return nil, fmt.Errorf("failed to create GCS client with workload identity impersonation: %w", err) + } + return client, nil +} + +func (c *GCSStore) getLatestClient(ctx context.Context) (*storage.Client, error) { + err := c.checkAndRefreshToken(ctx) + if err != nil { + return nil, fmt.Errorf("failed to refresh token: %w", err) + } + return c.cachedClient, nil +} + +func (c *GCSStore) checkAndRefreshToken(ctx context.Context) error { + if time.Now().Before(c.tokenExpirationTime) { + return nil + } + now := time.Now() + client, err := createNewImpersonatedClient(ctx, c.config) + if err != nil { + return fmt.Errorf("failed to create GCS client with workload identity impersonation after expiration: %w", err) + } + c.cachedClient = client + c.tokenExpirationTime = now.Add(c.config.ImpersonationLifetime) + return nil +} diff --git a/blob/interface.go b/blob/interface.go index d93b036ee..f3ab4f592 100644 --- a/blob/interface.go +++ b/blob/interface.go @@ -30,7 +30,7 @@ type Store interface { Upload(ctx context.Context, file io.Reader, filePath string) error // GetSignedURL returns the URL for a file in the blob store. - GetSignedURL(filePath string) (string, error) + GetSignedURL(ctx context.Context, filePath string) (string, error) // Download returns a reader for a file in the blob store. Download(ctx context.Context, filePath string) (io.ReadCloser, error) diff --git a/blob/wire.go b/blob/wire.go index e621e3a1e..7fa01bc57 100644 --- a/blob/wire.go +++ b/blob/wire.go @@ -15,6 +15,7 @@ package blob import ( + "context" "fmt" "github.com/google/wire" @@ -24,12 +25,12 @@ var WireSet = wire.NewSet( ProvideStore, ) -func ProvideStore(config Config) (Store, error) { +func ProvideStore(ctx context.Context, config Config) (Store, error) { switch config.Provider { case ProviderFileSystem: return NewFileSystemStore(config) case ProviderGCS: - return NewGCSStore(config) + return NewGCSStore(ctx, config) default: return nil, fmt.Errorf("invalid blob store provider: %s", config.Provider) } diff --git a/cmd/gitness/wire_gen.go b/cmd/gitness/wire_gen.go index ef6b99685..a4a21e69f 100644 --- a/cmd/gitness/wire_gen.go +++ b/cmd/gitness/wire_gen.go @@ -246,7 +246,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro if err != nil { return nil, err } - blobStore, err := blob.ProvideStore(blobConfig) + blobStore, err := blob.ProvideStore(ctx, blobConfig) if err != nil { return nil, err }