Skip to content

Commit

Permalink
fix(p2p): avoid starting the node twice (#3349)
Browse files Browse the repository at this point in the history
* fix(p2p): avoid starting the node twice

Signed-off-by: Ettore Di Giacinto <[email protected]>

* fix(p2p): keep exposing service if we don't start the llama.cpp runner

Signed-off-by: Ettore Di Giacinto <[email protected]>

---------

Signed-off-by: Ettore Di Giacinto <[email protected]>
  • Loading branch information
mudler authored Aug 21, 2024
1 parent 70e53bc commit af09520
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 34 deletions.
9 changes: 8 additions & 1 deletion core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,15 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
if err != nil {
return err
}
nodeContext := context.Background()

err = node.Start(nodeContext)
if err != nil {
return fmt.Errorf("starting new node: %w", err)
}

log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
if err := p2p.ServiceDiscoverer(nodeContext, node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID)) {
if v.IsOnline() {
Expand All @@ -146,6 +152,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
return err
}
fedCtx := context.Background()

node, err := p2p.ExposeService(fedCtx, "localhost", port, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID))
if err != nil {
return err
Expand Down
54 changes: 26 additions & 28 deletions core/cli/worker/worker_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,44 +65,42 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
return err
}
log.Info().Msgf("You need to start llama-cpp-rpc-server on '%s:%s'", address, p)
} else {
// Start llama.cpp directly from the version we have pre-packaged
go func() {
for {
log.Info().Msgf("Starting llama-cpp-rpc-server on '%s:%d'", address, port)

return nil
}

// Start llama.cpp directly from the version we have pre-packaged
go func() {
for {
log.Info().Msgf("Starting llama-cpp-rpc-server on '%s:%d'", address, port)
grpcProcess := assets.ResolvePath(
r.BackendAssetsPath,
"util",
"llama-cpp-rpc-server",
)

grpcProcess := assets.ResolvePath(
r.BackendAssetsPath,
"util",
"llama-cpp-rpc-server",
)
args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, r.ExtraLLamaCPPArgs...)
args, grpcProcess = library.LoadLDSO(r.BackendAssetsPath, args, grpcProcess)

args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, r.ExtraLLamaCPPArgs...)
args, grpcProcess = library.LoadLDSO(r.BackendAssetsPath, args, grpcProcess)
cmd := exec.Command(
grpcProcess, args...,
)

cmd := exec.Command(
grpcProcess, args...,
)
cmd.Env = os.Environ()

cmd.Env = os.Environ()
cmd.Stderr = os.Stdout
cmd.Stdout = os.Stdout

cmd.Stderr = os.Stdout
cmd.Stdout = os.Stdout
if err := cmd.Start(); err != nil {
log.Error().Any("grpcProcess", grpcProcess).Any("args", args).Err(err).Msg("Failed to start llama-cpp-rpc-server")
}

if err := cmd.Start(); err != nil {
log.Error().Any("grpcProcess", grpcProcess).Any("args", args).Err(err).Msg("Failed to start llama-cpp-rpc-server")
cmd.Wait()
}
}()

cmd.Wait()
_, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
}()

_, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}

for {
Expand Down
6 changes: 1 addition & 5 deletions core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,9 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) {
tunnels := make(chan NodeData)

err := n.Start(ctx)
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
}
ledger, err := n.Ledger()
if err != nil {
return nil, fmt.Errorf("creating a new node: %w", err)
return nil, fmt.Errorf("getting the ledger: %w", err)
}
// get new services, allocate and return to the channel

Expand Down

0 comments on commit af09520

Please sign in to comment.