-
Notifications
You must be signed in to change notification settings - Fork 9
/
optimistic_delete_stream_failure_test.go
70 lines (60 loc) · 2.1 KB
/
optimistic_delete_stream_failure_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package rangedbserver_test
import (
"context"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
"github.com/inklabs/rangedb/pkg/grpc/rangedbserver"
"github.com/inklabs/rangedb"
"github.com/inklabs/rangedb/pkg/clock/provider/sequentialclock"
"github.com/inklabs/rangedb/pkg/grpc/rangedbpb"
"github.com/inklabs/rangedb/provider/inmemorystore"
"github.com/inklabs/rangedb/rangedbtest"
)
func ExampleRangeDBServer_OptimisticDeleteStream_failure() {
// Given
inMemoryStore := inmemorystore.New(
inmemorystore.WithClock(sequentialclock.New()),
)
rangedbtest.BindEvents(inMemoryStore)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
const aggregateID = "c129a8fa3d8945928c3e300beb0b6d58"
event1 := rangedbtest.ThingWasDone{ID: aggregateID, Number: 100}
event2 := rangedbtest.ThingWasDone{ID: aggregateID, Number: 200}
streamName := "thing-c129a8fa3d8945928c3e300beb0b6d58"
PrintError(IgnoreFirstNumber(inMemoryStore.Save(ctx, streamName,
&rangedb.EventRecord{Event: event1},
&rangedb.EventRecord{Event: event2},
)))
// Setup gRPC server
bufListener := bufconn.Listen(7)
server := grpc.NewServer()
defer server.Stop()
rangeDBServer, err := rangedbserver.New(rangedbserver.WithStore(inMemoryStore))
PrintError(err)
defer rangeDBServer.Stop()
rangedbpb.RegisterRangeDBServer(server, rangeDBServer)
go func() {
PrintError(server.Serve(bufListener))
}()
// Setup gRPC connection
dialer := grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return bufListener.Dial()
})
conn, err := grpc.DialContext(ctx, "bufnet", dialer, grpc.WithInsecure(), grpc.WithBlock())
defer Close(conn)
PrintError(err)
// Setup gRPC client
rangeDBClient := rangedbpb.NewRangeDBClient(conn)
optimisticDeleteStream := &rangedbpb.OptimisticDeleteStreamRequest{
ExpectedStreamSequenceNumber: 5,
StreamName: streamName,
}
// When
_, err = rangeDBClient.OptimisticDeleteStream(ctx, optimisticDeleteStream)
PrintError(err)
// Output:
// rpc error: code = Unknown desc = unexpected sequence number: 5, actual: 2
}