Skip to content

Commit

Permalink
refactor(cloudevents-server): add kafka topic to store event (#165)
Browse files Browse the repository at this point in the history
Signed-off-by: wuhuizuo <[email protected]>

Signed-off-by: wuhuizuo <[email protected]>
  • Loading branch information
wuhuizuo authored Sep 8, 2024
1 parent 7b7ca02 commit bbd423a
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 49 deletions.
34 changes: 20 additions & 14 deletions cloudevents-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ require (
)

require (
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
)

require (
Expand All @@ -35,8 +36,11 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful v2.15.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
Expand Down Expand Up @@ -72,6 +76,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand All @@ -85,32 +90,33 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.11.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/segmentio/kafka-go v0.4.47
github.com/shopspring/decimal v1.2.0 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/zclconf/go-cty v1.8.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/mod v0.15.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/oauth2 v0.15.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
Expand Down
80 changes: 54 additions & 26 deletions cloudevents-server/go.sum

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions cloudevents-server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ func newEventsHandlerFunc(cfg *config.Config) gin.HandlerFunc {
log.Fatal().Err(err).Msg("Failed to create protocol")
}

handler, err := newCloudEventsHandler(cfg)
handler, err := handler.NewEventProducer(cfg.Kafka)
if err != nil {
log.Fatal().Err(err).Msg("failed to create cloudevents handler")
log.Fatal().Err(err).Msg("failed to create broker handler")
}
log.Debug().Any("types", handler.SupportEventTypes()).Msgf("registered event handlers")

h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.Handle)
h, err := cloudevents.NewHTTPReceiveHandler(nil, p, handler.HandleCloudEvent)
if err != nil {
log.Fatal().Err(err).Msg("failed to create handler")
}
Expand Down
17 changes: 16 additions & 1 deletion cloudevents-server/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"net/http"

Expand All @@ -9,6 +10,7 @@ import (
"github.com/rs/zerolog/log"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/events/handler"
)

func main() {
Expand Down Expand Up @@ -44,8 +46,21 @@ func main() {
_ = r.SetTrustedProxies(nil)

setRouters(r, cfg)
log.Info().Str("address", serveAddr).Msg("server started.")

hd, err := newCloudEventsHandler(cfg)
if err != nil {
log.Fatal().Err(err).Msg("failed to create cloudevents handler")
}
log.Debug().Any("types", hd.SupportEventTypes()).Msgf("registered event handlers")

cg, err := handler.NewEventConsumerGroup(cfg.Kafka, hd)
if err != nil {
log.Fatal().Err(err).Msg("failed to create consumer group")
}
defer cg.Close()
go cg.Start(context.Background())

log.Info().Str("address", serveAddr).Msg("server started.")
if err := http.ListenAndServe(serveAddr, r); err != nil {
log.Fatal().Err(err).Send()
}
Expand Down
21 changes: 17 additions & 4 deletions cloudevents-server/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"regexp"

"gopkg.in/yaml.v3"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/kafka"
)

type Store struct {
Expand Down Expand Up @@ -50,14 +52,25 @@ type Tekton struct {
FailedStepTailLines int `yaml:"failed_step_tail_lines,omitempty" json:"failed_step_tail_lines,omitempty"`
}

type Kafka struct {
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty"`
ClientID string `yaml:"client_id,omitempty" json:"client_id,omitempty"`
Authentication kafka.Authentication `yaml:"authentication,omitempty" json:"authentication,omitempty"`
Producer kafka.Producer `yaml:"producer,omitempty" json:"producer,omitempty"`
Consumer kafka.Consumer `yaml:"consumer,omitempty" json:"consumer,omitempty"`
}

type TiBuild struct {
ResultSinkURL string `yaml:"result_sink_url,omitempty" json:"result_sink_url,omitempty"`
TriggerSinkURL string `yaml:"trigger_sink_url,omitempty" json:"trigger_sink_url,omitempty"`
}

type Config struct {
Store Store `yaml:"store,omitempty" json:"store,omitempty"`
Lark LarkBotApp `yaml:"lark,omitempty" json:"lark,omitempty"`
Tekton Tekton `yaml:"tekton,omitempty" json:"tekton,omitempty"`
TiBuild struct {
ResultSinkURL string `yaml:"result_sink_url,omitempty" json:"result_sink_url,omitempty"`
TriggerSinkURL string `yaml:"trigger_sink_url,omitempty" json:"trigger_sink_url,omitempty"`
} `yaml:"tibuild,omitempty" json:"tibuild,omitempty"`
TiBuild TiBuild `yaml:"tibuild,omitempty" json:"tibuild,omitempty"`
Kafka Kafka `yaml:"kafka,omitempty" json:"kafka,omitempty"`
}

func (c *Config) LoadFromFile(file string) error {
Expand Down
173 changes: 173 additions & 0 deletions cloudevents-server/pkg/events/handler/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package handler

import (
"context"
"os"
"os/signal"
"sync"
"syscall"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/rs/zerolog/log"
kafka "github.com/segmentio/kafka-go"

"github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/config"
skakfa "github.com/PingCAP-QE/ee-apps/cloudevents-server/pkg/kafka"
)

func NewEventProducer(cfg config.Kafka) (*EventProducer, error) {
writer, err := skakfa.NewWriter(cfg.Authentication, cfg.Brokers, "", cfg.ClientID)
if err != nil {
return nil, err
}

return &EventProducer{
writer: writer,
unknowEventTopic: cfg.Producer.DefaultTopic,
topicMapping: cfg.Producer.TopicMapping,
}, nil
}

func NewEventConsumer(cfg config.Kafka, topic string, hander EventHandler) (*EventConsumer, error) {
reader, err := skakfa.NewReader(cfg.Authentication, cfg.Brokers, topic, cfg.Consumer.GroupID, cfg.ClientID)
if err != nil {
return nil, err
}

return &EventConsumer{
reader: reader,
handler: hander,
}, nil
}

func NewEventConsumerGroup(cfg config.Kafka, hander EventHandler) (EventConsumerGroup, error) {
consumerGroup := make(EventConsumerGroup)
for _, topic := range cfg.Consumer.TopicMapping {
if consumerGroup[topic] != nil {
continue
}
consumer, err := NewEventConsumer(cfg, topic, hander)
if err != nil {
return nil, err
}

consumerGroup[topic] = consumer
}

return consumerGroup, nil
}

// EventProducer is the main structure for our event broker
type EventProducer struct {
writer *kafka.Writer
unknowEventTopic string
topicMapping map[string]string // Map event type to Kafka topic
}

func (eb *EventProducer) HandleCloudEvent(ctx context.Context, event cloudevents.Event) cloudevents.Result {
eventType := event.Type()
topic, ok := eb.topicMapping[eventType]

// Use default topic if not found in mapping
if !ok {
log.Debug().Str("event-type", eventType).Msg("No topic found for event type, using default topic")
topic = eb.unknowEventTopic
}

cloudEventBytes, err := event.MarshalJSON()
if err != nil {
log.Err(err).Msg("error marshalling Cloud Event")
return cloudevents.ResultNACK
}

message := kafka.Message{
Topic: topic,
Key: []byte(event.ID()),
Value: cloudEventBytes,
}

err = eb.writer.WriteMessages(ctx, message)
if err != nil {
log.Err(err).Str("topic", topic).Str("ce-id", event.ID()).Msg("error writing message to Kafka")
return err
}

log.Debug().Str("topic", topic).Str("ce-id", event.ID()).Msg("message written to Kafka")
return cloudevents.ResultACK
}

type EventConsumerGroup map[string]*EventConsumer

func (ecs EventConsumerGroup) Close() {
for _, ec := range ecs {
if ec != nil {
ec.Close()
}
}
}

func (ecs EventConsumerGroup) Start(ctx context.Context) {
wg := new(sync.WaitGroup)
for _, ec := range ecs {
if ec != nil {
wg.Add(1)
go func(c *EventConsumer) {
c.Start(ctx)
wg.Done()
}(ec)
}
}
wg.Wait()
}

type EventConsumer struct {
reader *kafka.Reader
writer *kafka.Writer // used for ack and put into dead letter queue.
handler EventHandler
faultTopic string // dead letter topic
}

// consumer workers
func (ec *EventConsumer) Start(ctx context.Context) error {
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)

defer ec.Close()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-sigterm:
// When SIGTERM received, try to flush remaining messages
// and exit gracefully
return nil
default:
m, err := ec.reader.ReadMessage(ctx)
if err != nil {
return err
}

var event cloudevents.Event
err = event.UnmarshalJSON(m.Value)
if err != nil {
return err
}

result := ec.handler.Handle(event)
if !cloudevents.IsACK(result) {
log.Error().Err(err).Msg("error handling event")
// ec.writer.WriteMessages(ctx, kafka.Message{Topic: ec.faultTopic, Key: m.Key, Value: m.Value})
}
}
}
}

func (ec *EventConsumer) Close() {
if ec.reader != nil {
ec.reader.Close()
}
if ec.writer != nil {
ec.writer.Close()
}
}
Loading

0 comments on commit bbd423a

Please sign in to comment.