Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update schema and add migrations for mysql and pgsql #273

Merged
merged 2 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,23 @@ steps:
event:
- pull_request

- name: test-schema-migration
image: rancher/dapper:v0.6.0
depends_on:
- build
- test-image
commands:
- >
docker run -i -e ARCH -e REPO -e TAG -e DRONE_TAG -e IMAGE_NAME
-v /var/run/docker.sock:/var/run/docker.sock -v kine-cache:/go/src/github.com/k3s-io/kine/.cache
--privileged kine:test-${DRONE_COMMIT} "./scripts/test schema-migration"
volumes:
- name: docker
path: /var/run/docker.sock
when:
event:
- pull_request

- name: test-nats
image: rancher/dapper:v0.6.0
depends_on:
Expand Down
40 changes: 29 additions & 11 deletions pkg/drivers/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
cryptotls "crypto/tls"
"database/sql"
"fmt"
"os"
"strconv"

"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -27,12 +29,12 @@ var (
schema = []string{
`CREATE TABLE IF NOT EXISTS kine
(
id INTEGER AUTO_INCREMENT,
id BIGINT UNSIGNED AUTO_INCREMENT,
name VARCHAR(630) CHARACTER SET ascii,
created INTEGER,
deleted INTEGER,
create_revision INTEGER,
prev_revision INTEGER,
create_revision BIGINT UNSIGNED,
prev_revision BIGINT UNSIGNED,
lease INTEGER,
value MEDIUMBLOB,
old_value MEDIUMBLOB,
Expand All @@ -44,6 +46,9 @@ var (
`CREATE INDEX kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
schemaMigrations = []string{
`ALTER TABLE kine MODIFY COLUMN id BIGINT UNSIGNED AUTO_INCREMENT NOT NULL UNIQUE, MODIFY COLUMN create_revision BIGINT UNSIGNED, MODIFY COLUMN prev_revision BIGINT UNSIGNED`,
}
createDB = "CREATE DATABASE IF NOT EXISTS "
)

Expand Down Expand Up @@ -117,25 +122,40 @@ func New(ctx context.Context, dataSourceName string, tlsInfo tls.Config, connPoo
}

func setup(db *sql.DB) error {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
var exists bool
err := db.QueryRow("SELECT 1 FROM information_schema.TABLES WHERE table_schema = DATABASE() AND table_name = ?", "kine").Scan(&exists)
if err != nil && err != sql.ErrNoRows {
logrus.Warnf("failed to check existence of database table %s, going to attempt create: %v", "kine", err)
logrus.Warnf("Failed to check existence of database table %s, going to attempt create: %v", "kine", err)
}

if !exists {
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
if _, err := db.Exec(stmt); err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
return err
}
}
}
}

// Run enabled schama migrations.
// Note that the schema created by the `schema` var is always the latest revision;
// migrations should handle deltas between prior schema versions.
schemaVersion, _ := strconv.ParseUint(os.Getenv("KINE_SCHEMA_MIGRATION"), 10, 64)
for i, stmt := range schemaMigrations {
if i >= int(schemaVersion) {
break
}
logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt))
if _, err := db.Exec(stmt); err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1061 {
return err
}
}
}

logrus.Infof("Database tables and indexes are up to date")
return nil
}
Expand All @@ -159,8 +179,7 @@ func createDBIfNotExist(dataSourceName string) error {
}

if !exists {
_, err = db.Exec(createDB + dbName)
if err != nil {
if _, err = db.Exec(createDB + dbName); err != nil {
if mysqlError, ok := err.(*mysql.MySQLError); !ok || mysqlError.Number != 1049 {
return err
}
Expand All @@ -169,8 +188,7 @@ func createDBIfNotExist(dataSourceName string) error {
if err != nil {
return err
}
_, err = db.Exec(createDB + dbName)
if err != nil {
if _, err = db.Exec(createDB + dbName); err != nil {
return err
}
}
Expand Down
30 changes: 23 additions & 7 deletions pkg/drivers/pgsql/pgsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"net/url"
"os"
"regexp"
"strconv"
"strings"
Expand All @@ -30,12 +31,12 @@ var (
schema = []string{
`CREATE TABLE IF NOT EXISTS kine
(
id SERIAL PRIMARY KEY,
id BIGSERIAL PRIMARY KEY,
name VARCHAR(630),
created INTEGER,
deleted INTEGER,
create_revision INTEGER,
prev_revision INTEGER,
create_revision BIGINT,
prev_revision BIGINT,
lease INTEGER,
value bytea,
old_value bytea
Expand All @@ -46,6 +47,9 @@ var (
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
}
schemaMigrations = []string{
`ALTER TABLE kine ALTER COLUMN id SET DATA TYPE BIGINT, ALTER COLUMN create_revision SET DATA TYPE BIGINT, ALTER COLUMN prev_revision SET DATA TYPE BIGINT; ALTER SEQUENCE kine_id_seq AS BIGINT`,
}
createDB = "CREATE DATABASE "
)

Expand Down Expand Up @@ -117,8 +121,21 @@ func setup(db *sql.DB) error {

for _, stmt := range schema {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err := db.Exec(stmt)
if err != nil {
if _, err := db.Exec(stmt); err != nil {
return err
}
}

// Run enabled schama migrations.
// Note that the schema created by the `schema` var is always the latest revision;
// migrations should handle deltas between prior schema versions.
schemaVersion, _ := strconv.ParseUint(os.Getenv("KINE_SCHEMA_MIGRATION"), 10, 64)
for i, stmt := range schemaMigrations {
if i >= int(schemaVersion) {
break
}
logrus.Tracef("SETUP EXEC MIGRATION %d: %v", i, util.Stripped(stmt))
if _, err := db.Exec(stmt); err != nil {
return err
}
}
Expand Down Expand Up @@ -152,8 +169,7 @@ func createDBIfNotExist(dataSourceName string) error {

if !exists {
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
_, err = db.Exec(stmt)
if err != nil {
if _, err = db.Exec(stmt); err != nil {
logrus.Warnf("failed to create database %s: %v", dbName, err)
} else {
logrus.Tracef("created database: %s", dbName)
Expand Down
3 changes: 3 additions & 0 deletions scripts/test
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ echo "Did test-run-postgres $?"
. ./scripts/test-run-cockroachdb
echo "Did test-run-cockroachdb $?"

. ./scripts/test-run-schema-migration
echo "Did test-run-schema-migration $?"

. ./scripts/test-run-nats
echo "did test-nats $?"

Expand Down
1 change: 1 addition & 0 deletions scripts/test-helpers
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ provision-kine() {

docker container run \
-d --name $name \
$KINE_ENV \
$KINE_IMAGE --endpoint $KINE_ENDPOINT

local ip=$(docker container inspect --format '{{.NetworkSettings.IPAddress}}' $name | tee $TEST_DIR/kine/$count/metadata/ip)
Expand Down
62 changes: 62 additions & 0 deletions scripts/test-run-schema-migration
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/bash

start-test() {
local ip=$(cat $TEST_DIR/databases/*/metadata/ip)
local port=$(cat $TEST_DIR/databases/*/metadata/port)
local pass=$(cat $TEST_DIR/databases/*/metadata/password)
local image=$(cat $TEST_DIR/databases/*/metadata/image)
DB_CONNECTION_TEST="
docker run --rm
--name connection-test
$image
mysql
--host=$ip
--port=$port
--user=root
--password=$pass
--execute=status" \
timeout --foreground 1m bash -c "wait-for-db-connection"
KINE_IMAGE=docker.io/rancher/kine:v0.11.4 KINE_ENDPOINT="mysql://root:$pass@tcp($ip:$port)/kine" provision-kine
sleep 10
for container in $(cat $TEST_DIR/kine/*/metadata/name); do
docker container rm -f -v $container
done
rm -rf $TEST_DIR/kine/*
KINE_IMAGE=$IMAGE KINE_ENDPOINT="mysql://root:$pass@tcp($ip:$port)/kine" KINE_ENV="-e KINE_SCHEMA_MIGRATION=1" provision-kine
local kine_url=$(cat $TEST_DIR/kine/*/metadata/url)
K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster
}
export -f start-test

LABEL=mariadb-10.11-migration DB_PASSWORD_ENV=MYSQL_ROOT_PASSWORD DB_IMAGE=docker.io/library/mariadb:10.11 run-test

start-test() {
local ip=$(cat $TEST_DIR/databases/*/metadata/ip)
local port=$(cat $TEST_DIR/databases/*/metadata/port)
local pass=$(cat $TEST_DIR/databases/*/metadata/password)
local image=$(cat $TEST_DIR/databases/*/metadata/image)
DB_CONNECTION_TEST="
docker run --rm
--name connection-test
-e PGPASSWORD=$pass
$image
psql
--host=$ip
--port=$port
--username=postgres
--command=\\conninfo" \
timeout --foreground 1m bash -c "wait-for-db-connection"
KINE_IMAGE=docker.io/rancher/kine:v0.11.4 KINE_ENDPOINT="postgres://postgres:$pass@$ip:$port/postgres?sslmode=disable" provision-kine
sleep 10
for container in $(cat $TEST_DIR/kine/*/metadata/name); do
docker container rm -f -v $container
done
rm -rf $TEST_DIR/kine/*
KINE_IMAGE=$IMAGE KINE_ENDPOINT="postgres://postgres:$pass@$ip:$port/postgres?sslmode=disable" KINE_ENV="-e KINE_SCHEMA_MIGRATION=1" provision-kine
local kine_url=$(cat $TEST_DIR/kine/*/metadata/url)
K3S_DATASTORE_ENDPOINT=$kine_url provision-cluster
}
export -f start-test

LABEL=postgres-15.4-migration DB_PASSWORD_ENV=POSTGRES_PASSWORD DB_IMAGE=docker.io/library/postgres:15.4 run-test