Skip to content

Commit

Permalink
Add PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
sam-at-luther committed Aug 6, 2024
1 parent 0c5555a commit db3853d
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 107 deletions.
8 changes: 4 additions & 4 deletions oracle/httpmiddleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ import (
// response header.
func (orc *Oracle) addServerHeader() midware.Middleware {
return midware.ServerResponseHeader(
midware.ServerFixed(orc.serviceName, orc.version),
midware.ServerFixed(orc.cfg.ServiceName, orc.cfg.Version),
func() string {
cachedPhylumVersion := orc.getLastPhylumVersion()
if cachedPhylumVersion != "" {
return fmt.Sprintf("%s/%s", orc.phylumServiceName, cachedPhylumVersion)
return fmt.Sprintf("%s/%s", orc.cfg.PhylumServiceName, cachedPhylumVersion)
}
return ""
})
Expand Down Expand Up @@ -85,8 +85,8 @@ func (orc *Oracle) healthCheckHandler() http.Handler {
resp = &healthcheck.GetHealthCheckResponse{
Reports: []*healthcheck.HealthCheckReport{
{
ServiceName: orc.serviceName,
ServiceVersion: orc.version,
ServiceName: orc.cfg.ServiceName,
ServiceVersion: orc.cfg.Version,
Timestamp: time.Now().Format(timestampFormat),
Status: "DOWN",
},
Expand Down
120 changes: 42 additions & 78 deletions oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ const (
metricsAddr = ":9600"
)

// defaultConfig returns a default config.
func defaultConfig() *Config {
// DefaultConfig returns a default config.
func DefaultConfig() *Config {
return &Config{
Verbose: true,
EmulateCC: false,
// IMPORTANT: Phylum bootstrap expects ListenAddress on :8080 for
// FakeAuth IDP. Only change this if you know what you're doing!
ListenAddress: ":8080",
PhylumPath: "./phylum",
GatewayEndpoint: "http://shiroclient_gw:8082",
PhylumServiceName: "phylum",
ServiceName: "oracle",
RequestIDHeader: "X-Request-ID",
Expand All @@ -63,64 +62,45 @@ func defaultConfig() *Config {

// Config configures an oracle.
type Config struct {
// swaggerHandler configures an endpoint to serve the
// swagger API.
swaggerHandler http.Handler
// ListenAddress is an address the oracle HTTP listens on.
ListenAddress string `yaml:"listen-address"`
// PhylumPath is the the path for the business logic.
PhylumPath string `yaml:"phylum-path"`
// GatewayEndpoint is an address to the shiroclient gateway.
GatewayEndpoint string `yaml:"gateway-endpoint"`
// OTLPEndpoint optionally configures OTLP tracing and sends traces to
// the supplied OTLP endpoint
OTLPEndpoint string `yaml:"otlp-endpoint"`
// PhylumServiceName is the app-specific name of the conneted phylum.
PhylumServiceName string `yaml:"phylum-service-name"`
// PhylumServiceName is the app-specific name of the Oracle.
// ServiceName is the app-specific name of the Oracle.
ServiceName string `yaml:"service-name"`
// RequestIDHeader is the HTTP header encoding the request ID.
RequestIDHeader string `yaml:"request-id-header"`
// Version is the oracle version.
Version string `yaml:"version"`
// TraceOpts are tracing options.
TraceOpts []opttrace.Option
// Verbose increases logging.
Verbose bool `yaml:"verbose"`
// EmulateCC emulates chaincode in memory (for testing).
EmulateCC bool `yaml:"emulate-cc"`

// swaggerHandler configures an endpoint to serve the
// swagger API.
swaggerHandler http.Handler
}

// SetSwaggerHandler configures an endpoint to serve the swagger API.
func (c *Config) SetSwaggerHandler(h http.Handler) {
if c == nil {
return
}
c.swaggerHandler = h
}

func (c *Config) SetDefaults() {
d := defaultConfig()
if c.ListenAddress == "" {
c.ListenAddress = d.ListenAddress
}
if c.PhylumPath == "" {
c.PhylumPath = d.PhylumPath
}
if c.GatewayEndpoint == "" {
c.GatewayEndpoint = d.GatewayEndpoint
}
if c.OTLPEndpoint == "" {
c.OTLPEndpoint = d.OTLPEndpoint
}
if c.PhylumServiceName == "" {
c.PhylumServiceName = d.PhylumServiceName
}
if c.ServiceName == "" {
c.ServiceName = d.ServiceName
}
if c.RequestIDHeader == "" {
c.RequestIDHeader = d.RequestIDHeader
}
if c.Version == "" {
c.Version = d.Version
// SetOTLPEndpoint is a helper to set the OTLP trace endpoint.
func (c *Config) SetOTLPEndpoint(endpoint string) {
if c == nil {
return
}
c.TraceOpts = append(c.TraceOpts, opttrace.WithOTLPExporter(endpoint))
}

// Valid validates an oracle configuration.
Expand Down Expand Up @@ -163,40 +143,31 @@ const (

// Oracle provides services.
type Oracle struct {
// stateMut guards state.
stateMut sync.RWMutex

state oracleState
swaggerHandler http.Handler

// log provides logging.
logBase *logrus.Entry

// phylum interacts with phylum.
phylum *phylum.Client

// txConfigs generates default transaction configs
txConfigs func(context.Context, ...shiroclient.Config) []shiroclient.Config

// Optional application tracing provider
tracer *opttrace.Tracer

listenAddress string

// version is the version of the oracle.
version string

// phylumVersionMut guards cachedPhylumVersion.
phylumVersionMut sync.RWMutex
// txConfigs generates default transaction configs
txConfigs func(context.Context, ...shiroclient.Config) []shiroclient.Config

cachedPhylumVersion string

phylumServiceName string
cfg Config

serviceName string
state oracleState

requestIDHeader string
// stateMut guards state.
stateMut sync.RWMutex

swaggerHandler http.Handler
// phylumVersionMut guards cachedPhylumVersion.
phylumVersionMut sync.RWMutex
}

// option provides additional configuration to the oracle. Primarily for
Expand Down Expand Up @@ -255,7 +226,6 @@ func newOracle(config *Config, opts ...option) (*Oracle, error) {
logrus.SetLevel(logrus.DebugLevel)
}

config.SetDefaults()
if err := config.Valid(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
}
Expand All @@ -268,12 +238,8 @@ func newOracle(config *Config, opts ...option) (*Oracle, error) {
return nil, fmt.Errorf("invalid config: %w", err)
}
oracle := &Oracle{
listenAddress: config.ListenAddress,
serviceName: config.ServiceName,
phylumServiceName: config.PhylumServiceName,
requestIDHeader: config.RequestIDHeader,
version: config.Version,
swaggerHandler: config.swaggerHandler,
cfg: *config,
swaggerHandler: config.swaggerHandler,
}
oracle.logBase = logrus.StandardLogger().WithFields(nil)
for _, opt := range opts {
Expand All @@ -283,28 +249,26 @@ func newOracle(config *Config, opts ...option) (*Oracle, error) {
}
}
if oracle.phylum == nil {
err := withPhylum(config.GatewayEndpoint)(oracle)
if oracle.cfg.GatewayEndpoint == "" {
oracle.cfg.GatewayEndpoint = fmt.Sprintf("http://shiroclient_gw_%s:8082", oracle.cfg.PhylumServiceName)
}
err := withPhylum(oracle.cfg.GatewayEndpoint)(oracle)
if err != nil {
return nil, err
}
}
oracle.txConfigs = txConfigs()
traceOpts := []opttrace.Option{}
if config.OTLPEndpoint != "" {
traceOpts = append(traceOpts, opttrace.WithOTLPExporter(config.OTLPEndpoint))
}
t, err := opttrace.New(context.Background(), "oracle", traceOpts...)
t, err := opttrace.New(context.Background(), "oracle", oracle.cfg.TraceOpts...)
if err != nil {
return nil, err
}
t.SetGlobalTracer()
oracle.tracer = t

oracle.log(context.Background()).WithFields(logrus.Fields{
"emulate_cc": config.EmulateCC,
"phylum_path": config.PhylumPath,
"gateway_endpoint": config.GatewayEndpoint,
"otlp_endpoint": config.OTLPEndpoint,
"emulate_cc": oracle.cfg.EmulateCC,
"phylum_path": oracle.cfg.PhylumPath,
"gateway_endpoint": oracle.cfg.GatewayEndpoint,
}).Infof("new oracle")

return oracle, nil
Expand All @@ -321,7 +285,7 @@ func txConfigs() func(context.Context, ...shiroclient.Config) []shiroclient.Conf
shiroclient.WithLogrusFields(fields),
}
if fields["req_id"] != nil {
logrus.WithField("req_id", fields["req_id"]).Infof("setting request id")
logrus.WithField("req_id", fields["req_id"]).Debugf("setting request id")
configs = append(configs, shiroclient.WithID(fmt.Sprint(fields["req_id"])))
}
configs = append(configs, extend...)
Expand All @@ -335,7 +299,7 @@ func (orc *Oracle) setPhylumVersion(version string) {
defer orc.phylumVersionMut.Unlock()
orc.cachedPhylumVersion = version
if orc.cachedPhylumVersion != "" {
versionTotal.WithLabelValues(orc.serviceName, orc.version, orc.phylumServiceName, orc.cachedPhylumVersion).Inc()
versionTotal.WithLabelValues(orc.cfg.ServiceName, orc.cfg.Version, orc.cfg.PhylumServiceName, orc.cachedPhylumVersion).Inc()
}
}

Expand All @@ -351,21 +315,21 @@ func (orc *Oracle) phylumHealthCheck(ctx context.Context) []*healthcheck.HealthC
ccHealth, err := orc.phylum.GetHealthCheck(ctx, []string{"phylum"}, sopts...)
if err != nil && !errors.Is(err, context.Canceled) {
return []*healthcheck.HealthCheckReport{{
ServiceName: orc.phylumServiceName,
ServiceName: orc.cfg.PhylumServiceName,
ServiceVersion: "",
Timestamp: time.Now().Format(timestampFormat),
Status: "DOWN",
}}
}
reports := ccHealth.GetReports()
for _, report := range reports {
if strings.EqualFold(report.GetServiceName(), orc.phylumServiceName) {
if strings.EqualFold(report.GetServiceName(), orc.cfg.PhylumServiceName) {
orc.setPhylumVersion(report.GetServiceVersion())
break
}
}
if orc.getLastPhylumVersion() == "" {
orc.log(ctx).Errorf("missing phylum version")
orc.log(ctx).Warnf("missing phylum version")
}
return reports
}
Expand All @@ -386,8 +350,8 @@ func (orc *Oracle) GetHealthCheck(ctx context.Context, req *healthcheck.GetHealt
}
}
reports = append(reports, &healthcheck.HealthCheckReport{
ServiceName: orc.serviceName,
ServiceVersion: orc.version,
ServiceName: orc.cfg.ServiceName,
ServiceVersion: orc.cfg.Version,
Timestamp: time.Now().Format(timestampFormat),
Status: "UP",
})
Expand Down
34 changes: 20 additions & 14 deletions oracle/oraclerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (orc *Oracle) gatewayForwardedHeaders() []string {
"User-Agent",
"X-Forwarded-User-Agent",
"Referer",
orc.requestIDHeader,
orc.cfg.RequestIDHeader,
}
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (orc *Oracle) grpcGateway(swaggerHandler http.Handler) (*runtime.ServeMux,
// The trace header middleware appears early in the chain
// because of how important it is that they happen for essentially all
// requests.
midware.TraceHeaders(orc.requestIDHeader, true),
midware.TraceHeaders(orc.cfg.RequestIDHeader, true),
orc.addServerHeader(),
// PathOverrides and other middleware that may serve requests or have
// potential failure states should appear below here so they may rely
Expand All @@ -109,16 +109,15 @@ func (orc *Oracle) grpcGateway(swaggerHandler http.Handler) (*runtime.ServeMux,
return jsonapi, middleware.Wrap(jsonapi)
}

type PortalConfig interface {
type GrpcGatewayConfig interface {
// RegisterServiceServer is required to be overidden by the implementation.
RegisterServiceServer(grpcServer *grpc.Server)
// RegisterServiceClient is required to be overidden by the implementation.
RegisterServiceClient(ctx context.Context, grpcCon *grpc.ClientConn, mux *runtime.ServeMux) error
}

func (orc *Oracle) Run(portalConfig PortalConfig) error {
func (orc *Oracle) StartGateway(grpcConfig GrpcGatewayConfig) error {
orc.stateMut.Lock()
defer orc.stateMut.Unlock()
if orc.state != oracleStateInit {
return fmt.Errorf("run: invalid oracle state: %d", orc.state)
}
Expand All @@ -139,17 +138,16 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error {
defer cancel()

defer func() {
err := orc.close()
if err != nil {
if err := orc.close(); err != nil {
orc.log(ctx).WithError(err).Warn("failed to close oracle")
}
}()

orc.log(ctx).WithFields(logrus.Fields{
"version": orc.version,
"service": orc.serviceName,
"phylum_name": orc.phylumServiceName,
"listen_address": orc.listenAddress,
"version": orc.cfg.Version,
"service": orc.cfg.ServiceName,
"phylum_name": orc.cfg.PhylumServiceName,
"listen_address": orc.cfg.ListenAddress,
}).Infof("starting oracle")

// Start a grpc server listening on the unix socket at grpcAddr
Expand All @@ -163,12 +161,20 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error {
grpclogging.RealTime()),
svcerr.AppErrorUnaryInterceptor(orc.log))))

portalConfig.RegisterServiceServer(grpcServer)
grpcConfig.RegisterServiceServer(grpcServer)

orc.stateMut.Unlock()

listener, err := net.Listen("unix", grpcAddr)
if err != nil {
return fmt.Errorf("grpc listen: %w", err)
}
defer func() {
if err := listener.Close(); err != nil {
orc.log(ctx).WithError(err).Warn("failed to close listener")
}
}()

go func() {
trySendError(errServe, grpcServer.Serve(listener))
}()
Expand All @@ -183,7 +189,7 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error {
}

mux, httpHandler := orc.grpcGateway(orc.swaggerHandler)
if err := portalConfig.RegisterServiceClient(ctx, grpcConn, mux); err != nil {
if err := grpcConfig.RegisterServiceClient(ctx, grpcConn, mux); err != nil {
return fmt.Errorf("register service client: %w", err)
}

Expand All @@ -197,7 +203,7 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error {
go func() {
orc.log(ctx).Infof("oracle listen")
server := &http.Server{
Addr: orc.listenAddress,
Addr: orc.cfg.ListenAddress,
Handler: httpHandler,
ReadHeaderTimeout: 3 * time.Second,
}
Expand Down
Loading

0 comments on commit db3853d

Please sign in to comment.