Skip to content

Commit

Permalink
Merge pull request gocelery#48 from haritsfahreza/master
Browse files Browse the repository at this point in the history
Add AMQP Celery backend and broker initialize using conn and channel
  • Loading branch information
yoonsio authored Aug 7, 2018
2 parents 85e5f9f + 2ec4687 commit c13b080
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
13 changes: 9 additions & 4 deletions amqp_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@ type AMQPCeleryBackend struct {
host string
}

// NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
conn, channel := NewAMQPConnection(host)
// NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP conn and channel
func NewAMQPCeleryBackendByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBackend {
// ensure exchange is initialized
backend := &AMQPCeleryBackend{
Channel: channel,
connection: conn,
host: host,
}
return backend
}

// NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
backend := NewAMQPCeleryBackendByConnAndChannel(NewAMQPConnection(host))
backend.host = host
return backend
}

// Reconnect reconnects to AMQP server
func (b *AMQPCeleryBackend) Reconnect() {
b.connection.Close()
Expand Down
8 changes: 6 additions & 2 deletions amqp_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel) {
if err != nil {
panic(err)
}
//defer connection.Close()

channel, err := connection.Channel()
if err != nil {
panic(err)
Expand All @@ -67,7 +67,11 @@ func NewAMQPConnection(host string) (*amqp.Connection, *amqp.Channel) {

// NewAMQPCeleryBroker creates new AMQPCeleryBroker
func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker {
conn, channel := NewAMQPConnection(host)
return NewAMQPCeleryBrokerByConnAndChannel(NewAMQPConnection(host))
}

// NewAMQPCeleryBrokerByConnAndChannel creates new AMQPCeleryBroker using AMQP conn and channel
func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBroker {
// ensure exchange is initialized
broker := &AMQPCeleryBroker{
Channel: channel,
Expand Down

0 comments on commit c13b080

Please sign in to comment.