Skip to content

Commit

Permalink
ARCO-223: Call default tracer if tracing is enabled in each service
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Oct 30, 2024
1 parent a60f2c9 commit da0ed73
Show file tree
Hide file tree
Showing 34 changed files with 490 additions and 397 deletions.
67 changes: 7 additions & 60 deletions cmd/arc/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
Expand All @@ -13,15 +11,12 @@ import (
"os/signal"
"syscall"

"github.com/prometheus/client_golang/prometheus/promhttp"

cmd "github.com/bitcoin-sv/arc/cmd/arc/services"
"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/logger"
"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/bitcoin-sv/arc/internal/version"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

func main() {
Expand Down Expand Up @@ -66,17 +61,6 @@ func run() error {

shutdownFns := make([]func(), 0)

var globalTracer trace.Tracer = nil
if arcConfig.Tracing != nil && arcConfig.Tracing.DialAddr != "" {
tracer, cleanup, err := enableTracing(logger, arcConfig.Tracing.DialAddr)
if err != nil {
logger.Error("failed to enable tracing", slog.String("err", err.Error()))
} else {
shutdownFns = append(shutdownFns, cleanup)
}
globalTracer = tracer
}

go func() {
if arcConfig.ProfilerAddr != "" {
logger.Info(fmt.Sprintf("Starting profiler on http://%s/debug/pprof", arcConfig.ProfilerAddr))
Expand Down Expand Up @@ -109,7 +93,7 @@ func run() error {

if startBlockTx {
logger.Info("Starting BlockTx")
shutdown, err := cmd.StartBlockTx(logger, arcConfig, globalTracer)
shutdown, err := cmd.StartBlockTx(logger, arcConfig)
if err != nil {
return fmt.Errorf("failed to start blocktx: %v", err)
}
Expand All @@ -118,7 +102,7 @@ func run() error {

if startMetamorph {
logger.Info("Starting Metamorph")
shutdown, err := cmd.StartMetamorph(logger, arcConfig, cacheStore, globalTracer)
shutdown, err := cmd.StartMetamorph(logger, arcConfig, cacheStore)
if err != nil {
return fmt.Errorf("failed to start metamorph: %v", err)
}
Expand All @@ -127,7 +111,7 @@ func run() error {

if startApi {
logger.Info("Starting API")
shutdown, err := cmd.StartAPIServer(logger, arcConfig, globalTracer)
shutdown, err := cmd.StartAPIServer(logger, arcConfig)
if err != nil {
return fmt.Errorf("failed to start api: %v", err)
}
Expand All @@ -137,15 +121,15 @@ func run() error {

if startK8sWatcher {
logger.Info("Starting K8s-Watcher")
shutdown, err := cmd.StartK8sWatcher(logger, arcConfig, globalTracer)
shutdown, err := cmd.StartK8sWatcher(logger, arcConfig)
if err != nil {
return fmt.Errorf("failed to start k8s-watcher: %v", err)
}
shutdownFns = append(shutdownFns, func() { shutdown() })
}

if startCallbacker {
shutdown, err := cmd.StartCallbacker(logger, arcConfig, globalTracer)
shutdown, err := cmd.StartCallbacker(logger, arcConfig)
if err != nil {
return fmt.Errorf("failed to start callbacker: %v", err)
}
Expand Down Expand Up @@ -226,40 +210,3 @@ func isAnyFlagPassed(flags ...string) bool {
}
return false
}

func enableTracing(logger *slog.Logger, tracingAddr string) (trace.Tracer, func(), error) {
if tracingAddr == "" {
return nil, nil, errors.New("tracing enabled, but tracing address empty")
}

ctx := context.Background()

exporter, err := tracing.NewExporter(ctx, tracingAddr)
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize exporter: %v", err)
}

tp, err := tracing.NewTraceProvider(exporter, "arc")
if err != nil {
return nil, nil, fmt.Errorf("failed to create trace provider: %v", err)
}

otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

cleanup := func() {
err = exporter.Shutdown(ctx)
if err != nil {
logger.Error("Failed to shutdown exporter", slog.String("err", err.Error()))
}

err = tp.Shutdown(ctx)
if err != nil {
logger.Error("Failed to shutdown tracing provider", slog.String("err", err.Error()))
}
}

tracer := otel.GetTracerProvider().Tracer("")

return tracer, cleanup, nil
}
52 changes: 35 additions & 17 deletions cmd/arc/services/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,30 @@ import (
"net/url"
"time"

"github.com/google/uuid"
"github.com/labstack/echo-contrib/echoprometheus"
"github.com/labstack/echo/v4"
echomiddleware "github.com/labstack/echo/v4/middleware"
"github.com/ordishs/go-bitcoin"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
arc_logger "github.com/bitcoin-sv/arc/internal/logger"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/bitcoin-sv/arc/pkg/api"
"github.com/bitcoin-sv/arc/pkg/api/handler"
"github.com/bitcoin-sv/arc/pkg/blocktx"
"github.com/bitcoin-sv/arc/pkg/metamorph"

"github.com/google/uuid"
"github.com/labstack/echo-contrib/echoprometheus"
"github.com/labstack/echo/v4"
echomiddleware "github.com/labstack/echo/v4/middleware"
"github.com/ordishs/go-bitcoin"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/trace"

arc_logger "github.com/bitcoin-sv/arc/internal/logger"
"github.com/prometheus/client_golang/prometheus"
)

func StartAPIServer(logger *slog.Logger, arcConfig *config.ArcConfig, tracer trace.Tracer) (func(), error) {
func StartAPIServer(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), error) {
logger = logger.With(slog.String("service", "api"))

e := setApiEcho(logger, arcConfig)
Expand All @@ -56,12 +56,24 @@ func StartAPIServer(logger *slog.Logger, arcConfig *config.ArcConfig, tracer tra
handler.WithCallbackUrlRestrictions(arcConfig.Metamorph.RejectCallbackContaining),
}

if tracer != nil {
mtmOpts = append(mtmOpts, metamorph.WithTracer(tracer))
apiOpts = append(apiOpts, handler.WithTracer(tracer))
shutdownFns := make([]func(), 0)

tracingEnabled := false
if arcConfig.Tracing != nil && arcConfig.Tracing.DialAddr != "" {
cleanup, err := tracing.Enable(logger, "api", arcConfig.Tracing.DialAddr)
if err != nil {
logger.Error("failed to enable tracing", slog.String("err", err.Error()))
} else {
shutdownFns = append(shutdownFns, cleanup)
}

tracingEnabled = true

mtmOpts = append(mtmOpts, metamorph.WithTracer())
apiOpts = append(apiOpts, handler.WithTracer())
}

conn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracer)
conn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracingEnabled)
if err != nil {
return nil, fmt.Errorf("failed to connect to metamorph server: %v", err)
}
Expand All @@ -71,7 +83,7 @@ func StartAPIServer(logger *slog.Logger, arcConfig *config.ArcConfig, tracer tra
mtmOpts...,
)

btcConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracer)
btcConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracingEnabled)
if err != nil {
return nil, fmt.Errorf("failed to connect to blocktx server: %v", err)
}
Expand Down Expand Up @@ -115,6 +127,10 @@ func StartAPIServer(logger *slog.Logger, arcConfig *config.ArcConfig, tracer tra
}

mqClient.Shutdown()

for _, fn := range shutdownFns {
fn()
}
}, nil
}

Expand Down Expand Up @@ -145,6 +161,8 @@ func setApiEcho(logger *slog.Logger, arcConfig *config.ArcConfig) *echo.Echo {
}
})

e.Use(otelecho.Middleware("api-server"))

// Log info about requests
e.Use(logRequestMiddleware(logger, arcConfig.Api.RequestExtendedLogs))

Expand Down
39 changes: 30 additions & 9 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,30 @@ import (
"log/slog"
"time"

"github.com/libsv/go-p2p"

"github.com/bitcoin-sv/arc/internal/grpc_opts"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"github.com/bitcoin-sv/arc/internal/tracing"

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql"
"github.com/bitcoin-sv/arc/internal/version"
"github.com/libsv/go-p2p"
"go.opentelemetry.io/otel/trace"
)

const (
maximumBlockSize = 4294967296 // 4Gb
blockProcessingBuffer = 100
)

func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig, tracer trace.Tracer) (func(), error) {
func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), error) {
logger = logger.With(slog.String("service", "blocktx"))
logger.Info("Starting")

tracingEnabled := arcConfig.Tracing != nil
btxConfig := arcConfig.Blocktx

var (
Expand All @@ -43,9 +43,23 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig, tracer trace
err error
)

shutdownFns := make([]func(), 0)

tracingEnabled := false
if arcConfig.Tracing != nil && arcConfig.Tracing.DialAddr != "" {
cleanup, err := tracing.Enable(logger, "blocktx", arcConfig.Tracing.DialAddr)
if err != nil {
logger.Error("failed to enable tracing", slog.String("err", err.Error()))
} else {
shutdownFns = append(shutdownFns, cleanup)
}

tracingEnabled = true
}

stopFn := func() {
logger.Info("Shutting down blocktx")
disposeBlockTx(logger, server, processor, pm, mqClient, blockStore, healthServer, workers)
disposeBlockTx(logger, server, processor, pm, mqClient, blockStore, healthServer, workers, shutdownFns)
logger.Info("Shutdown complete")
}

Expand Down Expand Up @@ -93,8 +107,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig, tracer trace
blocktx.WithRegisterTxsInterval(btxConfig.RegisterTxsInterval),
blocktx.WithMessageQueueClient(mqClient),
}
if tracer != nil {
processorOpts = append(processorOpts, blocktx.WithTracer(tracer))
if tracingEnabled {
processorOpts = append(processorOpts, blocktx.WithTracer())
}

blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer)
Expand Down Expand Up @@ -157,7 +171,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig, tracer trace
workers.StartFillGaps(peers, btxConfig.FillGapsInterval, btxConfig.RecordRetentionDays, blockRequestCh)

server, err = blocktx.NewServer(arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, logger,
blockStore, pm, btxConfig.MaxAllowedBlockHeightMismatch, tracer)
blockStore, pm, btxConfig.MaxAllowedBlockHeightMismatch, tracingEnabled)
if err != nil {
stopFn()
return nil, fmt.Errorf("create GRPCServer failed: %v", err)
Expand Down Expand Up @@ -211,7 +225,9 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingEnab

func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor,
pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient,
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers) {
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers,
shutdownFns []func(),
) {

// dispose the dependencies in the correct order:
// 1. server - ensure no new requests will be received
Expand All @@ -221,6 +237,7 @@ func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.P
// 5. mqClient
// 6. store
// 7. healthServer
// 8. run shutdown functions

if server != nil {
server.GracefulStop()
Expand Down Expand Up @@ -248,4 +265,8 @@ func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.P
if healthServer != nil {
healthServer.GracefulStop()
}

for _, shutdownFn := range shutdownFns {
shutdownFn()
}
}
5 changes: 2 additions & 3 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Graceful Shutdown: on service termination, all components are stopped gracefully
import (
"context"
"fmt"
"go.opentelemetry.io/otel/trace"
"log/slog"
"net/http"
"time"
Expand All @@ -37,7 +36,7 @@ import (
"github.com/bitcoin-sv/arc/internal/grpc_opts"
)

func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig, tracer trace.Tracer) (func(), error) {
func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), error) {
logger = logger.With(slog.String("service", "callbacker"))
logger.Info("Starting")

Expand Down Expand Up @@ -82,7 +81,7 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig, tracer tr
workers.StartCallbackStoreCleanup(config.PruneInterval, config.PruneOlderThan)
workers.StartQuarantineCallbacksDispatch(config.QuarantineCheckInterval)

server, err = callbacker.NewServer(appConfig.PrometheusEndpoint, appConfig.GrpcMessageSize, logger, dispatcher, tracer)
server, err = callbacker.NewServer(appConfig.PrometheusEndpoint, appConfig.GrpcMessageSize, logger, dispatcher, false)
if err != nil {
stopFn()
return nil, fmt.Errorf("create GRPCServer failed: %v", err)
Expand Down
7 changes: 3 additions & 4 deletions cmd/arc/services/k8s_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/pkg/blocktx"
"github.com/bitcoin-sv/arc/pkg/metamorph"
"go.opentelemetry.io/otel/trace"
)

func StartK8sWatcher(logger *slog.Logger, arcConfig *config.ArcConfig, tracer trace.Tracer) (func(), error) {
func StartK8sWatcher(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), error) {
logger.With(slog.String("service", "k8s-watcher"))

mtmConn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracer)
mtmConn, err := metamorph.DialGRPC(arcConfig.Metamorph.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, false)
if err != nil {
return nil, fmt.Errorf("failed to connect to metamorph server: %v", err)
}
Expand All @@ -29,7 +28,7 @@ func StartK8sWatcher(logger *slog.Logger, arcConfig *config.ArcConfig, tracer tr
return nil, fmt.Errorf("failed to get k8s-client: %v", err)
}

blocktxConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, tracer)
blocktxConn, err := blocktx.DialGRPC(arcConfig.Blocktx.DialAddr, arcConfig.PrometheusEndpoint, arcConfig.GrpcMessageSize, false)
if err != nil {
return nil, fmt.Errorf("failed to connect to block-tx server: %v", err)
}
Expand Down
Loading

0 comments on commit da0ed73

Please sign in to comment.