From 702add006813ddad85c08caab011e1d1909eae36 Mon Sep 17 00:00:00 2001 From: yse Date: Tue, 17 Dec 2024 23:23:00 +0100 Subject: [PATCH] fix: update proto to new definition --- proto/sync.pb.go | 30 ++++++------- proto/sync.proto | 8 ++-- .../migrations/1_initial_schema.up.sql | 4 +- store/postgres/pg.go | 43 ++++++++++++++----- .../sqlite/migrations/1_initial_schema.up.sql | 4 +- store/sqlite/sqlite.go | 14 +++--- store/sync_storage.go | 11 ++--- store/test_utils.go | 26 +++++------ syncer_server.go | 9 ++-- 9 files changed, 87 insertions(+), 62 deletions(-) diff --git a/proto/sync.pb.go b/proto/sync.pb.go index e555107..68f8a9a 100644 --- a/proto/sync.pb.go +++ b/proto/sync.pb.go @@ -71,10 +71,10 @@ type Record struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Revision int64 `protobuf:"varint,2,opt,name=revision,proto3" json:"revision,omitempty"` - SchemaVersion float32 `protobuf:"fixed32,3,opt,name=schema_version,json=schemaVersion,proto3" json:"schema_version,omitempty"` - Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Revision uint64 `protobuf:"varint,2,opt,name=revision,proto3" json:"revision,omitempty"` + SchemaVersion string `protobuf:"bytes,3,opt,name=schema_version,json=schemaVersion,proto3" json:"schema_version,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` } func (x *Record) Reset() { @@ -116,18 +116,18 @@ func (x *Record) GetId() string { return "" } -func (x *Record) GetRevision() int64 { +func (x *Record) GetRevision() uint64 { if x != nil { return x.Revision } return 0 } -func (x *Record) GetSchemaVersion() float32 { +func (x *Record) GetSchemaVersion() string { if x != nil { return x.SchemaVersion } - return 0 + return "" } func (x *Record) GetData() []byte { @@ -206,7 +206,7 @@ type SetRecordReply struct { unknownFields protoimpl.UnknownFields Status SetRecordStatus `protobuf:"varint,1,opt,name=status,proto3,enum=sync.SetRecordStatus" json:"status,omitempty"` - NewRevision int64 `protobuf:"varint,2,opt,name=new_revision,json=newRevision,proto3" json:"new_revision,omitempty"` + NewRevision uint64 `protobuf:"varint,2,opt,name=new_revision,json=newRevision,proto3" json:"new_revision,omitempty"` } func (x *SetRecordReply) Reset() { @@ -248,7 +248,7 @@ func (x *SetRecordReply) GetStatus() SetRecordStatus { return SetRecordStatus_SUCCESS } -func (x *SetRecordReply) GetNewRevision() int64 { +func (x *SetRecordReply) GetNewRevision() uint64 { if x != nil { return x.NewRevision } @@ -260,7 +260,7 @@ type ListChangesRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - SinceRevision int64 `protobuf:"varint,1,opt,name=since_revision,json=sinceRevision,proto3" json:"since_revision,omitempty"` + SinceRevision uint64 `protobuf:"varint,1,opt,name=since_revision,json=sinceRevision,proto3" json:"since_revision,omitempty"` RequestTime uint32 `protobuf:"varint,2,opt,name=request_time,json=requestTime,proto3" json:"request_time,omitempty"` Signature string `protobuf:"bytes,3,opt,name=signature,proto3" json:"signature,omitempty"` } @@ -297,7 +297,7 @@ func (*ListChangesRequest) Descriptor() ([]byte, []int) { return file_sync_proto_rawDescGZIP(), []int{3} } -func (x *ListChangesRequest) GetSinceRevision() int64 { +func (x *ListChangesRequest) GetSinceRevision() uint64 { if x != nil { return x.SinceRevision } @@ -426,9 +426,9 @@ var file_sync_proto_rawDesc = []byte{ 0x0a, 0x0a, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x73, 0x79, 0x6e, 0x63, 0x22, 0x6f, 0x0a, 0x06, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1a, 0x0a, 0x08, - 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, + 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x63, 0x68, 0x65, - 0x6d, 0x61, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x02, + 0x6d, 0x61, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x79, 0x0a, 0x10, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, @@ -444,10 +444,10 @@ var file_sync_proto_rawDesc = []byte{ 0x32, 0x15, 0x2e, 0x73, 0x79, 0x6e, 0x63, 0x2e, 0x53, 0x65, 0x74, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x6e, 0x65, 0x77, 0x5f, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x6e, 0x65, 0x77, 0x52, 0x65, 0x76, 0x69, 0x73, 0x69, + 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x6e, 0x65, 0x77, 0x52, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x7c, 0x0a, 0x12, 0x4c, 0x69, 0x73, 0x74, 0x43, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x69, 0x6e, 0x63, - 0x65, 0x5f, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, + 0x65, 0x5f, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0d, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x52, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x54, 0x69, diff --git a/proto/sync.proto b/proto/sync.proto index b3826a6..51e345f 100644 --- a/proto/sync.proto +++ b/proto/sync.proto @@ -11,8 +11,8 @@ service Syncer { message Record { string id = 1; - int64 revision = 2; - float schema_version = 3; + uint64 revision = 2; + string schema_version = 3; bytes data = 4; } @@ -27,11 +27,11 @@ enum SetRecordStatus { } message SetRecordReply { SetRecordStatus status = 1; - int64 new_revision = 2; + uint64 new_revision = 2; } message ListChangesRequest { - int64 since_revision = 1; + uint64 since_revision = 1; uint32 request_time = 2; string signature = 3; } diff --git a/store/postgres/migrations/1_initial_schema.up.sql b/store/postgres/migrations/1_initial_schema.up.sql index 29db357..1d3d350 100644 --- a/store/postgres/migrations/1_initial_schema.up.sql +++ b/store/postgres/migrations/1_initial_schema.up.sql @@ -1,2 +1,2 @@ -CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data bytea NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, id)); -CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision)); \ No newline at end of file +CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data bytea NOT NULL, revision INTEGER NOT NULL, schema_version TEXT NOT NULL, PRIMARY KEY (user_id, id)); +CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision)); diff --git a/store/postgres/pg.go b/store/postgres/pg.go index 8c0acf7..c2a0d05 100644 --- a/store/postgres/pg.go +++ b/store/postgres/pg.go @@ -57,7 +57,7 @@ func NewPGSyncStorage(databaseURL string) (*PgSyncStorage, error) { return &PgSyncStorage{db: pgxPool}, nil } -func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision int64) (int64, error) { +func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision uint64, schemaVersion string) (uint64, error) { tx, err := s.db.BeginTx(ctx, pgx.TxOptions{ IsoLevel: pgx.Serializable, }) @@ -67,7 +67,7 @@ func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data [ defer tx.Rollback(context.Background()) // check that the existing revision is the same as the one we expect - var revision int64 + var revision uint64 err = tx.QueryRow(ctx, "SELECT revision FROM records WHERE user_id = $1 AND id = $2", userID, id).Scan(&revision) if err != pgx.ErrNoRows { if err != nil { @@ -79,25 +79,48 @@ func (s *PgSyncStorage) SetRecord(ctx context.Context, userID, id string, data [ } // get the store's last revision - var newRevision int64 - err = tx.QueryRow(ctx, "INSERT INTO user_revisions (user_id, revision) VALUES ($1, 1) ON CONFLICT(user_id, revision) DO UPDATE SET revision=user_revisions.revision + 1 RETURNING revision", userID).Scan(&newRevision) - if err != nil { - return 0, fmt.Errorf("failed to set store's latest revision: %w", err) + var newRevision uint64 = 0 + err = tx.QueryRow(ctx, "SELECT revision FROM user_revisions WHERE user_id = $1", userID).Scan(&newRevision) + if err != pgx.ErrNoRows { + if err != nil { + return 0, fmt.Errorf("failed to get store's latest revision: %w", err) + } + } + if newRevision == 0 { + err := tx.QueryRow(ctx, "INSERT INTO user_revisions (user_id, revision) VALUES ($1, $2) RETURNING revision", userID, 1).Scan(&newRevision) + if err != nil { + return 0, fmt.Errorf("failed to insert store's revision: %w", err) + } + } else { + err := tx.QueryRow(ctx, "UPDATE user_revisions SET revision = revision + 1 WHERE user_id = $1 RETURNING revision", userID).Scan(&newRevision) + if err != nil { + return 0, fmt.Errorf("failed to update store's revision: %w", err) + } } - _, err = tx.Exec(ctx, "INSERT INTO records (user_id, id, data, revision) VALUES ($1, $2, $3, $4) ON CONFLICT (user_id, id) DO UPDATE SET data=EXCLUDED.data, revision=EXCLUDED.revision RETURNING revision", userID, id, data, newRevision) + _, err = tx.Exec(ctx, ` + INSERT INTO records (user_id, id, data, revision, schema_version) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (user_id, id) DO UPDATE SET + data=EXCLUDED.data, + revision=EXCLUDED.revision, + schema_version=EXCLUDED.schema_version + RETURNING revision`, + userID, id, data, newRevision, schemaVersion, + ) if err != nil { return 0, fmt.Errorf("failed to insert record: %w", err) } + if err := tx.Commit(ctx); err != nil { return 0, fmt.Errorf("failed to commit transaction: %w", err) } return newRevision, nil } -func (s *PgSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision int64) ([]store.StoredRecord, error) { +func (s *PgSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision uint64) ([]store.StoredRecord, error) { - rows, err := s.db.Query(ctx, "SELECT id, data, revision FROM records WHERE user_id = $1 AND revision > $2", userID, sinceRevision) + rows, err := s.db.Query(ctx, "SELECT id, data, revision, schema_version FROM records WHERE user_id = $1 AND revision > $2", userID, sinceRevision) if err != nil { return nil, fmt.Errorf("failed to query records: %w", err) } @@ -106,7 +129,7 @@ func (s *PgSyncStorage) ListChanges(ctx context.Context, userID string, sinceRev records := make([]store.StoredRecord, 0) for rows.Next() { record := store.StoredRecord{} - err = rows.Scan(&record.Id, &record.Data, &record.Revision) + err = rows.Scan(&record.Id, &record.Data, &record.Revision, &record.SchemaVersion) if err != nil { return nil, fmt.Errorf("failed to scan record: %w", err) } diff --git a/store/sqlite/migrations/1_initial_schema.up.sql b/store/sqlite/migrations/1_initial_schema.up.sql index 44f117a..779050c 100644 --- a/store/sqlite/migrations/1_initial_schema.up.sql +++ b/store/sqlite/migrations/1_initial_schema.up.sql @@ -1,2 +1,2 @@ -CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data BLOB NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, id)); -CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision)); \ No newline at end of file +CREATE TABLE records (user_id TEXT NOT NULL, id TEXT NOT NULL, data BLOB NOT NULL, revision INTEGER NOT NULL, schema_version TEXT NOT NULL, PRIMARY KEY (user_id, id)); +CREATE TABLE user_revisions (user_id TEXT NOT NULL, revision INTEGER NOT NULL, PRIMARY KEY (user_id, revision)); diff --git a/store/sqlite/sqlite.go b/store/sqlite/sqlite.go index 6e4d47d..3bed32e 100644 --- a/store/sqlite/sqlite.go +++ b/store/sqlite/sqlite.go @@ -59,7 +59,7 @@ func NewSQLiteSyncStorage(file string) (*SQLiteSyncStorage, error) { return &SQLiteSyncStorage{db: db}, nil } -func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision int64) (int64, error) { +func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision uint64, schemaVersion string) (uint64, error) { tx, err := s.db.BeginTx(context.Background(), &sql.TxOptions{ Isolation: sql.LevelSerializable, @@ -70,7 +70,7 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da defer tx.Rollback() // check that the existing revision is the same as the one we expect - var revision int64 + var revision uint64 err = tx.QueryRow("SELECT revision FROM records WHERE user_id = ? AND id = ?", userID, id).Scan(&revision) if err != sql.ErrNoRows { if err != nil { @@ -82,7 +82,7 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da } // get the store's last revision - var newRevision int64 + var newRevision uint64 err = tx.QueryRow("SELECT revision FROM user_revisions WHERE user_id = ?", userID).Scan(&newRevision) if err != sql.ErrNoRows { if err != nil { @@ -101,7 +101,7 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da } } - _, err = tx.Exec("INSERT OR REPLACE INTO records (user_id, id, data, revision) VALUES (?, ?, ?, ?) returning revision", userID, id, data, newRevision) + _, err = tx.Exec("INSERT OR REPLACE INTO records (user_id, id, data, revision, schema_version) VALUES (?, ?, ?, ?, ?) returning revision", userID, id, data, newRevision, schemaVersion) if err != nil { return 0, fmt.Errorf("failed to insert record: %w", err) } @@ -111,9 +111,9 @@ func (s *SQLiteSyncStorage) SetRecord(ctx context.Context, userID, id string, da return newRevision, nil } -func (s *SQLiteSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision int64) ([]store.StoredRecord, error) { +func (s *SQLiteSyncStorage) ListChanges(ctx context.Context, userID string, sinceRevision uint64) ([]store.StoredRecord, error) { - rows, err := s.db.Query("SELECT id, data, revision FROM records WHERE user_id = ? AND revision > ?", userID, sinceRevision) + rows, err := s.db.Query("SELECT id, data, revision, schema_version FROM records WHERE user_id = ? AND revision > ?", userID, sinceRevision) if err != nil { return nil, fmt.Errorf("failed to query records: %w", err) } @@ -122,7 +122,7 @@ func (s *SQLiteSyncStorage) ListChanges(ctx context.Context, userID string, sinc records := make([]store.StoredRecord, 0) for rows.Next() { record := store.StoredRecord{} - err = rows.Scan(&record.Id, &record.Data, &record.Revision) + err = rows.Scan(&record.Id, &record.Data, &record.Revision, &record.SchemaVersion) if err != nil { return nil, fmt.Errorf("failed to scan record: %w", err) } diff --git a/store/sync_storage.go b/store/sync_storage.go index 686f089..9c6dbba 100644 --- a/store/sync_storage.go +++ b/store/sync_storage.go @@ -8,11 +8,12 @@ import ( var ErrSetConflict = errors.New("set conflict") type StoredRecord struct { - Id string - Data []byte - Revision int64 + Id string + Data []byte + Revision uint64 + SchemaVersion string } type SyncStorage interface { - SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision int64) (int64, error) - ListChanges(ctx context.Context, userID string, sinceRevision int64) ([]StoredRecord, error) + SetRecord(ctx context.Context, userID, id string, data []byte, existingRevision uint64, schemaVersion string) (uint64, error) + ListChanges(ctx context.Context, userID string, sinceRevision uint64) ([]StoredRecord, error) } diff --git a/store/test_utils.go b/store/test_utils.go index 965c6f2..c0f2694 100644 --- a/store/test_utils.go +++ b/store/test_utils.go @@ -13,13 +13,13 @@ type StoreTest struct{} func (s *StoreTest) TestAddRecords(t *testing.T, storage SyncStorage) { testStoreID := uuid.New().String() - newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0) + newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0, "0.0.1") require.NoError(t, err, "failed to call SetRecord a1") - require.Equal(t, newRevision, int64(1)) + require.Equal(t, newRevision, uint64(1)) - newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a2", []byte("data2"), 0) + newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a2", []byte("data2"), 0, "0.0.1") require.NoError(t, err, "failed to call SetRecord a2") - require.Equal(t, newRevision, int64(2)) + require.Equal(t, newRevision, uint64(2)) records, err := storage.ListChanges(context.Background(), testStoreID, 0) require.NoError(t, err, "failed to call list changes") @@ -30,20 +30,20 @@ func (s *StoreTest) TestAddRecords(t *testing.T, storage SyncStorage) { // Test different store with same id anotherStoreID := uuid.New().String() - newRev, err := storage.SetRecord(context.Background(), anotherStoreID, "a1", []byte("data1"), 0) + newRev, err := storage.SetRecord(context.Background(), anotherStoreID, "a1", []byte("data1"), 0, "0.0.1") require.NoError(t, err, "failed to call SetRecord a1") - require.Equal(t, newRev, int64(1)) + require.Equal(t, newRev, uint64(1)) } func (s *StoreTest) TestUpdateRecords(t *testing.T, storage SyncStorage) { testStoreID := uuid.New().String() - newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0) + newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0, "0.0.1") require.NoError(t, err, "failed to call SetRecord a1") - require.Equal(t, newRevision, int64(1)) + require.Equal(t, newRevision, uint64(1)) - newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 1) + newRevision, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 1, "0.0.1") require.NoError(t, err, "failed to call SetRecord a2") - require.Equal(t, newRevision, int64(2)) + require.Equal(t, newRevision, uint64(2)) records, err := storage.ListChanges(context.Background(), testStoreID, 0) require.NoError(t, err, "failed to call list changes") @@ -54,11 +54,11 @@ func (s *StoreTest) TestUpdateRecords(t *testing.T, storage SyncStorage) { func (s *StoreTest) TestConflict(t *testing.T, storage SyncStorage) { testStoreID := uuid.New().String() - newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0) + newRevision, err := storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data1"), 0, "0.0.1") require.NoError(t, err, "failed to call SetRecord a1") - require.Equal(t, newRevision, int64(1)) + require.Equal(t, newRevision, uint64(1)) - _, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 0) + _, err = storage.SetRecord(context.Background(), testStoreID, "a1", []byte("data2"), 0, "0.0.1") require.Error(t, err, "should have return with error") require.Equal(t, err, ErrSetConflict) } diff --git a/syncer_server.go b/syncer_server.go index a48711c..178948d 100644 --- a/syncer_server.go +++ b/syncer_server.go @@ -68,7 +68,7 @@ func (s *PersistentSyncerServer) SetRecord(ctx context.Context, msg *proto.SetRe return nil, err } pubkey := c.Value(middleware.USER_PUBKEY_CONTEXT_KEY).(string) - newRevision, err := s.storage.SetRecord(c, pubkey, msg.Record.Id, msg.Record.Data, msg.Record.Revision) + newRevision, err := s.storage.SetRecord(c, pubkey, msg.Record.Id, msg.Record.Data, msg.Record.Revision, msg.Record.SchemaVersion) if err != nil { if err == store.ErrSetConflict { @@ -100,9 +100,10 @@ func (s *PersistentSyncerServer) ListChanges(ctx context.Context, msg *proto.Lis records := make([]*proto.Record, len(changed)) for i, r := range changed { records[i] = &proto.Record{ - Id: r.Id, - Data: r.Data, - Revision: r.Revision, + Id: r.Id, + Data: r.Data, + Revision: r.Revision, + SchemaVersion: r.SchemaVersion, } } return &proto.ListChangesReply{