Skip to content

Commit

Permalink
refactor: move federated server logic to its own service (mudler#2914)
Browse files Browse the repository at this point in the history
Signed-off-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
mudler authored Jul 18, 2024
1 parent bf9dd1d commit 24a8eeb
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 124 deletions.
120 changes: 2 additions & 118 deletions core/cli/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,9 @@ package cli

import (
"context"
"errors"
"fmt"
"io"
"net"
"time"

"math/rand/v2"

cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)

type FederatedCLI struct {
Expand All @@ -25,112 +14,7 @@ type FederatedCLI struct {

func (f *FederatedCLI) Run(ctx *cliContext.Context) error {

n, err := p2p.NewNode(f.Peer2PeerToken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(context.Background())
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}

if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, func(servicesID string, tunnel p2p.NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}); err != nil {
return err
}

return Proxy(context.Background(), n, f.Address, p2p.FederatedID)
}

func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error {

log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", listenAddr)
if err != nil {
log.Error().Err(err).Msg("Error listening")
return err
}
// ll.Info("Binding local port on", srcaddr)

ledger, _ := node.Ledger()

// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)

defer l.Close()
for {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
continue
}

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}

if len(tunnelAddresses) == 0 {
log.Error().Msg("No available nodes yet")
return
}
// open a TCP stream to one of the tunnels
// chosen randomly
// TODO: optimize this and track usage
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]

tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
return
}

log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer

tunnelConn.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
}()
}
}

}
fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken)

func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)
return fs.Start(context.Background())
}
15 changes: 15 additions & 0 deletions core/p2p/federated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package p2p

const FederatedID = "federated"

type FederatedServer struct {
listenAddr, service, p2ptoken string
}

func NewFederatedServer(listenAddr, service, p2pToken string) *FederatedServer {
return &FederatedServer{
listenAddr: listenAddr,
service: service,
p2ptoken: p2pToken,
}
}
127 changes: 127 additions & 0 deletions core/p2p/federated_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
//go:build p2p
// +build p2p

package p2p

import (
"context"
"errors"
"fmt"
"net"
"time"

"math/rand/v2"

"github.com/mudler/edgevpn/pkg/node"
"github.com/mudler/edgevpn/pkg/protocol"
"github.com/mudler/edgevpn/pkg/types"
"github.com/rs/zerolog/log"
)

func (f *FederatedServer) Start(ctx context.Context) error {

n, err := NewNode(f.p2ptoken)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}
err = n.Start(ctx)
if err != nil {
return fmt.Errorf("creating a new node: %w", err)
}

if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}); err != nil {
return err
}

return f.proxy(ctx, n)
}

func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {

log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr)
// Open local port for listening
l, err := net.Listen("tcp", fs.listenAddr)
if err != nil {
log.Error().Err(err).Msg("Error listening")
return err
}
// ll.Info("Binding local port on", srcaddr)

ledger, _ := node.Ledger()

// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)

defer l.Close()
for {
select {
case <-ctx.Done():
return errors.New("context canceled")
default:
log.Debug().Msg("New for connection")
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting: ", err.Error())
continue
}

// Handle connections in a new goroutine, forwarding to the p2p service
go func() {
var tunnelAddresses []string
for _, v := range GetAvailableNodes(fs.service) {
if v.IsOnline() {
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress)
} else {
log.Info().Msgf("Node %s is offline", v.ID)
}
}

if len(tunnelAddresses) == 0 {
log.Error().Msg("No available nodes yet")
return
}

// open a TCP stream to one of the tunnels
// chosen randomly
// TODO: optimize this and track usage
tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))]

tunnelConn, err := net.Dial("tcp", tunnelAddr)
if err != nil {
log.Error().Err(err).Msg("Error connecting to tunnel")
return
}

log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String())
closer := make(chan struct{}, 2)
go copyStream(closer, tunnelConn, conn)
go copyStream(closer, conn, tunnelConn)
<-closer

tunnelConn.Close()
conn.Close()
// ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
}()
}
}

}
1 change: 0 additions & 1 deletion core/p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
)

const defaultServicesID = "services_localai"
const FederatedID = "federated"

type NodeData struct {
Name string
Expand Down
10 changes: 5 additions & 5 deletions core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv

}

func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)
}

// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error {
Expand Down Expand Up @@ -396,3 +391,8 @@ func newNodeOpts(token string) ([]node.Option, error) {

return nodeOpts, nil
}

func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
io.Copy(dst, src)
}
4 changes: 4 additions & 0 deletions core/p2p/p2p_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ func GenerateToken() string {
return "not implemented"
}

func (f *FederatedServer) Start(ctx context.Context) error {
return fmt.Errorf("not implemented")
}

func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error {
return fmt.Errorf("not implemented")
}
Expand Down

0 comments on commit 24a8eeb

Please sign in to comment.