Skip to content

Commit

Permalink
Avoid stuttering in temporalite package
Browse files Browse the repository at this point in the history
  • Loading branch information
jlegrone committed Mar 10, 2023
1 parent c9b68f5 commit 5b0d733
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 51 deletions.
26 changes: 4 additions & 22 deletions temporal/temporalite/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,16 @@ import (

"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite"
"go.temporal.io/server/temporal"
)

const (
broadcastAddress = "127.0.0.1"
defaultFrontendPort = 7233
)

type liteConfig struct {
Ephemeral bool
DatabaseFilePath string
FrontendPort int
MetricsPort int
DynamicPorts bool
Namespaces []string
SQLitePragmas map[string]string
Logger log.Logger
ServerOptions []temporal.ServerOption
portProvider *portProvider
FrontendIP string
BaseConfig *config.Config
DynamicConfig dynamicconfig.StaticClient
}

var supportedPragmas = map[string]struct{}{
"journal_mode": {},
"synchronous": {},
Expand All @@ -57,13 +39,13 @@ func getAllowedPragmas() []string {
return allowedPragmaList
}

func newDefaultConfig() (*liteConfig, error) {
func newDefaultConfig() (*temporaliteConfig, error) {
userConfigDir, err := os.UserConfigDir()
if err != nil {
return nil, fmt.Errorf("cannot determine user config directory: %w", err)
}

return &liteConfig{
return &temporaliteConfig{
Ephemeral: false,
DatabaseFilePath: filepath.Join(userConfigDir, "temporalite", "db", "default.db"),
FrontendPort: 0,
Expand All @@ -82,7 +64,7 @@ func newDefaultConfig() (*liteConfig, error) {
}, nil
}

func convertLiteConfig(cfg *liteConfig) *config.Config {
func convertLiteConfig(cfg *temporaliteConfig) *config.Config {
defer func() {
if err := cfg.portProvider.Close(); err != nil {
panic(err)
Expand Down Expand Up @@ -196,7 +178,7 @@ func convertLiteConfig(cfg *liteConfig) *config.Config {
return baseConfig
}

func (cfg *liteConfig) mustGetService(frontendPortOffset int) config.Service {
func (cfg *temporaliteConfig) mustGetService(frontendPortOffset int) config.Service {
svc := config.Service{
RPC: config.RPC{
GRPCPort: cfg.FrontendPort + frontendPortOffset,
Expand Down
61 changes: 45 additions & 16 deletions temporal/temporalite/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,45 @@ import (
"go.temporal.io/server/temporal"
)

type temporaliteConfig struct {
// When true, Ephemeral disables file persistence and uses the in-memory storage driver.
// State will be reset on each process restart.
Ephemeral bool
// DatabaseFilePath persists state to the file at the specified path.
//
// This is required if Ephemeral is false.
DatabaseFilePath string
FrontendPort int
// WithMetricsPort sets the listening port for metrics.
//
// When unspecified, the port will be system-chosen.
MetricsPort int
DynamicPorts bool
Namespaces []string
SQLitePragmas map[string]string
// Logger overrides the default logger.
Logger log.Logger
ServerOptions []temporal.ServerOption
portProvider *portProvider
FrontendIP string
// BaseConfig sets the default Temporal server configuration.
//
// Storage and client configuration will always be overridden, however base config can be
// used to enable settings like TLS or authentication.
BaseConfig *config.Config
DynamicConfig dynamicconfig.StaticClient
}

// WithLogger overrides the default logger.
func WithLogger(logger log.Logger) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.Logger = logger
})
}

// WithDatabaseFilePath persists state to the file at the specified path.
func WithDatabaseFilePath(filepath string) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.Ephemeral = false
cfg.DatabaseFilePath = filepath
})
Expand All @@ -29,7 +58,7 @@ func WithDatabaseFilePath(filepath string) ServerOption {
// WithPersistenceDisabled disables file persistence and uses the in-memory storage driver.
// State will be reset on each process restart.
func WithPersistenceDisabled() ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.Ephemeral = true
})
}
Expand All @@ -38,7 +67,7 @@ func WithPersistenceDisabled() ServerOption {
//
// When unspecified, the default port number of 7233 is used.
func WithFrontendPort(port int) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.FrontendPort = port
})
}
Expand All @@ -47,7 +76,7 @@ func WithFrontendPort(port int) ServerOption {
//
// When unspecified, the port will be system-chosen.
func WithMetricsPort(port int) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.MetricsPort = port
})
}
Expand All @@ -57,28 +86,28 @@ func WithMetricsPort(port int) ServerOption {
//
// When unspecified, the frontend service will bind to localhost.
func WithFrontendIP(address string) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.FrontendIP = address
})
}

// WithDynamicPorts starts Temporal on system-chosen ports.
func WithDynamicPorts() ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
func WithDynamicPorts() Option {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.DynamicPorts = true
})
}

// WithNamespaces registers each namespace on Temporal start.
func WithNamespaces(namespaces ...string) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.Namespaces = append(cfg.Namespaces, namespaces...)
})
}

// WithSQLitePragmas applies pragma statements to SQLite on Temporal start.
func WithSQLitePragmas(pragmas map[string]string) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
if cfg.SQLitePragmas == nil {
cfg.SQLitePragmas = make(map[string]string)
}
Expand All @@ -90,7 +119,7 @@ func WithSQLitePragmas(pragmas map[string]string) ServerOption {

// WithOptions registers Temporal server options.
func WithOptions(options ...temporal.ServerOption) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.ServerOptions = append(cfg.ServerOptions, options...)
})
}
Expand All @@ -100,15 +129,15 @@ func WithOptions(options ...temporal.ServerOption) ServerOption {
// Storage and client configuration will always be overridden, however base config can be
// used to enable settings like TLS or authentication.
func WithBaseConfig(base *config.Config) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
cfg.BaseConfig = base
})
}

// WithDynamicConfigValue sets the given dynamic config key with the given set
// of values. This will overwrite a key if already set.
func WithDynamicConfigValue(key dynamicconfig.Key, value []dynamicconfig.ConstrainedValue) ServerOption {
return newApplyFuncContainer(func(cfg *liteConfig) {
return newApplyFuncContainer(func(cfg *temporaliteConfig) {
if cfg.DynamicConfig == nil {
cfg.DynamicConfig = dynamicconfig.StaticClient{}
}
Expand All @@ -128,14 +157,14 @@ func WithSearchAttributeCacheDisabled() ServerOption {
}

type applyFuncContainer struct {
applyInternal func(*liteConfig)
applyInternal func(*temporaliteConfig)
}

func (fso *applyFuncContainer) apply(cfg *liteConfig) {
func (fso *applyFuncContainer) apply(cfg *temporaliteConfig) {
fso.applyInternal(cfg)
}

func newApplyFuncContainer(apply func(*liteConfig)) *applyFuncContainer {
func newApplyFuncContainer(apply func(*temporaliteConfig)) *applyFuncContainer {
return &applyFuncContainer{
applyInternal: apply,
}
Expand Down
24 changes: 12 additions & 12 deletions temporal/temporalite/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,19 @@ import (
"go.temporal.io/server/temporal"
)

// TemporaliteServer is a high level wrapper for temporal.Server that automatically configures a sqlite backend.
type TemporaliteServer struct {
// Server is a high level wrapper for temporal.Server that automatically configures a sqlite backend.
type Server struct {
internal temporal.Server
frontendHostPort string
config *liteConfig
config *temporaliteConfig
}

type ServerOption interface {
apply(*liteConfig)
apply(*temporaliteConfig)
}

// NewServer returns a TemporaliteServer with a sqlite backend.
func NewServer(opts ...ServerOption) (*TemporaliteServer, error) {
// NewServer returns a Server with a sqlite backend.
func NewServer(opts ...ServerOption) (*Server, error) {
c, err := newDefaultConfig()
if err != nil {
return nil, err
Expand Down Expand Up @@ -112,7 +112,7 @@ func NewServer(opts ...ServerOption) (*TemporaliteServer, error) {
return nil, fmt.Errorf("unable to instantiate server: %w", err)
}

s := &TemporaliteServer{
s := &Server{
internal: srv,
frontendHostPort: cfg.PublicClient.HostPort,
config: c,
Expand All @@ -122,12 +122,12 @@ func NewServer(opts ...ServerOption) (*TemporaliteServer, error) {
}

// Start temporal server.
func (s *TemporaliteServer) Start() error {
func (s *Server) Start() error {
return s.internal.Start()
}

// Stop the server.
func (s *TemporaliteServer) Stop() {
func (s *Server) Stop() {
if s == nil {
return
}
Expand All @@ -136,7 +136,7 @@ func (s *TemporaliteServer) Stop() {

// NewClient initializes a client ready to communicate with the Temporal
// server in the target namespace.
func (s *TemporaliteServer) NewClient(ctx context.Context, namespace string) (client.Client, error) {
func (s *Server) NewClient(ctx context.Context, namespace string) (client.Client, error) {
return s.NewClientWithOptions(ctx, client.Options{Namespace: namespace})
}

Expand All @@ -145,7 +145,7 @@ func (s *TemporaliteServer) NewClient(ctx context.Context, namespace string) (cl
// To set the client's namespace, use the corresponding field in client.Options.
//
// Note that the HostPort and ConnectionOptions fields of client.Options will always be overridden.
func (s *TemporaliteServer) NewClientWithOptions(ctx context.Context, options client.Options) (client.Client, error) {
func (s *Server) NewClientWithOptions(ctx context.Context, options client.Options) (client.Client, error) {
options.HostPort = s.frontendHostPort
return client.NewClient(options)
}
Expand All @@ -154,6 +154,6 @@ func (s *TemporaliteServer) NewClientWithOptions(ctx context.Context, options cl
//
// When constructing a Temporalite client from within the same process,
// NewClient or NewClientWithOptions should be used instead.
func (s *TemporaliteServer) FrontendHostPort() string {
func (s *Server) FrontendHostPort() string {
return s.frontendHostPort
}
2 changes: 1 addition & 1 deletion temporal/temporaltest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// A TestServer is a Temporal server listening on a system-chosen port on the
// local loopback interface, for use in end-to-end tests.
type TestServer struct {
server *temporalite.TemporaliteServer
server *temporalite.Server
defaultTestNamespace string
defaultClient client.Client
clients []client.Client
Expand Down

0 comments on commit 5b0d733

Please sign in to comment.