Skip to content

Commit

Permalink
feat: trigger http protocol push event support gateway (#641)
Browse files Browse the repository at this point in the history
* feat: trigger http protocol push event support gateway

* feat: add trigger log
  • Loading branch information
xdlbdy authored Oct 20, 2023
1 parent 5712f51 commit 7af72c2
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 39 deletions.
7 changes: 7 additions & 0 deletions server/trigger/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ func NewHTTPClient(url string) EventClient {
}
}

func NewHTTPClientWithGateway(url, gateway, headerKey string) EventClient {
c, _ := ce.NewClientHTTP(ce.WithTarget(gateway), ce.WithHeader(headerKey, url))
return &http{
client: c,
}
}

func (c *http) Send(ctx context.Context, events ...*ce.Event) Result {
event := events[0]
res := c.client.Send(ctx, *event)
Expand Down
8 changes: 5 additions & 3 deletions server/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ import (
"time"

"github.com/vanus-labs/vanus/pkg/observability"
"github.com/vanus-labs/vanus/server/trigger/trigger"
)

type Config struct {
Observability observability.Config `yaml:"observability"`
TriggerAddr string
Port int `yaml:"port"`
IP string `yaml:"ip"`
ControllerAddr []string `yaml:"controllers"`
Port int `yaml:"port"`
IP string `yaml:"ip"`
ControllerAddr []string `yaml:"controllers"`
Gateway *trigger.TargetGateway `yaml:"gateway"`

HeartbeatInterval time.Duration `yaml:"heartbeat_interval"`
// send event goroutine size
Expand Down
12 changes: 7 additions & 5 deletions server/trigger/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
lookupReadableLogsTimeout = 5 * time.Second
readEventTimeout = 5 * time.Second
readErrSleepTime = 2 * time.Second
checkEventlogInterval = 2 * time.Minute
logFrequencyMini = 10
)

type Config struct {
Expand Down Expand Up @@ -143,12 +145,12 @@ func (r *reader) Start() error {
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(time.Minute * 2)
ticker := time.NewTicker(checkEventlogInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.findEventlog(ctx)
_ = r.findEventlog(ctx)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -219,7 +221,7 @@ func (elReader *eventlogReader) stop() {
elReader.cancel()
}

// get earliest offset
// getOffset get earliest offset.
func (elReader *eventlogReader) getOffset(ctx context.Context) (int64, error) {
logs, err := elReader.config.Client.Eventbus(ctx,
api.WithID(elReader.config.EventbusID.Uint64())).ListLog(ctx)
Expand Down Expand Up @@ -251,15 +253,15 @@ func (elReader *eventlogReader) run(parentCtx context.Context) {
Uint64("offset", elReader.offset).
Msg("eventlog reader init success")

min := time.Now().Minute() / 10
min := time.Now().Minute() / logFrequencyMini
for {
select {
case <-ctx.Done():
return
default:
}
err := elReader.loop(ctx, r)
currMin := time.Now().Minute() / 10
currMin := time.Now().Minute() / logFrequencyMini
if currMin != min {
min = currMin
log.Info().Err(err).
Expand Down
12 changes: 12 additions & 0 deletions server/trigger/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ type Config struct {
SendBatchSize int
PullBatchSize int
MaxUACKNumber int
TargetGateway *TargetGateway
}

type TargetGateway struct {
Address string `yaml:"address"`
TargetHeaderName string `yaml:"header_name"`
}

func defaultConfig() Config {
Expand Down Expand Up @@ -155,3 +161,9 @@ func WithMaxUACKNumber(maxUACKNumber int) Option {
t.config.MaxUACKNumber = maxUACKNumber
}
}

func WithProxy(proxy *TargetGateway) Option {
return func(t *trigger) {
t.config.TargetGateway = proxy
}
}
52 changes: 36 additions & 16 deletions server/trigger/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Trigger interface {

type trigger struct {
subscriptionIDStr string
eventbusIDStr string

subscription *primitive.Subscription
offsetManager *offset.SubscriptionOffset
Expand Down Expand Up @@ -119,6 +120,7 @@ func newTrigger(subscription *primitive.Subscription, opts ...Option) (*trigger,
filter: filter.GetFilter(subscription.Filters),
subscription: subscription,
subscriptionIDStr: subscription.ID.String(),
eventbusIDStr: subscription.EventbusID.String(),
transformer: trans,
}
if subscription.Protocol == primitive.GRPC {
Expand Down Expand Up @@ -154,7 +156,11 @@ func (t *trigger) getClient() client.EventClient {
func (t *trigger) changeTarget(
sink primitive.URI, protocol primitive.Protocol, credential primitive.SinkCredential,
) error {
eventCli := newEventClient(sink, protocol, credential)
eventCli := newEventClient(clientConfig{
sink: sink,
protocol: protocol,
credential: credential,
gateway: t.config.TargetGateway})
t.lock.Lock()
defer t.lock.Unlock()
t.eventCli = eventCli
Expand Down Expand Up @@ -280,7 +286,10 @@ func (t *trigger) runRetryEventFilterTransform(ctx context.Context) {
if err != nil {
log.Info(ctx).Err(err).
Str("event_id", event.record.Event.ID()).
Interface("event_offset", event.record.OffsetInfo).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Str(log.KeyEventbusID, t.eventbusIDStr).
Stringer(log.KeyEventlogID, event.record.EventlogID).
Uint64("event_offset", event.record.OffsetInfo.Offset).
Msg("event transform error")
t.writeFailEvent(ctx, record.Event, ErrTransformCode, err)
t.offsetManager.EventCommit(record.OffsetInfo)
Expand Down Expand Up @@ -315,7 +324,10 @@ func (t *trigger) runEventFilterTransform(ctx context.Context) {
if err != nil {
log.Info(ctx).Err(err).
Str("event_id", event.record.Event.ID()).
Interface("event_offset", event.record.OffsetInfo).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Str(log.KeyEventbusID, t.eventbusIDStr).
Stringer(log.KeyEventlogID, event.record.EventlogID).
Uint64("event_offset", event.record.OffsetInfo.Offset).
Msg("event transform error")
t.writeFailEvent(ctx, record.Event, ErrTransformCode, err)
t.offsetManager.EventCommit(record.OffsetInfo)
Expand Down Expand Up @@ -395,7 +407,7 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) {
es := make([]*ce.Event, l)
for i := range events {
if events[i].retry {
retryEventCnt += 1
retryEventCnt++
}
es[i] = events[i].transform
}
Expand All @@ -406,7 +418,11 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) {
Int("code", r.StatusCode).
Int("count", l).
Str("event_id", events[0].record.Event.ID()).
Interface("event_offset", events[0].record.OffsetInfo).
Interface("target", t.subscription.Sink).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Str(log.KeyEventbusID, t.eventbusIDStr).
Stringer(log.KeyEventlogID, events[0].record.EventlogID).
Uint64("event_offset", events[0].record.OffsetInfo.Offset).
Msg("send event fail")
code := r.StatusCode
if t.config.Ordered {
Expand All @@ -427,11 +443,11 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) {
Msg("send event success")
}
if retryEventCnt > 0 {
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.subscription.EventbusID.String(), metrics.LabelTrue, result).
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.eventbusIDStr, metrics.LabelTrue, result).
Add(float64(retryEventCnt))
}
if l > retryEventCnt {
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.subscription.EventbusID.String(), metrics.LabelFalse, result).
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.eventbusIDStr, metrics.LabelFalse, result).
Add(float64(l - retryEventCnt))
}
}
Expand Down Expand Up @@ -488,7 +504,7 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i
Observe(time.Since(startTime).Seconds())
if err != nil {
log.Info(ctx).Err(err).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Int("attempt", writeAttempt).
Interface("event", e).
Msg("write retry event error")
Expand All @@ -502,7 +518,7 @@ func (t *trigger) writeEventToRetry(ctx context.Context, e *ce.Event, attempts i
}
}
log.Debug(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Interface("event", e).
Msg("write retry event success")
}
Expand All @@ -523,7 +539,7 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso
Observe(time.Since(startTime).Seconds())
if err != nil {
log.Info(ctx).Err(err).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Int("attempt", writeAttempt).
Interface("event", e).
Msg("write dl event error")
Expand All @@ -536,7 +552,7 @@ func (t *trigger) writeEventToDeadLetter(ctx context.Context, e *ce.Event, reaso
}
}
log.Debug(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Interface("event", e).
Msg("write dl event success")
}
Expand Down Expand Up @@ -572,7 +588,11 @@ func getOffset(sub *primitive.Subscription) map[vanus.ID]uint64 {
}

func (t *trigger) Init(ctx context.Context) error {
t.eventCli = newEventClient(t.subscription.Sink, t.subscription.Protocol, t.subscription.SinkCredential)
t.eventCli = newEventClient(clientConfig{
sink: t.subscription.Sink,
protocol: t.subscription.Protocol,
credential: t.subscription.SinkCredential,
gateway: t.config.TargetGateway})
t.client = eb.Connect(t.config.Controllers)

t.timerEventWriter = t.client.Eventbus(ctx, api.WithID(
Expand All @@ -592,7 +612,7 @@ func (t *trigger) Init(ctx context.Context) error {

func (t *trigger) Start(ctx context.Context) error {
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger start...")
ctx, cancel := context.WithCancel(context.Background())
t.stop = cancel
Expand All @@ -613,14 +633,14 @@ func (t *trigger) Start(ctx context.Context) error {
t.wg.StartWithContext(ctx, t.runRetryEventFilterTransform)
t.state = TriggerRunning
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger started")
return nil
}

func (t *trigger) Stop(ctx context.Context) error {
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger stop...")

if t.state == TriggerStopped {
Expand All @@ -638,7 +658,7 @@ func (t *trigger) Stop(ctx context.Context) error {
t.offsetManager.Close()
t.state = TriggerStopped
log.Info(ctx).
Stringer(log.KeySubscriptionID, t.subscription.ID).
Str(log.KeySubscriptionID, t.subscriptionIDStr).
Msg("trigger stopped")
return nil
}
Expand Down
29 changes: 19 additions & 10 deletions server/trigger/trigger/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,29 @@ import (
"github.com/vanus-labs/vanus/server/trigger/client"
)

func newEventClient(sink primitive.URI,
protocol primitive.Protocol,
credential primitive.SinkCredential) client.EventClient {
switch protocol {
type clientConfig struct {
gateway *TargetGateway
sink primitive.URI
protocol primitive.Protocol
credential primitive.SinkCredential
}

func newEventClient(cfg clientConfig) client.EventClient {
sink := string(cfg.sink)
switch cfg.protocol {
case primitive.AwsLambdaProtocol:
_credential, _ := credential.(*primitive.AkSkSinkCredential)
return client.NewAwsLambdaClient(_credential.AccessKeyID, _credential.SecretAccessKey, string(sink))
_credential, _ := cfg.credential.(*primitive.AkSkSinkCredential)
return client.NewAwsLambdaClient(_credential.AccessKeyID, _credential.SecretAccessKey, sink)
case primitive.GCloudFunctions:
_credential, _ := credential.(*primitive.GCloudSinkCredential)
return client.NewGCloudFunctionClient(string(sink), _credential.CredentialJSON)
_credential, _ := cfg.credential.(*primitive.GCloudSinkCredential)
return client.NewGCloudFunctionClient(sink, _credential.CredentialJSON)
case primitive.GRPC:
return client.NewGRPCClient(string(sink))
return client.NewGRPCClient(sink)
default:
return client.NewHTTPClient(string(sink))
if cfg.gateway != nil {
return client.NewHTTPClientWithGateway(sink, cfg.gateway.Address, cfg.gateway.TargetHeaderName)
}
return client.NewHTTPClient(sink)
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/trigger/trigger/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
func TestNewEventClient(t *testing.T) {
Convey("test new event client", t, func() {
Convey("new lambda client", func() {
cli := newEventClient("test", primitive.AwsLambdaProtocol,
primitive.NewAkSkSinkCredential("ak", "sk"))
cli := newEventClient(clientConfig{sink: "test", protocol: primitive.AwsLambdaProtocol,
credential: primitive.NewAkSkSinkCredential("ak", "sk")})
So(cli, ShouldNotBeNil)
})
Convey("new http client", func() {
cli := newEventClient("test", primitive.HTTPProtocol,
primitive.NewPlainSinkCredential("identifier", "secret"))
cli := newEventClient(clientConfig{sink: "test", protocol: primitive.HTTPProtocol,
credential: primitive.NewPlainSinkCredential("identifier", "secret")})
So(cli, ShouldNotBeNil)
})
})
Expand Down
3 changes: 2 additions & 1 deletion server/trigger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (w *worker) getTriggerOptions(subscription *primitive.Subscription) []trigg
trigger.WithGoroutineSize(w.config.SendEventGoroutineSize),
trigger.WithSendBatchSize(w.config.SendEventBatchSize),
trigger.WithPullBatchSize(w.config.PullEventBatchSize),
trigger.WithMaxUACKNumber(w.config.MaxUACKEventNumber))
trigger.WithMaxUACKNumber(w.config.MaxUACKEventNumber),
trigger.WithProxy(w.config.Gateway))
return opts
}

0 comments on commit 7af72c2

Please sign in to comment.