diff --git a/provider.go b/provider.go index fe80ba8..f9938a2 100644 --- a/provider.go +++ b/provider.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "path" + "sync" "sync/atomic" "time" @@ -38,9 +39,14 @@ type InstanceGroup struct { settings provider.Settings log hclog.Logger instanceCounter atomic.Int32 + shutdownC chan struct{} + mu sync.RWMutex } func (g *InstanceGroup) Init(ctx context.Context, log hclog.Logger, settings provider.Settings) (provider.ProviderInfo, error) { + // g.mu.Lock() + // defer g.mu.Unlock() + pOpts := []clouds.ParseOption{clouds.WithCloudName(g.Cloud)} if g.CloudsConfig != "" { pOpts = append(pOpts, clouds.WithLocations(g.CloudsConfig)) @@ -54,17 +60,27 @@ func (g *InstanceGroup) Init(ctx context.Context, log hclog.Logger, settings pro // plugin is a long running process. force allow reauth ao.AllowReauth = true - pc, err := config.NewProviderClient(ctx, ao, config.WithTLSConfig(tlsCfg)) - if err != nil { - return provider.ProviderInfo{}, fmt.Errorf("Failed to connect to OpenStack Keystone: %w", err) + reauth := func(ctx context.Context) (*gophercloud.ServiceClient, error) { + pc, err := config.NewProviderClient(ctx, ao, config.WithTLSConfig(tlsCfg)) + if err != nil { + return nil, fmt.Errorf("Failed to connect to OpenStack Keystone: %w", err) + } + + cli, err := openstack.NewComputeV2(pc, eo) + if err != nil { + return nil, fmt.Errorf("Failed to connect to OpenStack Nova: %w", err) + } + + cli.Context = nil // ensure no global context + cli.Microversion = "2.79" // train+ + return cli, nil } - cli, err := openstack.NewComputeV2(pc, eo) + cli, err := reauth(ctx) if err != nil { - return provider.ProviderInfo{}, fmt.Errorf("Failed to connect to OpenStack Nova: %w", err) + return provider.ProviderInfo{}, err } - cli.Microversion = "2.79" // train+ g.computeClient = cli if !settings.ConnectorConfig.UseStaticCredentials { @@ -91,6 +107,37 @@ func (g *InstanceGroup) Init(ctx context.Context, log hclog.Logger, settings pro return provider.ProviderInfo{}, err } + // FIXME: workaround to fix token reauth problem: + // > Unable to re-authenticate: Expected HTTP response code [202 204] when accessing [DELETE https://ci.cloud/compute/v2.1/servers/50b75183-a43a-434f-b3ea-689f90b4ac6b], but got 401 instead + // Issue: https://github.com/gophercloud/gophercloud/issues/2931 + g.shutdownC = make(chan struct{}, 1) + reauthT := time.NewTicker(1 * time.Hour) + go func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for { + select { + case <-reauthT.C: + log.Debug("Re-authenticating...") + cli, err := reauth(ctx) + if err != nil { + log.Error("Re-authenticate failed", "err", err) + continue + } + + log.Info("Re-authenticated successful") + g.mu.Lock() + g.computeClient = cli + g.mu.Unlock() + + case <-g.shutdownC: + reauthT.Stop() + return + } + } + }() + return provider.ProviderInfo{ ID: path.Join("openstack", g.Cloud, g.Name), MaxSize: 1000, @@ -122,7 +169,7 @@ func (g *InstanceGroup) Update(ctx context.Context, update func(instance string, // treat all nodes running long enough as Running state = provider.StateRunning } else { - log, err := servers.ShowConsoleOutput(g.computeClient, srv.ID, servers.ShowConsoleOutputOpts{ + log, err := servers.ShowConsoleOutput(g.cli(), srv.ID, servers.ShowConsoleOutputOpts{ Length: 100, }).Extract() if err != nil { @@ -186,9 +233,17 @@ func (g *InstanceGroup) Decrease(ctx context.Context, instances []string) (succe return instances, err } +func (g *InstanceGroup) cli() *gophercloud.ServiceClient { + g.mu.RLock() + defer g.mu.RUnlock() + + return g.computeClient +} + func (g *InstanceGroup) getInstances(ctx context.Context, initial bool) ([]servers.Server, error) { - page, err := servers.List(g.computeClient, nil).AllPagesWithContext(ctx) + page, err := servers.List(g.cli(), nil).AllPagesWithContext(ctx) if err != nil { + return nil, fmt.Errorf("Server listing error: %w", err) } @@ -232,7 +287,7 @@ func (g *InstanceGroup) createInstance(ctx context.Context) (string, error) { } spec.Metadata[MetadataKey] = g.Name - srv, err := servers.Create(g.computeClient, spec).Extract() + srv, err := servers.Create(g.cli(), spec).Extract() if err != nil { return "", err } @@ -241,11 +296,11 @@ func (g *InstanceGroup) createInstance(ctx context.Context) (string, error) { } func (g *InstanceGroup) deleteInstance(ctx context.Context, id string) error { - return servers.Delete(g.computeClient, id).ExtractErr() + return servers.Delete(g.cli(), id).ExtractErr() } func (g *InstanceGroup) getInstance(ctx context.Context, id string) (*servers.Server, error) { - return servers.Get(g.computeClient, id).Extract() + return servers.Get(g.cli(), id).Extract() } func (g *InstanceGroup) ConnectInfo(ctx context.Context, instanceID string) (provider.ConnectInfo, error) { @@ -314,5 +369,6 @@ func (g *InstanceGroup) ConnectInfo(ctx context.Context, instanceID string) (pro } func (g *InstanceGroup) Shutdown(ctx context.Context) error { + g.shutdownC <- struct{}{} return nil }