diff --git a/.gitignore b/.gitignore index 57c4fa891..e2803bac1 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ node_modules /registry/logs/* /distribution-spec /registry/distribution-spec +/app/store/database/test.db diff --git a/app/store/database.go b/app/store/database.go index a6e35afed..73ffb0e3e 100644 --- a/app/store/database.go +++ b/app/store/database.go @@ -1273,4 +1273,14 @@ type ( Delete(ctx context.Context, id int64) error Update(ctx context.Context, infraProvisioned *types.InfraProvisioned) error } + + UsageMetricStore interface { + Upsert(ctx context.Context, in *types.UsageMetric) error + GetMetrics( + ctx context.Context, + rootSpaceID int64, + startDate int64, + endDate int64, + ) (*types.UsageMetric, error) + } ) diff --git a/app/store/database/migrate/postgres/0091_create_usage_table.down.sql b/app/store/database/migrate/postgres/0091_create_usage_table.down.sql new file mode 100644 index 000000000..2234f23a2 --- /dev/null +++ b/app/store/database/migrate/postgres/0091_create_usage_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS usage_metrics; \ No newline at end of file diff --git a/app/store/database/migrate/postgres/0091_create_usage_table.up.sql b/app/store/database/migrate/postgres/0091_create_usage_table.up.sql new file mode 100644 index 000000000..e24b99147 --- /dev/null +++ b/app/store/database/migrate/postgres/0091_create_usage_table.up.sql @@ -0,0 +1,17 @@ +CREATE TABLE usage_metrics +( + usage_metric_space_id BIGINT NOT NULL, + usage_metric_date BIGINT NOT NULL, + usage_metric_created BIGINT NOT NULL, + usage_metric_updated BIGINT, + usage_metric_bandwidth BIGINT NOT NULL, + usage_metric_storage BIGINT NOT NULL, + usage_metric_version BIGINT NOT NULL, + + PRIMARY KEY (usage_metric_space_id, usage_metric_date), + + CONSTRAINT fk_usagemetric_space_id FOREIGN KEY (usage_metric_space_id) + REFERENCES spaces (space_id) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE CASCADE +); \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0091_create_usage_table.down.sql b/app/store/database/migrate/sqlite/0091_create_usage_table.down.sql new file mode 100644 index 000000000..2234f23a2 --- /dev/null +++ b/app/store/database/migrate/sqlite/0091_create_usage_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS usage_metrics; \ No newline at end of file diff --git a/app/store/database/migrate/sqlite/0091_create_usage_table.up.sql b/app/store/database/migrate/sqlite/0091_create_usage_table.up.sql new file mode 100644 index 000000000..e24b99147 --- /dev/null +++ b/app/store/database/migrate/sqlite/0091_create_usage_table.up.sql @@ -0,0 +1,17 @@ +CREATE TABLE usage_metrics +( + usage_metric_space_id BIGINT NOT NULL, + usage_metric_date BIGINT NOT NULL, + usage_metric_created BIGINT NOT NULL, + usage_metric_updated BIGINT, + usage_metric_bandwidth BIGINT NOT NULL, + usage_metric_storage BIGINT NOT NULL, + usage_metric_version BIGINT NOT NULL, + + PRIMARY KEY (usage_metric_space_id, usage_metric_date), + + CONSTRAINT fk_usagemetric_space_id FOREIGN KEY (usage_metric_space_id) + REFERENCES spaces (space_id) MATCH SIMPLE + ON UPDATE NO ACTION + ON DELETE CASCADE +); \ No newline at end of file diff --git a/app/store/database/setup_test.go b/app/store/database/setup_test.go index f9b450649..3c1baca8b 100644 --- a/app/store/database/setup_test.go +++ b/app/store/database/setup_test.go @@ -17,6 +17,7 @@ package database_test import ( "context" "fmt" + "os" "strconv" "testing" @@ -49,11 +50,15 @@ func New(dsn string) (*sqlx.DB, error) { func setupDB(t *testing.T) (*sqlx.DB, func()) { t.Helper() - db, err := New(":memory:") + // must use file as db because in memory have only basic features + // file is anyway removed on every test. SQLite is fast + // so it will not affect too much performance. + _ = os.Remove("test.db") + db, err := New("test.db") if err != nil { t.Fatalf("Error opening db, err: %v", err) } - + _, _ = db.Exec("PRAGMA busy_timeout = 5000;") if err = migrate.Migrate(context.Background(), db); err != nil { t.Fatalf("Error migrating db, err: %v", err) } diff --git a/app/store/database/usage_metrics.go b/app/store/database/usage_metrics.go new file mode 100644 index 000000000..3d1333b4a --- /dev/null +++ b/app/store/database/usage_metrics.go @@ -0,0 +1,235 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package database + +import ( + "context" + "errors" + "time" + + "github.com/harness/gitness/app/store" + gitness_store "github.com/harness/gitness/store" + "github.com/harness/gitness/store/database" + "github.com/harness/gitness/store/database/dbtx" + "github.com/harness/gitness/types" + + "github.com/jmoiron/sqlx" +) + +var _ store.UsageMetricStore = (*UsageMetricsStore)(nil) + +// NewUsageMetricsStore returns a new UsageMetricsStore. +func NewUsageMetricsStore(db *sqlx.DB) *UsageMetricsStore { + return &UsageMetricsStore{ + db: db, + } +} + +// UsageMetricsStore implements store.UsageMetrics backed by a relational database. +type UsageMetricsStore struct { + db *sqlx.DB +} + +func (s *UsageMetricsStore) getVersion( + ctx context.Context, + rootSpaceID int64, + date int64, +) int64 { + const sqlQuery = ` + SELECT + usage_metric_version + FROM usage_metrics + WHERE usage_metric_space_id = ? AND usage_metric_date = ? + ` + var version int64 + err := s.db.QueryRowContext(ctx, sqlQuery, rootSpaceID, date).Scan(&version) + if err != nil { + return 0 + } + return version +} + +func (s *UsageMetricsStore) Upsert(ctx context.Context, in *types.UsageMetric) error { + const sqlQuery = ` + INSERT INTO usage_metrics ( + usage_metric_space_id + ,usage_metric_date + ,usage_metric_created + ,usage_metric_updated + ,usage_metric_bandwidth + ,usage_metric_storage + ,usage_metric_version + ) VALUES ( + :usage_metric_space_id + ,:usage_metric_date + ,:usage_metric_created + ,:usage_metric_updated + ,:usage_metric_bandwidth + ,:usage_metric_storage + ,:usage_metric_version + ) + ON CONFLICT (usage_metric_space_id, usage_metric_date) + DO UPDATE + SET + usage_metric_version = EXCLUDED.usage_metric_version + ,usage_metric_updated = EXCLUDED.usage_metric_updated + ,usage_metric_bandwidth = usage_metric_bandwidth + EXCLUDED.usage_metric_bandwidth + ,usage_metric_storage = usage_metric_storage + EXCLUDED.usage_metric_storage + WHERE usage_metric_version = EXCLUDED.usage_metric_version - 1` + + db := dbtx.GetAccessor(ctx, s.db) + today := s.Date(time.Now()) + query, args, err := db.BindNamed(sqlQuery, usageMetric{ + RootSpaceID: in.RootSpaceID, + Date: today, + Created: time.Now().UnixMilli(), + Updated: time.Now().UnixMilli(), + Bandwidth: in.Bandwidth, + Storage: in.Storage, + Version: s.getVersion(ctx, in.RootSpaceID, today) + 1, + }) + if err != nil { + return database.ProcessSQLErrorf(ctx, err, "failed to bind query") + } + + result, err := db.ExecContext(ctx, query, args...) + if err != nil { + return database.ProcessSQLErrorf(ctx, err, "failed to upsert usage_metric") + } + n, err := result.RowsAffected() + if err != nil { + return database.ProcessSQLErrorf(ctx, err, "failed to fetch number of rows affected") + } + if n == 0 { + return gitness_store.ErrVersionConflict + } + return nil +} + +// UpsertOptimistic upsert the usage metric details using the optimistic locking mechanism. +func (s *UsageMetricsStore) UpsertOptimistic( + ctx context.Context, + in *types.UsageMetric, +) error { + for { + err := s.Upsert(ctx, in) + if err == nil { + return nil + } + if !errors.Is(err, gitness_store.ErrVersionConflict) { + return err + } + } +} + +func (s *UsageMetricsStore) GetMetrics( + ctx context.Context, + rootSpaceID int64, + start int64, + end int64, +) (*types.UsageMetric, error) { + const sqlQuery = ` + SELECT + COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth, + COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage + FROM usage_metrics + WHERE + usage_metric_space_id = ? AND + usage_metric_date BETWEEN ? AND ?` + + result := &types.UsageMetric{ + RootSpaceID: rootSpaceID, + } + + startTime := time.UnixMilli(start) + endTime := time.UnixMilli(end) + + err := s.db.QueryRowContext( + ctx, + sqlQuery, + rootSpaceID, + s.Date(startTime), + s.Date(endTime), + ).Scan( + &result.Bandwidth, + &result.Storage, + ) + if err != nil { + return nil, database.ProcessSQLErrorf(ctx, err, "failed to get metric") + } + + return result, nil +} + +func (s *UsageMetricsStore) List( + ctx context.Context, + start int64, + end int64, +) ([]types.UsageMetric, error) { + const sqlQuery = ` + SELECT + usage_metric_space_id, + COALESCE(SUM(usage_metric_bandwidth), 0) AS usage_metric_bandwidth, + COALESCE(SUM(usage_metric_storage), 0) AS usage_metric_storage + FROM usage_metrics + WHERE + usage_metric_date BETWEEN ? AND ? + GROUP BY usage_metric_space_id + ORDER BY usage_metric_bandwidth DESC, usage_metric_storage DESC` + + startTime := time.UnixMilli(start) + endTime := time.UnixMilli(end) + + db := dbtx.GetAccessor(ctx, s.db) + rows, err := db.QueryContext(ctx, sqlQuery, s.Date(startTime), s.Date(endTime)) + if err != nil { + return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage_metrics") + } + defer rows.Close() + + results := make([]types.UsageMetric, 0, 16) + for rows.Next() { + metric := types.UsageMetric{} + err = rows.Scan( + &metric.RootSpaceID, + &metric.Bandwidth, + &metric.Storage, + ) + if err != nil { + return nil, database.ProcessSQLErrorf(ctx, err, "failed to scan usage_metrics") + } + results = append(results, metric) + } + + if err = rows.Err(); err != nil { + return nil, database.ProcessSQLErrorf(ctx, err, "failed to list usage_metrics") + } + return results, nil +} + +func (s *UsageMetricsStore) Date(t time.Time) int64 { + year, month, day := t.Date() + return time.Date(year, month, day, 0, 0, 0, 0, time.UTC).UnixMilli() +} + +type usageMetric struct { + RootSpaceID int64 `db:"usage_metric_space_id"` + Date int64 `db:"usage_metric_date"` + Created int64 `db:"usage_metric_created"` + Updated int64 `db:"usage_metric_updated"` + Bandwidth int64 `db:"usage_metric_bandwidth"` + Storage int64 `db:"usage_metric_storage"` + Version int64 `db:"usage_metric_version"` +} diff --git a/app/store/database/usage_metrics_test.go b/app/store/database/usage_metrics_test.go new file mode 100644 index 000000000..2c428a20b --- /dev/null +++ b/app/store/database/usage_metrics_test.go @@ -0,0 +1,191 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package database_test + +import ( + "context" + "testing" + "time" + + "github.com/harness/gitness/app/store/database" + "github.com/harness/gitness/types" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestUsageMetricsStore_Upsert(t *testing.T) { + db, teardown := setupDB(t) + defer teardown() + + principalStore, spaceStore, spacePathStore, _ := setupStores(t, db) + + ctx := context.Background() + + createUser(ctx, t, principalStore) + createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0) + + metricsStore := database.NewUsageMetricsStore(db) + // First write will set bandwidth and storage to 100 + err := metricsStore.Upsert(ctx, &types.UsageMetric{ + RootSpaceID: 1, + Bandwidth: 100, + Storage: 100, + }) + require.NoError(t, err) + + // second write will increase bandwidth for 100 and storage remains the same + err = metricsStore.Upsert(ctx, &types.UsageMetric{ + RootSpaceID: 1, + Bandwidth: 100, + Storage: 0, + }) + require.NoError(t, err) + + row := db.QueryRowContext( + ctx, + `SELECT + usage_metric_space_id, + usage_metric_date, + usage_metric_bandwidth, + usage_metric_storage + FROM usage_metrics + WHERE usage_metric_space_id = ? + LIMIT 1`, + 1, + ) + metric := types.UsageMetric{} + var date int64 + err = row.Scan( + &metric.RootSpaceID, + &date, + &metric.Bandwidth, + &metric.Storage, + ) + require.NoError(t, err) + require.Equal(t, int64(1), metric.RootSpaceID) + require.Equal(t, metricsStore.Date(time.Now()), date) + require.Equal(t, int64(200), metric.Bandwidth) + require.Equal(t, int64(100), metric.Storage) +} + +func TestUsageMetricsStore_UpsertOptimistic(t *testing.T) { + db, teardown := setupDB(t) + defer teardown() + + principalStore, spaceStore, spacePathStore, _ := setupStores(t, db) + + ctx := context.Background() + + createUser(ctx, t, principalStore) + createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0) + + metricsStore := database.NewUsageMetricsStore(db) + + g, _ := errgroup.WithContext(ctx) + + for range 100 { + g.Go(func() error { + return metricsStore.UpsertOptimistic(ctx, &types.UsageMetric{ + RootSpaceID: 1, + Bandwidth: 100, + Storage: 100, + }) + }) + } + + err := g.Wait() + require.NoError(t, err) + + now := time.Now().UnixMilli() + metric, err := metricsStore.GetMetrics(ctx, 1, now, now) + require.NoError(t, err) + + require.Equal(t, int64(100*100), metric.Bandwidth) + require.Equal(t, int64(100*100), metric.Storage) +} + +func TestUsageMetricsStore_GetMetrics(t *testing.T) { + db, teardown := setupDB(t) + defer teardown() + + principalStore, spaceStore, spacePathStore, _ := setupStores(t, db) + + ctx := context.Background() + + createUser(ctx, t, principalStore) + createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0) + + metricsStore := database.NewUsageMetricsStore(db) + // First write will set bandwidth and storage to 100 + err := metricsStore.Upsert(ctx, &types.UsageMetric{ + RootSpaceID: 1, + Bandwidth: 100, + Storage: 100, + }) + require.NoError(t, err) + + now := time.Now().UnixMilli() + + metric, err := metricsStore.GetMetrics(ctx, 1, now, now) + require.NoError(t, err) + + require.Equal(t, int64(1), metric.RootSpaceID, "expected spaceID = %d, got %d", 1, metric.RootSpaceID) + require.Equal(t, int64(100), metric.Bandwidth, "expected bandwidth = %d, got %d", 100, metric.Bandwidth) + require.Equal(t, int64(100), metric.Storage, "expected storage = %d, got %d", 100, metric.Storage) +} + +func TestUsageMetricsStore_List(t *testing.T) { + db, teardown := setupDB(t) + defer teardown() + + principalStore, spaceStore, spacePathStore, _ := setupStores(t, db) + + ctx := context.Background() + + createUser(ctx, t, principalStore) + createSpace(ctx, t, spaceStore, spacePathStore, userID, 1, 0) + createSpace(ctx, t, spaceStore, spacePathStore, userID, 2, 0) + + metricsStore := database.NewUsageMetricsStore(db) + err := metricsStore.Upsert(ctx, &types.UsageMetric{ + RootSpaceID: 1, + Bandwidth: 100, + Storage: 100, + }) + require.NoError(t, err) + + err = metricsStore.Upsert(ctx, &types.UsageMetric{ + RootSpaceID: 1, + Bandwidth: 50, + Storage: 50, + }) + require.NoError(t, err) + + err = metricsStore.Upsert(ctx, &types.UsageMetric{ + RootSpaceID: 2, + Bandwidth: 200, + Storage: 200, + }) + require.NoError(t, err) + + now := time.Now().UnixMilli() + metrics, err := metricsStore.List(ctx, now, now) + require.NoError(t, err) + require.Equal(t, 2, len(metrics)) + + // list use desc order so first row should be spaceID = 2 + require.Equal(t, int64(2), metrics[0].RootSpaceID) +} diff --git a/types/usage_metric.go b/types/usage_metric.go new file mode 100644 index 000000000..530cc5319 --- /dev/null +++ b/types/usage_metric.go @@ -0,0 +1,21 @@ +// Copyright 2023 Harness, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +type UsageMetric struct { + RootSpaceID int64 `json:"root_space_id"` + Bandwidth int64 `json:"bandwidth"` + Storage int64 `json:"storage"` +}