feat: [code-2912]: db layer for usage metrics (#3159)

* db layer for usage metrics
BT-10437
Enver Biševac 2024-12-13 21:17:28 +00:00 committed by Harness
parent 0af4f33a8b
commit 48f07f0f0c
10 changed files with 501 additions and 2 deletions

1
.gitignore vendored
View File

@ -30,3 +30,4 @@ node_modules
/registry/logs/*
/distribution-spec
/registry/distribution-spec
/app/store/database/test.db

View File

@ -1273,4 +1273,14 @@ type (
Delete(ctx context.Context, id int64) error
Update(ctx context.Context, infraProvisioned *types.InfraProvisioned) error
}
UsageMetricStore interface {
Upsert(ctx context.Context, in *types.UsageMetric) error
GetMetrics(
ctx context.Context,
rootSpaceID int64,
startDate int64,
endDate int64,
) (*types.UsageMetric, error)
}
)

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS usage_metrics;

View File

@ -0,0 +1,17 @@
CREATE TABLE usage_metrics
(
usage_metric_space_id BIGINT NOT NULL,
usage_metric_date BIGINT NOT NULL,
usage_metric_created BIGINT NOT NULL,
usage_metric_updated BIGINT,
usage_metric_bandwidth BIGINT NOT NULL,
usage_metric_storage BIGINT NOT NULL,
usage_metric_version BIGINT NOT NULL,
PRIMARY KEY (usage_metric_space_id, usage_metric_date),
CONSTRAINT fk_usagemetric_space_id FOREIGN KEY (usage_metric_space_id)
REFERENCES spaces (space_id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
);

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS usage_metrics;

View File

@ -0,0 +1,17 @@
CREATE TABLE usage_metrics
(
usage_metric_space_id BIGINT NOT NULL,
usage_metric_date BIGINT NOT NULL,
usage_metric_created BIGINT NOT NULL,
usage_metric_updated BIGINT,
usage_metric_bandwidth BIGINT NOT NULL,
usage_metric_storage BIGINT NOT NULL,
usage_metric_version BIGINT NOT NULL,
PRIMARY KEY (usage_metric_space_id, usage_metric_date),
CONSTRAINT fk_usagemetric_space_id FOREIGN KEY (usage_metric_space_id)
REFERENCES spaces (space_id) MATCH SIMPLE
ON UPDATE NO ACTION
ON DELETE CASCADE
);

View File

@ -17,6 +17,7 @@ package database_test
import (
"context"
"fmt"
"os"
"strconv"
"testing"
@ -49,11 +50,15 @@ func New(dsn string) (*sqlx.DB, error) {
func setupDB(t *testing.T) (*sqlx.DB, func()) {
t.Helper()
db, err := New(":memory:")
// must use file as db because in memory have only basic features
// file is anyway removed on every test. SQLite is fast
// so it will not affect too much performance.
_ = os.Remove("test.db")
db, err := New("test.db")
if err != nil {
t.Fatalf("Error opening db, err: %v", err)
}
_, _ = db.Exec("PRAGMA busy_timeout = 5000;")
if err = migrate.Migrate(context.Background(), db); err != nil {
t.Fatalf("Error migrating db, err: %v", err)
}

View File

@ -0,0 +1,235 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package database
import (
"context"
"errors"
"time"
"github.com/harness/gitness/app/store"
gitness_store "github.com/harness/gitness/store"
"github.com/harness/gitness/store/database"
"github.com/harness/gitness/store/database/dbtx"
"github.com/harness/gitness/types"
"github.com/jmoiron/sqlx"
)
var _ store.UsageMetricStore = (*UsageMetricsStore)(nil)
// NewUsageMetricsStore returns a new UsageMetricsStore.
func NewUsageMetricsStore(db *sqlx.DB) *UsageMetricsStore {
return &UsageMetricsStore{
db: db,
}
}
// UsageMetricsStore implements store.UsageMetrics backed by a relational database.
type UsageMetricsStore struct {
db *sqlx.DB
}
func (s *UsageMetricsStore) getVersion(
ctx context.Context,
rootSpaceID int64,
date int64,
) int64 {
const sqlQuery = `
SELECT
usage_metric_version
FROM usage_metrics
WHERE usage_metric_space_id = ? AND usage_metric_date = ?
`
var version int64
err := s.db.QueryRowContext(ctx, sqlQuery, rootSpaceID, date).Scan(&version)
if err != nil {
return 0
}
return version
}
func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) error {
const sqlQuery = `
INSERT INTO usage_metrics (
usage_metric_space_id
,usage_metric_date
,usage_metric_created
,usage_metric_updated
,usage_metric_bandwidth
,usage_metric_storage
,usage_metric_version
) VALUES (
:usage_metric_space_id
,:usage_metric_date
,:usage_metric_created
,:usage_metric_updated
,:usage_metric_bandwidth
,:usage_metric_storage
,:usage_metric_version
)
ON CONFLICT (usage_metric_space_id, usage_metric_date)
DO UPDATE
SET
usage_metric_version = EXCLUDED.usage_metric_version
,usage_metric_updated = EXCLUDED.usage_metric_updated
,usage_metric_bandwidth = usage_metric_bandwidth + EXCLUDED.usage_metric_bandwidth
,usage_metric_storage = usage_metric_storage + EXCLUDED.usage_metric_storage
WHERE usage_metric_version = EXCLUDED.usage_metric_version - 1`
db := dbtx.GetAccessor(ctx, s.db)
today := s.Date(time.Now())
query, args, err := db.BindNamed(sqlQuery, usageMetric{
RootSpaceID: in.RootSpaceID,
Date: today,
Created: time.Now().UnixMilli(),
Updated: time.Now().UnixMilli(),
Bandwidth: in.Bandwidth,
Storage: in.Storage,
Version: s.getVersion(ctx, in.RootSpaceID, today) + 1,
})
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "failed to bind query")
}
result, err := db.ExecContext(ctx, query, args...)
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "failed to upsert usage_metric")
}
n, err := result.RowsAffected()
if err != nil {
return database.ProcessSQLErrorf(ctx, err, "failed to fetch number of rows affected")
}
if n == 0 {
return gitness_store.ErrVersionConflict
}
return nil
}
// UpsertOptimistic upsert the usage metric details using the optimistic locking mechanism.
func (s *UsageMetricsStore) UpsertOptimistic(
ctx context.Context,
in *types.UsageMetric,
) error {
for {
err := s.Upsert(ctx, in)
if err == nil {
return nil
}
if !errors.Is(err, gitness_store.ErrVersionConflict) {
return err
}
}
}
func (s *UsageMetricsStore) GetMetrics(
ctx context.Context,
rootSpaceID int64,
start int64,
end int64,
) (*types.UsageMetric, error) {
const sqlQuery = `
SELECT
COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth,
COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage
FROM usage_metrics
WHERE
usage_metric_space_id = ? AND
usage_metric_date BETWEEN ? AND ?`
result := &types.UsageMetric{
RootSpaceID: rootSpaceID,
}
startTime := time.UnixMilli(start)
endTime := time.UnixMilli(end)
err := s.db.QueryRowContext(
ctx,
sqlQuery,
rootSpaceID,
s.Date(startTime),
s.Date(endTime),
).Scan(
&result.Bandwidth,
&result.Storage,
)
if err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to get metric")
}
return result, nil
}
func (s *UsageMetricsStore) List(
ctx context.Context,
start int64,
end int64,
) ([]types.UsageMetric, error) {
const sqlQuery = `
SELECT
usage_metric_space_id,
COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth,
COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage
FROM usage_metrics
WHERE
usage_metric_date BETWEEN ? AND ?
GROUP BY usage_metric_space_id
ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC`
startTime := time.UnixMilli(start)
endTime := time.UnixMilli(end)
db := dbtx.GetAccessor(ctx, s.db)
rows, err := db.QueryContext(ctx, sqlQuery, s.Date(startTime), s.Date(endTime))
if err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage_metrics")
}
defer rows.Close()
results := make([]types.UsageMetric, 0, 16)
for rows.Next() {
metric := types.UsageMetric{}
err = rows.Scan(
&metric.RootSpaceID,
&metric.Bandwidth,
&metric.Storage,
)
if err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to scan usage_metrics")
}
results = append(results, metric)
}
if err = rows.Err(); err != nil {
return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage_metrics")
}
return results, nil
}
func (s *UsageMetricsStore) Date(t time.Time) int64 {
year, month, day := t.Date()
return time.Date(year, month, day, 0, 0, 0, 0, time.UTC).UnixMilli()
}
type usageMetric struct {
RootSpaceID int64 `db:"usage_metric_space_id"`
Date int64 `db:"usage_metric_date"`
Created int64 `db:"usage_metric_created"`
Updated int64 `db:"usage_metric_updated"`
Bandwidth int64 `db:"usage_metric_bandwidth"`
Storage int64 `db:"usage_metric_storage"`
Version int64 `db:"usage_metric_version"`
}

View File

@ -0,0 +1,191 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package database_test
import (
"context"
"testing"
"time"
"github.com/harness/gitness/app/store/database"
"github.com/harness/gitness/types"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
func TestUsageMetricsStore_Upsert(t *testing.T) {
db, teardown := setupDB(t)
defer teardown()
principalStore, spaceStore, spacePathStore, _ := setupStores(t, db)
ctx := context.Background()
createUser(ctx, t, principalStore)
createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0)
metricsStore := database.NewUsageMetricsStore(db)
// First write will set bandwidth and storage to 100
err := metricsStore.Upsert(ctx, &types.UsageMetric{
RootSpaceID: 1,
Bandwidth: 100,
Storage: 100,
})
require.NoError(t, err)
// second write will increase bandwidth for 100 and storage remains the same
err = metricsStore.Upsert(ctx, &types.UsageMetric{
RootSpaceID: 1,
Bandwidth: 100,
Storage: 0,
})
require.NoError(t, err)
row := db.QueryRowContext(
ctx,
`SELECT
usage_metric_space_id,
usage_metric_date,
usage_metric_bandwidth,
usage_metric_storage
FROM usage_metrics
WHERE usage_metric_space_id = ?
LIMIT 1`,
1,
)
metric := types.UsageMetric{}
var date int64
err = row.Scan(
&metric.RootSpaceID,
&date,
&metric.Bandwidth,
&metric.Storage,
)
require.NoError(t, err)
require.Equal(t, int64(1), metric.RootSpaceID)
require.Equal(t, metricsStore.Date(time.Now()), date)
require.Equal(t, int64(200), metric.Bandwidth)
require.Equal(t, int64(100), metric.Storage)
}
func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) {
db, teardown := setupDB(t)
defer teardown()
principalStore, spaceStore, spacePathStore, _ := setupStores(t, db)
ctx := context.Background()
createUser(ctx, t, principalStore)
createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0)
metricsStore := database.NewUsageMetricsStore(db)
g, _ := errgroup.WithContext(ctx)
for range 100 {
g.Go(func() error {
return metricsStore.UpsertOptimistic(ctx, &types.UsageMetric{
RootSpaceID: 1,
Bandwidth: 100,
Storage: 100,
})
})
}
err := g.Wait()
require.NoError(t, err)
now := time.Now().UnixMilli()
metric, err := metricsStore.GetMetrics(ctx, 1, now, now)
require.NoError(t, err)
require.Equal(t, int64(100*100), metric.Bandwidth)
require.Equal(t, int64(100*100), metric.Storage)
}
func TestUsageMetricsStore_GetMetrics(t *testing.T) {
db, teardown := setupDB(t)
defer teardown()
principalStore, spaceStore, spacePathStore, _ := setupStores(t, db)
ctx := context.Background()
createUser(ctx, t, principalStore)
createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0)
metricsStore := database.NewUsageMetricsStore(db)
// First write will set bandwidth and storage to 100
err := metricsStore.Upsert(ctx, &types.UsageMetric{
RootSpaceID: 1,
Bandwidth: 100,
Storage: 100,
})
require.NoError(t, err)
now := time.Now().UnixMilli()
metric, err := metricsStore.GetMetrics(ctx, 1, now, now)
require.NoError(t, err)
require.Equal(t, int64(1), metric.RootSpaceID, "expected spaceID = %d, got %d", 1, metric.RootSpaceID)
require.Equal(t, int64(100), metric.Bandwidth, "expected bandwidth = %d, got %d", 100, metric.Bandwidth)
require.Equal(t, int64(100), metric.Storage, "expected storage = %d, got %d", 100, metric.Storage)
}
func TestUsageMetricsStore_List(t *testing.T) {
db, teardown := setupDB(t)
defer teardown()
principalStore, spaceStore, spacePathStore, _ := setupStores(t, db)
ctx := context.Background()
createUser(ctx, t, principalStore)
createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0)
createSpace(ctx, t, spaceStore, spacePathStore, userID, 2, 0)
metricsStore := database.NewUsageMetricsStore(db)
err := metricsStore.Upsert(ctx, &types.UsageMetric{
RootSpaceID: 1,
Bandwidth: 100,
Storage: 100,
})
require.NoError(t, err)
err = metricsStore.Upsert(ctx, &types.UsageMetric{
RootSpaceID: 1,
Bandwidth: 50,
Storage: 50,
})
require.NoError(t, err)
err = metricsStore.Upsert(ctx, &types.UsageMetric{
RootSpaceID: 2,
Bandwidth: 200,
Storage: 200,
})
require.NoError(t, err)
now := time.Now().UnixMilli()
metrics, err := metricsStore.List(ctx, now, now)
require.NoError(t, err)
require.Equal(t, 2, len(metrics))
// list use desc order so first row should be spaceID = 2
require.Equal(t, int64(2), metrics[0].RootSpaceID)
}

21
types/usage_metric.go Normal file
View File

@ -0,0 +1,21 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types
type UsageMetric struct {
RootSpaceID int64 `json:"root_space_id"`
Bandwidth int64 `json:"bandwidth"`
Storage int64 `json:"storage"`
}