From 9e3e892ac79b200586e3be5ebf04a619ae25b2a8 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 9 Aug 2024 20:12:01 +0200 Subject: [PATCH] feat(p2p): add network explorer and community pools (#3125) * WIP Signed-off-by: Ettore Di Giacinto * Fixups Signed-off-by: Ettore Di Giacinto * Wire up a simple explorer DB Signed-off-by: Ettore Di Giacinto * wip Signed-off-by: Ettore Di Giacinto * WIP Signed-off-by: Ettore Di Giacinto * refactor: group services id so can be identified easily in the ledger table Signed-off-by: Ettore Di Giacinto * feat(discovery): discovery service now gather worker informations correctly Signed-off-by: Ettore Di Giacinto * feat(explorer): display network token Signed-off-by: Ettore Di Giacinto * feat(explorer): display form to add new networks Signed-off-by: Ettore Di Giacinto * feat(explorer): stop from overwriting networks Signed-off-by: Ettore Di Giacinto * feat(explorer): display only networks with active workers Signed-off-by: Ettore Di Giacinto * feat(explorer): list only clusters in a network if it has online workers Signed-off-by: Ettore Di Giacinto * remove invalid and inactive networks if networks have no workers delete them from the database, similarly, if invalid. Signed-off-by: Ettore Di Giacinto * ci: add workflow to deploy new explorer versions automatically Signed-off-by: Ettore Di Giacinto * build-api: build with p2p tag Signed-off-by: Ettore Di Giacinto * Allow to specify a connection timeout Signed-off-by: Ettore Di Giacinto * logging Signed-off-by: Ettore Di Giacinto * Better p2p defaults Signed-off-by: Ettore Di Giacinto * Set loglevel Signed-off-by: Ettore Di Giacinto * Fix dht enable Signed-off-by: Ettore Di Giacinto * Default to info for loglevel Signed-off-by: Ettore Di Giacinto * Add navbar Signed-off-by: Ettore Di Giacinto * Slightly improve rendering Signed-off-by: Ettore Di Giacinto * Allow to copy the token easily Signed-off-by: Ettore Di Giacinto * ci fixups Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- .github/workflows/deploy-explorer.yaml | 64 ++++ Makefile | 2 +- core/cli/cli.go | 1 + core/cli/explorer.go | 35 ++ core/cli/run.go | 4 +- core/cli/worker/worker_p2p.go | 4 +- core/explorer/database.go | 106 ++++++ core/explorer/database_test.go | 92 +++++ core/explorer/discovery.go | 203 +++++++++++ core/explorer/explorer_suite_test.go | 13 + core/http/endpoints/explorer/dashboard.go | 105 ++++++ core/http/endpoints/localai/p2p.go | 2 +- core/http/explorer.go | 46 +++ core/http/routes/explorer.go | 13 + core/http/routes/ui.go | 4 +- core/http/views/explorer.html | 342 ++++++++++++++++++ core/http/views/partials/navbar_explorer.html | 39 ++ core/p2p/node.go | 5 +- core/p2p/p2p.go | 19 +- 19 files changed, 1082 insertions(+), 17 deletions(-) create mode 100644 .github/workflows/deploy-explorer.yaml create mode 100644 core/cli/explorer.go create mode 100644 core/explorer/database.go create mode 100644 core/explorer/database_test.go create mode 100644 core/explorer/discovery.go create mode 100644 core/explorer/explorer_suite_test.go create mode 100644 core/http/endpoints/explorer/dashboard.go create mode 100644 core/http/explorer.go create mode 100644 core/http/routes/explorer.go create mode 100644 core/http/views/explorer.html create mode 100644 core/http/views/partials/navbar_explorer.html diff --git a/.github/workflows/deploy-explorer.yaml b/.github/workflows/deploy-explorer.yaml new file mode 100644 index 000000000000..71a141830542 --- /dev/null +++ b/.github/workflows/deploy-explorer.yaml @@ -0,0 +1,64 @@ +name: Explorer deployment + +on: + push: + branches: + - master + tags: + - 'v*' + +concurrency: + group: ci-deploy-${{ github.head_ref || github.ref }}-${{ github.repository }} + +jobs: + build-linux: + runs-on: ubuntu-latest + steps: + - name: Clone + uses: actions/checkout@v4 + with: + submodules: true + - uses: actions/setup-go@v5 + with: + go-version: '1.21.x' + cache: false + - name: Dependencies + run: | + sudo apt-get update + sudo apt-get install -y wget curl build-essential ffmpeg protobuf-compiler ccache upx-ucl gawk cmake libgmock-dev + go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@1958fcbe2ca8bd93af633f11e97d44e567e945af + go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.34.2 + make protogen-go + - name: Build api + run: | + make build-api + - name: rm + uses: appleboy/ssh-action@v1.0.3 + with: + host: ${{ secrets.EXPLORER_SSH_HOST }} + username: ${{ secrets.EXPLORER_SSH_USERNAME }} + key: ${{ secrets.EXPLORER_SSH_KEY }} + port: ${{ secrets.EXPLORER_SSH_PORT }} + script: | + sudo rm -rf local-ai/ || true + - name: copy file via ssh + uses: appleboy/scp-action@v0.1.7 + with: + host: ${{ secrets.EXPLORER_SSH_HOST }} + username: ${{ secrets.EXPLORER_SSH_USERNAME }} + key: ${{ secrets.EXPLORER_SSH_KEY }} + port: ${{ secrets.EXPLORER_SSH_PORT }} + source: "local-ai" + overwrite: true + rm: true + target: ./local-ai + - name: restarting + uses: appleboy/ssh-action@v1.0.3 + with: + host: ${{ secrets.EXPLORER_SSH_HOST }} + username: ${{ secrets.EXPLORER_SSH_USERNAME }} + key: ${{ secrets.EXPLORER_SSH_KEY }} + port: ${{ secrets.EXPLORER_SSH_PORT }} + script: | + sudo cp -rfv local-ai/local-ai /usr/bin/local-ai + sudo systemctl restart local-ai diff --git a/Makefile b/Makefile index 1ed68c0810eb..d690e4830cd6 100644 --- a/Makefile +++ b/Makefile @@ -376,7 +376,7 @@ build-minimal: BUILD_GRPC_FOR_BACKEND_LLAMA=true GRPC_BACKENDS="backend-assets/grpc/llama-cpp-avx2" GO_TAGS=p2p $(MAKE) build build-api: - BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=none $(MAKE) build + BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=p2p $(MAKE) build backend-assets/lib: mkdir -p backend-assets/lib diff --git a/core/cli/cli.go b/core/cli/cli.go index 0fed33fdf0df..2073778d747f 100644 --- a/core/cli/cli.go +++ b/core/cli/cli.go @@ -15,4 +15,5 @@ var CLI struct { Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"` Worker worker.Worker `cmd:"" help:"Run workers to distribute workload (llama.cpp-only)"` Util UtilCMD `cmd:"" help:"Utility commands"` + Explorer ExplorerCMD `cmd:"" help:"Run p2p explorer"` } diff --git a/core/cli/explorer.go b/core/cli/explorer.go new file mode 100644 index 000000000000..0fcde7283e0b --- /dev/null +++ b/core/cli/explorer.go @@ -0,0 +1,35 @@ +package cli + +import ( + "context" + "time" + + cliContext "github.com/mudler/LocalAI/core/cli/context" + "github.com/mudler/LocalAI/core/explorer" + "github.com/mudler/LocalAI/core/http" +) + +type ExplorerCMD struct { + Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` + PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"` + ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"` +} + +func (e *ExplorerCMD) Run(ctx *cliContext.Context) error { + + db, err := explorer.NewDatabase(e.PoolDatabase) + if err != nil { + return err + } + + dur, err := time.ParseDuration(e.ConnectionTimeout) + if err != nil { + return err + } + ds := explorer.NewDiscoveryServer(db, dur) + + go ds.Start(context.Background()) + appHTTP := http.Explorer(db, ds) + + return appHTTP.Listen(e.Address) +} diff --git a/core/cli/run.go b/core/cli/run.go index 9d58f6d9cc71..707f6afbcc0a 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -121,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { } log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) { + if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) { var tunnelAddresses []string - for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) { + for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) { if v.IsOnline() { tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) } else { diff --git a/core/cli/worker/worker_p2p.go b/core/cli/worker/worker_p2p.go index 93a365cbd8c2..17b9ff082ab6 100644 --- a/core/cli/worker/worker_p2p.go +++ b/core/cli/worker/worker_p2p.go @@ -60,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { p = r.RunnerPort } - err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, "")) + err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) if err != nil { return err } @@ -100,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error { } }() - err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, "")) + err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) if err != nil { return err } diff --git a/core/explorer/database.go b/core/explorer/database.go new file mode 100644 index 000000000000..8535140c9097 --- /dev/null +++ b/core/explorer/database.go @@ -0,0 +1,106 @@ +package explorer + +// A simple JSON database for storing and retrieving p2p network tokens and a name and description. + +import ( + "encoding/json" + "os" + "sort" + "sync" +) + +// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description. +type Database struct { + sync.RWMutex + path string + data map[string]TokenData +} + +// TokenData is a p2p network token with a name and description. +type TokenData struct { + Name string `json:"name"` + Description string `json:"description"` +} + +// NewDatabase creates a new Database with the given path. +func NewDatabase(path string) (*Database, error) { + db := &Database{ + data: make(map[string]TokenData), + path: path, + } + return db, db.load() +} + +// Get retrieves a Token from the Database by its token. +func (db *Database) Get(token string) (TokenData, bool) { + db.RLock() + defer db.RUnlock() + t, ok := db.data[token] + return t, ok +} + +// Set stores a Token in the Database by its token. +func (db *Database) Set(token string, t TokenData) error { + db.Lock() + db.data[token] = t + db.Unlock() + + return db.Save() +} + +// Delete removes a Token from the Database by its token. +func (db *Database) Delete(token string) error { + db.Lock() + delete(db.data, token) + db.Unlock() + return db.Save() +} + +func (db *Database) TokenList() []string { + db.RLock() + defer db.RUnlock() + tokens := []string{} + for k := range db.data { + tokens = append(tokens, k) + } + + sort.Slice(tokens, func(i, j int) bool { + // sort by token + return tokens[i] < tokens[j] + }) + + return tokens +} + +// load reads the Database from disk. +func (db *Database) load() error { + db.Lock() + defer db.Unlock() + + if _, err := os.Stat(db.path); os.IsNotExist(err) { + return nil + } + + // Read the file from disk + // Unmarshal the JSON into db.data + f, err := os.ReadFile(db.path) + if err != nil { + return err + } + return json.Unmarshal(f, &db.data) +} + +// Save writes the Database to disk. +func (db *Database) Save() error { + db.RLock() + defer db.RUnlock() + + // Marshal db.data into JSON + // Write the JSON to the file + f, err := os.Create(db.path) + if err != nil { + return err + } + defer f.Close() + return json.NewEncoder(f).Encode(db.data) +} diff --git a/core/explorer/database_test.go b/core/explorer/database_test.go new file mode 100644 index 000000000000..7f2cbd268a36 --- /dev/null +++ b/core/explorer/database_test.go @@ -0,0 +1,92 @@ +package explorer_test + +import ( + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/explorer" +) + +var _ = Describe("Database", func() { + var ( + dbPath string + db *explorer.Database + err error + ) + + BeforeEach(func() { + // Create a temporary file path for the database + dbPath = "test_db.json" + db, err = explorer.NewDatabase(dbPath) + Expect(err).To(BeNil()) + }) + + AfterEach(func() { + // Clean up the temporary database file + os.Remove(dbPath) + }) + + Context("when managing tokens", func() { + It("should add and retrieve a token", func() { + token := "token123" + t := explorer.TokenData{Name: "TokenName", Description: "A test token"} + + err = db.Set(token, t) + Expect(err).To(BeNil()) + + retrievedToken, exists := db.Get(token) + Expect(exists).To(BeTrue()) + Expect(retrievedToken).To(Equal(t)) + }) + + It("should delete a token", func() { + token := "token123" + t := explorer.TokenData{Name: "TokenName", Description: "A test token"} + + err = db.Set(token, t) + Expect(err).To(BeNil()) + + err = db.Delete(token) + Expect(err).To(BeNil()) + + _, exists := db.Get(token) + Expect(exists).To(BeFalse()) + }) + + It("should persist data to disk", func() { + token := "token123" + t := explorer.TokenData{Name: "TokenName", Description: "A test token"} + + err = db.Set(token, t) + Expect(err).To(BeNil()) + + // Recreate the database object to simulate reloading from disk + db, err = explorer.NewDatabase(dbPath) + Expect(err).To(BeNil()) + + retrievedToken, exists := db.Get(token) + Expect(exists).To(BeTrue()) + Expect(retrievedToken).To(Equal(t)) + + // Check the token list + tokenList := db.TokenList() + Expect(tokenList).To(ContainElement(token)) + }) + }) + + Context("when loading an empty or non-existent file", func() { + It("should start with an empty database", func() { + dbPath = "empty_db.json" + db, err = explorer.NewDatabase(dbPath) + Expect(err).To(BeNil()) + + _, exists := db.Get("nonexistent") + Expect(exists).To(BeFalse()) + + // Clean up + os.Remove(dbPath) + }) + }) +}) diff --git a/core/explorer/discovery.go b/core/explorer/discovery.go new file mode 100644 index 000000000000..73281dc0787b --- /dev/null +++ b/core/explorer/discovery.go @@ -0,0 +1,203 @@ +package explorer + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/rs/zerolog/log" + + "github.com/mudler/LocalAI/core/p2p" + "github.com/mudler/edgevpn/pkg/blockchain" +) + +type DiscoveryServer struct { + sync.Mutex + database *Database + networkState *NetworkState + connectionTime time.Duration +} + +type NetworkState struct { + Networks map[string]Network +} + +func (s *DiscoveryServer) NetworkState() *NetworkState { + s.Lock() + defer s.Unlock() + return s.networkState +} + +// NewDiscoveryServer creates a new DiscoveryServer with the given Database. +// it keeps the db state in sync with the network state +func NewDiscoveryServer(db *Database, dur time.Duration) *DiscoveryServer { + if dur == 0 { + dur = 50 * time.Second + } + return &DiscoveryServer{ + database: db, + connectionTime: dur, + networkState: &NetworkState{ + Networks: map[string]Network{}, + }, + } +} + +type Network struct { + Clusters []ClusterData +} + +func (s *DiscoveryServer) runBackground() { + if len(s.database.TokenList()) == 0 { + time.Sleep(5 * time.Second) // avoid busy loop + return + } + + for _, token := range s.database.TokenList() { + c, cancel := context.WithTimeout(context.Background(), s.connectionTime) + defer cancel() + + // Connect to the network + // Get the number of nodes + // save it in the current state (mutex) + // do not do in parallel + n, err := p2p.NewNode(token) + if err != nil { + log.Err(err).Msg("Failed to create node") + s.database.Delete(token) + continue + } + + err = n.Start(c) + if err != nil { + log.Err(err).Msg("Failed to start node") + s.database.Delete(token) + continue + } + + ledger, err := n.Ledger() + if err != nil { + log.Err(err).Msg("Failed to start ledger") + s.database.Delete(token) + continue + } + + networkData := make(chan ClusterData) + + // get the network data - it takes the whole timeout + // as we might not be connected to the network yet, + // and few attempts would have to be made before bailing out + go s.retrieveNetworkData(c, ledger, networkData) + + hasWorkers := false + ledgerK := []ClusterData{} + for key := range networkData { + ledgerK = append(ledgerK, key) + if len(key.Workers) > 0 { + hasWorkers = true + } + } + + log.Debug().Any("network", token).Msgf("Network has %d clusters", len(ledgerK)) + if len(ledgerK) != 0 { + for _, k := range ledgerK { + log.Debug().Any("network", token).Msgf("Clusterdata %+v", k) + } + } + + if hasWorkers { + s.Lock() + s.networkState.Networks[token] = Network{ + Clusters: ledgerK, + } + s.Unlock() + } else { + log.Info().Any("network", token).Msg("No workers found in the network. Removing it from the database") + s.database.Delete(token) + } + } +} + +type ClusterData struct { + Workers []string + Type string + NetworkID string +} + +func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockchain.Ledger, networkData chan ClusterData) { + clusters := map[string]ClusterData{} + + defer func() { + for _, n := range clusters { + networkData <- n + } + close(networkData) + }() + + for { + select { + case <-c.Done(): + return + default: + time.Sleep(5 * time.Second) + + data := ledger.LastBlock().Storage + LEDGER: + for d := range data { + toScanForWorkers := false + cd := ClusterData{} + isWorkerCluster := d == p2p.WorkerID || (strings.Contains(d, "_") && strings.Contains(d, p2p.WorkerID)) + isFederatedCluster := d == p2p.FederatedID || (strings.Contains(d, "_") && strings.Contains(d, p2p.FederatedID)) + switch { + case isWorkerCluster: + toScanForWorkers = true + cd.Type = "worker" + case isFederatedCluster: + toScanForWorkers = true + cd.Type = "federated" + } + + if strings.Contains(d, "_") { + cd.NetworkID = strings.Split(d, "_")[0] + } + + if !toScanForWorkers { + continue LEDGER + } + + atLeastOneWorker := false + DATA: + for _, v := range data[d] { + nd := &p2p.NodeData{} + if err := v.Unmarshal(nd); err != nil { + continue DATA + } + + if nd.IsOnline() { + atLeastOneWorker = true + (&cd).Workers = append(cd.Workers, nd.ID) + } + } + + if atLeastOneWorker { + clusters[d] = cd + } + } + } + } +} + +// Start the discovery server. This is meant to be run in to a goroutine. +func (s *DiscoveryServer) Start(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled") + default: + // Collect data + s.runBackground() + } + } +} diff --git a/core/explorer/explorer_suite_test.go b/core/explorer/explorer_suite_test.go new file mode 100644 index 000000000000..fc718d5f8dfa --- /dev/null +++ b/core/explorer/explorer_suite_test.go @@ -0,0 +1,13 @@ +package explorer_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestExplorer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Explorer test suite") +} diff --git a/core/http/endpoints/explorer/dashboard.go b/core/http/endpoints/explorer/dashboard.go new file mode 100644 index 000000000000..7cd9f3c98423 --- /dev/null +++ b/core/http/endpoints/explorer/dashboard.go @@ -0,0 +1,105 @@ +package explorer + +import ( + "encoding/base64" + "sort" + + "github.com/gofiber/fiber/v2" + "github.com/mudler/LocalAI/core/explorer" + "github.com/mudler/LocalAI/internal" +) + +func Dashboard() func(*fiber.Ctx) error { + return func(c *fiber.Ctx) error { + + summary := fiber.Map{ + "Title": "LocalAI API - " + internal.PrintableVersion(), + "Version": internal.PrintableVersion(), + } + + if string(c.Context().Request.Header.ContentType()) == "application/json" || len(c.Accepts("html")) == 0 { + // The client expects a JSON response + return c.Status(fiber.StatusOK).JSON(summary) + } else { + // Render index + return c.Render("views/explorer", summary) + } + } +} + +type AddNetworkRequest struct { + Token string `json:"token"` + Name string `json:"name"` + Description string `json:"description"` +} + +type Network struct { + explorer.Network + explorer.TokenData + Token string `json:"token"` +} + +func ShowNetworks(db *explorer.Database, ds *explorer.DiscoveryServer) func(*fiber.Ctx) error { + return func(c *fiber.Ctx) error { + networkState := ds.NetworkState() + results := []Network{} + for token, network := range networkState.Networks { + networkData, exists := db.Get(token) // get the token data + hasWorkers := false + for _, cluster := range network.Clusters { + if len(cluster.Workers) > 0 { + hasWorkers = true + break + } + } + if exists && hasWorkers { + results = append(results, Network{Network: network, TokenData: networkData, Token: token}) + } + } + + // order by number of clusters + sort.Slice(results, func(i, j int) bool { + return len(results[i].Clusters) > len(results[j].Clusters) + }) + + return c.JSON(results) + } +} + +func AddNetwork(db *explorer.Database) func(*fiber.Ctx) error { + return func(c *fiber.Ctx) error { + request := new(AddNetworkRequest) + if err := c.BodyParser(request); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Cannot parse JSON"}) + } + + if request.Token == "" { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Token is required"}) + } + + if request.Name == "" { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Name is required"}) + } + + if request.Description == "" { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Description is required"}) + } + + // TODO: check if token is valid, otherwise reject + // try to decode the token from base64 + _, err := base64.StdEncoding.DecodeString(request.Token) + if err != nil { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Invalid token"}) + } + + if _, exists := db.Get(request.Token); exists { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "Token already exists"}) + } + err = db.Set(request.Token, explorer.TokenData{Name: request.Name, Description: request.Description}) + if err != nil { + return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Cannot add token"}) + } + + return c.Status(fiber.StatusOK).JSON(fiber.Map{"message": "Token added"}) + } +} diff --git a/core/http/endpoints/localai/p2p.go b/core/http/endpoints/localai/p2p.go index 93e9b5d5b468..bbcee8c801e1 100644 --- a/core/http/endpoints/localai/p2p.go +++ b/core/http/endpoints/localai/p2p.go @@ -15,7 +15,7 @@ func ShowP2PNodes(appConfig *config.ApplicationConfig) func(*fiber.Ctx) error { // Render index return func(c *fiber.Ctx) error { return c.JSON(schema.P2PNodesResponse{ - Nodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")), + Nodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.WorkerID)), FederatedNodes: p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)), }) } diff --git a/core/http/explorer.go b/core/http/explorer.go new file mode 100644 index 000000000000..608ecdb51b81 --- /dev/null +++ b/core/http/explorer.go @@ -0,0 +1,46 @@ +package http + +import ( + "net/http" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/favicon" + "github.com/gofiber/fiber/v2/middleware/filesystem" + "github.com/mudler/LocalAI/core/explorer" + "github.com/mudler/LocalAI/core/http/routes" +) + +func Explorer(db *explorer.Database, discoveryServer *explorer.DiscoveryServer) *fiber.App { + + fiberCfg := fiber.Config{ + Views: renderEngine(), + // We disable the Fiber startup message as it does not conform to structured logging. + // We register a startup log line with connection information in the OnListen hook to keep things user friendly though + DisableStartupMessage: false, + // Override default error handler + } + + app := fiber.New(fiberCfg) + + routes.RegisterExplorerRoutes(app, db, discoveryServer) + + httpFS := http.FS(embedDirStatic) + + app.Use(favicon.New(favicon.Config{ + URL: "/favicon.ico", + FileSystem: httpFS, + File: "static/favicon.ico", + })) + + app.Use("/static", filesystem.New(filesystem.Config{ + Root: httpFS, + PathPrefix: "static", + Browse: true, + })) + + // Define a custom 404 handler + // Note: keep this at the bottom! + app.Use(notFoundHandler) + + return app +} diff --git a/core/http/routes/explorer.go b/core/http/routes/explorer.go new file mode 100644 index 000000000000..b3c0d40b9953 --- /dev/null +++ b/core/http/routes/explorer.go @@ -0,0 +1,13 @@ +package routes + +import ( + "github.com/gofiber/fiber/v2" + coreExplorer "github.com/mudler/LocalAI/core/explorer" + "github.com/mudler/LocalAI/core/http/endpoints/explorer" +) + +func RegisterExplorerRoutes(app *fiber.App, db *coreExplorer.Database, ds *coreExplorer.DiscoveryServer) { + app.Get("/", explorer.Dashboard()) + app.Post("/network/add", explorer.AddNetwork(db)) + app.Get("/networks", explorer.ShowNetworks(db, ds)) +} diff --git a/core/http/routes/ui.go b/core/http/routes/ui.go index 2996e9dceb52..0a9867fe6415 100644 --- a/core/http/routes/ui.go +++ b/core/http/routes/ui.go @@ -105,14 +105,14 @@ func RegisterUIRoutes(app *fiber.App, /* show nodes live! */ app.Get("/p2p/ui/workers", auth, func(c *fiber.Ctx) error { - return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")))) + return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.WorkerID)))) }) app.Get("/p2p/ui/workers-federation", auth, func(c *fiber.Ctx) error { return c.SendString(elements.P2PNodeBoxes(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)))) }) app.Get("/p2p/ui/workers-stats", auth, func(c *fiber.Ctx) error { - return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, "")))) + return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.WorkerID)))) }) app.Get("/p2p/ui/workers-federation-stats", auth, func(c *fiber.Ctx) error { return c.SendString(elements.P2PNodeStats(p2p.GetAvailableNodes(p2p.NetworkID(appConfig.P2PNetworkID, p2p.FederatedID)))) diff --git a/core/http/views/explorer.html b/core/http/views/explorer.html new file mode 100644 index 000000000000..91cb9720c9b2 --- /dev/null +++ b/core/http/views/explorer.html @@ -0,0 +1,342 @@ + + + +{{template "views/partials/head" .}} + + + + +
+ {{template "views/partials/navbar_explorer" .}} + +
+

Network Clusters Explorer

+

View the clusters and workers available in each network.

+
+ +
+ +
+ + The explorer is a global, community-driven tool to share network tokens and view available clusters in the globe. + Anyone can use the tokens to offload computation and use the clusters available or share resources. + This is provided without any warranty. Use it at your own risk. We are not responsible for any potential harm or misuse. Sharing tokens globally allows anyone from the internet to use your instances. + Although the community will address bugs, this is experimental software and may be insecure to deploy on your hardware unless you take all necessary precautions. +
+
+ + +
+ +
+

Add New Network

+
+ + +
+
+ + +
+
+ + +
+ + + +
+ + + + + + + + +
+ + + {{template "views/partials/footer" .}} +
+ + + + diff --git a/core/http/views/partials/navbar_explorer.html b/core/http/views/partials/navbar_explorer.html new file mode 100644 index 000000000000..ffc6c4d5ae7d --- /dev/null +++ b/core/http/views/partials/navbar_explorer.html @@ -0,0 +1,39 @@ + + + diff --git a/core/p2p/node.go b/core/p2p/node.go index 6394498fd416..b89bb7c694ff 100644 --- a/core/p2p/node.go +++ b/core/p2p/node.go @@ -5,7 +5,10 @@ import ( "time" ) -const defaultServicesID = "services_localai" +const ( + defaultServicesID = "services" + WorkerID = "worker" +) type NodeData struct { Name string diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index 927f0e241319..37b892d9564b 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -345,13 +345,16 @@ func newNodeOpts(token string) ([]node.Option, error) { // TODO: move this up, expose more config options when creating a node noDHT := os.Getenv("LOCALAI_P2P_DISABLE_DHT") == "true" - noLimits := os.Getenv("LOCALAI_P2P_DISABLE_LIMITS") == "true" + noLimits := os.Getenv("LOCALAI_P2P_ENABLE_LIMITS") == "true" - loglevel := "info" + loglevel := os.Getenv("LOCALAI_P2P_LOGLEVEL") + if loglevel == "" { + loglevel = "info" + } c := config.Config{ Limit: config.ResourceLimit{ - Enable: !noLimits, + Enable: noLimits, MaxConns: 100, }, NetworkToken: token, @@ -366,19 +369,19 @@ func newNodeOpts(token string) ([]node.Option, error) { Service: true, Map: true, RateLimit: true, - RateLimitGlobal: 10, - RateLimitPeer: 10, + RateLimitGlobal: 100, + RateLimitPeer: 100, RateLimitInterval: defaultInterval, }, Discovery: config.Discovery{ - DHT: noDHT, + DHT: !noDHT, MDNS: true, - Interval: 30 * time.Second, + Interval: 10 * time.Second, }, Connection: config.Connection{ HolePunch: true, AutoRelay: true, - MaxConnections: 100, + MaxConnections: 1000, }, }