feat: Update token source impersonation and add expiration check (#815)

rkapoor10-patch-1
Akhilesh Pandey 2023-11-27 14:41:39 +00:00 committed by Harness
parent 7110eecb7d
commit 4fc38bca65
6 changed files with 85 additions and 31 deletions

View File

@ -38,7 +38,7 @@ func (c *Controller) Download(
fileBucketPath := getFileBucketPath(repo.ID, filePath) fileBucketPath := getFileBucketPath(repo.ID, filePath)
signedURL, err := c.blobStore.GetSignedURL(fileBucketPath) signedURL, err := c.blobStore.GetSignedURL(ctx, fileBucketPath)
if err != nil && !errors.Is(err, blob.ErrNotSupported) { if err != nil && !errors.Is(err, blob.ErrNotSupported) {
return "", nil, fmt.Errorf("failed to get signed URL: %w", err) return "", nil, fmt.Errorf("failed to get signed URL: %w", err)
} }

View File

@ -76,7 +76,7 @@ func (c FileSystemStore) Upload(ctx context.Context,
return nil return nil
} }
func (c FileSystemStore) GetSignedURL(_ string) (string, error) { func (c FileSystemStore) GetSignedURL(_ context.Context, _ string) (string, error) {
return "", ErrNotSupported return "", ErrNotSupported
} }

View File

@ -23,6 +23,7 @@ import (
"cloud.google.com/go/storage" "cloud.google.com/go/storage"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"golang.org/x/oauth2"
"google.golang.org/api/impersonate" "google.golang.org/api/impersonate"
"google.golang.org/api/option" "google.golang.org/api/option"
) )
@ -32,67 +33,72 @@ const defaultScope = "https://www.googleapis.com/auth/cloud-platform"
type GCSStore struct { type GCSStore struct {
// Bucket is the name of the GCS bucket to use. // Bucket is the name of the GCS bucket to use.
bucket string cachedClient *storage.Client
client *storage.Client config Config
tokenExpirationTime time.Time
} }
func NewGCSStore(cfg Config) (Store, error) { func NewGCSStore(ctx context.Context, cfg Config) (Store, error) {
// Use service account [Development and Non-GCP environments] // Use service account [Development and Non-GCP environments]
if cfg.KeyPath != "" { if cfg.KeyPath != "" {
client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(cfg.KeyPath)) client, err := storage.NewClient(ctx, option.WithCredentialsFile(cfg.KeyPath))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create GCS client with service account key: %w", err) return nil, fmt.Errorf("failed to create GCS client with service account key: %w", err)
} }
return &GCSStore{ return &GCSStore{
bucket: cfg.Bucket, config: cfg,
client: client, cachedClient: client,
tokenExpirationTime: time.Now().Add(cfg.ImpersonationLifetime),
}, nil }, nil
} }
// Use workload identity impersonation default credentials (GKE environment) client, err := createNewImpersonatedClient(ctx, cfg)
ts, err := impersonate.CredentialsTokenSource(context.Background(), impersonate.CredentialsConfig{
TargetPrincipal: cfg.TargetPrincipal,
Scopes: []string{defaultScope}, // Required field
Lifetime: cfg.ImpersonationLifetime,
})
if err != nil {
return nil, fmt.Errorf("failed to impersonate the client service account %s : %w", cfg.TargetPrincipal, err)
}
client, err := storage.NewClient(context.Background(), option.WithTokenSource(ts))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create GCS client with workload identity impersonation: %w", err) return nil, fmt.Errorf("failed to create GCS client with workload identity impersonation: %w", err)
} }
return &GCSStore{ return &GCSStore{
bucket: cfg.Bucket, config: cfg,
client: client, cachedClient: client,
tokenExpirationTime: time.Now().Add(cfg.ImpersonationLifetime),
}, nil }, nil
} }
func (c *GCSStore) Upload(ctx context.Context, file io.Reader, filePath string) error { func (c *GCSStore) Upload(ctx context.Context, file io.Reader, filePath string) error {
wc := c.client.Bucket(c.bucket).Object(filePath).NewWriter(ctx) gcsClient, err := c.getLatestClient(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve latest client: %w", err)
}
bkt := gcsClient.Bucket(c.config.Bucket)
wc := bkt.Object(filePath).NewWriter(ctx)
defer func() { defer func() {
cErr := wc.Close() cErr := wc.Close()
if cErr != nil { if cErr != nil {
log.Ctx(ctx).Err(cErr). log.Ctx(ctx).Err(cErr).
Msgf("failed to close gcs blob writer for file '%s' in bucket '%s'", filePath, c.bucket) Msgf("failed to close gcs blob writer for file '%s' in bucket '%s'", filePath, c.config.Bucket)
} }
}() }()
if _, err := io.Copy(wc, file); err != nil { if _, err := io.Copy(wc, file); err != nil {
// Remove the file if it was created. // Remove the file if it was created.
deleteErr := c.client.Bucket(c.bucket).Object(filePath).Delete(ctx) deleteErr := gcsClient.Bucket(c.config.Bucket).Object(filePath).Delete(ctx)
if deleteErr != nil { if deleteErr != nil {
return fmt.Errorf("failed to delete file: %s from bucket: %s %w", filePath, c.bucket, deleteErr) return fmt.Errorf("failed to delete file: %s from bucket: %s %w", filePath, c.config.Bucket, deleteErr)
} }
return fmt.Errorf("failed to write file to GCS: %w", err) return fmt.Errorf("failed to write file to GCS: %w", err)
} }
return nil return nil
} }
func (c *GCSStore) GetSignedURL(filePath string) (string, error) { func (c *GCSStore) GetSignedURL(ctx context.Context, filePath string) (string, error) {
signedURL, err := c.client.Bucket(c.bucket).SignedURL(filePath, &storage.SignedURLOptions{ gcsClient, err := c.getLatestClient(ctx)
if err != nil {
return "", fmt.Errorf("failed to retrieve latest client: %w", err)
}
bkt := gcsClient.Bucket(c.config.Bucket)
signedURL, err := bkt.SignedURL(filePath, &storage.SignedURLOptions{
Method: http.MethodGet, Method: http.MethodGet,
Expires: time.Now().Add(1 * time.Hour), Expires: time.Now().Add(1 * time.Hour),
}) })
@ -101,6 +107,53 @@ func (c *GCSStore) GetSignedURL(filePath string) (string, error) {
} }
return signedURL, nil return signedURL, nil
} }
func (c *GCSStore) Download(_ context.Context, _ string) (io.ReadCloser, error) { func (c *GCSStore) Download(_ context.Context, _ string) (io.ReadCloser, error) {
return nil, fmt.Errorf("not implemented") return nil, fmt.Errorf("not implemented")
} }
func createNewImpersonatedClient(ctx context.Context, cfg Config) (*storage.Client, error) {
// Use workload identity impersonation default credentials (GKE environment)
ts, err := impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
TargetPrincipal: cfg.TargetPrincipal,
Scopes: []string{defaultScope}, // Required field
Lifetime: cfg.ImpersonationLifetime,
})
if err != nil {
return nil, fmt.Errorf("failed to impersonate the client service account %s : %w", cfg.TargetPrincipal, err)
}
// Generate a new token
token, err := ts.Token()
if err != nil {
return nil, fmt.Errorf("failed to refresh token from impersonated credentials: %w", err)
}
client, err := storage.NewClient(ctx, option.WithTokenSource(oauth2.StaticTokenSource(token)))
if err != nil {
return nil, fmt.Errorf("failed to create GCS client with workload identity impersonation: %w", err)
}
return client, nil
}
func (c *GCSStore) getLatestClient(ctx context.Context) (*storage.Client, error) {
err := c.checkAndRefreshToken(ctx)
if err != nil {
return nil, fmt.Errorf("failed to refresh token: %w", err)
}
return c.cachedClient, nil
}
func (c *GCSStore) checkAndRefreshToken(ctx context.Context) error {
if time.Now().Before(c.tokenExpirationTime) {
return nil
}
now := time.Now()
client, err := createNewImpersonatedClient(ctx, c.config)
if err != nil {
return fmt.Errorf("failed to create GCS client with workload identity impersonation after expiration: %w", err)
}
c.cachedClient = client
c.tokenExpirationTime = now.Add(c.config.ImpersonationLifetime)
return nil
}

View File

@ -30,7 +30,7 @@ type Store interface {
Upload(ctx context.Context, file io.Reader, filePath string) error Upload(ctx context.Context, file io.Reader, filePath string) error
// GetSignedURL returns the URL for a file in the blob store. // GetSignedURL returns the URL for a file in the blob store.
GetSignedURL(filePath string) (string, error) GetSignedURL(ctx context.Context, filePath string) (string, error)
// Download returns a reader for a file in the blob store. // Download returns a reader for a file in the blob store.
Download(ctx context.Context, filePath string) (io.ReadCloser, error) Download(ctx context.Context, filePath string) (io.ReadCloser, error)

View File

@ -15,6 +15,7 @@
package blob package blob
import ( import (
"context"
"fmt" "fmt"
"github.com/google/wire" "github.com/google/wire"
@ -24,12 +25,12 @@ var WireSet = wire.NewSet(
ProvideStore, ProvideStore,
) )
func ProvideStore(config Config) (Store, error) { func ProvideStore(ctx context.Context, config Config) (Store, error) {
switch config.Provider { switch config.Provider {
case ProviderFileSystem: case ProviderFileSystem:
return NewFileSystemStore(config) return NewFileSystemStore(config)
case ProviderGCS: case ProviderGCS:
return NewGCSStore(config) return NewGCSStore(ctx, config)
default: default:
return nil, fmt.Errorf("invalid blob store provider: %s", config.Provider) return nil, fmt.Errorf("invalid blob store provider: %s", config.Provider)
} }

View File

@ -246,7 +246,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
if err != nil { if err != nil {
return nil, err return nil, err
} }
blobStore, err := blob.ProvideStore(blobConfig) blobStore, err := blob.ProvideStore(ctx, blobConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }