Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lease service: SendManifest #212

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
21 changes: 21 additions & 0 deletions cmd/provider-services/cmd/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cmd

import (
"fmt"
"net"
"net/url"
)

func grpcURI(hostURI string) (string, error) {
u, err := url.Parse(hostURI)
if err != nil {
return "", fmt.Errorf("url parse: %w", err)
}

h, _, err := net.SplitHostPort(u.Host)
if err != nil {
return "", fmt.Errorf("split host port: %w", err)
}

return net.JoinHostPort(h, "8444"), nil
}
88 changes: 63 additions & 25 deletions cmd/provider-services/cmd/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package cmd

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/spf13/cobra"
Expand All @@ -14,16 +16,17 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"

dtypes "github.com/akash-network/akash-api/go/node/deployment/v1beta3"
ptypes "github.com/akash-network/akash-api/go/node/provider/v1beta3"
leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1"
"github.com/akash-network/node/sdl"
cutils "github.com/akash-network/node/x/cert/utils"

aclient "github.com/akash-network/provider/client"
gwgrpc "github.com/akash-network/provider/gateway/grpc"
gwrest "github.com/akash-network/provider/gateway/rest"
)

var (
errSubmitManifestFailed = errors.New("submit manifest to some providers has been failed")
)
var errSubmitManifestFailed = errors.New("submit manifest to some providers has been failed")

// SendManifestCmd looks up the Providers blockchain information,
// and POSTs the SDL file to the Gateway address.
Expand Down Expand Up @@ -94,32 +97,68 @@ func doSendManifest(cmd *cobra.Command, sdlpath string) error {
ErrorMessage string `json:"errorMessage,omitempty" yaml:"errorMessage,omitempty"`
}

results := make([]result, len(leases))

submitFailed := false
var (
results = make([]result, len(leases))
submitFailed = false
)

for i, lid := range leases {
prov, _ := sdk.AccAddressFromBech32(lid.Provider)
gclient, err := gwrest.NewClient(cl, prov, []tls.Certificate{cert})
if err != nil {
return err
}
err = func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

provAddr, _ := sdk.AccAddressFromBech32(lid.Provider)
prov, err := cl.Provider(ctx, &ptypes.QueryProviderRequest{Owner: provAddr.String()})
if err != nil {
return fmt.Errorf("query client provider: %w", err)
}

err = gclient.SubmitManifest(cmd.Context(), dseq, mani)
res := result{
Provider: prov,
Status: "PASS",
}
if err != nil {
res.Error = err.Error()
if e, valid := err.(gwrest.ClientResponseError); valid {
res.ErrorMessage = e.Message
hostURIgRPC, err := grpcURI(prov.GetProvider().HostURI)
if err != nil {
return fmt.Errorf("grpc uri: %w", err)
}
res.Status = "FAIL"
submitFailed = true
}

results[i] = res
res := result{
Provider: provAddr,
Status: "PASS",
}

c, err := gwgrpc.NewClient(ctx, hostURIgRPC, cert, cl)
if err == nil {
defer c.Close()

if _, err = c.SendManifest(ctx, &leasev1.SendManifestRequest{
LeaseId: lid,
Manifest: mani,
}); err != nil {
res.Error = err.Error()
res.Status = "FAIL"
submitFailed = true
}
} else {
gclient, err := gwrest.NewClient(cl, provAddr, []tls.Certificate{cert})
if err != nil {
return fmt.Errorf("gwrest new client: %w", err)
}

err = gclient.SubmitManifest(cmd.Context(), dseq, mani)
if err != nil {
res.Error = err.Error()
if e, valid := err.(gwrest.ClientResponseError); valid {
res.ErrorMessage = e.Message
}
res.Status = "FAIL"
submitFailed = true
}
}

results[i] = res

return nil
}()
if err != nil {
return err
}
}

buf := &bytes.Buffer{}
Expand All @@ -146,7 +185,6 @@ func doSendManifest(cmd *cobra.Command, sdlpath string) error {
}

_, err = fmt.Fprint(cmd.OutOrStdout(), buf.String())

if err != nil {
return err
}
Expand Down
21 changes: 12 additions & 9 deletions cmd/provider-services/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"
"time"

cltypes "github.com/akash-network/akash-api/go/node/client/types"
sdkclient "github.com/cosmos/cosmos-sdk/client"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
Expand All @@ -21,6 +20,8 @@ import (
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"

cltypes "github.com/akash-network/akash-api/go/node/client/types"

"github.com/tendermint/tendermint/libs/log"

"github.com/cosmos/cosmos-sdk/client/flags"
Expand Down Expand Up @@ -109,9 +110,7 @@ const (
serviceHostnameOperator = "hostname-operator"
)

var (
errInvalidConfig = errors.New("Invalid configuration")
)
var errInvalidConfig = errors.New("Invalid configuration")

// RunCmd launches the Akash Provider service
func RunCmd() *cobra.Command {
Expand Down Expand Up @@ -206,7 +205,7 @@ func RunCmd() *cobra.Command {
panic(err)
}

cmd.Flags().String(FlagGatewayGRPCListenAddress, "0.0.0.0:8444", "Gateway listen address")
cmd.Flags().String(FlagGatewayGRPCListenAddress, "0.0.0.0:8444", "Gateway gRPC listen address")
if err := viper.BindPFlag(FlagGatewayGRPCListenAddress, cmd.Flags().Lookup(FlagGatewayGRPCListenAddress)); err != nil {
panic(err)
}
Expand Down Expand Up @@ -420,9 +419,11 @@ var allowedBidPricingStrategies = [...]string{
bidPricingStrategyShellScript,
}

var errNoSuchBidPricingStrategy = fmt.Errorf("No such bid pricing strategy. Allowed: %v", allowedBidPricingStrategies)
var errInvalidValueForBidPrice = errors.New("not a valid bid price")
var errBidPriceNegative = errors.New("Bid price cannot be a negative number")
var (
errNoSuchBidPricingStrategy = fmt.Errorf("No such bid pricing strategy. Allowed: %v", allowedBidPricingStrategies)
errInvalidValueForBidPrice = errors.New("not a valid bid price")
errBidPriceNegative = errors.New("Bid price cannot be a negative number")
)

func strToBidPriceScale(val string) (decimal.Decimal, error) {
v, err := decimal.NewFromString(val)
Expand Down Expand Up @@ -746,7 +747,9 @@ func doRunCmd(ctx context.Context, cmd *cobra.Command, _ []string) error {
return err
}

err = gwgrpc.NewServer(ctx, grpcaddr, []tls.Certificate{tlsCert}, service)
ctx = gwgrpc.ContextWithQueryClient(ctx, cl.Query())

err = gwgrpc.Serve(ctx, grpcaddr, []tls.Certificate{tlsCert}, service)
andrewhare marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand Down
57 changes: 57 additions & 0 deletions gateway/grpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package grpc

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

ctypes "github.com/akash-network/akash-api/go/node/cert/v1beta3"
leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1"
providerv1 "github.com/akash-network/akash-api/go/provider/v1"

"github.com/akash-network/provider/gateway/utils"
)

type Client struct {
providerv1.ProviderRPCClient
leasev1.LeaseRPCClient

conn *grpc.ClientConn
}

func (c *Client) Close() error {
return c.conn.Close()
}

func NewClient(ctx context.Context, addr string, cert tls.Certificate, cquery ctypes.QueryClient) (*Client, error) {
tlsConfig := tls.Config{
InsecureSkipVerify: true,

Check failure on line 32 in gateway/grpc/client.go

View workflow job for this annotation

GitHub Actions / lint

G402: TLS InsecureSkipVerify set true. (gosec)
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS13,
VerifyPeerCertificate: func(certificates [][]byte, _ [][]*x509.Certificate) error {
if _, err := utils.VerifyOwnerCertBytes(ctx, certificates, "", x509.ExtKeyUsageClientAuth, cquery); err != nil {
return err
}
return nil
},
}

conn, err := grpc.DialContext(ctx, addr,
grpc.WithBlock(),
grpc.WithTransportCredentials(credentials.NewTLS(&tlsConfig)),
)
if err != nil {
return nil, fmt.Errorf("grpc dial context %s: %w", addr, err)
}

return &Client{
ProviderRPCClient: providerv1.NewProviderRPCClient(conn),
LeaseRPCClient: leasev1.NewLeaseRPCClient(conn),

conn: conn,
}, nil
}
41 changes: 41 additions & 0 deletions gateway/grpc/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package grpc

import (
"context"

ctypes "github.com/akash-network/akash-api/go/node/cert/v1beta3"
sdk "github.com/cosmos/cosmos-sdk/types"
)

type ContextKey string

const (
ContextKeyQueryClient = ContextKey("query-client")
ContextKeyOwner = ContextKey("owner")
)

func ContextWithQueryClient(ctx context.Context, c ctypes.QueryClient) context.Context {
return context.WithValue(ctx, ContextKeyQueryClient, c)
}

func MustQueryClientFromCtx(ctx context.Context) ctypes.QueryClient {
val := ctx.Value(ContextKeyQueryClient)
if val == nil {
panic("context does not have query client set")
}

return val.(ctypes.QueryClient)
}

func ContextWithOwner(ctx context.Context, address sdk.Address) context.Context {
return context.WithValue(ctx, ContextKeyOwner, address)
}

func OwnerFromCtx(ctx context.Context) sdk.Address {
val := ctx.Value(ContextKeyOwner)
if val == nil {
return sdk.AccAddress{}
}

return val.(sdk.Address)
}
69 changes: 69 additions & 0 deletions gateway/grpc/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package grpc

import (
"context"
"errors"

manifestValidation "github.com/akash-network/akash-api/go/manifest/v2beta2"
leasev1 "github.com/akash-network/akash-api/go/provider/lease/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

types "github.com/akash-network/akash-api/go/node/types/v1beta3"

"github.com/akash-network/provider"
pmanifest "github.com/akash-network/provider/manifest"
)

type leaseV1 struct {
c provider.Client
ctx context.Context
}

func (l *leaseV1) SendManifest(ctx context.Context, r *leasev1.SendManifestRequest) (*leasev1.SendManifestResponse, error) {
var (
id = r.GetLeaseId().DeploymentID()
m = r.GetManifest()
)

// HACK(andrewhare): Existing manifests expected service resource endpoints
// to be JSON serialized as [] instead of null when determining the manifest
// version hash. This forces Go to do the right thing.
for g := range m {
for s := range m[g].Services {
if len(m[g].Services[s].Resources.Endpoints) == 0 {
m[g].Services[s].Resources.Endpoints = make(types.Endpoints, 0)
}
}
}

err := l.c.Manifest().Submit(ctx, id, m)
if err == nil {
return &leasev1.SendManifestResponse{}, nil
}

switch {
case errors.Is(err, manifestValidation.ErrInvalidManifest):
return nil, status.Error(codes.InvalidArgument, "invalid manifest")
case errors.Is(err, pmanifest.ErrNoLeaseForDeployment):
return nil, status.Error(codes.NotFound, "no lease for deployment")
}

return nil, status.Errorf(codes.Internal, "manifest submit: %v", err)
}

func (l *leaseV1) ServiceLogs(context.Context, *leasev1.ServiceLogsRequest) (*leasev1.ServiceLogsResponse, error) {
panic("unimplemented")
}

func (l *leaseV1) ServiceStatus(context.Context, *leasev1.ServiceStatusRequest) (*leasev1.ServiceStatusResponse, error) {
panic("unimplemented")
}

func (l *leaseV1) StreamServiceLogs(*leasev1.ServiceLogsRequest, leasev1.LeaseRPC_StreamServiceLogsServer) error {
panic("unimplemented")
}

func (l *leaseV1) StreamServiceStatus(*leasev1.ServiceStatusRequest, leasev1.LeaseRPC_StreamServiceStatusServer) error {
panic("unimplemented")
}
Loading
Loading