Skip to content

Commit

Permalink
feat: repair tools (#322)
Browse files Browse the repository at this point in the history
Signed-off-by: James Yin <[email protected]>
  • Loading branch information
ifplusor authored Aug 22, 2023
1 parent 3ada6b4 commit 77a4196
Show file tree
Hide file tree
Showing 38 changed files with 1,792 additions and 40 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ docker-push-toolbox:
build-cmd:
$(GO_BUILD) -ldflags "${LD_FLAGS}" -o bin/vsctl ./vsctl/

build-repair-tool:
$(GO_BUILD) -ldflags "${LD_FLAGS}" -o bin/vsrepair ./vsrepair/

docker-push-root:
docker buildx build --platform ${DOCKER_PLATFORM} -t ${DOCKER_REPO}/root-controller:${IMAGE_TAG} -f build/images/root-controller/Dockerfile . --push
docker-build-root:
Expand Down
4 changes: 2 additions & 2 deletions internal/store/meta/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *AsyncStore) Load(key []byte) (interface{}, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
if v, ok := s.pending.GetValue(key); ok {
if v == deletedMark {
if v == DeletedMark {
return nil, false
}
return v, true
Expand All @@ -93,7 +93,7 @@ func (s *AsyncStore) BatchStore(_ context.Context, kvs Ranger) {
}

func (s *AsyncStore) Delete(key []byte) {
_ = s.set(KVRange(key, deletedMark))
_ = s.set(KVRange(key, DeletedMark))
}

func (s *AsyncStore) BatchDelete(keys [][]byte) {
Expand Down
12 changes: 7 additions & 5 deletions internal/store/meta/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

var (
deletedMark deletedMarkType
DeletedMark deletedMarkType
defaultCodec codec
)

Expand Down Expand Up @@ -65,8 +65,10 @@ const (
type codec struct{}

// Make sure codec implements Marshaler and Unmarshaler.
var _ Marshaler = (*codec)(nil)
var _ Unmarshaler = (*codec)(nil)
var (
_ Marshaler = (*codec)(nil)
_ Unmarshaler = (*codec)(nil)
)

func (codec) Marshal(data Ranger) ([]byte, error) {
buf := make([]byte, 0)
Expand Down Expand Up @@ -118,7 +120,7 @@ func encodeKey(key, last []byte) (int, []byte) {
}

func appendValue(buf []byte, value interface{}) ([]byte, error) {
if value == deletedMark {
if value == DeletedMark {
buf = protowire.AppendVarint(buf, uint64(Deleted))
return buf, nil
}
Expand Down Expand Up @@ -232,7 +234,7 @@ func consumeValue(buf []byte) (interface{}, int) {
kind := Kind(k)
switch kind {
case Deleted:
return deletedMark, n
return DeletedMark, n
case True:
return true, n
case False:
Expand Down
2 changes: 1 addition & 1 deletion internal/store/meta/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

func TestCodec_AppendValue(t *testing.T) {
Convey("append deletedMark", t, func() {
out, err := appendValue(nil, deletedMark)
out, err := appendValue(nil, DeletedMark)
So(err, ShouldBeNil)
So(bytes.Equal(out, []byte{byte(Deleted)}), ShouldBeTrue)
})
Expand Down
2 changes: 1 addition & 1 deletion internal/store/meta/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var _ Ranger = (*deleteRange)(nil)

func (r *deleteRange) Range(cb RangeCallback) error {
for _, key := range r.keys {
if err := cb(key, deletedMark); err != nil {
if err := cb(key, DeletedMark); err != nil {
return err
}
}
Expand Down
103 changes: 103 additions & 0 deletions internal/store/meta/review.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 Linkall Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package meta

import (
// standard libraries.
"bytes"
"context"
"errors"
"os"
"path/filepath"
"strconv"

// this project.
walog "github.com/vanus-labs/vanus/internal/store/wal"
)

var errFound = errors.New("found")

type ReviewWatcher = func(value interface{}, version int64)

func ReviewSyncStore(ctx context.Context, dir string, key []byte, watcher ReviewWatcher, opts ...walog.Option) error {
snapshot, err := reviewLatestSnapshot(ctx, dir, defaultCodec, key, watcher)
if err != nil {
return err
}

opts = append([]walog.Option{
walog.FromPosition(snapshot),
walog.WithRecoveryCallback(func(data []byte, r walog.Range) error {
err2 := defaultCodec.Unmarshal(data, func(k []byte, v interface{}) error {
if bytes.Equal(k, key) {
watcher(v, r.EO)
}
return nil
})
if err2 != nil {
return err2
}
return nil
}),
}, opts...)
wal, err := walog.Open(ctx, dir, opts...)
if err != nil {
return err
}

wal.Close()

return nil
}

func reviewLatestSnapshot(
_ context.Context, dir string, unmarshaler Unmarshaler, key []byte, watcher ReviewWatcher,
) (int64, error) {
files, err := os.ReadDir(dir)
if err != nil {
return 0, err
}
latest, _ := filterLatestSnapshot(files)

if latest == nil {
return 0, nil
}

filename := latest.Name()
snapshot, err := strconv.ParseInt(filename[:len(filename)-len(snapshotExt)], 10, 64)
if err != nil {
return 0, err
}

path := filepath.Join(dir, filename)
data, err := os.ReadFile(path)
if err != nil {
return 0, err
}

err = unmarshaler.Unmarshal(data, func(k []byte, v interface{}) error {
if bytes.Equal(k, key) {
watcher(v, -1)
// TODO(james.yin): don't skip remaind data?
return errFound
}
return nil
})
if err != nil && err != errFound { //nolint:errorlint // compare to errFound is ok.
return 0, err
}

return snapshot, nil
}
2 changes: 1 addition & 1 deletion internal/store/meta/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *store) load(key []byte) (interface{}, bool) {
// }

func set(m *skiplist.SkipList, key []byte, value interface{}) {
if value == deletedMark {
if value == DeletedMark {
m.Remove(key)
} else {
m.Set(key, value)
Expand Down
4 changes: 2 additions & 2 deletions internal/store/meta/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *SyncStore) BatchStore(ctx context.Context, kvs Ranger, cb StoreCallback
}

func (s *SyncStore) Delete(ctx context.Context, key []byte, cb StoreCallback) {
s.set(ctx, KVRange(key, deletedMark), cb)
s.set(ctx, KVRange(key, DeletedMark), cb)
}

func (s *SyncStore) BatchDelete(ctx context.Context, keys [][]byte, cb StoreCallback) {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *SyncStore) set(ctx context.Context, kvs Ranger, cb StoreCallback) {
// Update state.
s.mu.Lock()
_ = kvs.Range(func(key []byte, value interface{}) error {
if value == deletedMark {
if value == DeletedMark {
s.committed.Remove(key)
} else {
s.committed.Set(key, value)
Expand Down
82 changes: 57 additions & 25 deletions internal/store/vsb/block_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/binary"
"hash/crc32"
"os"

// this project.
"github.com/vanus-labs/vanus/internal/store/block/raw"
Expand Down Expand Up @@ -46,6 +47,54 @@ var (
emptyHeader = make([]byte, headerBlockSize)
)

type Header struct {
Magic uint32
Crc uint32
Flags uint32
BreakFlags uint32
DataOffset uint32
State uint8
_pad uint8 //nolint:unused // padding
IndexSize uint16
Capacity uint64
EntryLength uint64
EntryNum uint32
IndexOffset uint16
}

func LoadHeader(f *os.File) (hdr Header, err error) {
var buf [headerSize]byte
if _, err = f.ReadAt(buf[:], 0); err != nil {
return
}

magic := binary.LittleEndian.Uint32(buf[magicOffset:])
if magic != FormatMagic {
return hdr, raw.ErrInvalidFormat
}

breakFlags := binary.LittleEndian.Uint32(buf[breakFlagsOffset:])
if breakFlags != 0 {
return hdr, errIncomplete
}

hdr.DataOffset = binary.LittleEndian.Uint32(buf[dataOffsetOffset:])
hdr.State = buf[stateOffset]
hdr.IndexSize = binary.LittleEndian.Uint16(buf[indexSizeOffset:])
hdr.Capacity = binary.LittleEndian.Uint64(buf[capacityOffset:])
hdr.EntryLength = binary.LittleEndian.Uint64(buf[entryLengthOffset:])
hdr.EntryNum = binary.LittleEndian.Uint32(buf[entryNumOffset:])

origin := binary.LittleEndian.Uint32(buf[crcOffset:])
crc := crc32.Checksum(buf[flagsOffset:], crc32q)
crc = crc32.Update(crc, crc32q, emptyHeader[headerSize:])
if origin != crc {
return hdr, errCorrupted
}

return hdr, nil
}

func (b *vsBlock) persistHeader(_ context.Context, m meta) error {
var buf [headerSize]byte
binary.LittleEndian.PutUint32(buf[magicOffset:], FormatMagic) // magic
Expand Down Expand Up @@ -79,34 +128,17 @@ func (b *vsBlock) persistHeader(_ context.Context, m meta) error {
}

func (b *vsBlock) loadHeader(_ context.Context) error {
var buf [headerSize]byte
if _, err := b.f.ReadAt(buf[:], 0); err != nil {
hdr, err := LoadHeader(b.f)
if err != nil {
return err
}

magic := binary.LittleEndian.Uint32(buf[magicOffset:])
if magic != FormatMagic {
return raw.ErrInvalidFormat
}

breakFlags := binary.LittleEndian.Uint32(buf[breakFlagsOffset:])
if breakFlags != 0 {
return errIncomplete
}

b.dataOffset = int64(binary.LittleEndian.Uint32(buf[dataOffsetOffset:])) // data offset
b.fm.archived = buf[stateOffset] != 0 // state
b.indexSize = binary.LittleEndian.Uint16(buf[indexSizeOffset:]) // index size
b.capacity = int64(binary.LittleEndian.Uint64(buf[capacityOffset:])) // capacity
b.fm.entryLength = int64(binary.LittleEndian.Uint64(buf[entryLengthOffset:])) // entry length
b.fm.entryNum = int64(binary.LittleEndian.Uint32(buf[entryNumOffset:])) // entry number

origin := binary.LittleEndian.Uint32(buf[crcOffset:])
crc := crc32.Checksum(buf[flagsOffset:], crc32q)
crc = crc32.Update(crc, crc32q, emptyHeader[headerSize:])
if origin != crc {
return errCorrupted
}
b.dataOffset = int64(hdr.DataOffset)
b.fm.archived = hdr.State != 0
b.indexSize = hdr.IndexSize
b.capacity = int64(hdr.Capacity)
b.fm.entryLength = int64(hdr.EntryLength)
b.fm.entryNum = int64(hdr.EntryNum)

return nil
}
6 changes: 5 additions & 1 deletion internal/store/vsb/engine_open.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,9 @@ func (e *engine) Open(ctx context.Context, id vanus.ID) (block.Raw, error) {
}

func (e *engine) resolvePath(id vanus.ID) string {
return filepath.Join(e.dir, fmt.Sprintf("%s%s", id.String(), vsbExt))
return BlockPath(e.dir, id)
}

func BlockPath(dir string, id vanus.ID) string {
return filepath.Join(dir, fmt.Sprintf("%s%s", id.String(), vsbExt))
}
7 changes: 7 additions & 0 deletions internal/store/wal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type config struct {
fileSize int64
flushDelayTime time.Duration // default: 3 * time.Millisecond
engine ioengine.Interface
readOnly bool
}

func (cfg *config) segmentedFileOptions() []segmentedfile.Option {
Expand Down Expand Up @@ -115,3 +116,9 @@ func WithIOEngine(engine ioengine.Interface) Option {
cfg.engine = engine
}
}

func WithReadOnly() Option {
return func(cfg *config) {
cfg.readOnly = true
}
}
8 changes: 6 additions & 2 deletions internal/store/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ func open(ctx context.Context, dir string, cfg config) (*WAL, error) {
doneC: make(chan struct{}),
}

w.appendQ.Init(false)
go w.runAppend()
if !cfg.readOnly {
w.appendQ.Init(false)
go w.runAppend()
} else {
close(w.doneC)
}

return w, nil
}
Expand Down
Loading

0 comments on commit 77a4196

Please sign in to comment.