goose/tests/clickhouse/clickhouse_test.go

210 lines
6.0 KiB
Go

package clickhouse_test
import (
"database/sql"
"errors"
"path/filepath"
"testing"
"time"
"github.com/avast/retry-go/v4"
"github.com/pressly/goose/v3"
"github.com/pressly/goose/v3/internal/check"
"github.com/pressly/goose/v3/internal/testdb"
)
func TestClickUpDownAll(t *testing.T) {
t.Parallel()
migrationDir := filepath.Join("testdata", "migrations")
db, cleanup, err := testdb.NewClickHouse()
check.NoError(t, err)
t.Cleanup(cleanup)
goose.SetDialect("clickhouse")
retryCheckTableMutation := func(table string) func() error {
return func() error {
ok := checkTableMutation(t, db, table)
if !ok {
return errors.New("mutation not done for table: " + table)
}
return nil
}
}
/*
This test applies all up migrations, asserts we have all the entries in
the versions table, applies all down migration and asserts we have zero
migrations applied.
ClickHouse performs UPDATES and DELETES asynchronously,
but we can best-effort check mutations and their progress.
This is especially important for down migrations where rows are deleted
from the versions table.
For the sake of testing, there might be a way to modifying the server
(or queries) to perform all operations synchronously?
Ref: https://clickhouse.com/docs/en/operations/system-tables/mutations/
Ref: https://clickhouse.com/docs/en/sql-reference/statements/alter/#mutations
Ref: https://clickhouse.com/blog/how-to-update-data-in-click-house/
*/
// Collect migrations so we don't have to hard-code the currentVersion
// in an assertion later in the test.
migrations, err := goose.CollectMigrations(migrationDir, 0, goose.MaxVersion)
check.NoError(t, err)
currentVersion, err := goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, 0)
err = goose.Up(db, migrationDir)
check.NoError(t, err)
currentVersion, err = goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, len(migrations))
err = goose.DownTo(db, migrationDir, 0)
check.NoError(t, err)
err = retry.Do(
retryCheckTableMutation(goose.TableName()),
retry.Delay(1*time.Second),
)
check.NoError(t, err)
currentVersion, err = goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, 0)
}
func TestClickHouseFirstThree(t *testing.T) {
t.Parallel()
migrationDir := filepath.Join("testdata", "migrations")
db, cleanup, err := testdb.NewClickHouse()
check.NoError(t, err)
t.Cleanup(cleanup)
goose.SetDialect("clickhouse")
err = goose.Up(db, migrationDir)
check.NoError(t, err)
currentVersion, err := goose.GetDBVersion(db)
check.NoError(t, err)
check.Number(t, currentVersion, 3)
type result struct {
customerID string `db:"customer_id"`
timestamp time.Time `db:"time_stamp"`
clickEventType string `db:"click_event_type"`
countryCode string `db:"country_code"`
sourceID int64 `db:"source_id"`
}
rows, err := db.Query(`SELECT * FROM clickstream ORDER BY customer_id`)
check.NoError(t, err)
var results []result
for rows.Next() {
var r result
err = rows.Scan(&r.customerID, &r.timestamp, &r.clickEventType, &r.countryCode, &r.sourceID)
check.NoError(t, err)
results = append(results, r)
}
check.Number(t, len(results), 3)
check.NoError(t, rows.Close())
check.NoError(t, rows.Err())
parseTime := func(t *testing.T, s string) time.Time {
t.Helper()
tm, err := time.Parse("2006-01-02", s)
check.NoError(t, err)
return tm
}
want := []result{
{"customer1", parseTime(t, "2021-10-02"), "add_to_cart", "US", 568239},
{"customer2", parseTime(t, "2021-10-30"), "remove_from_cart", "", 0},
{"customer3", parseTime(t, "2021-11-07"), "checkout", "", 307493},
}
for i, result := range results {
check.Equal(t, result.customerID, want[i].customerID)
check.Equal(t, result.timestamp, want[i].timestamp)
check.Equal(t, result.clickEventType, want[i].clickEventType)
if result.countryCode != "" && want[i].countryCode != "" {
check.Equal(t, result.countryCode, want[i].countryCode)
}
check.Number(t, result.sourceID, want[i].sourceID)
}
}
func TestRemoteImportMigration(t *testing.T) {
t.Parallel()
// TODO(mf): use TestMain and create a proper "long" or "remote" flag.
if !testing.Short() {
t.Skip("skipping test")
}
// This test is using a remote dataset from an s3 bucket:
// https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv
// From this tutorial: https://clickhouse.com/docs/en/tutorial/
// Note, these files are backed up in this repository in:
// tests/clickhouse/testdata/backup-files/taxi_zone_lookup.csv
// We may want to host this ourselves. Or just don't bother with SOURCE(HTTP(URL..
// and craft a long INSERT statement.
migrationDir := filepath.Join("testdata", "migrations-remote")
db, cleanup, err := testdb.NewClickHouse(testdb.WithBindPort(9000))
check.NoError(t, err)
t.Cleanup(cleanup)
goose.SetDialect("clickhouse")
err = goose.Up(db, migrationDir)
check.NoError(t, err)
_, err = goose.GetDBVersion(db)
check.NoError(t, err)
var count int
err = db.QueryRow(`SELECT count(*) FROM taxi_zone_dictionary`).Scan(&count)
check.NoError(t, err)
check.Number(t, count, 265)
}
func checkTableMutation(t *testing.T, db *sql.DB, tableName string) bool {
t.Helper()
rows, err := db.Query(
`select mutation_id, command, is_done, create_time from system.mutations where table=$1`,
tableName,
)
check.NoError(t, err)
type result struct {
mutationID string `db:"mutation_id"`
command string `db:"command"`
isDone int64 `db:"is_done"`
createTime time.Time `db:"create_time"`
}
var results []result
for rows.Next() {
var r result
err = rows.Scan(&r.mutationID, &r.command, &r.isDone, &r.createTime)
check.NoError(t, err)
results = append(results, r)
}
check.NoError(t, rows.Close())
check.NoError(t, rows.Err())
// No results means there are no mutations. Assume they are all done.
if len(results) == 0 {
return true
}
// Loop through all the mutations, if at least one of them is
// not done, return false.
for _, r := range results {
if r.isDone != 1 {
return false
}
}
return true
}