Skip to content

Commit

Permalink
fix: use non-streaming events API
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Dec 28, 2024
1 parent 74fde81 commit eac88c4
Show file tree
Hide file tree
Showing 18 changed files with 114 additions and 418 deletions.
12 changes: 10 additions & 2 deletions api/grpc/events/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func (cm clientMock) SetStream(ctx context.Context, req *SetStreamRequest, opts
return
}

func (cm clientMock) Publish(ctx context.Context, opts ...grpc.CallOption) (Service_PublishClient, error) {
return newPublishStreamMock(), nil
func (cm clientMock) PublishBatch(ctx context.Context, req *PublishRequest, opts ...grpc.CallOption) (resp *PublishResponse, err error) {
switch req.Topic {
case "fail":
err = status.Error(codes.Internal, "internal failure")
default:
resp = &PublishResponse{
AckCount: 42,
}
}
return
}
10 changes: 4 additions & 6 deletions api/grpc/events/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@ func (cp clientPool) SetStream(ctx context.Context, req *SetStreamRequest, opts
return
}

func (cp clientPool) Publish(ctx context.Context, opts ...grpc.CallOption) (stream Service_PublishClient, err error) {
func (cp clientPool) PublishBatch(ctx context.Context, req *PublishRequest, opts ...grpc.CallOption) (resp *PublishResponse, err error) {
var conn *grpcpool.ClientConn
conn, err = cp.connPool.Get(ctx)
var c *grpc.ClientConn
if err == nil {
c = conn.ClientConn
conn.Close() // return back to the conn pool immediately
defer conn.Close()
}
var client ServiceClient
if err == nil {
client = NewServiceClient(c)
stream, err = client.Publish(ctx, opts...)
client = NewServiceClient(conn)
resp, err = client.PublishBatch(ctx, req, opts...)
}
return
}
13 changes: 6 additions & 7 deletions api/grpc/events/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package events
import (
"context"
"fmt"
"github.com/awakari/pub/model"
"github.com/awakari/pub/util"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"log/slog"
)

Expand All @@ -25,11 +26,9 @@ func (lm loggingMiddleware) SetStream(ctx context.Context, topic string, limit u
return
}

func (lm loggingMiddleware) NewPublisher(ctx context.Context, topic string) (p model.MessagesWriter, err error) {
p, err = lm.svc.NewPublisher(ctx, topic)
lm.log.Debug(fmt.Sprintf("events.Publish(topic=%s): err=%s", topic, err))
if err == nil {
p = model.NewMessagesWriterLogging(p, lm.log, topic)
}
func (lm loggingMiddleware) Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error) {
ackCount, err = lm.svc.Publish(ctx, topic, evts)
ll := util.LogLevel(err)
lm.log.Log(ctx, ll, fmt.Sprintf("events.Publish(%s, %d): ack=%d, err=%s", topic, len(evts), ackCount, err))
return
}
83 changes: 0 additions & 83 deletions api/grpc/events/publish_stream_mock.go

This file was deleted.

47 changes: 0 additions & 47 deletions api/grpc/events/publisher.go

This file was deleted.

32 changes: 0 additions & 32 deletions api/grpc/events/publisher_mock.go

This file was deleted.

84 changes: 0 additions & 84 deletions api/grpc/events/publisher_test.go

This file was deleted.

17 changes: 10 additions & 7 deletions api/grpc/events/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"context"
"errors"
"fmt"
"github.com/awakari/pub/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
)

type Service interface {
SetStream(ctx context.Context, topic string, limit uint32) (err error)
NewPublisher(ctx context.Context, topic string) (p model.MessagesWriter, err error)
Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error)
}

type service struct {
Expand All @@ -39,11 +39,14 @@ func (svc service) SetStream(ctx context.Context, topic string, limit uint32) (e
return
}

func (svc service) NewPublisher(ctx context.Context, topic string) (p model.MessagesWriter, err error) {
var stream Service_PublishClient
stream, err = svc.client.Publish(ctx)
if err == nil {
p = newPublisher(stream, topic)
func (svc service) Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error) {
var resp *PublishResponse
resp, err = svc.client.PublishBatch(ctx, &PublishRequest{
Topic: topic,
Evts: evts,
})
if resp != nil {
ackCount = resp.AckCount
}
err = decodeError(err)
return
Expand Down
2 changes: 1 addition & 1 deletion api/grpc/events/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ service Service {
rpc SetStream(SetStreamRequest) returns (SetStreamResponse);

// Publish events to the specified topic.
rpc Publish(stream PublishRequest) returns (stream PublishResponse);
rpc PublishBatch(PublishRequest) returns (PublishResponse);
}

message SetStreamRequest {
Expand Down
17 changes: 6 additions & 11 deletions api/grpc/events/service_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package events

import (
"context"
"github.com/awakari/pub/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
)

type serviceMock struct {
}

func NewServiceMock() Service {
return serviceMock{}
}

func (sm serviceMock) SetStream(ctx context.Context, topic string, limit uint32) (err error) {
switch topic {
case "":
Expand All @@ -18,16 +22,7 @@ func (sm serviceMock) SetStream(ctx context.Context, topic string, limit uint32)
return
}

func NewServiceMock() Service {
return serviceMock{}
}
func (sm serviceMock) Publish(ctx context.Context, topic string, evts []*pb.CloudEvent) (ackCount uint32, err error) {

func (sm serviceMock) NewPublisher(ctx context.Context, topic string) (mw model.MessagesWriter, err error) {
switch topic {
case "fail":
err = ErrInternal
default:
mw = NewPublisherMock()
}
return
}
Loading

0 comments on commit eac88c4

Please sign in to comment.