Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC Endpoints #232

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions cmd/provider-services/cmd/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package cmd

import (
"fmt"
"net"
"net/url"
)

const gRPCDefaultPort = "8444"

func grpcURI(hostURI string) (string, error) {
u, err := url.Parse(hostURI)
if err != nil {
return "", fmt.Errorf("url parse: %w", err)
}

h, _, err := net.SplitHostPort(u.Host)
if err != nil {
return "", fmt.Errorf("split host port: %w", err)
}

return net.JoinHostPort(h, gRPCDefaultPort), nil
}
177 changes: 153 additions & 24 deletions cmd/provider-services/cmd/leaseLogs.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
package cmd

import (
"context"
"crypto/tls"
"fmt"
"strings"
"sync"
"time"

sdkclient "github.com/cosmos/cosmos-sdk/client"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/akash-network/akash-api/go/node/client/v1beta2"
dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
mtypes "github.com/akash-network/akash-api/go/node/market/v1beta4"
ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3"
leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1"
cmdcommon "github.com/akash-network/node/cmd/common"
cutils "github.com/akash-network/node/x/cert/utils"

aclient "github.com/akash-network/provider/client"
gwgrpc "github.com/akash-network/provider/gateway/grpc"
gwrest "github.com/akash-network/provider/gateway/rest"
)

Expand Down Expand Up @@ -81,7 +89,7 @@ func doLeaseLogs(cmd *cobra.Command) error {
}

if outputFormat != outputText && outputFormat != outputJSON {
return errors.Errorf("invalid output format %s. expected text|json", outputFormat)
return fmt.Errorf("invalid output format %s. expected text|json", outputFormat)
}

follow, err := cmd.Flags().GetBool(flagFollow)
Expand All @@ -95,9 +103,127 @@ func doLeaseLogs(cmd *cobra.Command) error {
}

if tailLines < -1 {
return errors.Errorf("tail flag supplied with invalid value. must be >= -1")
return fmt.Errorf("tail flag supplied with invalid value. must be >= -1")
}

g := leaseLogGetter{
cert: cert,
cl: cl,
svcs: svcs,
follow: follow,
tailLines: tailLines,
printer: printer{
cctx: cctx,
fmt: outputFormat,
},
}

if err = g.run(ctx, leases); err != nil {
return fmt.Errorf("getting logs: %w", err)
}

return nil
}

type leaseLogGetter struct {
cert tls.Certificate
cl v1beta2.QueryClient
svcs string
follow bool
tailLines int64
printer printer
}

func (g leaseLogGetter) run(ctx context.Context, leases []mtypes.LeaseID) error {
var (
restLeases = make([]mtypes.LeaseID, 0, len(leases))
grpcLeases = make(map[mtypes.LeaseID]*gwgrpc.Client, len(leases))
)

for _, lid := range leases {
provAddr, _ := sdk.AccAddressFromBech32(lid.Provider)
prov, err := g.cl.Provider(ctx, &ptypes.QueryProviderRequest{Owner: provAddr.String()})
if err != nil {
return fmt.Errorf("query client provider: %w", err)
}

hostURIgRPC, err := grpcURI(prov.GetProvider().HostURI)
if err != nil {
return fmt.Errorf("grpc uri: %w", err)
}

ctxDial, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

client, err := gwgrpc.NewClient(ctxDial, hostURIgRPC, g.cert, g.cl)
if err == nil {
grpcLeases[lid] = client
} else {
restLeases = append(restLeases, lid)
}
}

g.grpc(ctx, grpcLeases)
g.rest(ctx, restLeases)

return nil
}

func (g leaseLogGetter) grpc(ctx context.Context, leases map[mtypes.LeaseID]*gwgrpc.Client) {
var wg sync.WaitGroup
wg.Add(len(leases))

for lid, cc := range leases {
go func(c *gwgrpc.Client, id mtypes.LeaseID) {
defer wg.Done()

req := leasev1.ServiceLogsRequest{
Services: strings.Split(g.svcs, " "),
LeaseId: id,
}

logErr := func(err error) {
fmt.Printf("[%s]: %v", id, err)
}

s, err := c.StreamServiceLogs(ctx, &req)
if err != nil {
logErr(fmt.Errorf("stream service logs: %w", err))
return
}

for {
select {
case <-ctx.Done():
return
default:
r, err := s.Recv()
if err != nil {
if e, ok := status.FromError(err); ok {
if e.Code() != codes.Canceled {
logErr(fmt.Errorf("recv: %w", err))
}
}

return
}

for _, s := range r.Services {
g.printer.write(logEntry{
Name: s.GetName(),
Message: string(s.GetLogs()),
Lid: id,
})
}
}
}
}(cc, lid)
}

wg.Wait()
}

func (g leaseLogGetter) rest(ctx context.Context, leases []mtypes.LeaseID) {
type result struct {
lid mtypes.LeaseID
error error
Expand All @@ -109,9 +235,9 @@ func doLeaseLogs(cmd *cobra.Command) error {
for _, lid := range leases {
stream := result{lid: lid}
prov, _ := sdk.AccAddressFromBech32(lid.Provider)
gclient, err := gwrest.NewClient(ctx, cl, prov, []tls.Certificate{cert})
gclient, err := gwrest.NewClient(ctx, g.cl, prov, []tls.Certificate{g.cert})
if err == nil {
stream.stream, stream.error = gclient.LeaseLogs(ctx, lid, svcs, follow, tailLines)
stream.stream, stream.error = gclient.LeaseLogs(ctx, lid, g.svcs, g.follow, g.tailLines)
} else {
stream.error = err
}
Expand All @@ -121,26 +247,11 @@ func doLeaseLogs(cmd *cobra.Command) error {

var wgStreams sync.WaitGroup

type logEntry struct {
gwrest.ServiceLogMessage `json:",inline"`
Lid mtypes.LeaseID `json:"lease_id"`
}

outch := make(chan logEntry)

printFn := func(evt logEntry) {
fmt.Printf("[%s][%s] %s\n", evt.Lid, evt.Name, evt.Message)
}

if outputFormat == "json" {
printFn = func(evt logEntry) {
_ = cmdcommon.PrintJSON(cctx, evt)
}
}

go func() {
for evt := range outch {
printFn(evt)
g.printer.write(evt)
}
}()

Expand All @@ -155,15 +266,33 @@ func doLeaseLogs(cmd *cobra.Command) error {

for res := range stream.stream.Stream {
outch <- logEntry{
ServiceLogMessage: res,
Lid: stream.lid,
Name: res.Name,
Message: res.Message,
Lid: stream.lid,
}
}
}(stream)
}

wgStreams.Wait()
close(outch)
}

return nil
type logEntry struct {
Name string `json:"name"`
Message string `json:"message"`
Lid mtypes.LeaseID `json:"lease_id"`
}

type printer struct {
fmt string
cctx sdkclient.Context
}

func (p printer) write(e logEntry) {
if p.fmt == "json" {
cmdcommon.PrintJSON(p.cctx, e)
} else {
fmt.Printf("[%s][%s] %s\n", e.Lid, e.Name, e.Message)
}
}
Loading