Skip to content

Commit

Permalink
feat: refactor the dynamic json configs for api_keys and external_bac…
Browse files Browse the repository at this point in the history
…kends (mudler#2055)

* feat: refactor the dynamic json configs for api_keys and external_backends

Signed-off-by: Chris Jowett <[email protected]>

* fix: remove commented code

Signed-off-by: Chris Jowett <[email protected]>

---------

Signed-off-by: Chris Jowett <[email protected]>
Signed-off-by: Ettore Di Giacinto <[email protected]>
Co-authored-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
cryptk and mudler authored Apr 18, 2024
1 parent e9f0902 commit 502c1ee
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 63 deletions.
13 changes: 1 addition & 12 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cli

import (
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -65,6 +64,7 @@ func (r *RunCMD) Run(ctx *Context) error {
config.WithAudioDir(r.AudioPath),
config.WithUploadDir(r.UploadPath),
config.WithConfigsDir(r.ConfigPath),
config.WithDynamicConfigDir(r.LocalaiConfigDir),
config.WithF16(r.F16),
config.WithStringGalleries(r.Galleries),
config.WithModelLibraryURL(r.RemoteLibrary),
Expand Down Expand Up @@ -134,17 +134,6 @@ func (r *RunCMD) Run(ctx *Context) error {
return fmt.Errorf("failed basic startup tasks with error %s", err.Error())
}

// Watch the configuration directory
// If the directory does not exist, we don't watch it
if _, err := os.Stat(r.LocalaiConfigDir); err == nil {
closeConfigWatcherFn, err := startup.WatchConfigDirectory(r.LocalaiConfigDir, options)
defer closeConfigWatcherFn()

if err != nil {
return fmt.Errorf("failed while watching configuration directory %s", r.LocalaiConfigDir)
}
}

appHTTP, err := http.App(cl, ml, options)
if err != nil {
log.Error().Err(err).Msg("error during HTTP App construction")
Expand Down
7 changes: 7 additions & 0 deletions core/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ApplicationConfig struct {
AudioDir string
UploadDir string
ConfigsDir string
DynamicConfigsDir string
CORS bool
PreloadJSONModels string
PreloadModelsFromPath string
Expand Down Expand Up @@ -264,6 +265,12 @@ func WithConfigsDir(configsDir string) AppOption {
}
}

func WithDynamicConfigDir(dynamicConfigsDir string) AppOption {
return func(o *ApplicationConfig) {
o.DynamicConfigsDir = dynamicConfigsDir
}
}

func WithApiKeys(apiKeys []string) AppOption {
return func(o *ApplicationConfig) {
o.ApiKeys = apiKeys
Expand Down
156 changes: 105 additions & 51 deletions core/startup/config_file_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,89 +12,143 @@ import (
"github.com/rs/zerolog/log"
)

type WatchConfigDirectoryCloser func() error

func ReadApiKeysJson(configDir string, appConfig *config.ApplicationConfig) error {
fileContent, err := os.ReadFile(path.Join(configDir, "api_keys.json"))
if err == nil {
// Parse JSON content from the file
var fileKeys []string
err := json.Unmarshal(fileContent, &fileKeys)
if err == nil {
appConfig.ApiKeys = append(appConfig.ApiKeys, fileKeys...)
return nil
}
return err
}
return err
type fileHandler func(fileContent []byte, appConfig *config.ApplicationConfig) error

type configFileHandler struct {
handlers map[string]fileHandler

watcher *fsnotify.Watcher

configDir string
appConfig *config.ApplicationConfig
}

func ReadExternalBackendsJson(configDir string, appConfig *config.ApplicationConfig) error {
fileContent, err := os.ReadFile(path.Join(configDir, "external_backends.json"))
if err != nil {
return err
// TODO: This should be a singleton eventually so other parts of the code can register config file handlers,
// then we can export it to other packages
func newConfigFileHandler(appConfig *config.ApplicationConfig) configFileHandler {
c := configFileHandler{
handlers: make(map[string]fileHandler),
configDir: appConfig.DynamicConfigsDir,
appConfig: appConfig,
}
// Parse JSON content from the file
var fileBackends map[string]string
err = json.Unmarshal(fileContent, &fileBackends)
if err != nil {
return err
c.Register("api_keys.json", readApiKeysJson(*appConfig), true)
c.Register("external_backends.json", readExternalBackendsJson(*appConfig), true)
return c
}

func (c *configFileHandler) Register(filename string, handler fileHandler, runNow bool) error {
_, ok := c.handlers[filename]
if ok {
return fmt.Errorf("handler already registered for file %s", filename)
}
err = mergo.Merge(&appConfig.ExternalGRPCBackends, fileBackends)
if err != nil {
return err
c.handlers[filename] = handler
if runNow {
c.callHandler(path.Join(c.appConfig.DynamicConfigsDir, filename), handler)
}
return nil
}

var CONFIG_FILE_UPDATES = map[string]func(configDir string, appConfig *config.ApplicationConfig) error{
"api_keys.json": ReadApiKeysJson,
"external_backends.json": ReadExternalBackendsJson,
}
func (c *configFileHandler) callHandler(filename string, handler fileHandler) {
fileContent, err := os.ReadFile(filename)
if err != nil && !os.IsNotExist(err) {
log.Error().Err(err).Str("filename", filename).Msg("could not read file")
}

func WatchConfigDirectory(configDir string, appConfig *config.ApplicationConfig) (WatchConfigDirectoryCloser, error) {
if len(configDir) == 0 {
return nil, fmt.Errorf("configDir blank")
if err = handler(fileContent, c.appConfig); err != nil {
log.Error().Err(err).Msg("WatchConfigDirectory goroutine failed to update options")
}
}

func (c *configFileHandler) Watch() error {
configWatcher, err := fsnotify.NewWatcher()
c.watcher = configWatcher
if err != nil {
log.Fatal().Msgf("Unable to create a watcher for the LocalAI Configuration Directory: %+v", err)
}
ret := func() error {
configWatcher.Close()
return nil
log.Fatal().Err(err).Str("configdir", c.configDir).Msg("wnable to create a watcher for configuration directory")
}

// Start listening for events.
go func() {
for {
select {
case event, ok := <-configWatcher.Events:
case event, ok := <-c.watcher.Events:
if !ok {
return
}
if event.Has(fsnotify.Write) {
for targetName, watchFn := range CONFIG_FILE_UPDATES {
if event.Name == targetName {
err := watchFn(configDir, appConfig)
log.Warn().Msgf("WatchConfigDirectory goroutine for %s: failed to update options: %+v", targetName, err)
}
if event.Has(fsnotify.Write | fsnotify.Create | fsnotify.Remove) {
handler, ok := c.handlers[path.Base(event.Name)]
if !ok {
continue
}

c.callHandler(event.Name, handler)
}
case _, ok := <-configWatcher.Errors:
case err, ok := <-c.watcher.Errors:
log.Error().Err(err).Msg("config watcher error received")
if !ok {
return
}
log.Error().Err(err).Msg("error encountered while watching config directory")
}
}
}()

// Add a path.
err = configWatcher.Add(configDir)
err = c.watcher.Add(c.appConfig.DynamicConfigsDir)
if err != nil {
return ret, fmt.Errorf("unable to establish watch on the LocalAI Configuration Directory: %+v", err)
return fmt.Errorf("unable to establish watch on the LocalAI Configuration Directory: %+v", err)
}

return ret, nil
return nil
}

// TODO: When we institute graceful shutdown, this should be called
func (c *configFileHandler) Stop() {
c.watcher.Close()
}

func readApiKeysJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing api_keys.json")

if len(fileContent) > 0 {
// Parse JSON content from the file
var fileKeys []string
err := json.Unmarshal(fileContent, &fileKeys)
if err != nil {
return err
}

appConfig.ApiKeys = append(startupAppConfig.ApiKeys, fileKeys...)
} else {
appConfig.ApiKeys = startupAppConfig.ApiKeys
}
log.Debug().Msg("api keys loaded from api_keys.json")
return nil
}

return handler
}

func readExternalBackendsJson(startupAppConfig config.ApplicationConfig) fileHandler {
handler := func(fileContent []byte, appConfig *config.ApplicationConfig) error {
log.Debug().Msg("processing external_backends.json")

if len(fileContent) > 0 {
// Parse JSON content from the file
var fileBackends map[string]string
err := json.Unmarshal(fileContent, &fileBackends)
if err != nil {
return err
}
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
err = mergo.Merge(&appConfig.ExternalGRPCBackends, &fileBackends)
if err != nil {
return err
}
} else {
appConfig.ExternalGRPCBackends = startupAppConfig.ExternalGRPCBackends
}
log.Debug().Msg("external backends loaded from external_backends.json")
return nil
}
return handler
}
5 changes: 5 additions & 0 deletions core/startup/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func Startup(opts ...config.AppOption) (*config.BackendConfigLoader, *model.Mode
}()
}

// Watch the configuration directory
// If the directory does not exist, we don't watch it
configHandler := newConfigFileHandler(options)
configHandler.Watch()

log.Info().Msg("core/startup process completed!")
return cl, ml, options, nil
}

0 comments on commit 502c1ee

Please sign in to comment.