Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): add network explorer and community pools #3125

Merged
merged 25 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions .github/workflows/deploy-explorer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: Explorer deployment

on:
push:
branches:
- master
tags:
- 'v*'

concurrency:
group: ci-deploy-${{ github.head_ref || github.ref }}-${{ github.repository }}

jobs:
build-linux:
runs-on: ubuntu-latest
steps:
- name: Clone
uses: actions/checkout@v4
with:
submodules: true
- uses: actions/setup-go@v5
with:
go-version: '1.21.x'
cache: false
- name: Dependencies
run: |
sudo apt-get update
sudo apt-get install -y wget curl build-essential ffmpeg protobuf-compiler ccache upx-ucl gawk cmake libgmock-dev
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@1958fcbe2ca8bd93af633f11e97d44e567e945af
go install google.golang.org/protobuf/cmd/[email protected]
make protogen-go
- name: Build api
run: |
make build-api
- name: rm
uses: appleboy/[email protected]
with:
host: ${{ secrets.EXPLORER_SSH_HOST }}
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
key: ${{ secrets.EXPLORER_SSH_KEY }}
port: ${{ secrets.EXPLORER_SSH_PORT }}
script: |
sudo rm -rf local-ai/ || true
- name: copy file via ssh
uses: appleboy/[email protected]
with:
host: ${{ secrets.EXPLORER_SSH_HOST }}
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
key: ${{ secrets.EXPLORER_SSH_KEY }}
port: ${{ secrets.EXPLORER_SSH_PORT }}
source: "local-ai"
overwrite: true
rm: true
target: ./local-ai
- name: restarting
uses: appleboy/[email protected]
with:
host: ${{ secrets.EXPLORER_SSH_HOST }}
username: ${{ secrets.EXPLORER_SSH_USERNAME }}
key: ${{ secrets.EXPLORER_SSH_KEY }}
port: ${{ secrets.EXPLORER_SSH_PORT }}
script: |
sudo cp -rfv local-ai/local-ai /usr/bin/local-ai
sudo systemctl restart local-ai
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ build-minimal:
BUILD_GRPC_FOR_BACKEND_LLAMA=true GRPC_BACKENDS="backend-assets/grpc/llama-cpp-avx2" GO_TAGS=p2p $(MAKE) build

build-api:
BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=none $(MAKE) build
BUILD_GRPC_FOR_BACKEND_LLAMA=true BUILD_API_ONLY=true GO_TAGS=p2p $(MAKE) build

backend-assets/lib:
mkdir -p backend-assets/lib
Expand Down
1 change: 1 addition & 0 deletions core/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ var CLI struct {
Transcript TranscriptCMD `cmd:"" help:"Convert audio to text"`
Worker worker.Worker `cmd:"" help:"Run workers to distribute workload (llama.cpp-only)"`
Util UtilCMD `cmd:"" help:"Utility commands"`
Explorer ExplorerCMD `cmd:"" help:"Run p2p explorer"`
}
35 changes: 35 additions & 0 deletions core/cli/explorer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cli

import (
"context"
"time"

cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/core/http"
)

type ExplorerCMD struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
}

func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {

db, err := explorer.NewDatabase(e.PoolDatabase)
if err != nil {
return err
}

dur, err := time.ParseDuration(e.ConnectionTimeout)
if err != nil {
return err
}
ds := explorer.NewDiscoveryServer(db, dur)

go ds.Start(context.Background())
appHTTP := http.Explorer(db, ds)

return appHTTP.Listen(e.Address)
}
4 changes: 2 additions & 2 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}

log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, ""), func(serviceID string, node p2p.NodeData) {
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, "")) {
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/cli/worker/worker_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}

err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()

err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, ""))
err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
Expand Down
106 changes: 106 additions & 0 deletions core/explorer/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package explorer

// A simple JSON database for storing and retrieving p2p network tokens and a name and description.

import (
"encoding/json"
"os"
"sort"
"sync"
)

// Database is a simple JSON database for storing and retrieving p2p network tokens and a name and description.
type Database struct {
sync.RWMutex
path string
data map[string]TokenData
}

// TokenData is a p2p network token with a name and description.
type TokenData struct {
Name string `json:"name"`
Description string `json:"description"`
}

// NewDatabase creates a new Database with the given path.
func NewDatabase(path string) (*Database, error) {
db := &Database{
data: make(map[string]TokenData),
path: path,
}
return db, db.load()
}

// Get retrieves a Token from the Database by its token.
func (db *Database) Get(token string) (TokenData, bool) {
db.RLock()
defer db.RUnlock()
t, ok := db.data[token]
return t, ok
}

// Set stores a Token in the Database by its token.
func (db *Database) Set(token string, t TokenData) error {
db.Lock()
db.data[token] = t
db.Unlock()

return db.Save()
}

// Delete removes a Token from the Database by its token.
func (db *Database) Delete(token string) error {
db.Lock()
delete(db.data, token)
db.Unlock()
return db.Save()
}

func (db *Database) TokenList() []string {
db.RLock()
defer db.RUnlock()
tokens := []string{}
for k := range db.data {
tokens = append(tokens, k)
}

sort.Slice(tokens, func(i, j int) bool {
// sort by token
return tokens[i] < tokens[j]
})

return tokens
}

// load reads the Database from disk.
func (db *Database) load() error {
db.Lock()
defer db.Unlock()

if _, err := os.Stat(db.path); os.IsNotExist(err) {
return nil
}

// Read the file from disk
// Unmarshal the JSON into db.data
f, err := os.ReadFile(db.path)
if err != nil {
return err
}
return json.Unmarshal(f, &db.data)
}

// Save writes the Database to disk.
func (db *Database) Save() error {
db.RLock()
defer db.RUnlock()

// Marshal db.data into JSON
// Write the JSON to the file
f, err := os.Create(db.path)
if err != nil {
return err
}
defer f.Close()
return json.NewEncoder(f).Encode(db.data)
}
92 changes: 92 additions & 0 deletions core/explorer/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package explorer_test

import (
"os"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/mudler/LocalAI/core/explorer"
)

var _ = Describe("Database", func() {
var (
dbPath string
db *explorer.Database
err error
)

BeforeEach(func() {
// Create a temporary file path for the database
dbPath = "test_db.json"
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())
})

AfterEach(func() {
// Clean up the temporary database file
os.Remove(dbPath)
})

Context("when managing tokens", func() {
It("should add and retrieve a token", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}

err = db.Set(token, t)
Expect(err).To(BeNil())

retrievedToken, exists := db.Get(token)
Expect(exists).To(BeTrue())
Expect(retrievedToken).To(Equal(t))
})

It("should delete a token", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}

err = db.Set(token, t)
Expect(err).To(BeNil())

err = db.Delete(token)
Expect(err).To(BeNil())

_, exists := db.Get(token)
Expect(exists).To(BeFalse())
})

It("should persist data to disk", func() {
token := "token123"
t := explorer.TokenData{Name: "TokenName", Description: "A test token"}

err = db.Set(token, t)
Expect(err).To(BeNil())

// Recreate the database object to simulate reloading from disk
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())

retrievedToken, exists := db.Get(token)
Expect(exists).To(BeTrue())
Expect(retrievedToken).To(Equal(t))

// Check the token list
tokenList := db.TokenList()
Expect(tokenList).To(ContainElement(token))
})
})

Context("when loading an empty or non-existent file", func() {
It("should start with an empty database", func() {
dbPath = "empty_db.json"
db, err = explorer.NewDatabase(dbPath)
Expect(err).To(BeNil())

_, exists := db.Get("nonexistent")
Expect(exists).To(BeFalse())

// Clean up
os.Remove(dbPath)
})
})
})
Loading
Loading