From 24a8eebcefba10e762be1d539088bcaca4b07e53 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Thu, 18 Jul 2024 19:15:15 +0200 Subject: [PATCH] refactor: move federated server logic to its own service (#2914) Signed-off-by: Ettore Di Giacinto --- core/cli/federated.go | 120 +-------------------------------- core/p2p/federated.go | 15 +++++ core/p2p/federated_server.go | 127 +++++++++++++++++++++++++++++++++++ core/p2p/node.go | 1 - core/p2p/p2p.go | 10 +-- core/p2p/p2p_disabled.go | 4 ++ 6 files changed, 153 insertions(+), 124 deletions(-) create mode 100644 core/p2p/federated.go create mode 100644 core/p2p/federated_server.go diff --git a/core/cli/federated.go b/core/cli/federated.go index b1de18403c03..84440a9fea01 100644 --- a/core/cli/federated.go +++ b/core/cli/federated.go @@ -2,20 +2,9 @@ package cli import ( "context" - "errors" - "fmt" - "io" - "net" - "time" - - "math/rand/v2" cliContext "github.com/mudler/LocalAI/core/cli/context" "github.com/mudler/LocalAI/core/p2p" - "github.com/mudler/edgevpn/pkg/node" - "github.com/mudler/edgevpn/pkg/protocol" - "github.com/mudler/edgevpn/pkg/types" - "github.com/rs/zerolog/log" ) type FederatedCLI struct { @@ -25,112 +14,7 @@ type FederatedCLI struct { func (f *FederatedCLI) Run(ctx *cliContext.Context) error { - n, err := p2p.NewNode(f.Peer2PeerToken) - if err != nil { - return fmt.Errorf("creating a new node: %w", err) - } - err = n.Start(context.Background()) - if err != nil { - return fmt.Errorf("creating a new node: %w", err) - } - - if err := p2p.ServiceDiscoverer(context.Background(), n, f.Peer2PeerToken, p2p.FederatedID, func(servicesID string, tunnel p2p.NodeData) { - log.Debug().Msgf("Discovered node: %s", tunnel.ID) - }); err != nil { - return err - } - - return Proxy(context.Background(), n, f.Address, p2p.FederatedID) -} - -func Proxy(ctx context.Context, node *node.Node, listenAddr, service string) error { - - log.Info().Msgf("Allocating service '%s' on: %s", service, listenAddr) - // Open local port for listening - l, err := net.Listen("tcp", listenAddr) - if err != nil { - log.Error().Err(err).Msg("Error listening") - return err - } - // ll.Info("Binding local port on", srcaddr) - - ledger, _ := node.Ledger() - - // Announce ourselves so nodes accepts our connection - ledger.Announce( - ctx, - 10*time.Second, - func() { - // Retrieve current ID for ip in the blockchain - //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) - // If mismatch, update the blockchain - //if !found { - updatedMap := map[string]interface{}{} - updatedMap[node.Host().ID().String()] = &types.User{ - PeerID: node.Host().ID().String(), - Timestamp: time.Now().String(), - } - ledger.Add(protocol.UsersLedgerKey, updatedMap) - // } - }, - ) - - defer l.Close() - for { - select { - case <-ctx.Done(): - return errors.New("context canceled") - default: - log.Debug().Msg("New for connection") - // Listen for an incoming connection. - conn, err := l.Accept() - if err != nil { - fmt.Println("Error accepting: ", err.Error()) - continue - } - - // Handle connections in a new goroutine, forwarding to the p2p service - go func() { - var tunnelAddresses []string - for _, v := range p2p.GetAvailableNodes(p2p.FederatedID) { - if v.IsOnline() { - tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) - } else { - log.Info().Msgf("Node %s is offline", v.ID) - } - } - - if len(tunnelAddresses) == 0 { - log.Error().Msg("No available nodes yet") - return - } - // open a TCP stream to one of the tunnels - // chosen randomly - // TODO: optimize this and track usage - tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] - - tunnelConn, err := net.Dial("tcp", tunnelAddr) - if err != nil { - log.Error().Err(err).Msg("Error connecting to tunnel") - return - } - - log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) - closer := make(chan struct{}, 2) - go copyStream(closer, tunnelConn, conn) - go copyStream(closer, conn, tunnelConn) - <-closer - - tunnelConn.Close() - conn.Close() - // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) - }() - } - } - -} + fs := p2p.NewFederatedServer(f.Address, p2p.FederatedID, f.Peer2PeerToken) -func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { - defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy - io.Copy(dst, src) + return fs.Start(context.Background()) } diff --git a/core/p2p/federated.go b/core/p2p/federated.go new file mode 100644 index 000000000000..c76ff7b08e06 --- /dev/null +++ b/core/p2p/federated.go @@ -0,0 +1,15 @@ +package p2p + +const FederatedID = "federated" + +type FederatedServer struct { + listenAddr, service, p2ptoken string +} + +func NewFederatedServer(listenAddr, service, p2pToken string) *FederatedServer { + return &FederatedServer{ + listenAddr: listenAddr, + service: service, + p2ptoken: p2pToken, + } +} diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go new file mode 100644 index 000000000000..db5957e71ed5 --- /dev/null +++ b/core/p2p/federated_server.go @@ -0,0 +1,127 @@ +//go:build p2p +// +build p2p + +package p2p + +import ( + "context" + "errors" + "fmt" + "net" + "time" + + "math/rand/v2" + + "github.com/mudler/edgevpn/pkg/node" + "github.com/mudler/edgevpn/pkg/protocol" + "github.com/mudler/edgevpn/pkg/types" + "github.com/rs/zerolog/log" +) + +func (f *FederatedServer) Start(ctx context.Context) error { + + n, err := NewNode(f.p2ptoken) + if err != nil { + return fmt.Errorf("creating a new node: %w", err) + } + err = n.Start(ctx) + if err != nil { + return fmt.Errorf("creating a new node: %w", err) + } + + if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) { + log.Debug().Msgf("Discovered node: %s", tunnel.ID) + }); err != nil { + return err + } + + return f.proxy(ctx, n) +} + +func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { + + log.Info().Msgf("Allocating service '%s' on: %s", fs.service, fs.listenAddr) + // Open local port for listening + l, err := net.Listen("tcp", fs.listenAddr) + if err != nil { + log.Error().Err(err).Msg("Error listening") + return err + } + // ll.Info("Binding local port on", srcaddr) + + ledger, _ := node.Ledger() + + // Announce ourselves so nodes accepts our connection + ledger.Announce( + ctx, + 10*time.Second, + func() { + // Retrieve current ID for ip in the blockchain + //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) + // If mismatch, update the blockchain + //if !found { + updatedMap := map[string]interface{}{} + updatedMap[node.Host().ID().String()] = &types.User{ + PeerID: node.Host().ID().String(), + Timestamp: time.Now().String(), + } + ledger.Add(protocol.UsersLedgerKey, updatedMap) + // } + }, + ) + + defer l.Close() + for { + select { + case <-ctx.Done(): + return errors.New("context canceled") + default: + log.Debug().Msg("New for connection") + // Listen for an incoming connection. + conn, err := l.Accept() + if err != nil { + fmt.Println("Error accepting: ", err.Error()) + continue + } + + // Handle connections in a new goroutine, forwarding to the p2p service + go func() { + var tunnelAddresses []string + for _, v := range GetAvailableNodes(fs.service) { + if v.IsOnline() { + tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) + } else { + log.Info().Msgf("Node %s is offline", v.ID) + } + } + + if len(tunnelAddresses) == 0 { + log.Error().Msg("No available nodes yet") + return + } + + // open a TCP stream to one of the tunnels + // chosen randomly + // TODO: optimize this and track usage + tunnelAddr := tunnelAddresses[rand.IntN(len(tunnelAddresses))] + + tunnelConn, err := net.Dial("tcp", tunnelAddr) + if err != nil { + log.Error().Err(err).Msg("Error connecting to tunnel") + return + } + + log.Info().Msgf("Redirecting %s to %s", conn.LocalAddr().String(), tunnelConn.RemoteAddr().String()) + closer := make(chan struct{}, 2) + go copyStream(closer, tunnelConn, conn) + go copyStream(closer, conn, tunnelConn) + <-closer + + tunnelConn.Close() + conn.Close() + // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String()) + }() + } + } + +} diff --git a/core/p2p/node.go b/core/p2p/node.go index 1d5356e6d385..6394498fd416 100644 --- a/core/p2p/node.go +++ b/core/p2p/node.go @@ -6,7 +6,6 @@ import ( ) const defaultServicesID = "services_localai" -const FederatedID = "federated" type NodeData struct { Name string diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index 9b71f7ded976..927f0e241319 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -137,11 +137,6 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv } -func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { - defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy - io.Copy(dst, src) -} - // This is the main of the server (which keeps the env variable updated) // This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error { @@ -396,3 +391,8 @@ func newNodeOpts(token string) ([]node.Option, error) { return nodeOpts, nil } + +func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) { + defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy + io.Copy(dst, src) +} diff --git a/core/p2p/p2p_disabled.go b/core/p2p/p2p_disabled.go index b1d1d04a467c..ab1d69dc26be 100644 --- a/core/p2p/p2p_disabled.go +++ b/core/p2p/p2p_disabled.go @@ -14,6 +14,10 @@ func GenerateToken() string { return "not implemented" } +func (f *FederatedServer) Start(ctx context.Context) error { + return fmt.Errorf("not implemented") +} + func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error { return fmt.Errorf("not implemented") }