Skip to content

Commit

Permalink
fix: update proto to new definition
Browse files Browse the repository at this point in the history
  • Loading branch information
hydra-yse committed Dec 17, 2024
1 parent 1b1ab8e commit 702add0
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 62 deletions.
30 changes: 15 additions & 15 deletions proto/sync.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions proto/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ service Syncer {

message Record {
string id = 1;
int64 revision = 2;
float schema_version = 3;
uint64 revision = 2;
string schema_version = 3;
bytes data = 4;
}

Expand All @@ -27,11 +27,11 @@ enum SetRecordStatus {
}
message SetRecordReply {
SetRecordStatus status = 1;
int64 new_revision = 2;
uint64 new_revision = 2;
}

message ListChangesRequest {
int64 since_revision = 1;
uint64 since_revision = 1;
uint32 request_time = 2;
string signature = 3;
}
Expand Down
4 changes: 2 additions & 2 deletions store/postgres/migrations/1_initial_schema.up.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data bytea NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, id));
CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision));
CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data bytea NOT NULL, revision INTEGER NOT NULL, schema_version TEXT NOT NULL, PRIMARY KEY (user_id, id));
CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision));
43 changes: 33 additions & 10 deletions store/postgres/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewPGSyncStorage(databaseURL string) (*PgSyncStorage, error) {
return &PgSyncStorage{db: pgxPool}, nil
}

func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision int64) (int64, error) {
func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision uint64, schemaVersion string) (uint64, error) {
tx, err := s.db.BeginTx(ctx, pgx.TxOptions{
IsoLevel: pgx.Serializable,
})
Expand All @@ -67,7 +67,7 @@ func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data [
defer tx.Rollback(context.Background())

// check that the existing revision is the same as the one we expect
var revision int64
var revision uint64
err = tx.QueryRow(ctx, "SELECT revision FROM records WHERE user_id = $1 AND id = $2", userID, id).Scan(&revision)
if err != pgx.ErrNoRows {
if err != nil {
Expand All @@ -79,25 +79,48 @@ func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data [
}

// get the store's last revision
var newRevision int64
err = tx.QueryRow(ctx, "INSERT INTO user_revisions (user_id, revision) VALUES ($1, 1) ON CONFLICT(user_id, revision) DO UPDATE SET revision=user_revisions.revision + 1 RETURNING revision", userID).Scan(&newRevision)
if err != nil {
return 0, fmt.Errorf("failed to set store's latest revision: %w", err)
var newRevision uint64 = 0
err = tx.QueryRow(ctx, "SELECT revision FROM user_revisions WHERE user_id = $1", userID).Scan(&newRevision)
if err != pgx.ErrNoRows {
if err != nil {
return 0, fmt.Errorf("failed to get store's latest revision: %w", err)
}
}
if newRevision == 0 {
err := tx.QueryRow(ctx, "INSERT INTO user_revisions (user_id, revision) VALUES ($1, $2) RETURNING revision", userID, 1).Scan(&newRevision)
if err != nil {
return 0, fmt.Errorf("failed to insert store's revision: %w", err)
}
} else {
err := tx.QueryRow(ctx, "UPDATE user_revisions SET revision = revision + 1 WHERE user_id = $1 RETURNING revision", userID).Scan(&newRevision)
if err != nil {
return 0, fmt.Errorf("failed to update store's revision: %w", err)
}
}

_, err = tx.Exec(ctx, "INSERT INTO records (user_id, id, data, revision) VALUES ($1, $2, $3, $4) ON CONFLICT (user_id, id) DO UPDATE SET data=EXCLUDED.data, revision=EXCLUDED.revision RETURNING revision", userID, id, data, newRevision)
_, err = tx.Exec(ctx, `
INSERT INTO records (user_id, id, data, revision, schema_version)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (user_id, id) DO UPDATE SET
data=EXCLUDED.data,
revision=EXCLUDED.revision,
schema_version=EXCLUDED.schema_version
RETURNING revision`,
userID, id, data, newRevision, schemaVersion,
)
if err != nil {
return 0, fmt.Errorf("failed to insert record: %w", err)
}

if err := tx.Commit(ctx); err != nil {
return 0, fmt.Errorf("failed to commit transaction: %w", err)
}
return newRevision, nil
}

func (s *PgSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision int64) ([]store.StoredRecord, error) {
func (s *PgSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision uint64) ([]store.StoredRecord, error) {

rows, err := s.db.Query(ctx, "SELECT id, data, revision FROM records WHERE user_id = $1 AND revision > $2", userID, sinceRevision)
rows, err := s.db.Query(ctx, "SELECT id, data, revision, schema_version FROM records WHERE user_id = $1 AND revision > $2", userID, sinceRevision)
if err != nil {
return nil, fmt.Errorf("failed to query records: %w", err)
}
Expand All @@ -106,7 +129,7 @@ func (s *PgSyncStorage) ListChanges(ctx context.Context, userID string, sinceRev
records := make([]store.StoredRecord, 0)
for rows.Next() {
record := store.StoredRecord{}
err = rows.Scan(&record.Id, &record.Data, &record.Revision)
err = rows.Scan(&record.Id, &record.Data, &record.Revision, &record.SchemaVersion)
if err != nil {
return nil, fmt.Errorf("failed to scan record: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions store/sqlite/migrations/1_initial_schema.up.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data BLOB NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, id));
CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision));
CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data BLOB NOT NULL, revision INTEGER NOT NULL, schema_version TEXT NOT NULL, PRIMARY KEY (user_id, id));
CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision));
14 changes: 7 additions & 7 deletions store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewSQLiteSyncStorage(file string) (*SQLiteSyncStorage, error) {
return &SQLiteSyncStorage{db: db}, nil
}

func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision int64) (int64, error) {
func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision uint64, schemaVersion string) (uint64, error) {

tx, err := s.db.BeginTx(context.Background(), &sql.TxOptions{
Isolation: sql.LevelSerializable,
Expand All @@ -70,7 +70,7 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da
defer tx.Rollback()

// check that the existing revision is the same as the one we expect
var revision int64
var revision uint64
err = tx.QueryRow("SELECT revision FROM records WHERE user_id = ? AND id = ?", userID, id).Scan(&revision)
if err != sql.ErrNoRows {
if err != nil {
Expand All @@ -82,7 +82,7 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da
}

// get the store's last revision
var newRevision int64
var newRevision uint64
err = tx.QueryRow("SELECT revision FROM user_revisions WHERE user_id = ?", userID).Scan(&newRevision)
if err != sql.ErrNoRows {
if err != nil {
Expand All @@ -101,7 +101,7 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da
}
}

_, err = tx.Exec("INSERT OR REPLACE INTO records (user_id, id, data, revision) VALUES (?, ?, ?, ?) returning revision", userID, id, data, newRevision)
_, err = tx.Exec("INSERT OR REPLACE INTO records (user_id, id, data, revision, schema_version) VALUES (?, ?, ?, ?, ?) returning revision", userID, id, data, newRevision, schemaVersion)
if err != nil {
return 0, fmt.Errorf("failed to insert record: %w", err)
}
Expand All @@ -111,9 +111,9 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da
return newRevision, nil
}

func (s *SQLiteSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision int64) ([]store.StoredRecord, error) {
func (s *SQLiteSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision uint64) ([]store.StoredRecord, error) {

rows, err := s.db.Query("SELECT id, data, revision FROM records WHERE user_id = ? AND revision > ?", userID, sinceRevision)
rows, err := s.db.Query("SELECT id, data, revision, schema_version FROM records WHERE user_id = ? AND revision > ?", userID, sinceRevision)
if err != nil {
return nil, fmt.Errorf("failed to query records: %w", err)
}
Expand All @@ -122,7 +122,7 @@ func (s *SQLiteSyncStorage) ListChanges(ctx context.Context, userID string, sinc
records := make([]store.StoredRecord, 0)
for rows.Next() {
record := store.StoredRecord{}
err = rows.Scan(&record.Id, &record.Data, &record.Revision)
err = rows.Scan(&record.Id, &record.Data, &record.Revision, &record.SchemaVersion)
if err != nil {
return nil, fmt.Errorf("failed to scan record: %w", err)
}
Expand Down
11 changes: 6 additions & 5 deletions store/sync_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
var ErrSetConflict = errors.New("set conflict")

type StoredRecord struct {
Id string
Data []byte
Revision int64
Id string
Data []byte
Revision uint64
SchemaVersion string
}
type SyncStorage interface {
SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision int64) (int64, error)
ListChanges(ctx context.Context, userID string, sinceRevision int64) ([]StoredRecord, error)
SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision uint64, schemaVersion string) (uint64, error)
ListChanges(ctx context.Context, userID string, sinceRevision uint64) ([]StoredRecord, error)
}
26 changes: 13 additions & 13 deletions store/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ type StoreTest struct{}
func (s *StoreTest) TestAddRecords(t *testing.T, storage SyncStorage) {

testStoreID := uuid.New().String()
newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0)
newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0, "0.0.1")
require.NoError(t, err, "failed to call SetRecord a1")
require.Equal(t, newRevision, int64(1))
require.Equal(t, newRevision, uint64(1))

newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a2", []byte("data2"), 0)
newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a2", []byte("data2"), 0, "0.0.1")
require.NoError(t, err, "failed to call SetRecord a2")
require.Equal(t, newRevision, int64(2))
require.Equal(t, newRevision, uint64(2))

records, err := storage.ListChanges(context.Background(), testStoreID, 0)
require.NoError(t, err, "failed to call list changes")
Expand All @@ -30,20 +30,20 @@ func (s *StoreTest) TestAddRecords(t *testing.T, storage SyncStorage) {

// Test different store with same id
anotherStoreID := uuid.New().String()
newRev, err := storage.SetRecord(context.Background(), anotherStoreID, "a1", []byte("data1"), 0)
newRev, err := storage.SetRecord(context.Background(), anotherStoreID, "a1", []byte("data1"), 0, "0.0.1")
require.NoError(t, err, "failed to call SetRecord a1")
require.Equal(t, newRev, int64(1))
require.Equal(t, newRev, uint64(1))
}

func (s *StoreTest) TestUpdateRecords(t *testing.T, storage SyncStorage) {
testStoreID := uuid.New().String()
newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0)
newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0, "0.0.1")
require.NoError(t, err, "failed to call SetRecord a1")
require.Equal(t, newRevision, int64(1))
require.Equal(t, newRevision, uint64(1))

newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 1)
newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 1, "0.0.1")
require.NoError(t, err, "failed to call SetRecord a2")
require.Equal(t, newRevision, int64(2))
require.Equal(t, newRevision, uint64(2))

records, err := storage.ListChanges(context.Background(), testStoreID, 0)
require.NoError(t, err, "failed to call list changes")
Expand All @@ -54,11 +54,11 @@ func (s *StoreTest) TestUpdateRecords(t *testing.T, storage SyncStorage) {

func (s *StoreTest) TestConflict(t *testing.T, storage SyncStorage) {
testStoreID := uuid.New().String()
newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0)
newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0, "0.0.1")
require.NoError(t, err, "failed to call SetRecord a1")
require.Equal(t, newRevision, int64(1))
require.Equal(t, newRevision, uint64(1))

_, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 0)
_, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 0, "0.0.1")
require.Error(t, err, "should have return with error")
require.Equal(t, err, ErrSetConflict)
}
9 changes: 5 additions & 4 deletions syncer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *PersistentSyncerServer) SetRecord(ctx context.Context, msg *proto.SetRe
return nil, err
}
pubkey := c.Value(middleware.USER_PUBKEY_CONTEXT_KEY).(string)
newRevision, err := s.storage.SetRecord(c, pubkey, msg.Record.Id, msg.Record.Data, msg.Record.Revision)
newRevision, err := s.storage.SetRecord(c, pubkey, msg.Record.Id, msg.Record.Data, msg.Record.Revision, msg.Record.SchemaVersion)

if err != nil {
if err == store.ErrSetConflict {
Expand Down Expand Up @@ -100,9 +100,10 @@ func (s *PersistentSyncerServer) ListChanges(ctx context.Context, msg *proto.Lis
records := make([]*proto.Record, len(changed))
for i, r := range changed {
records[i] = &proto.Record{
Id: r.Id,
Data: r.Data,
Revision: r.Revision,
Id: r.Id,
Data: r.Data,
Revision: r.Revision,
SchemaVersion: r.SchemaVersion,
}
}
return &proto.ListChangesReply{
Expand Down

0 comments on commit 702add0

Please sign in to comment.