From af095204fab39c80fa937db86a1a1b41ae7512b5 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Wed, 21 Aug 2024 10:30:56 +0200 Subject: [PATCH] fix(p2p): avoid starting the node twice (#3349) * fix(p2p): avoid starting the node twice Signed-off-by: Ettore Di Giacinto * fix(p2p): keep exposing service if we don't start the llama.cpp runner Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- core/cli/run.go | 9 +++++- core/cli/worker/worker_p2p.go | 54 +++++++++++++++++------------------ core/p2p/p2p.go | 6 +--- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/core/cli/run.go b/core/cli/run.go index c469f05fcfbc..4fbcd73c93a4 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -120,9 +120,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { if err != nil { return err } + nodeContext := context.Background() + + err = node.Start(nodeContext) + if err != nil { + return fmt.Errorf("starting new node: %w", err) + } log.Info().Msg("Starting P2P server discovery...") - if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) { + if err := p2p.ServiceDiscoverer(nodeContext, node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) { var tunnelAddresses []string for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) { if v.IsOnline() { @@ -146,6 +152,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { return err } fedCtx := context.Background() + node, err := p2p.ExposeService(fedCtx, "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID)) if err != nil { return err diff --git a/core/cli/worker/worker_p2p.go b/core/cli/worker/worker_p2p.go index 7c900667abf3..a65d338175b0 100644 --- a/core/cli/worker/worker_p2p.go +++ b/core/cli/worker/worker_p2p.go @@ -65,44 +65,42 @@ func (r *P2P) Run(ctx *cliContext.Context) error { return err } log.Info().Msgf("You need to start llama-cpp-rpc-server on '%s:%s'", address, p) + } else { + // Start llama.cpp directly from the version we have pre-packaged + go func() { + for { + log.Info().Msgf("Starting llama-cpp-rpc-server on '%s:%d'", address, port) - return nil - } - - // Start llama.cpp directly from the version we have pre-packaged - go func() { - for { - log.Info().Msgf("Starting llama-cpp-rpc-server on '%s:%d'", address, port) + grpcProcess := assets.ResolvePath( + r.BackendAssetsPath, + "util", + "llama-cpp-rpc-server", + ) - grpcProcess := assets.ResolvePath( - r.BackendAssetsPath, - "util", - "llama-cpp-rpc-server", - ) + args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, r.ExtraLLamaCPPArgs...) + args, grpcProcess = library.LoadLDSO(r.BackendAssetsPath, args, grpcProcess) - args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, r.ExtraLLamaCPPArgs...) - args, grpcProcess = library.LoadLDSO(r.BackendAssetsPath, args, grpcProcess) + cmd := exec.Command( + grpcProcess, args..., + ) - cmd := exec.Command( - grpcProcess, args..., - ) + cmd.Env = os.Environ() - cmd.Env = os.Environ() + cmd.Stderr = os.Stdout + cmd.Stdout = os.Stdout - cmd.Stderr = os.Stdout - cmd.Stdout = os.Stdout + if err := cmd.Start(); err != nil { + log.Error().Any("grpcProcess", grpcProcess).Any("args", args).Err(err).Msg("Failed to start llama-cpp-rpc-server") + } - if err := cmd.Start(); err != nil { - log.Error().Any("grpcProcess", grpcProcess).Any("args", args).Err(err).Msg("Failed to start llama-cpp-rpc-server") + cmd.Wait() } + }() - cmd.Wait() + _, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) + if err != nil { + return err } - }() - - _, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) - if err != nil { - return err } for { diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index fe55346a7f1a..53ae63b51592 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -202,13 +202,9 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) { tunnels := make(chan NodeData) - err := n.Start(ctx) - if err != nil { - return nil, fmt.Errorf("creating a new node: %w", err) - } ledger, err := n.Ledger() if err != nil { - return nil, fmt.Errorf("creating a new node: %w", err) + return nil, fmt.Errorf("getting the ledger: %w", err) } // get new services, allocate and return to the channel