From 502c1eedaa61ae742bfd6eb2e074e6f1180c2c66 Mon Sep 17 00:00:00 2001 From: cryptk <421501+cryptk@users.noreply.github.com> Date: Wed, 17 Apr 2024 22:21:55 -0500 Subject: [PATCH] feat: refactor the dynamic json configs for api_keys and external_backends (#2055) * feat: refactor the dynamic json configs for api_keys and external_backends Signed-off-by: Chris Jowett <421501+cryptk@users.noreply.github.com> * fix: remove commented code Signed-off-by: Chris Jowett <421501+cryptk@users.noreply.github.com> --------- Signed-off-by: Chris Jowett <421501+cryptk@users.noreply.github.com> Signed-off-by: Ettore Di Giacinto Co-authored-by: Ettore Di Giacinto --- core/cli/run.go | 13 +-- core/config/application_config.go | 7 ++ core/startup/config_file_watcher.go | 156 +++++++++++++++++++--------- core/startup/startup.go | 5 + 4 files changed, 118 insertions(+), 63 deletions(-) diff --git a/core/cli/run.go b/core/cli/run.go index 0f3ba2ded303..d729f9460f5b 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -2,7 +2,6 @@ package cli import ( "fmt" - "os" "strings" "time" @@ -65,6 +64,7 @@ func (r *RunCMD) Run(ctx *Context) error { config.WithAudioDir(r.AudioPath), config.WithUploadDir(r.UploadPath), config.WithConfigsDir(r.ConfigPath), + config.WithDynamicConfigDir(r.LocalaiConfigDir), config.WithF16(r.F16), config.WithStringGalleries(r.Galleries), config.WithModelLibraryURL(r.RemoteLibrary), @@ -134,17 +134,6 @@ func (r *RunCMD) Run(ctx *Context) error { return fmt.Errorf("failed basic startup tasks with error %s", err.Error()) } - // Watch the configuration directory - // If the directory does not exist, we don't watch it - if _, err := os.Stat(r.LocalaiConfigDir); err == nil { - closeConfigWatcherFn, err := startup.WatchConfigDirectory(r.LocalaiConfigDir, options) - defer closeConfigWatcherFn() - - if err != nil { - return fmt.Errorf("failed while watching configuration directory %s", r.LocalaiConfigDir) - } - } - appHTTP, err := http.App(cl, ml, options) if err != nil { log.Error().Err(err).Msg("error during HTTP App construction") diff --git a/core/config/application_config.go b/core/config/application_config.go index 9525553a6868..778176164370 100644 --- a/core/config/application_config.go +++ b/core/config/application_config.go @@ -22,6 +22,7 @@ type ApplicationConfig struct { AudioDir string UploadDir string ConfigsDir string + DynamicConfigsDir string CORS bool PreloadJSONModels string PreloadModelsFromPath string @@ -264,6 +265,12 @@ func WithConfigsDir(configsDir string) AppOption { } } +func WithDynamicConfigDir(dynamicConfigsDir string) AppOption { + return func(o *ApplicationConfig) { + o.DynamicConfigsDir = dynamicConfigsDir + } +} + func WithApiKeys(apiKeys []string) AppOption { return func(o *ApplicationConfig) { o.ApiKeys = apiKeys diff --git a/core/startup/config_file_watcher.go b/core/startup/config_file_watcher.go index 9c758e252ddc..5d213df5adff 100644 --- a/core/startup/config_file_watcher.go +++ b/core/startup/config_file_watcher.go @@ -12,89 +12,143 @@ import ( "github.com/rs/zerolog/log" ) -type WatchConfigDirectoryCloser func() error - -func ReadApiKeysJson(configDir string, appConfig *config.ApplicationConfig) error { - fileContent, err := os.ReadFile(path.Join(configDir, "api_keys.json")) - if err == nil { - // Parse JSON content from the file - var fileKeys []string - err := json.Unmarshal(fileContent, &fileKeys) - if err == nil { - appConfig.ApiKeys = append(appConfig.ApiKeys, fileKeys...) - return nil - } - return err - } - return err +type fileHandler func(fileContent []byte, appConfig *config.ApplicationConfig) error + +type configFileHandler struct { + handlers map[string]fileHandler + + watcher *fsnotify.Watcher + + configDir string + appConfig *config.ApplicationConfig } -func ReadExternalBackendsJson(configDir string, appConfig *config.ApplicationConfig) error { - fileContent, err := os.ReadFile(path.Join(configDir, "external_backends.json")) - if err != nil { - return err +// TODO: This should be a singleton eventually so other parts of the code can register config file handlers, +// then we can export it to other packages +func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler { + c := configFileHandler{ + handlers: make(map[string]fileHandler), + configDir: appConfig.DynamicConfigsDir, + appConfig: appConfig, } - // Parse JSON content from the file - var fileBackends map[string]string - err = json.Unmarshal(fileContent, &fileBackends) - if err != nil { - return err + c.Register("api_keys.json", readApiKeysJson(*appConfig), true) + c.Register("external_backends.json", readExternalBackendsJson(*appConfig), true) + return c +} + +func (c *configFileHandler) Register(filename string, handler fileHandler, runNow bool) error { + _, ok := c.handlers[filename] + if ok { + return fmt.Errorf("handler already registered for file %s", filename) } - err = mergo.Merge(&appConfig.ExternalGRPCBackends, fileBackends) - if err != nil { - return err + c.handlers[filename] = handler + if runNow { + c.callHandler(path.Join(c.appConfig.DynamicConfigsDir, filename), handler) } return nil } -var CONFIG_FILE_UPDATES = map[string]func(configDir string, appConfig *config.ApplicationConfig) error{ - "api_keys.json": ReadApiKeysJson, - "external_backends.json": ReadExternalBackendsJson, -} +func (c *configFileHandler) callHandler(filename string, handler fileHandler) { + fileContent, err := os.ReadFile(filename) + if err != nil && !os.IsNotExist(err) { + log.Error().Err(err).Str("filename", filename).Msg("could not read file") + } -func WatchConfigDirectory(configDir string, appConfig *config.ApplicationConfig) (WatchConfigDirectoryCloser, error) { - if len(configDir) == 0 { - return nil, fmt.Errorf("configDir blank") + if err = handler(fileContent, c.appConfig); err != nil { + log.Error().Err(err).Msg("WatchConfigDirectory goroutine failed to update options") } +} + +func (c *configFileHandler) Watch() error { configWatcher, err := fsnotify.NewWatcher() + c.watcher = configWatcher if err != nil { - log.Fatal().Msgf("Unable to create a watcher for the LocalAI Configuration Directory: %+v", err) - } - ret := func() error { - configWatcher.Close() - return nil + log.Fatal().Err(err).Str("configdir", c.configDir).Msg("wnable to create a watcher for configuration directory") } // Start listening for events. go func() { for { select { - case event, ok := <-configWatcher.Events: + case event, ok := <-c.watcher.Events: if !ok { return } - if event.Has(fsnotify.Write) { - for targetName, watchFn := range CONFIG_FILE_UPDATES { - if event.Name == targetName { - err := watchFn(configDir, appConfig) - log.Warn().Msgf("WatchConfigDirectory goroutine for %s: failed to update options: %+v", targetName, err) - } + if event.Has(fsnotify.Write | fsnotify.Create | fsnotify.Remove) { + handler, ok := c.handlers[path.Base(event.Name)] + if !ok { + continue } + + c.callHandler(event.Name, handler) } - case _, ok := <-configWatcher.Errors: + case err, ok := <-c.watcher.Errors: + log.Error().Err(err).Msg("config watcher error received") if !ok { return } - log.Error().Err(err).Msg("error encountered while watching config directory") } } }() // Add a path. - err = configWatcher.Add(configDir) + err = c.watcher.Add(c.appConfig.DynamicConfigsDir) if err != nil { - return ret, fmt.Errorf("unable to establish watch on the LocalAI Configuration Directory: %+v", err) + return fmt.Errorf("unable to establish watch on the LocalAI Configuration Directory: %+v", err) } - return ret, nil + return nil +} + +// TODO: When we institute graceful shutdown, this should be called +func (c *configFileHandler) Stop() { + c.watcher.Close() +} + +func readApiKeysJson(startupAppConfig config.ApplicationConfig) fileHandler { + handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error { + log.Debug().Msg("processing api_keys.json") + + if len(fileContent) > 0 { + // Parse JSON content from the file + var fileKeys []string + err := json.Unmarshal(fileContent, &fileKeys) + if err != nil { + return err + } + + appConfig.ApiKeys = append(startupAppConfig.ApiKeys, fileKeys...) + } else { + appConfig.ApiKeys = startupAppConfig.ApiKeys + } + log.Debug().Msg("api keys loaded from api_keys.json") + return nil + } + + return handler +} + +func readExternalBackendsJson(startupAppConfig config.ApplicationConfig) fileHandler { + handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error { + log.Debug().Msg("processing external_backends.json") + + if len(fileContent) > 0 { + // Parse JSON content from the file + var fileBackends map[string]string + err := json.Unmarshal(fileContent, &fileBackends) + if err != nil { + return err + } + appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends + err = mergo.Merge(&appConfig.ExternalGRPCBackends, &fileBackends) + if err != nil { + return err + } + } else { + appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends + } + log.Debug().Msg("external backends loaded from external_backends.json") + return nil + } + return handler } diff --git a/core/startup/startup.go b/core/startup/startup.go index 6298f03426ed..af92f0e1772d 100644 --- a/core/startup/startup.go +++ b/core/startup/startup.go @@ -125,6 +125,11 @@ func Startup(opts ...config.AppOption) (*config.BackendConfigLoader, *model.Mode }() } + // Watch the configuration directory + // If the directory does not exist, we don't watch it + configHandler := newConfigFileHandler(options) + configHandler.Watch() + log.Info().Msg("core/startup process completed!") return cl, ml, options, nil }