Skip to content

Commit

Permalink
feat(p2p): add network explorer and community pools (mudler#3125)
Browse files Browse the repository at this point in the history
* WIP

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Fixups

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Wire up a simple explorer DB

Signed-off-by: Ettore Di Giacinto <[email protected]>

* wip

Signed-off-by: Ettore Di Giacinto <[email protected]>

* WIP

Signed-off-by: Ettore Di Giacinto <[email protected]>

* refactor: group services id so can be identified easily in the ledger table

Signed-off-by: Ettore Di Giacinto <[email protected]>

* feat(discovery): discovery service now gather worker informations correctly

Signed-off-by: Ettore Di Giacinto <[email protected]>

* feat(explorer): display network token

Signed-off-by: Ettore Di Giacinto <[email protected]>

* feat(explorer): display form to add new networks

Signed-off-by: Ettore Di Giacinto <[email protected]>

* feat(explorer): stop from overwriting networks

Signed-off-by: Ettore Di Giacinto <[email protected]>

* feat(explorer): display only networks with active workers

Signed-off-by: Ettore Di Giacinto <[email protected]>

* feat(explorer): list only clusters in a network if it has online workers

Signed-off-by: Ettore Di Giacinto <[email protected]>

* remove invalid and inactive networks

if networks have no workers delete them from the database, similarly,
if invalid.

Signed-off-by: Ettore Di Giacinto <[email protected]>

* ci: add workflow to deploy new explorer versions automatically

Signed-off-by: Ettore Di Giacinto <[email protected]>

* build-api: build with p2p tag

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Allow to specify a connection timeout

Signed-off-by: Ettore Di Giacinto <[email protected]>

* logging

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Better p2p defaults

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Set loglevel

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Fix dht enable

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Default to info for loglevel

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Add navbar

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Slightly improve rendering

Signed-off-by: Ettore Di Giacinto <[email protected]>

* Allow to copy the token easily

Signed-off-by: Ettore Di Giacinto <[email protected]>

* ci fixups

Signed-off-by: Ettore Di Giacinto <[email protected]>

---------

Signed-off-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
mudler authored Aug 9, 2024
1 parent 5fcafc3 commit 9e3e892
Show file tree
Hide file tree
Showing 19 changed files with 1,082 additions and 17 deletions.
64 changes: 64 additions & 0 deletions .github/workflows/deploy-explorer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: Explorer deployment

on:
push:
branches:
- master
tags:
- 'v*'

concurrency:
group: ci-deploy-${{ github.head_ref || github.ref }}-${{ github.repository }}

jobs:
build-linux:
runs-on: ubuntu-latest
steps:
- name: Clone
uses: actions/checkout@v4
with:
submodules: true
- uses: actions/setup-go@v5
with:
go-version: '1.21.x'
cache: false
- name: Dependencies
run: |
sudo apt-get update
sudo apt-get install -y wget curl build-essential ffmpeg protobuf-compiler ccache upx-ucl gawk cmake libgmock-dev
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@1958fcbe2ca8bd93af633f11e97d44e567e945af
go install google.golang.org/protobuf/cmd/[email protected]
make protogen-go
- name: Build api
run: |
make build-api
- name: rm
uses: appleboy/[email protected]
with:
host: ${{ secrets.EXPLORER_SSH_HOST }}
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
key: ${{ secrets.EXPLORER_SSH_KEY }}
port: ${{ secrets.EXPLORER_SSH_PORT }}
script: |
sudo rm -rf local-ai/ || true
- name: copy file via ssh
uses: appleboy/[email protected]
with:
host: ${{ secrets.EXPLORER_SSH_HOST }}
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
key: ${{ secrets.EXPLORER_SSH_KEY }}
port: ${{ secrets.EXPLORER_SSH_PORT }}
source: "local-ai"
overwrite: true
rm: true
target: ./local-ai
- name: restarting
uses: appleboy/[email protected]
with:
host: ${{ secrets.EXPLORER_SSH_HOST }}
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
key: ${{ secrets.EXPLORER_SSH_KEY }}
port: ${{ secrets.EXPLORER_SSH_PORT }}
script: |
sudo cp -rfv local-ai/local-ai /usr/bin/local-ai
sudo systemctl restart local-ai
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ build-minimal:
BUILD_GRPC_FOR_BACKEND_LLAMA=true GRPC_BACKENDS="backend-assets/grpc/llama-cpp-avx2" GO_TAGS=p2p $(MAKE) build

build-api:
BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=none $(MAKE) build
BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=p2p $(MAKE) build

backend-assets/lib:
mkdir -p backend-assets/lib
Expand Down
1 change: 1 addition & 0 deletions core/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ var CLI struct {
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`
Worker worker.Worker `cmd:"" help:"Run workers to distribute workload (llama.cpp-only)"`
Util UtilCMD `cmd:"" help:"Utility commands"`
Explorer ExplorerCMD `cmd:"" help:"Run p2p explorer"`
}
35 changes: 35 additions & 0 deletions core/cli/explorer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cli

import (
"context"
"time"

cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/core/http"
)

type ExplorerCMD struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
}

func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {

db, err := explorer.NewDatabase(e.PoolDatabase)
if err != nil {
return err
}

dur, err := time.ParseDuration(e.ConnectionTimeout)
if err != nil {
return err
}
ds := explorer.NewDiscoveryServer(db, dur)

go ds.Start(context.Background())
appHTTP := http.Explorer(db, ds)

return appHTTP.Listen(e.Address)
}
4 changes: 2 additions & 2 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}

log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) {
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) {
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/cli/worker/worker_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}

err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()

err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
Expand Down
106 changes: 106 additions & 0 deletions core/explorer/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package explorer

// A simple JSON database for storing and retrieving p2p network tokens and a name and description.

import (
"encoding/json"
"os"
"sort"
"sync"
)

// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description.
type Database struct {
sync.RWMutex
path string
data map[string]TokenData
}

// TokenData is a p2p network token with a name and description.
type TokenData struct {
Name string `json:"name"`
Description string `json:"description"`
}

// NewDatabase creates a new Database with the given path.
func NewDatabase(path string) (*Database, error) {
db := &Database{
data: make(map[string]TokenData),
path: path,
}
return db, db.load()
}

// Get retrieves a Token from the Database by its token.
func (db *Database) Get(token string) (TokenData, bool) {
db.RLock()
defer db.RUnlock()
t, ok := db.data[token]
return t, ok
}

// Set stores a Token in the Database by its token.
func (db *Database) Set(token string, t TokenData) error {
db.Lock()
db.data[token] = t
db.Unlock()

return db.Save()
}

// Delete removes a Token from the Database by its token.
func (db *Database) Delete(token string) error {
db.Lock()
delete(db.data, token)
db.Unlock()
return db.Save()
}

func (db *Database) TokenList() []string {
db.RLock()
defer db.RUnlock()
tokens := []string{}
for k := range db.data {
tokens = append(tokens, k)
}

sort.Slice(tokens, func(i, j int) bool {
// sort by token
return tokens[i] < tokens[j]
})

return tokens
}

// load reads the Database from disk.
func (db *Database) load() error {
db.Lock()
defer db.Unlock()

if _, err := os.Stat(db.path); os.IsNotExist(err) {
return nil
}

// Read the file from disk
// Unmarshal the JSON into db.data
f, err := os.ReadFile(db.path)
if err != nil {
return err
}
return json.Unmarshal(f, &db.data)
}

// Save writes the Database to disk.
func (db *Database) Save() error {
db.RLock()
defer db.RUnlock()

// Marshal db.data into JSON
// Write the JSON to the file
f, err := os.Create(db.path)
if err != nil {
return err
}
defer f.Close()
return json.NewEncoder(f).Encode(db.data)
}
92 changes: 92 additions & 0 deletions core/explorer/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package explorer_test

import (
"os"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/mudler/LocalAI/core/explorer"
)

var _ = Describe("Database", func() {
var (
dbPath string
db *explorer.Database
err error
)

BeforeEach(func() {
// Create a temporary file path for the database
dbPath = "test_db.json"
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())
})

AfterEach(func() {
// Clean up the temporary database file
os.Remove(dbPath)
})

Context("when managing tokens", func() {
It("should add and retrieve a token", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}

err = db.Set(token, t)
Expect(err).To(BeNil())

retrievedToken, exists := db.Get(token)
Expect(exists).To(BeTrue())
Expect(retrievedToken).To(Equal(t))
})

It("should delete a token", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}

err = db.Set(token, t)
Expect(err).To(BeNil())

err = db.Delete(token)
Expect(err).To(BeNil())

_, exists := db.Get(token)
Expect(exists).To(BeFalse())
})

It("should persist data to disk", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}

err = db.Set(token, t)
Expect(err).To(BeNil())

// Recreate the database object to simulate reloading from disk
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())

retrievedToken, exists := db.Get(token)
Expect(exists).To(BeTrue())
Expect(retrievedToken).To(Equal(t))

// Check the token list
tokenList := db.TokenList()
Expect(tokenList).To(ContainElement(token))
})
})

Context("when loading an empty or non-existent file", func() {
It("should start with an empty database", func() {
dbPath = "empty_db.json"
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())

_, exists := db.Get("nonexistent")
Expect(exists).To(BeFalse())

// Clean up
os.Remove(dbPath)
})
})
})
Loading

0 comments on commit 9e3e892

Please sign in to comment.