Skip to content

Commit

Permalink
kvserver/rangefeed: move helper functions to registry_helpers_test
Browse files Browse the repository at this point in the history
This patch moves helper functions to registry_helpers_test.

Epic: none
Release note: none
  • Loading branch information
wenyihu6 committed Oct 21, 2024
1 parent 9cbb2d3 commit c2a0fb8
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 149 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_test(
"event_size_test.go",
"processor_helpers_test.go",
"processor_test.go",
"registry_helper_test.go",
"registry_test.go",
"resolved_timestamp_test.go",
"scheduler_test.go",
Expand Down
167 changes: 167 additions & 0 deletions pkg/kv/kvserver/rangefeed/registry_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package rangefeed

import (
"context"
"sync"
"testing"
"time"

_ "github.com/cockroachdb/cockroach/pkg/keys" // hook up pretty printer
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)

var (
keyA, keyB = roachpb.Key("a"), roachpb.Key("b")
keyC, keyD = roachpb.Key("c"), roachpb.Key("d")
keyX, keyY = roachpb.Key("x"), roachpb.Key("y")

spAB = roachpb.Span{Key: keyA, EndKey: keyB}
spBC = roachpb.Span{Key: keyB, EndKey: keyC}
spCD = roachpb.Span{Key: keyC, EndKey: keyD}
spAC = roachpb.Span{Key: keyA, EndKey: keyC}
spXY = roachpb.Span{Key: keyX, EndKey: keyY}
)

type testStream struct {
ctx context.Context
ctxDone func()
done chan *kvpb.Error
mu struct {
syncutil.Mutex
sendErr error
events []*kvpb.RangeFeedEvent
}
}

func newTestStream() *testStream {
ctx, done := context.WithCancel(context.Background())
return &testStream{ctx: ctx, ctxDone: done, done: make(chan *kvpb.Error, 1)}
}

func (s *testStream) Context() context.Context {
return s.ctx
}

func (s *testStream) Cancel() {
s.ctxDone()
}

func (s *testStream) SendUnbufferedIsThreadSafe() {}

func (s *testStream) SendUnbuffered(e *kvpb.RangeFeedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.sendErr != nil {
return s.mu.sendErr
}
s.mu.events = append(s.mu.events, e)
return nil
}

func (s *testStream) SetSendErr(err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.sendErr = err
}

func (s *testStream) Events() []*kvpb.RangeFeedEvent {
s.mu.Lock()
defer s.mu.Unlock()
es := s.mu.events
s.mu.events = nil
return es
}

func (s *testStream) BlockSend() func() {
s.mu.Lock()
var once sync.Once
return func() {
once.Do(s.mu.Unlock) // safe to call multiple times, e.g. defer and explicit
}
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (s *testStream) Disconnect(err *kvpb.Error) {
s.done <- err
}

// Error returns the error that was sent to the done channel. It returns nil if
// no error was sent yet.
func (s *testStream) Error() error {
select {
case err := <-s.done:
return err.GoError()
default:
return nil
}
}

// WaitForError waits for the rangefeed to complete and returns the error sent
// to the done channel. It fails the test if rangefeed cannot complete within 30
// seconds.
func (s *testStream) WaitForError(t *testing.T) error {
select {
case err := <-s.done:
return err.GoError()
case <-time.After(testutils.DefaultSucceedsSoonDuration):
t.Fatalf("time out waiting for rangefeed completion")
return nil
}
}

type testRegistration struct {
*bufferedRegistration
*testStream
}

func makeCatchUpIterator(
iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp,
) *CatchUpIterator {
if iter == nil {
return nil
}
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}
}

func newTestRegistration(
span roachpb.Span,
ts hlc.Timestamp,
catchup storage.SimpleMVCCIterator,
withDiff bool,
withFiltering bool,
withOmitRemote bool,
) *testRegistration {
s := newTestStream()
r := newBufferedRegistration(
span,
ts,
makeCatchUpIterator(catchup, span, ts),
withDiff,
withFiltering,
withOmitRemote,
5,
false, /* blockWhenFull */
NewMetrics(),
s,
func() {},
)
return &testRegistration{
bufferedRegistration: r,
testStream: s,
}
}
149 changes: 0 additions & 149 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ package rangefeed
import (
"context"
"fmt"
"sync"
"testing"
"time"

_ "github.com/cockroachdb/cockroach/pkg/keys" // hook up pretty printer
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand All @@ -20,157 +18,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

var (
keyA, keyB = roachpb.Key("a"), roachpb.Key("b")
keyC, keyD = roachpb.Key("c"), roachpb.Key("d")
keyX, keyY = roachpb.Key("x"), roachpb.Key("y")

spAB = roachpb.Span{Key: keyA, EndKey: keyB}
spBC = roachpb.Span{Key: keyB, EndKey: keyC}
spCD = roachpb.Span{Key: keyC, EndKey: keyD}
spAC = roachpb.Span{Key: keyA, EndKey: keyC}
spXY = roachpb.Span{Key: keyX, EndKey: keyY}
)

type testStream struct {
ctx context.Context
ctxDone func()
done chan *kvpb.Error
mu struct {
syncutil.Mutex
sendErr error
events []*kvpb.RangeFeedEvent
}
}

func newTestStream() *testStream {
ctx, done := context.WithCancel(context.Background())
return &testStream{ctx: ctx, ctxDone: done, done: make(chan *kvpb.Error, 1)}
}

func (s *testStream) Context() context.Context {
return s.ctx
}

func (s *testStream) Cancel() {
s.ctxDone()
}

func (s *testStream) SendUnbufferedIsThreadSafe() {}

func (s *testStream) SendUnbuffered(e *kvpb.RangeFeedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.sendErr != nil {
return s.mu.sendErr
}
s.mu.events = append(s.mu.events, e)
return nil
}

func (s *testStream) SetSendErr(err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.sendErr = err
}

func (s *testStream) Events() []*kvpb.RangeFeedEvent {
s.mu.Lock()
defer s.mu.Unlock()
es := s.mu.events
s.mu.events = nil
return es
}

func (s *testStream) BlockSend() func() {
s.mu.Lock()
var once sync.Once
return func() {
once.Do(s.mu.Unlock) // safe to call multiple times, e.g. defer and explicit
}
}

// Disconnect implements the Stream interface. It mocks the disconnect behavior
// by sending the error to the done channel.
func (s *testStream) Disconnect(err *kvpb.Error) {
s.done <- err
}

// Error returns the error that was sent to the done channel. It returns nil if
// no error was sent yet.
func (s *testStream) Error() error {
select {
case err := <-s.done:
return err.GoError()
default:
return nil
}
}

// WaitForError waits for the rangefeed to complete and returns the error sent
// to the done channel. It fails the test if rangefeed cannot complete within 30
// seconds.
func (s *testStream) WaitForError(t *testing.T) error {
select {
case err := <-s.done:
return err.GoError()
case <-time.After(testutils.DefaultSucceedsSoonDuration):
t.Fatalf("time out waiting for rangefeed completion")
return nil
}
}

type testRegistration struct {
*bufferedRegistration
*testStream
}

func makeCatchUpIterator(
iter storage.SimpleMVCCIterator, span roachpb.Span, startTime hlc.Timestamp,
) *CatchUpIterator {
if iter == nil {
return nil
}
return &CatchUpIterator{
simpleCatchupIter: simpleCatchupIterAdapter{iter},
span: span,
startTime: startTime,
}
}

func newTestRegistration(
span roachpb.Span,
ts hlc.Timestamp,
catchup storage.SimpleMVCCIterator,
withDiff bool,
withFiltering bool,
withOmitRemote bool,
) *testRegistration {
s := newTestStream()
r := newBufferedRegistration(
span,
ts,
makeCatchUpIterator(catchup, span, ts),
withDiff,
withFiltering,
withOmitRemote,
5,
false, /* blockWhenFull */
NewMetrics(),
s,
func() {},
)
return &testRegistration{
bufferedRegistration: r,
testStream: s,
}
}

func TestRegistrationBasic(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down

0 comments on commit c2a0fb8

Please sign in to comment.