Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
code sanitization
Browse files Browse the repository at this point in the history
  • Loading branch information
rajveermalviya committed May 31, 2020
1 parent 6336ad3 commit 034301f
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *StreamHandler) Unsubscribe(ctx context.Context, consumerID string, topi
These methods can be used to add or remove subscriptions for a consumer.

If you want to give subscription control to the client look at
[the implementation](examples/nats_example) in the example.
[the implementation](examples/nats_example/main.go#L95) in the example.

To know more, check out the [example](examples/nats_example)

Expand Down
1 change: 1 addition & 0 deletions drivers/gcpdriver/gcppubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package gcpdriver contains Google Cloud Pub/Sub driver for unifrost.StreamHandler
package gcpdriver

import (
Expand Down
1 change: 1 addition & 0 deletions drivers/kafkadriver/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package kafkadriver contains Apache Kafka message bus driver for unifrost.StreamHandler
package kafkadriver

import (
Expand Down
1 change: 1 addition & 0 deletions drivers/memdriver/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package memdriver contains In-memory testing driver for unifrost.StreamHandler
package memdriver

import (
Expand Down
1 change: 1 addition & 0 deletions drivers/natsdriver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package natsdriver contains NATS driver for unifrost.StreamHandler
package natsdriver

import (
Expand Down
1 change: 1 addition & 0 deletions drivers/rabbitdriver/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package rabbitdriver contains RabbitMQ driver for unifrost.StreamHandler
package rabbitdriver

import (
Expand Down
1 change: 1 addition & 0 deletions drivers/sqsdriver/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sqsdriver contains Amazon SQS driver for unifrost.StreamHandler
package sqsdriver

import (
Expand Down
2 changes: 1 addition & 1 deletion examples/nats_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {

// add a signal notifier to close the streamer gracefully
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, os.Kill)
signal.Notify(sigs, os.Interrupt)

go func() {
log.Println("sig:", <-sigs)
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ module github.com/unifrost/unifrost
go 1.14

require (
cloud.google.com/go/pubsub v1.3.1
cloud.google.com/go/pubsub v1.4.0
github.com/Shopify/sarama v1.26.4
github.com/aws/aws-sdk-go v1.31.5
github.com/aws/aws-sdk-go v1.31.7
github.com/google/uuid v1.1.1
github.com/nats-io/nats.go v1.10.0
github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71
Expand All @@ -14,5 +14,4 @@ require (
gocloud.dev/pubsub/natspubsub v0.19.0
gocloud.dev/pubsub/rabbitpubsub v0.19.0
google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.24.0 // indirect
)
62 changes: 60 additions & 2 deletions go.sum

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions unifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
// StreamHandler handles all the consumers and subscriptions.
// It implements the http.Handler interface for easy embedding with any API server.
type StreamHandler struct {
subClient drivers.SubscriberClient
consumers map[string]*Consumer
consumerTTLMillis time.Duration
subscriptions map[string]*subscription
subClient drivers.SubscriberClient
consumers map[string]*Consumer
consumerTTL time.Duration
subscriptions map[string]*subscription

addConsumerChan chan consumerMessageg
closeConsumerChan chan consumerMessageg
Expand Down Expand Up @@ -113,7 +113,7 @@ func NewStreamHandler(ctx context.Context, subClient drivers.SubscriberClient, o
subClient: subClient,
consumers: map[string]*Consumer{},
subscriptions: map[string]*subscription{},
consumerTTLMillis: time.Duration(time.Minute.Milliseconds()),
consumerTTL: time.Duration(time.Minute),
addConsumerChan: make(chan consumerMessageg),
closeConsumerChan: make(chan consumerMessageg),
removeConsumerTopicChan: make(chan consumerTopicMessage),
Expand All @@ -137,7 +137,7 @@ func NewStreamHandler(ctx context.Context, subClient drivers.SubscriberClient, o
// default TTL is 1 minute
func ConsumerTTL(t time.Duration) Option {
return func(s *StreamHandler) error {
s.consumerTTLMillis = time.Duration(t.Milliseconds())
s.consumerTTL = t
return nil
}
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func (s *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// Stop the timeout timer if it is running.
if consumer.connected == false {
if !consumer.connected {
consumer.connected = true
consumer.ttlTimer.Stop()
consumer.timerStopped <- true
Expand All @@ -379,7 +379,7 @@ func (s *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
d, _ := json.Marshal(map[string]interface{}{
"config": map[string]interface{}{
"consumer_id": consumerID,
"consumer_ttl_millis": s.consumerTTLMillis,
"consumer_ttl_millis": s.consumerTTL.Milliseconds(),
},
"subscriptions": s.GetConsumerTopics(ctx, consumer),
})
Expand All @@ -399,7 +399,7 @@ func (s *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Printf("consumer %s disconnected", consumerID)

// When consumer gets disconnected start the timer
timer := time.NewTimer(s.consumerTTLMillis * time.Millisecond)
timer := time.NewTimer(s.consumerTTL)

consumer.ttlTimer = timer
consumer.connected = false
Expand Down Expand Up @@ -474,7 +474,7 @@ func (s *StreamHandler) NewConsumer(ctx context.Context) (*Consumer, error) {
ID: uuid.New().String(),
messageChannel: make(chan message, 10),
topics: map[string]struct{}{},
ttlTimer: time.NewTimer(s.consumerTTLMillis * time.Millisecond),
ttlTimer: time.NewTimer(s.consumerTTL * time.Millisecond),
connected: true,
}

Expand All @@ -496,7 +496,7 @@ func (s *StreamHandler) NewCustomConsumer(ctx context.Context, consumerID string
ID: consumerID,
messageChannel: make(chan message, 10),
topics: map[string]struct{}{},
ttlTimer: time.NewTimer(s.consumerTTLMillis * time.Millisecond),
ttlTimer: time.NewTimer(s.consumerTTL * time.Millisecond),
connected: true,
}

Expand Down

0 comments on commit 034301f

Please sign in to comment.