From 7bba30531b30885ab854f64d58ae90daebed1a9a Mon Sep 17 00:00:00 2001 From: oliverpool Date: Wed, 4 Dec 2024 10:21:43 +0100 Subject: [PATCH] bug: cancelling a stream blocks when http2 is enabled --- connect_ext_close_test.go | 81 +++++++++++++++++++++++++++++++++++++++ connect_ext_test.go | 2 + 2 files changed, 83 insertions(+) create mode 100644 connect_ext_close_test.go diff --git a/connect_ext_close_test.go b/connect_ext_close_test.go new file mode 100644 index 00000000..25f7b43f --- /dev/null +++ b/connect_ext_close_test.go @@ -0,0 +1,81 @@ +package connect_test + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + connect "connectrpc.com/connect" + "connectrpc.com/connect/internal/assert" + pingv1 "connectrpc.com/connect/internal/gen/connect/ping/v1" + "connectrpc.com/connect/internal/gen/connect/ping/v1/pingv1connect" +) + +func TestClientStream_CancelContext(t *testing.T) { + t.Run("HTTP2 disabled", func(t *testing.T) { + testClientStream_CancelContext(t, false) + }) + t.Run("HTTP2 enabled", func(t *testing.T) { + testClientStream_CancelContext(t, true) + }) +} + +func testClientStream_CancelContext(t *testing.T, enableHTTP2 bool) { + mux := http.NewServeMux() + mux.Handle(pingv1connect.NewPingServiceHandler(pingServer{ + delayCountUp: 3 * time.Second, + })) + + s := httptest.NewUnstartedServer(mux) + s.EnableHTTP2 = enableHTTP2 + s.StartTLS() + + client := pingv1connect.NewPingServiceClient( + s.Client(), + s.URL, + ) + + ctx, cancel := context.WithCancel(context.Background()) + stream, err := client.CountUp(ctx, connect.NewRequest(&pingv1.CountUpRequest{ + Number: 100, + })) + + assert.Nil(t, err) + + msg := make(chan int64) + go func() { + for stream.Receive() { + select { + case msg <- stream.Msg().Number: + default: + } + } + close(msg) + }() + + assert.Equal(t, <-msg, 1) + + closed := make(chan struct{}) + go func() { + t.Log("will close stream") + t.Log("stream closed:", stream.Close()) + close(closed) + }() + + time.Sleep(10 * time.Millisecond) // delay to ensure that stream.Close has already been called + cancel() + + select { + case <-closed: + case <-time.After(time.Second): + t.Error("stream was not closed within 1s") + } + select { + case _, ok := <-msg: + assert.False(t, ok) + case <-time.After(time.Second): + t.Error("stream was not done receiving within 1s") + } +} diff --git a/connect_ext_test.go b/connect_ext_test.go index b93c5708..5e3f775b 100644 --- a/connect_ext_test.go +++ b/connect_ext_test.go @@ -2813,6 +2813,7 @@ type pingServer struct { checkMetadata bool includeErrorDetails bool + delayCountUp time.Duration } func (p pingServer) Ping(ctx context.Context, request *connect.Request[pingv1.PingRequest]) (*connect.Response[pingv1.PingResponse], error) { @@ -2913,6 +2914,7 @@ func (p pingServer) CountUp( if err := stream.Send(&pingv1.CountUpResponse{Number: i}); err != nil { return err } + time.Sleep(p.delayCountUp) } return nil }