diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 3b1799c2d137..569c7f329870 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1013,7 +1013,8 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool { return true } -const maxMutations = 100000 +// Overriden in tests +var maxMutations = 100000 // Apply mutates a row atomically. A mutation must contain at least one // operation and at most 100000 operations. @@ -1224,9 +1225,14 @@ func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb. type entryErr struct { Entry *btpb.MutateRowsRequest_Entry Err error + + // TopLevelErr is the error received either from + // 1. client.MutateRows + // 2. stream.Recv + TopLevelErr error } -// ApplyBulk applies multiple Mutations, up to a maximum of 100,000. +// ApplyBulk applies multiple Mutations. // Each mutation is individually applied atomically, // but the set of mutations may be applied in any order. // @@ -1254,17 +1260,31 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}} } - for _, group := range groupEntries(origEntries, maxMutations) { + var firstGroupErr error + numFailed := 0 + groups := groupEntries(origEntries, maxMutations) + for _, group := range groups { err := t.applyGroup(ctx, group, opts...) if err != nil { - return nil, err + if firstGroupErr == nil { + firstGroupErr = err + } + numFailed++ } } + if numFailed == len(groups) { + return nil, firstGroupErr + } + // All the errors are accumulated into an array and returned, interspersed with nils for successful // entries. The absence of any errors means we should return nil. var foundErr bool for _, entry := range origEntries { + if entry.Err == nil && entry.TopLevelErr != nil { + // Populate per mutation error if top level error is not nil + entry.Err = entry.TopLevelErr + } if entry.Err != nil { foundErr = true } @@ -1289,6 +1309,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply // We want to retry the entire request with the current group return err } + // Get the entries that need to be retried group = t.getApplyBulkRetries(group) if len(group) > 0 && len(idempotentRetryCodes) > 0 { // We have at least one mutation that needs to be retried. @@ -1324,6 +1345,11 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD } } + var topLevelErr error + defer func() { + populateTopLevelError(entryErrs, topLevelErr) + }() + entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) for i, entryErr := range entryErrs { entries[i] = entryErr.Entry @@ -1340,6 +1366,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD stream, err := t.c.client.MutateRows(ctx, req) if err != nil { + _, topLevelErr = convertToGrpcStatusErr(err) return err } @@ -1354,6 +1381,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD } if err != nil { *trailerMD = stream.Trailer() + _, topLevelErr = convertToGrpcStatusErr(err) return err } @@ -1370,6 +1398,12 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD return nil } +func populateTopLevelError(entries []*entryErr, topLevelErr error) { + for _, entry := range entries { + entry.TopLevelErr = topLevelErr + } +} + // groupEntries groups entries into groups of a specified size without breaking up // individual entries. func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr { diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 6664670ea6b0..472def5c4b38 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -28,6 +28,8 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}} @@ -875,3 +877,139 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) { t.Error() } } + +type rowKeyCheckingInterceptor struct { + grpc.ClientStream + failRow string + failErr error // error to use while sending failed reponse for fail row + requestCounter *int +} + +func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error { + *i.requestCounter = *i.requestCounter + 1 + if req, ok := m.(*btpb.MutateRowsRequest); ok { + for _, entry := range req.Entries { + if string(entry.RowKey) == i.failRow { + return i.failErr + } + } + } + return i.ClientStream.SendMsg(m) +} + +func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error { + return i.ClientStream.RecvMsg(m) +} + +// Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service +// This test validates that even if one of the group receives error, requests are sent for further groups +func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { + testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{}) + if gotErr != nil { + t.Fatalf("NewEmulatedEnv failed: %v", gotErr) + } + + // Add interceptor to fail rows + failedRow := "row2" + failErr := status.Error(codes.InvalidArgument, "Invalid row key") + reqCount := 0 + conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)), + grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + clientStream, err := streamer(ctx, desc, cc, method, opts...) + return &rowKeyCheckingInterceptor{ + ClientStream: clientStream, + failRow: failedRow, + requestCounter: &reqCount, + failErr: failErr, + }, err + }), + ) + if gotErr != nil { + t.Fatalf("grpc.Dial failed: %v", gotErr) + } + + // Create client and table + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + adminClient, gotErr := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if gotErr != nil { + t.Fatalf("NewClient failed: %v", gotErr) + } + defer adminClient.Close() + tableConf := &TableConf{ + TableID: testEnv.config.Table, + ColumnFamilies: map[string]Family{ + "f": { + ValueType: AggregateType{ + Input: Int64Type{}, + Aggregator: SumAggregator{}, + }, + }, + }, + } + if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil { + t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) + } + client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) + if gotErr != nil { + t.Fatalf("NewClientWithConfig failed: %v", gotErr) + } + defer client.Close() + table := client.Open(testEnv.config.Table) + + // Override maxMutations to break mutations into smaller groups + origMaxMutations := maxMutations + t.Cleanup(func() { + maxMutations = origMaxMutations + }) + maxMutations = 2 + + // Create mutations + m1 := NewMutation() + m1.AddIntToCell("f", "q", 0, 1000) + m2 := NewMutation() + m2.AddIntToCell("f", "q", 0, 2000) + + // Perform ApplyBulk operation and compare errors + rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"} + var wantErr error + wantErrs := []error{nil, nil, failErr, failErr, nil, nil} + gotErrs, gotErr := table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2}) + + // Assert overall error + if !equalErrs(gotErr, wantErr) { + t.Fatalf("ApplyBulk err got: %v, want: %v", gotErr, wantErr) + } + + // Assert individual muation errors + if len(gotErrs) != len(wantErrs) { + t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs) + } + for i := range gotErrs { + if !equalErrs(gotErrs[i], wantErrs[i]) { + t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i]) + } + } + + // Assert number of requests sent + wantReqCount := len(rowKeys) / maxMutations + if reqCount != wantReqCount { + t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount) + } + + // Assert individual mutation apply success/failure by reading rows + gotErr = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool { + rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) + if rowMutated && row.Key() == failedRow { + t.Error("Expected mutation to fail for row " + row.Key()) + } + if !rowMutated && row.Key() != failedRow { + t.Error("Expected mutation to succeed for row " + row.Key()) + } + return true + }) + if gotErr != nil { + t.Fatalf("ReadRows failed: %v", gotErr) + } +} diff --git a/datastore/datastore.go b/datastore/datastore.go index 4ee71cc18b04..64aef7a1aa65 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -435,7 +435,7 @@ func (c *Client) Get(ctx context.Context, key *Key, dst interface{}) (err error) } var opts *pb.ReadOptions - if !c.readSettings.readTime.IsZero() { + if c.readSettings.readTimeExists() { opts = &pb.ReadOptions{ ConsistencyType: &pb.ReadOptions_ReadTime{ // Timestamp cannot be less than microseconds accuracy. See #6938 @@ -470,7 +470,7 @@ func (c *Client) GetMulti(ctx context.Context, keys []*Key, dst interface{}) (er defer func() { trace.EndSpan(ctx, err) }() var opts *pb.ReadOptions - if c.readSettings != nil && !c.readSettings.readTime.IsZero() { + if c.readSettings.readTimeExists() { opts = &pb.ReadOptions{ ConsistencyType: &pb.ReadOptions_ReadTime{ // Timestamp cannot be less than microseconds accuracy. See #6938 @@ -875,6 +875,10 @@ type readSettings struct { readTime time.Time } +func (rs *readSettings) readTimeExists() bool { + return rs != nil && !rs.readTime.IsZero() +} + // WithReadOptions specifies constraints for accessing documents from the database, // e.g. at what time snapshot to read the documents. // The client uses this value for subsequent reads, unless additional ReadOptions diff --git a/datastore/integration_test.go b/datastore/integration_test.go index d2ebd836b82e..b93d7f3dc8e9 100644 --- a/datastore/integration_test.go +++ b/datastore/integration_test.go @@ -435,6 +435,52 @@ func TestIntegration_GetWithReadTime(t *testing.T) { _ = client.Delete(ctx, k) } +func TestIntegration_RunWithReadTime(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + client := newTestClient(ctx, t) + defer cancel() + defer client.Close() + + type RT struct { + TimeCreated time.Time + } + + rt1 := RT{time.Now()} + k := NameKey("RT", "ReadTime", nil) + + tx, err := client.NewTransaction(ctx) + if err != nil { + t.Fatal(err) + } + + if _, err := tx.Put(k, &rt1); err != nil { + t.Fatalf("Transaction.Put: %v\n", err) + } + + if _, err := tx.Commit(); err != nil { + t.Fatalf("Transaction.Commit: %v\n", err) + } + + testutil.Retry(t, 5, time.Duration(10*time.Second), func(r *testutil.R) { + got := RT{} + tm := ReadTime(time.Now()) + + client.WithReadOptions(tm) + + // If the Entity isn't available at the requested read time, we get + // a "datastore: no such entity" error. The ReadTime is otherwise not + // exposed in anyway in the response. + err = client.Get(ctx, k, &got) + client.Run(ctx, NewQuery("RT")) + if err != nil { + r.Errorf("client.Get: %v", err) + } + }) + + // Cleanup + _ = client.Delete(ctx, k) +} + func TestIntegration_TopLevelKeyLoaded(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) defer cancel() @@ -1225,6 +1271,8 @@ func TestIntegration_AggregationQueries(t *testing.T) { client := newTestClient(ctx, t) defer client.Close() + beforeCreate := time.Now().Truncate(time.Millisecond) + parent := NameKey("SQParent", keyPrefix+"AggregationQueries"+suffix, nil) now := timeNow.Truncate(time.Millisecond).Unix() children := []*SQChild{ @@ -1255,12 +1303,13 @@ func TestIntegration_AggregationQueries(t *testing.T) { }() testCases := []struct { - desc string - aggQuery *AggregationQuery - transactionOpts []TransactionOption - wantFailure bool - wantErrMsg string - wantAggResult AggregationResult + desc string + aggQuery *AggregationQuery + transactionOpts []TransactionOption + clientReadOptions []ReadOption + wantFailure bool + wantErrMsg string + wantAggResult AggregationResult }{ { @@ -1280,6 +1329,26 @@ func TestIntegration_AggregationQueries(t *testing.T) { "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}}, }, }, + { + desc: "Count success before create with client read time", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Filter("I>=", 3). + NewAggregationQuery(). + WithCount("count"), + clientReadOptions: []ReadOption{ReadTime(beforeCreate)}, + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 0}}, + }, + }, + { + desc: "Count success after create with client read time", + aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Filter("I>=", 3). + NewAggregationQuery(). + WithCount("count"), + clientReadOptions: []ReadOption{ReadTime(time.Now().Truncate(time.Millisecond))}, + wantAggResult: map[string]interface{}{ + "count": &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}}, + }, + }, { desc: "Multiple aggregations", aggQuery: NewQuery("SQChild").Ancestor(parent).Filter("T=", now). @@ -1352,8 +1421,16 @@ func TestIntegration_AggregationQueries(t *testing.T) { } for _, testCase := range testCases { + testClient := client + if testCase.clientReadOptions != nil { + clientWithReadTime := newTestClient(ctx, t) + clientWithReadTime.WithReadOptions(testCase.clientReadOptions...) + defer clientWithReadTime.Close() + + testClient = clientWithReadTime + } testutil.Retry(t, 10, time.Second, func(r *testutil.R) { - gotAggResult, gotErr := client.RunAggregationQuery(ctx, testCase.aggQuery) + gotAggResult, gotErr := testClient.RunAggregationQuery(ctx, testCase.aggQuery) gotFailure := gotErr != nil if gotFailure != testCase.wantFailure || diff --git a/datastore/query.go b/datastore/query.go index f5dd66d81236..0b241c81ca2e 100644 --- a/datastore/query.go +++ b/datastore/query.go @@ -29,6 +29,7 @@ import ( "cloud.google.com/go/internal/protostruct" "cloud.google.com/go/internal/trace" "google.golang.org/api/iterator" + "google.golang.org/protobuf/types/known/timestamppb" wrapperspb "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -501,13 +502,13 @@ func (q *Query) End(c Cursor) *Query { } // toRunQueryRequest converts the query to a protocol buffer. -func (q *Query) toRunQueryRequest(req *pb.RunQueryRequest) error { +func (q *Query) toRunQueryRequest(req *pb.RunQueryRequest, clientReadSettings *readSettings) error { dst, err := q.toProto() if err != nil { return err } - req.ReadOptions, err = parseQueryReadOptions(q.eventual, q.trans) + req.ReadOptions, err = parseQueryReadOptions(q.eventual, q.trans, clientReadSettings) if err != nil { return err } @@ -879,7 +880,7 @@ func (c *Client) run(ctx context.Context, q *Query, opts ...RunOption) *Iterator t.req.ExplainOptions = runSettings.explainOptions } - if err := q.toRunQueryRequest(t.req); err != nil { + if err := q.toRunQueryRequest(t.req, c.readSettings); err != nil { t.err = err } return t @@ -948,7 +949,7 @@ func (c *Client) RunAggregationQueryWithOptions(ctx context.Context, aq *Aggrega defer txn.stateLockDeferUnlock()() } - req.ReadOptions, err = parseQueryReadOptions(aq.query.eventual, txn) + req.ReadOptions, err = parseQueryReadOptions(aq.query.eventual, txn, c.readSettings) if err != nil { return ar, err } @@ -976,22 +977,30 @@ func (c *Client) RunAggregationQueryWithOptions(ctx context.Context, aq *Aggrega return ar, nil } -func validateReadOptions(eventual bool, t *Transaction) error { - if t == nil { +func validateReadOptions(eventual bool, t *Transaction, clientReadSettings *readSettings) error { + if !clientReadSettings.readTimeExists() { + if t == nil { + return nil + } + if eventual { + return errEventualConsistencyTransaction + } + if t.state == transactionStateExpired { + return errExpiredTransaction + } return nil } - if eventual { - return errEventualConsistencyTransaction - } - if t.state == transactionStateExpired { - return errExpiredTransaction + + if t != nil || eventual { + return errEventualConsistencyTxnClientReadTime } + return nil } // parseQueryReadOptions translates Query read options into protobuf format. -func parseQueryReadOptions(eventual bool, t *Transaction) (*pb.ReadOptions, error) { - err := validateReadOptions(eventual, t) +func parseQueryReadOptions(eventual bool, t *Transaction, clientReadSettings *readSettings) (*pb.ReadOptions, error) { + err := validateReadOptions(eventual, t, clientReadSettings) if err != nil { return nil, err } @@ -1004,6 +1013,13 @@ func parseQueryReadOptions(eventual bool, t *Transaction) (*pb.ReadOptions, erro return &pb.ReadOptions{ConsistencyType: &pb.ReadOptions_ReadConsistency_{ReadConsistency: pb.ReadOptions_EVENTUAL}}, nil } + if clientReadSettings.readTimeExists() { + return &pb.ReadOptions{ + ConsistencyType: &pb.ReadOptions_ReadTime{ + ReadTime: timestamppb.New(clientReadSettings.readTime), + }, + }, nil + } return nil, nil } @@ -1117,7 +1133,7 @@ func (t *Iterator) nextBatch() error { } var err error - t.req.ReadOptions, err = parseQueryReadOptions(t.eventual, txn) + t.req.ReadOptions, err = parseQueryReadOptions(t.eventual, txn, t.client.readSettings) if err != nil { return err } diff --git a/datastore/query_test.go b/datastore/query_test.go index 414583c7acdc..f3ef6b720def 100644 --- a/datastore/query_test.go +++ b/datastore/query_test.go @@ -22,12 +22,14 @@ import ( "sort" "strings" "testing" + "time" pb "cloud.google.com/go/datastore/apiv1/datastorepb" "cloud.google.com/go/internal/testutil" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" ) var ( @@ -723,16 +725,29 @@ func TestNamespaceQuery(t *testing.T) { } } -func TestReadOptions(t *testing.T) { +func TestToRunQueryRequest(t *testing.T) { + clientReadTime := time.Now() tid := []byte{1} for _, test := range []struct { q *Query + crs *readSettings want *pb.ReadOptions }{ { q: NewQuery(""), want: nil, }, + { + q: NewQuery(""), + crs: &readSettings{ + readTime: clientReadTime, + }, + want: &pb.ReadOptions{ + ConsistencyType: &pb.ReadOptions_ReadTime{ + ReadTime: timestamppb.New(clientReadTime), + }, + }, + }, { q: NewQuery("").Transaction(nil), want: nil, @@ -755,20 +770,10 @@ func TestReadOptions(t *testing.T) { }, } { req := &pb.RunQueryRequest{} - if err := test.q.toRunQueryRequest(req); err != nil { + if err := test.q.toRunQueryRequest(req, test.crs); err != nil { t.Fatalf("%+v: got %v, want no error", test.q, err) } } - // Test errors. - for _, q := range []*Query{ - NewQuery("").Transaction(&Transaction{id: nil, state: transactionStateExpired}), - NewQuery("").Transaction(&Transaction{id: tid, state: transactionStateInProgress}).EventualConsistency(), - } { - req := &pb.RunQueryRequest{} - if err := q.toRunQueryRequest(req); err == nil { - t.Errorf("%+v: got nil, wanted error", q) - } - } } func TestInvalidFilters(t *testing.T) { @@ -956,6 +961,7 @@ func TestValidateReadOptions(t *testing.T) { desc string eventual bool trans *Transaction + crs *readSettings wantErr error }{ { @@ -991,8 +997,39 @@ func TestValidateReadOptions(t *testing.T) { desc: "No transaction in eventual query", eventual: true, }, + { + desc: "Eventual query with client read time", + eventual: true, + crs: &readSettings{ + readTime: time.Now(), + }, + wantErr: errEventualConsistencyTxnClientReadTime, + }, + { + desc: "Eventual query and transaction with client read time", + eventual: true, + crs: &readSettings{ + readTime: time.Now(), + }, + trans: &Transaction{ + id: []byte("test id"), + state: transactionStateInProgress, + }, + wantErr: errEventualConsistencyTxnClientReadTime, + }, + { + desc: "Transaction with client read time", + crs: &readSettings{ + readTime: time.Now(), + }, + trans: &Transaction{ + id: []byte("test id"), + state: transactionStateInProgress, + }, + wantErr: errEventualConsistencyTxnClientReadTime, + }, } { - gotErr := validateReadOptions(test.eventual, test.trans) + gotErr := validateReadOptions(test.eventual, test.trans, test.crs) gotErrMsg := "" if gotErr != nil { gotErrMsg = gotErr.Error() diff --git a/datastore/transaction.go b/datastore/transaction.go index 2f86e1506973..e835291aa44d 100644 --- a/datastore/transaction.go +++ b/datastore/transaction.go @@ -35,8 +35,10 @@ const maxIndividualReqTxnRetry = 5 var ErrConcurrentTransaction = errors.New("datastore: concurrent transaction") var ( - errExpiredTransaction = errors.New("datastore: transaction expired") - errEventualConsistencyTransaction = errors.New("datastore: cannot use EventualConsistency query in a transaction") + errExpiredTransaction = errors.New("datastore: transaction expired") + errEventualConsistencyTransaction = errors.New("datastore: cannot use EventualConsistency query in a transaction") + errEventualConsistencyTxnClientReadTime = errors.New("datastore: cannot use EventualConsistency query when read time is specified on client or query is in a transaction") + errTxnClientReadTime = errors.New("datastore: cannot use query in a transaction when read time is specified on client") txnBackoff = gax.Backoff{ Initial: 20 * time.Millisecond, @@ -542,6 +544,10 @@ func (t *Transaction) Rollback() (err error) { } func (t *Transaction) parseReadOptions() (*pb.ReadOptions, error) { + if t.client != nil && t.client.readSettings.readTimeExists() { + return nil, errTxnClientReadTime + } + var opts *pb.ReadOptions switch t.state { case transactionStateExpired: