Skip to content

Commit

Permalink
updated hashing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Oct 11, 2024
1 parent aebbdfd commit 7b89fda
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
10 changes: 3 additions & 7 deletions internal/sink/hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (h HashRing) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h HashRing) GetServer(key string) string {
func (h HashRing) Get(key string) string {
if len(h) == 0 {
return ""
}
Expand All @@ -42,14 +42,14 @@ func (h HashRing) GetServer(key string) string {
return h[idx].sink
}

func (h *HashRing) AddServer(server string) {
func (h *HashRing) Add(server string) {
hash := int(crc32.ChecksumIEEE([]byte(server)))
node := Node{hash: hash, sink: server}
*h = append(*h, node)
sort.Sort(h)
}

func (h *HashRing) RemoveServer(server string) {
func (h *HashRing) Remove(server string) {
hash := int(crc32.ChecksumIEEE([]byte(server)))
for i, node := range *h {
if node.hash == hash {
Expand All @@ -60,10 +60,6 @@ func (h *HashRing) RemoveServer(server string) {
sort.Sort(h)
}

func NewRing() *HashRing {
return &HashRing{}
}

func GetKey(field string, msg *wrp.Message) string {

v := reflect.ValueOf(msg)
Expand Down
27 changes: 13 additions & 14 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type CommonWebhook struct {
logger *zap.Logger
}
type KafkaSink struct {
Kafkas []*Kafka
Kafkas map[string]*Kafka
Hash *HashRing
HashField string
}
Expand Down Expand Up @@ -81,15 +81,15 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) Sink {
}
if len(l.Registration.Kafkas) > 0 {
var sink KafkaSink
r := NewRing()
r := &HashRing{}
sink.HashField = l.Registration.Hash.Field
for i, k := range l.Registration.Kafkas {
kafka := &Kafka{}
kafka.Update(l.GetId(), "quickstart-events", k.RetryHint.MaxRetry, k.BootstrapServers, logger)
sink.Kafkas = append(sink.Kafkas, kafka)
sink.Kafkas[strconv.Itoa(i)] = kafka
if l.Registration.Hash.Field != "" {
key := l.Registration.Hash.Field + strconv.Itoa(i)
r.AddServer(key)
r.Add(strconv.Itoa(i))

}
}
sink.Hash = r
Expand Down Expand Up @@ -351,18 +351,17 @@ func (k *Kafka) Update(id, topic string, retries int, servers []string, logger *
func (k KafkaSink) Send(secret string, acceptType string, msg *wrp.Message) error {
var errs error
if k.HashField != "" {
key := GetKey(k.HashField, msg)
for i, kafka := range k.Kafkas {
hash := k.HashField + strconv.Itoa(i)
server := k.Hash.GetServer(key)
if server == hash {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}

if kafka, ok := k.Kafkas[k.Hash.Get(GetKey(k.HashField, msg))]; ok {
err := kafka.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}

} else {
errs = fmt.Errorf("could not find kakfa for the related hash %v", k.HashField)
}

} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
Expand Down

0 comments on commit 7b89fda

Please sign in to comment.