From db3853d9dc8910f585200bcc6ed824e219387c8f Mon Sep 17 00:00:00 2001 From: sam-at-luther Date: Tue, 6 Aug 2024 13:31:31 -0700 Subject: [PATCH] Add PR feedback --- oracle/httpmiddleware.go | 8 +-- oracle/oracle.go | 120 ++++++++++++++------------------------- oracle/oraclerun.go | 34 ++++++----- oracle/oracletester.go | 39 +++++++++---- 4 files changed, 94 insertions(+), 107 deletions(-) diff --git a/oracle/httpmiddleware.go b/oracle/httpmiddleware.go index c231883..da4ff4e 100644 --- a/oracle/httpmiddleware.go +++ b/oracle/httpmiddleware.go @@ -33,11 +33,11 @@ import ( // response header. func (orc *Oracle) addServerHeader() midware.Middleware { return midware.ServerResponseHeader( - midware.ServerFixed(orc.serviceName, orc.version), + midware.ServerFixed(orc.cfg.ServiceName, orc.cfg.Version), func() string { cachedPhylumVersion := orc.getLastPhylumVersion() if cachedPhylumVersion != "" { - return fmt.Sprintf("%s/%s", orc.phylumServiceName, cachedPhylumVersion) + return fmt.Sprintf("%s/%s", orc.cfg.PhylumServiceName, cachedPhylumVersion) } return "" }) @@ -85,8 +85,8 @@ func (orc *Oracle) healthCheckHandler() http.Handler { resp = &healthcheck.GetHealthCheckResponse{ Reports: []*healthcheck.HealthCheckReport{ { - ServiceName: orc.serviceName, - ServiceVersion: orc.version, + ServiceName: orc.cfg.ServiceName, + ServiceVersion: orc.cfg.Version, Timestamp: time.Now().Format(timestampFormat), Status: "DOWN", }, diff --git a/oracle/oracle.go b/oracle/oracle.go index ada822d..066fe1f 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -44,8 +44,8 @@ const ( metricsAddr = ":9600" ) -// defaultConfig returns a default config. -func defaultConfig() *Config { +// DefaultConfig returns a default config. +func DefaultConfig() *Config { return &Config{ Verbose: true, EmulateCC: false, @@ -53,7 +53,6 @@ func defaultConfig() *Config { // FakeAuth IDP. Only change this if you know what you're doing! ListenAddress: ":8080", PhylumPath: "./phylum", - GatewayEndpoint: "http://shiroclient_gw:8082", PhylumServiceName: "phylum", ServiceName: "oracle", RequestIDHeader: "X-Request-ID", @@ -63,64 +62,45 @@ func defaultConfig() *Config { // Config configures an oracle. type Config struct { + // swaggerHandler configures an endpoint to serve the + // swagger API. + swaggerHandler http.Handler // ListenAddress is an address the oracle HTTP listens on. ListenAddress string `yaml:"listen-address"` // PhylumPath is the the path for the business logic. PhylumPath string `yaml:"phylum-path"` // GatewayEndpoint is an address to the shiroclient gateway. GatewayEndpoint string `yaml:"gateway-endpoint"` - // OTLPEndpoint optionally configures OTLP tracing and sends traces to - // the supplied OTLP endpoint - OTLPEndpoint string `yaml:"otlp-endpoint"` // PhylumServiceName is the app-specific name of the conneted phylum. PhylumServiceName string `yaml:"phylum-service-name"` - // PhylumServiceName is the app-specific name of the Oracle. + // ServiceName is the app-specific name of the Oracle. ServiceName string `yaml:"service-name"` // RequestIDHeader is the HTTP header encoding the request ID. RequestIDHeader string `yaml:"request-id-header"` // Version is the oracle version. Version string `yaml:"version"` + // TraceOpts are tracing options. + TraceOpts []opttrace.Option // Verbose increases logging. Verbose bool `yaml:"verbose"` // EmulateCC emulates chaincode in memory (for testing). EmulateCC bool `yaml:"emulate-cc"` - - // swaggerHandler configures an endpoint to serve the - // swagger API. - swaggerHandler http.Handler } // SetSwaggerHandler configures an endpoint to serve the swagger API. func (c *Config) SetSwaggerHandler(h http.Handler) { + if c == nil { + return + } c.swaggerHandler = h } -func (c *Config) SetDefaults() { - d := defaultConfig() - if c.ListenAddress == "" { - c.ListenAddress = d.ListenAddress - } - if c.PhylumPath == "" { - c.PhylumPath = d.PhylumPath - } - if c.GatewayEndpoint == "" { - c.GatewayEndpoint = d.GatewayEndpoint - } - if c.OTLPEndpoint == "" { - c.OTLPEndpoint = d.OTLPEndpoint - } - if c.PhylumServiceName == "" { - c.PhylumServiceName = d.PhylumServiceName - } - if c.ServiceName == "" { - c.ServiceName = d.ServiceName - } - if c.RequestIDHeader == "" { - c.RequestIDHeader = d.RequestIDHeader - } - if c.Version == "" { - c.Version = d.Version +// SetOTLPEndpoint is a helper to set the OTLP trace endpoint. +func (c *Config) SetOTLPEndpoint(endpoint string) { + if c == nil { + return } + c.TraceOpts = append(c.TraceOpts, opttrace.WithOTLPExporter(endpoint)) } // Valid validates an oracle configuration. @@ -163,10 +143,7 @@ const ( // Oracle provides services. type Oracle struct { - // stateMut guards state. - stateMut sync.RWMutex - - state oracleState + swaggerHandler http.Handler // log provides logging. logBase *logrus.Entry @@ -174,29 +151,23 @@ type Oracle struct { // phylum interacts with phylum. phylum *phylum.Client - // txConfigs generates default transaction configs - txConfigs func(context.Context, ...shiroclient.Config) []shiroclient.Config - // Optional application tracing provider tracer *opttrace.Tracer - listenAddress string - - // version is the version of the oracle. - version string - - // phylumVersionMut guards cachedPhylumVersion. - phylumVersionMut sync.RWMutex + // txConfigs generates default transaction configs + txConfigs func(context.Context, ...shiroclient.Config) []shiroclient.Config cachedPhylumVersion string - phylumServiceName string + cfg Config - serviceName string + state oracleState - requestIDHeader string + // stateMut guards state. + stateMut sync.RWMutex - swaggerHandler http.Handler + // phylumVersionMut guards cachedPhylumVersion. + phylumVersionMut sync.RWMutex } // option provides additional configuration to the oracle. Primarily for @@ -255,7 +226,6 @@ func newOracle(config *Config, opts ...option) (*Oracle, error) { logrus.SetLevel(logrus.DebugLevel) } - config.SetDefaults() if err := config.Valid(); err != nil { return nil, fmt.Errorf("invalid config: %w", err) } @@ -268,12 +238,8 @@ func newOracle(config *Config, opts ...option) (*Oracle, error) { return nil, fmt.Errorf("invalid config: %w", err) } oracle := &Oracle{ - listenAddress: config.ListenAddress, - serviceName: config.ServiceName, - phylumServiceName: config.PhylumServiceName, - requestIDHeader: config.RequestIDHeader, - version: config.Version, - swaggerHandler: config.swaggerHandler, + cfg: *config, + swaggerHandler: config.swaggerHandler, } oracle.logBase = logrus.StandardLogger().WithFields(nil) for _, opt := range opts { @@ -283,17 +249,16 @@ func newOracle(config *Config, opts ...option) (*Oracle, error) { } } if oracle.phylum == nil { - err := withPhylum(config.GatewayEndpoint)(oracle) + if oracle.cfg.GatewayEndpoint == "" { + oracle.cfg.GatewayEndpoint = fmt.Sprintf("http://shiroclient_gw_%s:8082", oracle.cfg.PhylumServiceName) + } + err := withPhylum(oracle.cfg.GatewayEndpoint)(oracle) if err != nil { return nil, err } } oracle.txConfigs = txConfigs() - traceOpts := []opttrace.Option{} - if config.OTLPEndpoint != "" { - traceOpts = append(traceOpts, opttrace.WithOTLPExporter(config.OTLPEndpoint)) - } - t, err := opttrace.New(context.Background(), "oracle", traceOpts...) + t, err := opttrace.New(context.Background(), "oracle", oracle.cfg.TraceOpts...) if err != nil { return nil, err } @@ -301,10 +266,9 @@ func newOracle(config *Config, opts ...option) (*Oracle, error) { oracle.tracer = t oracle.log(context.Background()).WithFields(logrus.Fields{ - "emulate_cc": config.EmulateCC, - "phylum_path": config.PhylumPath, - "gateway_endpoint": config.GatewayEndpoint, - "otlp_endpoint": config.OTLPEndpoint, + "emulate_cc": oracle.cfg.EmulateCC, + "phylum_path": oracle.cfg.PhylumPath, + "gateway_endpoint": oracle.cfg.GatewayEndpoint, }).Infof("new oracle") return oracle, nil @@ -321,7 +285,7 @@ func txConfigs() func(context.Context, ...shiroclient.Config) []shiroclient.Conf shiroclient.WithLogrusFields(fields), } if fields["req_id"] != nil { - logrus.WithField("req_id", fields["req_id"]).Infof("setting request id") + logrus.WithField("req_id", fields["req_id"]).Debugf("setting request id") configs = append(configs, shiroclient.WithID(fmt.Sprint(fields["req_id"]))) } configs = append(configs, extend...) @@ -335,7 +299,7 @@ func (orc *Oracle) setPhylumVersion(version string) { defer orc.phylumVersionMut.Unlock() orc.cachedPhylumVersion = version if orc.cachedPhylumVersion != "" { - versionTotal.WithLabelValues(orc.serviceName, orc.version, orc.phylumServiceName, orc.cachedPhylumVersion).Inc() + versionTotal.WithLabelValues(orc.cfg.ServiceName, orc.cfg.Version, orc.cfg.PhylumServiceName, orc.cachedPhylumVersion).Inc() } } @@ -351,7 +315,7 @@ func (orc *Oracle) phylumHealthCheck(ctx context.Context) []*healthcheck.HealthC ccHealth, err := orc.phylum.GetHealthCheck(ctx, []string{"phylum"}, sopts...) if err != nil && !errors.Is(err, context.Canceled) { return []*healthcheck.HealthCheckReport{{ - ServiceName: orc.phylumServiceName, + ServiceName: orc.cfg.PhylumServiceName, ServiceVersion: "", Timestamp: time.Now().Format(timestampFormat), Status: "DOWN", @@ -359,13 +323,13 @@ func (orc *Oracle) phylumHealthCheck(ctx context.Context) []*healthcheck.HealthC } reports := ccHealth.GetReports() for _, report := range reports { - if strings.EqualFold(report.GetServiceName(), orc.phylumServiceName) { + if strings.EqualFold(report.GetServiceName(), orc.cfg.PhylumServiceName) { orc.setPhylumVersion(report.GetServiceVersion()) break } } if orc.getLastPhylumVersion() == "" { - orc.log(ctx).Errorf("missing phylum version") + orc.log(ctx).Warnf("missing phylum version") } return reports } @@ -386,8 +350,8 @@ func (orc *Oracle) GetHealthCheck(ctx context.Context, req *healthcheck.GetHealt } } reports = append(reports, &healthcheck.HealthCheckReport{ - ServiceName: orc.serviceName, - ServiceVersion: orc.version, + ServiceName: orc.cfg.ServiceName, + ServiceVersion: orc.cfg.Version, Timestamp: time.Now().Format(timestampFormat), Status: "UP", }) diff --git a/oracle/oraclerun.go b/oracle/oraclerun.go index b6ef5f0..1e139f6 100644 --- a/oracle/oraclerun.go +++ b/oracle/oraclerun.go @@ -54,7 +54,7 @@ func (orc *Oracle) gatewayForwardedHeaders() []string { "User-Agent", "X-Forwarded-User-Agent", "Referer", - orc.requestIDHeader, + orc.cfg.RequestIDHeader, } } @@ -98,7 +98,7 @@ func (orc *Oracle) grpcGateway(swaggerHandler http.Handler) (*runtime.ServeMux, // The trace header middleware appears early in the chain // because of how important it is that they happen for essentially all // requests. - midware.TraceHeaders(orc.requestIDHeader, true), + midware.TraceHeaders(orc.cfg.RequestIDHeader, true), orc.addServerHeader(), // PathOverrides and other middleware that may serve requests or have // potential failure states should appear below here so they may rely @@ -109,16 +109,15 @@ func (orc *Oracle) grpcGateway(swaggerHandler http.Handler) (*runtime.ServeMux, return jsonapi, middleware.Wrap(jsonapi) } -type PortalConfig interface { +type GrpcGatewayConfig interface { // RegisterServiceServer is required to be overidden by the implementation. RegisterServiceServer(grpcServer *grpc.Server) // RegisterServiceClient is required to be overidden by the implementation. RegisterServiceClient(ctx context.Context, grpcCon *grpc.ClientConn, mux *runtime.ServeMux) error } -func (orc *Oracle) Run(portalConfig PortalConfig) error { +func (orc *Oracle) StartGateway(grpcConfig GrpcGatewayConfig) error { orc.stateMut.Lock() - defer orc.stateMut.Unlock() if orc.state != oracleStateInit { return fmt.Errorf("run: invalid oracle state: %d", orc.state) } @@ -139,17 +138,16 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error { defer cancel() defer func() { - err := orc.close() - if err != nil { + if err := orc.close(); err != nil { orc.log(ctx).WithError(err).Warn("failed to close oracle") } }() orc.log(ctx).WithFields(logrus.Fields{ - "version": orc.version, - "service": orc.serviceName, - "phylum_name": orc.phylumServiceName, - "listen_address": orc.listenAddress, + "version": orc.cfg.Version, + "service": orc.cfg.ServiceName, + "phylum_name": orc.cfg.PhylumServiceName, + "listen_address": orc.cfg.ListenAddress, }).Infof("starting oracle") // Start a grpc server listening on the unix socket at grpcAddr @@ -163,12 +161,20 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error { grpclogging.RealTime()), svcerr.AppErrorUnaryInterceptor(orc.log)))) - portalConfig.RegisterServiceServer(grpcServer) + grpcConfig.RegisterServiceServer(grpcServer) + + orc.stateMut.Unlock() listener, err := net.Listen("unix", grpcAddr) if err != nil { return fmt.Errorf("grpc listen: %w", err) } + defer func() { + if err := listener.Close(); err != nil { + orc.log(ctx).WithError(err).Warn("failed to close listener") + } + }() + go func() { trySendError(errServe, grpcServer.Serve(listener)) }() @@ -183,7 +189,7 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error { } mux, httpHandler := orc.grpcGateway(orc.swaggerHandler) - if err := portalConfig.RegisterServiceClient(ctx, grpcConn, mux); err != nil { + if err := grpcConfig.RegisterServiceClient(ctx, grpcConn, mux); err != nil { return fmt.Errorf("register service client: %w", err) } @@ -197,7 +203,7 @@ func (orc *Oracle) Run(portalConfig PortalConfig) error { go func() { orc.log(ctx).Infof("oracle listen") server := &http.Server{ - Addr: orc.listenAddress, + Addr: orc.cfg.ListenAddress, Handler: httpHandler, ReadHeaderTimeout: 3 * time.Second, } diff --git a/oracle/oracletester.go b/oracle/oracletester.go index dab7b98..b629025 100644 --- a/oracle/oracletester.go +++ b/oracle/oracletester.go @@ -49,28 +49,45 @@ func (orc *Oracle) Snapshot(t *testing.T) []byte { return snapshot.Bytes() } -// NewTestOracleFrom is used to create an oracle for testing, loading the -// state from an optional snapshot. -func NewTestOracleFrom(t *testing.T, phylumPath string, snapshot []byte) (*Oracle, func()) { - cfg := defaultConfig() +type testCfg struct { + snapshot []byte +} + +// TestOpt configures a test oracle. +type TestOpt func(*testCfg) + +// WithSnapshot restores the test oracle from a snapshot. +func WithSnapshot(b []byte) TestOpt { + return func(cfg *testCfg) { + cfg.snapshot = make([]byte, len(b)) + copy(cfg.snapshot, b) + } +} + +// NewTestOracle is used to create an oracle for testing. +func NewTestOracle(t *testing.T, testOpts ...TestOpt) (*Oracle, func()) { + testCfg := &testCfg{} + for _, opt := range testOpts { + opt(testCfg) + } + cfg := DefaultConfig() cfg.Verbose = testing.Verbose() logger := logrus.New() logger.SetOutput(newTestWriter(t)) var r io.Reader - if snapshot != nil { - r = bytes.NewReader(snapshot) + if testCfg.snapshot != nil { + r = bytes.NewReader(testCfg.snapshot) } - opts := []option{ + orcOpts := []option{ withLogBase(logger.WithFields(nil)), - withMockPhylumFrom(phylumPath, r), + withMockPhylumFrom("../../../phylum", r), } - server, err := newOracle(cfg, opts...) + server, err := newOracle(cfg, orcOpts...) + server.state = oracleStateTesting if err != nil { t.Fatal(err) } - server.state = oracleStateTesting - if cfg.Verbose { logger.SetLevel(logrus.DebugLevel) }