Skip to content

Commit

Permalink
feat: rabbitmq pubsub (#21)
Browse files Browse the repository at this point in the history
* first try to implement steps for RabbitMQ

* fix ci

* Added new steps and ATs

* added return statement

* revert change in return statement

* correlator added in the message steps

* fix: apply suggested changes

Co-authored-by: Ismael Taboada Rodero <[email protected]>

* fix: typos

* fix: step is added for parallel execution of ATs

* fix: feedback received with the steps

* fix: debugf format

* fix: new step added for failure cases

* fix: feedback received

* Remove correlationId use

* fix: refactor unsubscribe step

Co-authored-by: Ismael Taboada Rodero <[email protected]>
Co-authored-by: Cesar Brea <[email protected]>
Co-authored-by: Mario LG <[email protected]>
  • Loading branch information
4 people authored Apr 27, 2021
1 parent 5ccba19 commit ea4da6c
Show file tree
Hide file tree
Showing 10 changed files with 685 additions and 0 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ jobs:
ports:
- 6379:6379

rabbit:
image: rabbitmq:3.8.3
ports:
- 5672:5672
options: >-
--health-cmd "rabbitmqctl node_health_check"
--health-interval 10s
--health-timeout 5s
--health-retries 5
elasticsearch:
image: elasticsearch:7.12.0
ports:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ require (
github.com/kelseyhightower/envconfig v1.4.0
github.com/lestrrat-go/jwx v1.1.4
github.com/miekg/dns v1.1.40
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.8.0
github.com/spf13/pflag v1.0.5
github.com/streadway/amqp v1.0.0
github.com/tidwall/gjson v1.6.8
github.com/tidwall/pretty v1.1.0 // indirect
github.com/tidwall/sjson v1.1.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
37 changes: 37 additions & 0 deletions steps/rabbit/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 Telefonica Cybersecurity & Cloud Tech SL
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rabbit

import (
"context"
)

// ContextKey defines a type to store the rabbit session in context.Context.
type ContextKey string

var contextKey ContextKey = "rabbitSession"

// InitializeContext adds the rabbit session to the context.
// The new context is returned because context is immutable.
func InitializeContext(ctx context.Context) context.Context {
var session Session
return context.WithValue(ctx, contextKey, &session)
}

// GetSession returns the rabbit session stored in context.
// Note that the context should be previously initialized with InitializeContext function.
func GetSession(ctx context.Context) *Session {
return ctx.Value(contextKey).(*Session)
}
68 changes: 68 additions & 0 deletions steps/rabbit/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2021 Telefonica Cybersecurity & Cloud Tech SL
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rabbit

import (
"log"
"os"
"path"

"github.com/Telefonica/golium"
"github.com/sirupsen/logrus"
)

var logger *Logger

// GetLogger returns the logger for rabbit messages in publish/subscribe.
// If the logger is not created yet, it creates a new instance of Logger.
func GetLogger() *Logger {
if logger != nil {
return logger
}
dir := golium.GetConfig().Log.Directory
path := path.Join(dir, "rabbit-pubsub.log")
logger, err := NewLogger(path)
if err != nil {
logrus.Fatalf("Error creating rabbit logger with file: '%s'. %s", path, err)
}
return logger
}

// Logger logs the HTTP request and response in a configurable file.
type Logger struct {
log *log.Logger
}

// NewLogger creates an instance of the logger.
// It configures the file path where the RabbitMQ interactions are written.
func NewLogger(path string) (*Logger, error) {
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, err
}
return &Logger{
log: log.New(file, "", log.Ldate|log.Lmicroseconds|log.LUTC),
}, nil
}

// LogPublishedMessage logs a rabbit message published to a topic.
func (l Logger) LogPublishedMessage(msg, topic, corr string) {
l.log.Printf("Publish to %s [%s]:\n%s\n\n", topic, corr, msg)
}

// LogReceivedMessage logs a rabbit message received from a topic.
func (l Logger) LogReceivedMessage(msg, topic, corr string) {
l.log.Printf("Received from %s [%s]:\n%s\n\n", topic, corr, msg)
}
Loading

0 comments on commit ea4da6c

Please sign in to comment.