Skip to content

Commit

Permalink
Merge pull request #556 from xmidt-org/feat/hashing
Browse files Browse the repository at this point in the history
Hashing Logic
  • Loading branch information
denopink authored Oct 24, 2024
2 parents 5f15c55 + a444a1c commit 9cedf8c
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ _testmain.go
.vscode/*
.dev/*

caduceus
caduceus.yaml
.ignore


!deploy/helm/caduceus
deploy/helm/caduceus/rendered.*

File renamed without changes.
77 changes: 77 additions & 0 deletions internal/sink/hasher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-FileCopyrightText: 2024 Comcast Cable Communications Management, LLC
// SPDX-License-Identifier: Apache-2.0
package sink

import (
"fmt"
"hash/crc32"
"reflect"
"sort"

"github.com/xmidt-org/wrp-go/v3"
)

type Node struct {
hash int
sink string
}

type HashRing []Node

func (h HashRing) Len() int {
return len(h)
}
func (h HashRing) Less(i, j int) bool {
return h[i].hash < h[j].hash
}
func (h HashRing) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h HashRing) Get(key string) string {
if len(h) == 0 {
return ""
}
hash := int(crc32.ChecksumIEEE([]byte(key)))
idx := sort.Search(len(h), func(i int) bool {
return h[i].hash >= hash
})
if idx == len(h) {
idx = 0
}
return h[idx].sink
}

func (h *HashRing) Add(server string) {
hash := int(crc32.ChecksumIEEE([]byte(server)))
node := Node{hash: hash, sink: server}
*h = append(*h, node)
sort.Sort(h)
}

func (h *HashRing) Remove(server string) {
hash := int(crc32.ChecksumIEEE([]byte(server)))
for i, node := range *h {
if node.hash == hash {
*h = append((*h)[:i], (*h)[i+1:]...)
break
}
}
sort.Sort(h)
}

func GetKey(field string, msg *wrp.Message) string {

v := reflect.ValueOf(msg)
if v.Kind() == reflect.Ptr {
v = v.Elem() // Dereference pointer if necessary
}

value := v.FieldByName(field)
if value.IsValid() {
return fmt.Sprintf("%v", value.Interface())
}

return ""

}
54 changes: 38 additions & 16 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ type CommonWebhook struct {
mutex sync.RWMutex
logger *zap.Logger
}
type Kafkas []*Kafka
type KafkaSink struct {
Kafkas map[string]*Kafka
Hash *HashRing
HashField string
}
type Kafka struct {
brokerAddr []string
topic string
Expand All @@ -76,12 +80,22 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
return whs
}
if len(l.Registration.Kafkas) > 0 {
var sink Kafkas
for _, k := range l.Registration.Kafkas {
var sink KafkaSink
r := &HashRing{}
sink.HashField = l.Registration.Hash.Field
for i, k := range l.Registration.Kafkas {
kafka := &Kafka{}
kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger)
sink = append(sink, kafka)
err := kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger)
if err != nil {
return nil
}
sink.Kafkas[strconv.Itoa(i)] = kafka
if l.Registration.Hash.Field != "" {
r.Add(strconv.Itoa(i))

}
}
sink.Hash = r
return sink
}
default:
Expand All @@ -90,9 +104,8 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
return nil
}

func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) (err error) {
//TODO: is there anything else that needs to be done for this?
//do we need to return an error
func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) {
//TODO: do we need to return an error if not - we should get rid of the error return
v1.id = id
v1.failureUrl = failureUrl
v1.deliveryInterval = c.DeliveryInterval
Expand All @@ -105,7 +118,6 @@ func (v1 *WebhookV1) Update(c Config, l *zap.Logger, altUrls []string, id, failu
}
v1.updateUrls(urlCount, receiverUrl, altUrls)

return nil
}

func (v1 *WebhookV1) updateUrls(urlCount int, url string, urls []string) {
Expand Down Expand Up @@ -338,14 +350,24 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger *
return nil
}

func (k Kafkas) Send(secret string, acceptType string, msg *wrp.Message) error {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error {
var errs error
for _, kafka := range k {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
if len(*k.Hash) == len(k.Kafkas) {
//TODO: flush out the error handling for kafka
if kafka, ok := k.Kafkas[k.Hash.Get(GetKey(k.HashField, msg))]; ok {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
for _, kafka := range k.Kafkas {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
return errs
Expand Down

0 comments on commit 9cedf8c

Please sign in to comment.