Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouXing19 committed Nov 14, 2024
1 parent 7502537 commit e0bfbbd
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 5 deletions.
1 change: 1 addition & 0 deletions internal/sinktest/mocks/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ var _ eventproc.Processor = &Collector{}
func NewCollector(capacity int) *Collector {
c := &Collector{}
c.mu.batch = make([]string, 0, capacity)
c.mu.batchChan = make(chan *types.BatchCursor)
return c
}

Expand Down
7 changes: 6 additions & 1 deletion internal/source/objstore/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,12 @@ func (c *Conn) processBatch(ctx *stopper.Context, resolved hlc.Time, files []str
err := c.retry(operation, fmt.Sprintf("processing %s", file))
processDuration.WithLabelValues(c.config.bucketName).
Observe(float64(time.Since(start).Seconds()))
return errors.Wrapf(err, "process %q failed", file)
if err != nil {
return errors.Wrapf(err, "process %q failed", file)
}
// log.Infof("successfully processed %q", file)
return nil

})
}
err := g.Wait()
Expand Down
23 changes: 22 additions & 1 deletion internal/source/objstore/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,31 @@ func TestApplyWithinRange(t *testing.T) {
timeRange := timeRange(low, high)
conn, tracker, err := buildTimestampVerifier(rootFS)
r.NoError(err)

// Configure the time range we are interested in.
conn.config.MinTimestamp = timeRange.Min()
conn.config.timeRange = timeRange
conn.config.prefix = baseDir

//stop.Go(func(ctx *stopper.Context) error {
// resChan, err := conn.Read(ctx)
// if err != nil {
// return err
// }
// for {
// select {
// case res, ok := <-resChan:
// if !ok {
// }
// t.Logf("received %d muts", res.Batch.Count())
// case <-ctx.Stopping():
// }
// }
//})

stop.Go(
func(ctx *stopper.Context) error {
// Calling state.setlast.
err := conn.apply(ctx, baseDir)
return err
})
Expand Down Expand Up @@ -172,6 +192,7 @@ func TestApplyRange(t *testing.T) {
r.NoError(err)
conn, _, err := buildConn(rootFS, defaultProbTransientError)
r.NoError(err)

for _, rg := range ranges {
conveyor := &timestampTracker{}
collector := mocks.NewCollector(defaultUpperLimit)
Expand Down Expand Up @@ -415,7 +436,7 @@ func buildTimestampVerifier(fs fs.FS) (*Conn, *timestampTracker, error) {
if err != nil {
return nil, nil, err
}
processor := eventproc.NewLocal(conveyor, bucket, parser, ident.MustSchema(ident.Public))
processor := eventproc.NewLocal(bucket, parser, ident.MustSchema(ident.Public))
return &Conn{
bucket: bucket,
config: &Config{
Expand Down
9 changes: 6 additions & 3 deletions internal/source/objstore/eventproc/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

// Acceptor receives batches for processing.
Expand All @@ -51,9 +52,10 @@ func (c *localProcessor) BatchChan() <-chan *types.BatchCursor {
// NewLocal creates a local processor.
func NewLocal(bucket bucket.Bucket, parser *cdcjson.NDJsonParser, schema ident.Schema) Processor {
return &localProcessor{
bucket: bucket,
parser: parser,
schema: schema,
bucket: bucket,
parser: parser,
schema: schema,
batchChan: make(chan *types.BatchCursor),
}
}

Expand All @@ -79,6 +81,7 @@ func (c *localProcessor) Process(
if err != nil {
return errors.Wrapf(err, "failed to parse %s", path)
}
log.Infof("pushing to cursor chan")

// Send the batch downstream to the target.
for _, tempBatch := range batch.Data {
Expand Down
1 change: 1 addition & 0 deletions internal/source/objstore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func TestWorkload(t *testing.T) {
}

func testWorkload(t *testing.T, fc *fixtureConfig) {
log.SetLevel(log.TraceLevel)
r := require.New(t)

fixture, err := all.NewFixture(t)
Expand Down

0 comments on commit e0bfbbd

Please sign in to comment.