Skip to content

Commit

Permalink
Fix rpc error: code = Canceled desc = context canceled when falling…
Browse files Browse the repository at this point in the history
… back to non-TLS client

Longhorn 7040

Signed-off-by: Derek Su <[email protected]>
  • Loading branch information
derekbit committed Jan 26, 2024
1 parent 5f0b239 commit 6e2d55c
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 43 deletions.
15 changes: 5 additions & 10 deletions controller/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,7 @@ func (ec *EngineController) CreateInstance(obj interface{}) (*longhorn.InstanceP
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -593,8 +592,7 @@ func (ec *EngineController) DeleteInstance(obj interface{}) (err error) {
return nil
}

ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return err
}
Expand Down Expand Up @@ -684,8 +682,7 @@ func (ec *EngineController) GetInstance(obj interface{}) (*longhorn.InstanceProc
return nil, err
}
}
ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return nil, err
}
Expand All @@ -705,8 +702,7 @@ func (ec *EngineController) LogInstance(ctx context.Context, obj interface{}) (*
return nil, nil, err
}

ctx, cancel := context.WithCancel(ctx)
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -2037,8 +2033,7 @@ func (ec *EngineController) UpgradeEngineInstance(e *longhorn.Engine, log *logru
return err
}

ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return err
}
Expand Down
6 changes: 2 additions & 4 deletions controller/instance_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ type InstanceManagerMonitor struct {
}

func updateInstanceManagerVersion(im *longhorn.InstanceManager) error {
ctx, cancel := context.WithCancel(context.Background())
cli, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
cli, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return err
}
Expand Down Expand Up @@ -1369,8 +1368,7 @@ func (imc *InstanceManagerController) startMonitoring(im *longhorn.InstanceManag
}

// TODO: #2441 refactor this when we do the resource monitoring refactor
ctx, cancel := context.WithCancel(context.Background())
client, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
client, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
log.WithError(err).Errorf("Failed to initialize im client to %v before monitoring", im.Name)
return
Expand Down
7 changes: 3 additions & 4 deletions controller/monitor/disk_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (m *NodeMonitor) getRunningInstanceManagerRO(dataEngine longhorn.DataEngine
return nil, fmt.Errorf("unknown data engine %v", dataEngine)
}

func (m *NodeMonitor) newDiskServiceClients(ctx context.Context, ctxCancel context.CancelFunc, node *longhorn.Node) map[longhorn.DataEngineType]*DiskServiceClient {
func (m *NodeMonitor) newDiskServiceClients(node *longhorn.Node) map[longhorn.DataEngineType]*DiskServiceClient {
clients := map[longhorn.DataEngineType]*DiskServiceClient{}

dataEngines := m.ds.GetDataEngines()
Expand All @@ -187,7 +187,7 @@ func (m *NodeMonitor) newDiskServiceClients(ctx context.Context, ctxCancel conte

im, err := m.getRunningInstanceManagerRO(dataEngine)
if err == nil {
client, err = engineapi.NewDiskServiceClient(ctx, ctxCancel, im, m.logger)
client, err = engineapi.NewDiskServiceClient(im, m.logger)
}

clients[dataEngine] = &DiskServiceClient{
Expand All @@ -211,8 +211,7 @@ func (m *NodeMonitor) closeDiskServiceClients(clients map[longhorn.DataEngineTyp
func (m *NodeMonitor) collectDiskData(node *longhorn.Node) map[string]*CollectedDiskInfo {
diskInfoMap := make(map[string]*CollectedDiskInfo, 0)

ctx, cancel := context.WithCancel(context.Background())
diskServiceClients := m.newDiskServiceClients(ctx, cancel, node)
diskServiceClients := m.newDiskServiceClients(node)
defer func() {
m.closeDiskServiceClients(diskServiceClients)
}()
Expand Down
4 changes: 1 addition & 3 deletions controller/node_controller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"context"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -1502,8 +1501,7 @@ func (nc *NodeController) deleteDisk(node *longhorn.Node, diskType longhorn.Disk
return errors.Wrapf(err, "failed to get default engine instance manager")
}

ctx, cancel := context.WithCancel(context.Background())
diskServiceClient, err := engineapi.NewDiskServiceClient(ctx, cancel, im, nc.logger)
diskServiceClient, err := engineapi.NewDiskServiceClient(im, nc.logger)
if err != nil {
return errors.Wrapf(err, "failed to create disk service client")
}
Expand Down
4 changes: 1 addition & 3 deletions controller/orphan_controller.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package controller

import (
"context"
"fmt"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -341,8 +340,7 @@ func (oc *OrphanController) DeleteSpdkReplicaInstance(diskName, diskUUID, replic
return errors.Wrapf(err, "failed to get instance manager for node %v for deleting SPDK replica instance %v", oc.controllerID, replicaInstanceName)
}

ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewDiskServiceClient(ctx, cancel, im, oc.logger)
c, err := engineapi.NewDiskServiceClient(im, oc.logger)
if err != nil {
return err
}
Expand Down
12 changes: 4 additions & 8 deletions controller/replica_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,7 @@ func (rc *ReplicaController) CreateInstance(obj interface{}) (*longhorn.Instance
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -541,8 +540,7 @@ func (rc *ReplicaController) DeleteInstance(obj interface{}) error {
return nil
}

ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return err
}
Expand Down Expand Up @@ -665,8 +663,7 @@ func (rc *ReplicaController) GetInstance(obj interface{}) (*longhorn.InstancePro
}
}

ctx, cancel := context.WithCancel(context.Background())
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -697,8 +694,7 @@ func (rc *ReplicaController) LogInstance(ctx context.Context, obj interface{}) (
return nil, nil, err
}

ctx, cancel := context.WithCancel(ctx)
c, err := engineapi.NewInstanceManagerClient(ctx, cancel, im)
c, err := engineapi.NewInstanceManagerClient(im)
if err != nil {
return nil, nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions engineapi/disk_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
longhorn "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2"
)

func NewDiskServiceClient(ctx context.Context, ctxCancel context.CancelFunc, im *longhorn.InstanceManager, logger logrus.FieldLogger) (c *DiskService, err error) {
func NewDiskServiceClient(im *longhorn.InstanceManager, logger logrus.FieldLogger) (c *DiskService, err error) {
defer func() {
err = errors.Wrap(err, "failed to get disk service client")
}()
Expand All @@ -30,8 +30,9 @@ func NewDiskServiceClient(ctx context.Context, ctxCancel context.CancelFunc, im
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
endpoint := "tcp://" + imutil.GetURL(im.Status.IP, InstanceManagerDiskServiceDefaultPort)
client, err := imclient.NewDiskServiceClient(ctx, ctxCancel, endpoint, nil)
client, err := imclient.NewDiskServiceClient(ctx, cancel, endpoint, nil)
if err != nil {
return nil, err
}
Expand Down
28 changes: 19 additions & 9 deletions engineapi/instance_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func CheckInstanceManagerProxySupport(im *longhorn.InstanceManager) error {
}

// NewInstanceManagerClient creates a new instance manager client
func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc, im *longhorn.InstanceManager) (*InstanceManagerClient, error) {
func NewInstanceManagerClient(im *longhorn.InstanceManager) (*InstanceManagerClient, error) {
// Do not check the major version here. Since IM cannot get the major version without using this client to call VersionGet().
if im.Status.CurrentState != longhorn.InstanceManagerStateRunning || im.Status.IP == "" {
return nil, fmt.Errorf("invalid Instance Manager %v, state: %v, IP: %v", im.Name, im.Status.CurrentState, im.Status.IP)
Expand All @@ -104,11 +104,14 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,
defer func() {
if err != nil && processManagerClient != nil {
_ = processManagerClient.Close()
processManagerClient = nil
}
}()

ctx, cancel := context.WithCancel(context.Background())

// check for tls cert file presence
processManagerClient, err = imclient.NewProcessManagerClientWithTLS(ctx, ctxCancel, endpoint,
processManagerClient, err = imclient.NewProcessManagerClientWithTLS(ctx, cancel, endpoint,
filepath.Join(types.TLSDirectoryInContainer, types.TLSCAFile),
filepath.Join(types.TLSDirectoryInContainer, types.TLSCertFile),
filepath.Join(types.TLSDirectoryInContainer, types.TLSKeyFile),
Expand All @@ -119,7 +122,7 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,
}

if err = processManagerClient.CheckConnection(); err != nil {
return processManagerClient, errors.Wrapf(err, "failed to check Instance Manager Process Manager Service Client connection for %v ip %v",
return processManagerClient, errors.Wrapf(err, "failed to check Instance Manager Process Manager Service Client TLS connection for %v IP %v",
im.Name, im.Status.IP)
}

Expand All @@ -134,11 +137,14 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,
defer func() {
if err != nil && instanceServiceClient != nil {
_ = instanceServiceClient.Close()
instanceServiceClient = nil
}
}()

ctx, cancel := context.WithCancel(context.Background())

// check for tls cert file presence
instanceServiceClient, err = imclient.NewInstanceServiceClientWithTLS(ctx, ctxCancel, endpoint,
instanceServiceClient, err = imclient.NewInstanceServiceClientWithTLS(ctx, cancel, endpoint,
filepath.Join(types.TLSDirectoryInContainer, types.TLSCAFile),
filepath.Join(types.TLSDirectoryInContainer, types.TLSCertFile),
filepath.Join(types.TLSDirectoryInContainer, types.TLSKeyFile),
Expand All @@ -149,7 +155,7 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,
}

if err = instanceServiceClient.CheckConnection(); err != nil {
return instanceServiceClient, errors.Wrapf(err, "failed to check Instance Manager Instance Service Client connection for %v IP %v",
return instanceServiceClient, errors.Wrapf(err, "failed to check Instance Manager Instance Service Client TLS connection for %v IP %v",
im.Name, im.Status.IP)
}

Expand All @@ -170,14 +176,16 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,
defer func() {
if err != nil && processManagerClient != nil {
processManagerClient.Close()
processManagerClient = nil
}
}()
if err != nil {
logrus.WithError(err).Tracef("Falling back to non-tls client for Instance Manager Process Manager Service Client for %v IP %v",
im.Name, im.Status.IP)
// fallback to non tls client, there is no way to differentiate between im versions unless we get the version via the im client
// TODO: remove this im client fallback mechanism in a future version maybe 2.4 / 2.5 or the next time we update the api version
processManagerClient, err = imclient.NewProcessManagerClient(ctx, ctxCancel, endpoint, nil)
ctx, cancel := context.WithCancel(context.Background())
processManagerClient, err = imclient.NewProcessManagerClient(ctx, cancel, endpoint, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to initialize Instance Manager Process Manager Service Client for %v IP %v",
im.Name, im.Status.IP)
Expand All @@ -189,7 +197,7 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,

version, err := processManagerClient.VersionGet()
if err != nil {
return nil, errors.Wrapf(err, "failed to check version of Instance Manager Process Manager Service Client without TLS for %v IP %v",
return nil, errors.Wrapf(err, "failed to check version of Instance Manager Process Manager Service Client for %v IP %v",
im.Name, im.Status.IP)
}
logrus.Tracef("Instance Manager Process Manager Service Client Version: %+v", version)
Expand All @@ -209,14 +217,16 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,
defer func() {
if err != nil && instanceServiceClient != nil {
instanceServiceClient.Close()
instanceServiceClient = nil
}
}()
if err != nil {
logrus.WithError(err).Tracef("Falling back to non-tls client for Instance Manager Instance Service Client for %v, IP %v",
im.Name, im.Status.IP)
// fallback to non tls client, there is no way to differentiate between im versions unless we get the version via the im client
// TODO: remove this im client fallback mechanism in a future version maybe 2.4 / 2.5 or the next time we update the api version
instanceServiceClient, err = imclient.NewInstanceServiceClient(ctx, ctxCancel, endpoint, nil)
ctx, cancel := context.WithCancel(context.Background())
instanceServiceClient, err = imclient.NewInstanceServiceClient(ctx, cancel, endpoint, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to initialize Instance Manager Instance Service Client for %v IP %v",
im.Name, im.Status.IP)
Expand All @@ -228,7 +238,7 @@ func NewInstanceManagerClient(ctx context.Context, ctxCancel context.CancelFunc,

version, err := instanceServiceClient.VersionGet()
if err != nil {
return nil, errors.Wrapf(err, "failed to check version of Instance Manager Instance Service Client without TLS for %v IP %v",
return nil, errors.Wrapf(err, "failed to check version of Instance Manager Instance Service Client for %v IP %v",
im.Name, im.Status.IP)
}
logrus.Tracef("Instance Manager Instance Service Client Version: %+v", version)
Expand Down
2 changes: 2 additions & 0 deletions engineapi/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func NewEngineClientProxy(im *longhorn.InstanceManager, logger logrus.FieldLogge
defer func() {
if err != nil && proxyClient != nil {
proxyClient.Close()
proxyClient = nil
}
}()

Expand Down Expand Up @@ -115,6 +116,7 @@ func NewEngineClientProxy(im *longhorn.InstanceManager, logger logrus.FieldLogge
defer func() {
if err != nil && proxyClient != nil {
proxyClient.Close()
proxyClient = nil
}
}()
if err != nil {
Expand Down

0 comments on commit 6e2d55c

Please sign in to comment.