Skip to content

Commit

Permalink
upgrade go to 1.22; upgrade packages; migrate azure/go-amqp to 1.0.5
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Silva <[email protected]>
  • Loading branch information
cx-joses committed Apr 24, 2024
1 parent 9a61fca commit 9b65c4f
Show file tree
Hide file tree
Showing 15 changed files with 185 additions and 235 deletions.
6 changes: 3 additions & 3 deletions protocol/amqp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ go 1.18
replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-amqp v1.0.5
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.9.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
Expand Down
18 changes: 6 additions & 12 deletions protocol/amqp/v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand All @@ -19,18 +18,13 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion protocol/amqp/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const prefix = "cloudEvents:" // Name prefix for AMQP properties that hold CE at

var (
// Use the package path as AMQP error condition name
condition = amqp.ErrorCondition(reflect.TypeOf(Message{}).PkgPath())
condition = amqp.ErrCond(reflect.TypeOf(Message{}).PkgPath())
specs = spec.WithPrefix(prefix)
)

Expand Down
53 changes: 0 additions & 53 deletions protocol/amqp/v2/options.go

This file was deleted.

127 changes: 60 additions & 67 deletions protocol/amqp/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ import (
)

type Protocol struct {
connOpts []amqp.ConnOption
sessionOpts []amqp.SessionOption
senderLinkOpts []amqp.LinkOption
receiverLinkOpts []amqp.LinkOption

// AMQP
Client *amqp.Client
Client *amqp.Conn
Session *amqp.Session
ownedClient bool
Node string
Expand All @@ -35,54 +30,60 @@ type Protocol struct {
}

// NewProtocolFromClient creates a new amqp transport.
func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue string, opts ...Option) (*Protocol, error) {
func NewProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
queue string,
senderOptions amqp.SenderOptions,
receiverOptions amqp.ReceiverOptions,
) (*Protocol, error) {
t := &Protocol{
Node: queue,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
Node: queue,
Client: client,
Session: session,
}

t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(queue))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, queue, &senderOptions)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)
t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(t.Node))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, &receiverOptions)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver)
return t, nil
}

// NewProtocol creates a new amqp transport.
func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewProtocol(
ctx context.Context,
server, queue string,
connOptions amqp.ConnOptions,
sessionOptions amqp.SessionOptions,
senderOptions amqp.SenderOptions,
receiverOptions amqp.ReceiverOptions,
) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewProtocolFromClient(client, session, queue, opts...)
p, err := NewProtocolFromClient(ctx, client, session, queue, senderOptions, receiverOptions)
if err != nil {
return nil, err
}
Expand All @@ -92,69 +93,70 @@ func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOpti
}

// NewSenderProtocolFromClient creates a new amqp sender transport.
func NewSenderProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewSenderProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
senderOptions amqp.SenderOptions,
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}
t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(address))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, address, &senderOptions)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)
t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

return t, nil
}

// NewReceiverProtocolFromClient creates a new receiver amqp transport.
func NewReceiverProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewReceiverProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
receiverOptions amqp.ReceiverOptions,
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
Node: address,
Client: client,
Session: session,
}

t.Node = address
t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(address))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, address, &receiverOptions)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver)
return t, nil
}

// NewSenderProtocol creates a new sender amqp transport.
func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, senderOptions amqp.SenderOptions) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewSenderProtocolFromClient(client, session, address, opts...)
p, err := NewSenderProtocolFromClient(ctx, client, session, address, senderOptions)
if err != nil {
return nil, err
}
Expand All @@ -164,20 +166,20 @@ func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, ses
}

// NewReceiverProtocol creates a new receiver amqp transport.
func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, receiverOptions amqp.ReceiverOptions) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewReceiverProtocolFromClient(client, session, address, opts...)
p, err := NewReceiverProtocolFromClient(ctx, client, session, address, receiverOptions)

if err != nil {
return nil, err
Expand All @@ -187,15 +189,6 @@ func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, s
return p, nil
}

func (t *Protocol) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(t); err != nil {
return err
}
}
return nil
}

func (t *Protocol) Close(ctx context.Context) (err error) {
if t.ownedClient {
// Closing the client will close at cascade sender and receiver
Expand Down
11 changes: 7 additions & 4 deletions protocol/amqp/v2/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
const serverDown = "session ended by server"

// receiver wraps an amqp.Receiver as a binding.Receiver
type receiver struct{ amqp *amqp.Receiver }
type receiver struct {
amqp *amqp.Receiver
options amqp.ReceiveOptions
}

func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
m, err := r.amqp.Receive(ctx)
m, err := r.amqp.Receive(ctx, &r.options)
if err != nil {
if err == ctx.Err() {
return nil, io.EOF
Expand All @@ -38,6 +41,6 @@ func (r *receiver) Receive(ctx context.Context) (binding.Message, error) {
}

// NewReceiver create a new Receiver which wraps an amqp.Receiver in a binding.Receiver
func NewReceiver(amqp *amqp.Receiver) protocol.Receiver {
return &receiver{amqp: amqp}
func NewReceiver(amqp *amqp.Receiver, options amqp.ReceiveOptions) protocol.Receiver {
return &receiver{amqp: amqp, options: options}
}
Loading

0 comments on commit 9b65c4f

Please sign in to comment.