Skip to content

Commit

Permalink
add method to control if a phase can be run synchronously (#800)
Browse files Browse the repository at this point in the history
* add method to control if a phase can be run synchronously

Signed-off-by: spacewander <[email protected]>

* check it per phase

Signed-off-by: spacewander <[email protected]>

* tweak test

Signed-off-by: spacewander <[email protected]>

---------

Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Nov 19, 2024
1 parent 9f234b9 commit 0eac2c0
Show file tree
Hide file tree
Showing 10 changed files with 581 additions and 284 deletions.
11 changes: 7 additions & 4 deletions api/internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@ type Consumer struct {
FilterConfigs map[string]*fmModel.ParsedFilterConfig

// fields that generated from the configuration
CanSkipMethod map[string]bool
FilterNames []string
InitOnce sync.Once
CanSkipMethod map[string]bool
CanSkipMethodOnce sync.Once
CanSyncRunMethod map[string]bool
// CanSyncRunMethod share the same sync.Once with CanSkipMethodOnce
}

func (c *Consumer) Unmarshal(s string) error {
Expand Down Expand Up @@ -92,9 +94,10 @@ func (c *Consumer) InitConfigs() error {
}

c.FilterConfigs[name] = &fmModel.ParsedFilterConfig{
Name: name,
ParsedConfig: conf,
Factory: p.Factory,
Name: name,
ParsedConfig: conf,
Factory: p.Factory,
SyncRunPhases: p.ConfigParser.NonBlockingPhases(),
}
}

Expand Down
63 changes: 63 additions & 0 deletions api/pkg/filtermanager/api/phase.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

type Phase int

const (
PhaseDecodeHeaders Phase = 0x01
PhaseDecodeData Phase = 0x02
PhaseDecodeTrailers Phase = 0x04
PhaseDecodeRequest Phase = 0x08
PhaseEncodeHeaders Phase = 0x10
PhaseEncodeData Phase = 0x20
PhaseEncodeTrailers Phase = 0x40
PhaseEncodeResponse Phase = 0x80
PhaseOnLog Phase = 0x100
)

var (
AllPhases = PhaseDecodeHeaders | PhaseDecodeData | PhaseDecodeTrailers | PhaseDecodeRequest |
PhaseEncodeHeaders | PhaseEncodeData | PhaseEncodeTrailers | PhaseEncodeResponse | PhaseOnLog
)

func (p Phase) Contains(phases Phase) bool {
return p&phases == phases
}

func MethodToPhase(meth string) Phase {
switch meth {
case "DecodeHeaders":
return PhaseDecodeHeaders
case "DecodeData":
return PhaseDecodeData
case "DecodeTrailers":
return PhaseDecodeTrailers
case "DecodeRequest":
return PhaseDecodeRequest
case "EncodeHeaders":
return PhaseEncodeHeaders
case "EncodeData":
return PhaseEncodeData
case "EncodeTrailers":
return PhaseEncodeTrailers
case "EncodeResponse":
return PhaseEncodeResponse
case "OnLog":
return PhaseOnLog
default:
return 0
}
}
7 changes: 4 additions & 3 deletions api/pkg/filtermanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
})
} else {
conf.parsed = append(conf.parsed, &model.ParsedFilterConfig{
Name: proto.Name,
ParsedConfig: config,
Factory: plugin.Factory,
Name: proto.Name,
ParsedConfig: config,
Factory: plugin.Factory,
SyncRunPhases: plugin.ConfigParser.NonBlockingPhases(),
})

_, ok := pkgPlugins.LoadPlugin(name).(pkgPlugins.ConsumerPlugin)
Expand Down
Loading

0 comments on commit 0eac2c0

Please sign in to comment.