Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: trigger http protocol push event support gateway #641

Merged
merged 4 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/trigger/client/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ func NewHTTPClient(url string) EventClient {
}
}

func NewHTTPClientWithGateway(url, gateway, headerKey string) EventClient {
c, _ := ce.NewClientHTTP(ce.WithTarget(gateway), ce.WithHeader(headerKey, url))
return &http{
client: c,
}
}

func (c *http) Send(ctx context.Context, events ...*ce.Event) Result {
event := events[0]
res := c.client.Send(ctx, *event)
Expand Down
8 changes: 5 additions & 3 deletions server/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ import (
"time"

"github.com/vanus-labs/vanus/pkg/observability"
"github.com/vanus-labs/vanus/server/trigger/trigger"
)

type Config struct {
Observability observability.Config `yaml:"observability"`
TriggerAddr string
Port int `yaml:"port"`
IP string `yaml:"ip"`
ControllerAddr []string `yaml:"controllers"`
Port int `yaml:"port"`
IP string `yaml:"ip"`
ControllerAddr []string `yaml:"controllers"`
Proxy *trigger.TargetGateway `yaml:"proxy"`

HeartbeatInterval time.Duration `yaml:"heartbeat_interval"`
// send event goroutine size
Expand Down
12 changes: 7 additions & 5 deletions server/trigger/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
lookupReadableLogsTimeout = 5 * time.Second
readEventTimeout = 5 * time.Second
readErrSleepTime = 2 * time.Second
checkEventlogInterval = 2 * time.Minute
logFrequencyMini = 10
)

type Config struct {
Expand Down Expand Up @@ -143,12 +145,12 @@ func (r *reader) Start() error {
r.wg.Add(1)
go func() {
defer r.wg.Done()
ticker := time.NewTicker(time.Minute * 2)
ticker := time.NewTicker(checkEventlogInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
r.findEventlog(ctx)
_ = r.findEventlog(ctx)
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -219,7 +221,7 @@ func (elReader *eventlogReader) stop() {
elReader.cancel()
}

// get earliest offset
// getOffset get earliest offset.
func (elReader *eventlogReader) getOffset(ctx context.Context) (int64, error) {
logs, err := elReader.config.Client.Eventbus(ctx,
api.WithID(elReader.config.EventbusID.Uint64())).ListLog(ctx)
Expand Down Expand Up @@ -251,15 +253,15 @@ func (elReader *eventlogReader) run(parentCtx context.Context) {
Uint64("offset", elReader.offset).
Msg("eventlog reader init success")

min := time.Now().Minute() / 10
min := time.Now().Minute() / logFrequencyMini
for {
select {
case <-ctx.Done():
return
default:
}
err := elReader.loop(ctx, r)
currMin := time.Now().Minute() / 10
currMin := time.Now().Minute() / logFrequencyMini
if currMin != min {
min = currMin
log.Info().Err(err).
Expand Down
12 changes: 12 additions & 0 deletions server/trigger/trigger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ type Config struct {
SendBatchSize int
PullBatchSize int
MaxUACKNumber int
TargetGateway *TargetGateway
}

type TargetGateway struct {
Address string `yaml:"address"`
TargetHeaderName string `yaml:"header_name"`
}

func defaultConfig() Config {
Expand Down Expand Up @@ -155,3 +161,9 @@ func WithMaxUACKNumber(maxUACKNumber int) Option {
t.config.MaxUACKNumber = maxUACKNumber
}
}

func WithProxy(proxy *TargetGateway) Option {
return func(t *trigger) {
t.config.TargetGateway = proxy
}
}
20 changes: 15 additions & 5 deletions server/trigger/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type Trigger interface {

type trigger struct {
subscriptionIDStr string
eventbusIDStr string

subscription *primitive.Subscription
offsetManager *offset.SubscriptionOffset
Expand Down Expand Up @@ -119,6 +120,7 @@ func newTrigger(subscription *primitive.Subscription, opts ...Option) (*trigger,
filter: filter.GetFilter(subscription.Filters),
subscription: subscription,
subscriptionIDStr: subscription.ID.String(),
eventbusIDStr: subscription.EventbusID.String(),
transformer: trans,
}
if subscription.Protocol == primitive.GRPC {
Expand Down Expand Up @@ -154,7 +156,11 @@ func (t *trigger) getClient() client.EventClient {
func (t *trigger) changeTarget(
sink primitive.URI, protocol primitive.Protocol, credential primitive.SinkCredential,
) error {
eventCli := newEventClient(sink, protocol, credential)
eventCli := newEventClient(clientConfig{
sink: sink,
protocol: protocol,
credential: credential,
gateway: t.config.TargetGateway})
t.lock.Lock()
defer t.lock.Unlock()
t.eventCli = eventCli
Expand Down Expand Up @@ -395,7 +401,7 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) {
es := make([]*ce.Event, l)
for i := range events {
if events[i].retry {
retryEventCnt += 1
retryEventCnt++
}
es[i] = events[i].transform
}
Expand Down Expand Up @@ -427,11 +433,11 @@ func (t *trigger) processEvent(ctx context.Context, events ...*toSendEvent) {
Msg("send event success")
}
if retryEventCnt > 0 {
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.subscription.EventbusID.String(), metrics.LabelTrue, result).
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.eventbusIDStr, metrics.LabelTrue, result).
Add(float64(retryEventCnt))
}
if l > retryEventCnt {
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.subscription.EventbusID.String(), metrics.LabelFalse, result).
metrics.TriggerPushEventCounter.WithLabelValues(t.subscriptionIDStr, t.eventbusIDStr, metrics.LabelFalse, result).
Add(float64(l - retryEventCnt))
}
}
Expand Down Expand Up @@ -572,7 +578,11 @@ func getOffset(sub *primitive.Subscription) map[vanus.ID]uint64 {
}

func (t *trigger) Init(ctx context.Context) error {
t.eventCli = newEventClient(t.subscription.Sink, t.subscription.Protocol, t.subscription.SinkCredential)
t.eventCli = newEventClient(clientConfig{
sink: t.subscription.Sink,
protocol: t.subscription.Protocol,
credential: t.subscription.SinkCredential,
gateway: t.config.TargetGateway})
t.client = eb.Connect(t.config.Controllers)

t.timerEventWriter = t.client.Eventbus(ctx, api.WithID(
Expand Down
29 changes: 19 additions & 10 deletions server/trigger/trigger/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,29 @@ import (
"github.com/vanus-labs/vanus/server/trigger/client"
)

func newEventClient(sink primitive.URI,
protocol primitive.Protocol,
credential primitive.SinkCredential) client.EventClient {
switch protocol {
type clientConfig struct {
gateway *TargetGateway
sink primitive.URI
protocol primitive.Protocol
credential primitive.SinkCredential
}

func newEventClient(cfg clientConfig) client.EventClient {
sink := string(cfg.sink)
switch cfg.protocol {
case primitive.AwsLambdaProtocol:
_credential, _ := credential.(*primitive.AkSkSinkCredential)
return client.NewAwsLambdaClient(_credential.AccessKeyID, _credential.SecretAccessKey, string(sink))
_credential, _ := cfg.credential.(*primitive.AkSkSinkCredential)
return client.NewAwsLambdaClient(_credential.AccessKeyID, _credential.SecretAccessKey, sink)
case primitive.GCloudFunctions:
_credential, _ := credential.(*primitive.GCloudSinkCredential)
return client.NewGCloudFunctionClient(string(sink), _credential.CredentialJSON)
_credential, _ := cfg.credential.(*primitive.GCloudSinkCredential)
return client.NewGCloudFunctionClient(sink, _credential.CredentialJSON)
case primitive.GRPC:
return client.NewGRPCClient(string(sink))
return client.NewGRPCClient(sink)
default:
return client.NewHTTPClient(string(sink))
if cfg.gateway != nil {
return client.NewHTTPClientWithGateway(sink, cfg.gateway.Address, cfg.gateway.TargetHeaderName)
}
return client.NewHTTPClient(sink)
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/trigger/trigger/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ import (
func TestNewEventClient(t *testing.T) {
Convey("test new event client", t, func() {
Convey("new lambda client", func() {
cli := newEventClient("test", primitive.AwsLambdaProtocol,
primitive.NewAkSkSinkCredential("ak", "sk"))
cli := newEventClient(clientConfig{sink: "test", protocol: primitive.AwsLambdaProtocol,
credential: primitive.NewAkSkSinkCredential("ak", "sk")})
So(cli, ShouldNotBeNil)
})
Convey("new http client", func() {
cli := newEventClient("test", primitive.HTTPProtocol,
primitive.NewPlainSinkCredential("identifier", "secret"))
cli := newEventClient(clientConfig{sink: "test", protocol: primitive.HTTPProtocol,
credential: primitive.NewPlainSinkCredential("identifier", "secret")})
So(cli, ShouldNotBeNil)
})
})
Expand Down
3 changes: 2 additions & 1 deletion server/trigger/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func (w *worker) getTriggerOptions(subscription *primitive.Subscription) []trigg
trigger.WithGoroutineSize(w.config.SendEventGoroutineSize),
trigger.WithSendBatchSize(w.config.SendEventBatchSize),
trigger.WithPullBatchSize(w.config.PullEventBatchSize),
trigger.WithMaxUACKNumber(w.config.MaxUACKEventNumber))
trigger.WithMaxUACKNumber(w.config.MaxUACKEventNumber),
trigger.WithProxy(w.config.Proxy))
return opts
}
Loading