Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a 'set replica identity' operation #201

Merged
merged 4 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [Drop table](#drop-table)
* [Raw SQL](#raw-sql)
* [Rename table](#rename-table)
* [Set replica identity](#set-replica-identity)

## Concepts

Expand Down Expand Up @@ -674,6 +675,7 @@ See the [examples](../examples) directory for examples of each kind of operation
* [Drop table](#drop-table)
* [Raw SQL](#raw-sql)
* [Rename table](#rename-table)
* [Set replica identity](#set-replica-identity)

### Add column

Expand Down Expand Up @@ -1048,3 +1050,27 @@ A rename table operation renames a table.
Example **rename table** migrations:

* [04_rename_table.json](../examples/04_rename_table.json)

### Set replica identity

A set replica identity operation sets the replica identity for a table.

**set replica identity** operations have this structure:

```json
{
"set_replica_identity": {
"table": "name of the table",
"identity": {
"type": "full | default | nothing | index"
"index": "name of the index, if type is 'index'"
}
}
}
```

:warning: A **set replica identity** operation is applied directly to the underlying table on migration start. This means that both versions of the table exposed in the old and new version schemas will have the new replica identity set. :warning:

Example **set replica identity** migrations:

* [29_set_replica_identity.json](../examples/29_set_replica_identity.json)
14 changes: 14 additions & 0 deletions examples/29_set_replica_identity.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "29_set_replica_identity",
"operations": [
{
"set_replica_identity": {
"table": "fruits",
"identity": {
"type": "index",
"index": "_pgroll_new_fruits_pkey"
}
}
}
]
}
9 changes: 9 additions & 0 deletions pkg/migrations/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,12 @@ type InvalidPrimaryKeyError struct {
func (e InvalidPrimaryKeyError) Error() string {
return fmt.Sprintf("primary key on table %q must be defined on exactly one column, found %d", e.Table, e.Fields)
}

type InvalidReplicaIdentityError struct {
Table string
Identity string
}

func (e InvalidReplicaIdentityError) Error() string {
return fmt.Sprintf("replica identity on table %q must be one of 'NOTHING', 'DEFAULT', 'INDEX' or 'FULL', found %q", e.Table, e.Identity)
}
27 changes: 17 additions & 10 deletions pkg/migrations/op_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ import (
type OpName string

const (
OpNameCreateTable OpName = "create_table"
OpNameRenameTable OpName = "rename_table"
OpNameDropTable OpName = "drop_table"
OpNameAddColumn OpName = "add_column"
OpNameDropColumn OpName = "drop_column"
OpNameAlterColumn OpName = "alter_column"
OpNameCreateIndex OpName = "create_index"
OpNameDropIndex OpName = "drop_index"
OpNameDropConstraint OpName = "drop_constraint"
OpRawSQLName OpName = "sql"
OpNameCreateTable OpName = "create_table"
OpNameRenameTable OpName = "rename_table"
OpNameDropTable OpName = "drop_table"
OpNameAddColumn OpName = "add_column"
OpNameDropColumn OpName = "drop_column"
OpNameAlterColumn OpName = "alter_column"
OpNameCreateIndex OpName = "create_index"
OpNameDropIndex OpName = "drop_index"
OpNameDropConstraint OpName = "drop_constraint"
OpNameSetReplicaIdentity OpName = "set_replica_identity"
OpRawSQLName OpName = "sql"

// Internal operation types used by `alter_column`
OpNameRenameColumn OpName = "rename_column"
Expand Down Expand Up @@ -95,6 +96,9 @@ func (v *Operations) UnmarshalJSON(data []byte) error {
case OpNameDropConstraint:
item = &OpDropConstraint{}

case OpNameSetReplicaIdentity:
item = &OpSetReplicaIdentity{}

case OpNameAlterColumn:
item = &OpAlterColumn{}

Expand Down Expand Up @@ -172,6 +176,9 @@ func OperationName(op Operation) OpName {
case *OpDropConstraint:
return OpNameDropConstraint

case *OpSetReplicaIdentity:
return OpNameSetReplicaIdentity

case *OpAlterColumn:
return OpNameAlterColumn

Expand Down
21 changes: 21 additions & 0 deletions pkg/migrations/op_common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,27 @@ func IndexMustNotExist(t *testing.T, db *sql.DB, schema, table, index string) {
}
}

func ReplicaIdentityMustBe(t *testing.T, db *sql.DB, schema, table, replicaIdentity string) {
t.Helper()

var actualReplicaIdentity string
err := db.QueryRow(`
SELECT c.relreplident
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r' -- regular table
AND n.nspname = $1
AND c.relname = $2;
`, schema, table).Scan(&actualReplicaIdentity)
if err != nil {
t.Fatal(err)
}

if replicaIdentity != actualReplicaIdentity {
t.Fatalf("Expected replica identity to be %q, got %q", replicaIdentity, actualReplicaIdentity)
}
}

func indexExists(t *testing.T, db *sql.DB, schema, table, index string) bool {
t.Helper()

Expand Down
70 changes: 70 additions & 0 deletions pkg/migrations/op_set_replica_identity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// SPDX-License-Identifier: Apache-2.0

package migrations

import (
"context"
"database/sql"
"fmt"
"slices"
"strings"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/schema"
)

type OpSetReplicaIdentity struct {
Table string `json:"table"`
Identity ReplicaIdentity `json:"identity"`
}

type ReplicaIdentity struct {
Type string `json:"type"`
Index string `json:"index"`
}

var _ Operation = (*OpSetReplicaIdentity)(nil)

func (o *OpSetReplicaIdentity) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
// build the correct form of the `SET REPLICA IDENTITY` statement based on the`identity type
identitySQL := strings.ToUpper(o.Identity.Type)
if identitySQL == "INDEX" {
identitySQL = fmt.Sprintf("USING INDEX %s", pq.QuoteIdentifier(o.Identity.Index))
}

// set the replica identity on the underlying table
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY %s", o.Table, identitySQL))
return err
}

func (o *OpSetReplicaIdentity) Complete(ctx context.Context, conn *sql.DB) error {
// No-op
return nil
}

func (o *OpSetReplicaIdentity) Rollback(ctx context.Context, conn *sql.DB) error {
// No-op
return nil
}

func (o *OpSetReplicaIdentity) Validate(ctx context.Context, s *schema.Schema) error {
identityType := strings.ToUpper(o.Identity.Type)

table := s.GetTable(o.Table)
if table == nil {
return TableDoesNotExistError{Name: o.Table}
}

identities := []string{"NOTHING", "DEFAULT", "INDEX", "FULL"}
if !slices.Contains(identities, identityType) {
return InvalidReplicaIdentityError{Table: o.Table, Identity: o.Identity.Type}
}

if identityType == "INDEX" {
if _, ok := table.Indexes[o.Identity.Index]; !ok {
return IndexDoesNotExistError{Name: o.Identity.Index}
}
}

return nil
}
Loading
Loading