Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added retrier on api requests #4

Merged
merged 1 commit into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 94 additions & 17 deletions internal/tfhandler/tfhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,34 +114,96 @@ func (i informer) Run() {
log.Println("Stopped informer")
}

type eventQueryRetryResult struct {
next bool
err error
}

// Send a query to the API and only return upon a successful response from the API withith a given time threshed.
// Upon reaching the threshold, exit the program to prevent the tf resouce on the API (vcluster) from being
// out-of-sync with the local resource
func eventQueryRetrier(crud tfoapiclient.CrudResource, crudType string, data any) *eventQueryRetryResult {
expiration := time.Now().Add(time.Duration(60 * time.Minute))
done := make(chan eventQueryRetryResult)

go func() {
for {
var result *tfoapiclient.Result
var err error
switch crudType {
case "CREATE":
result, err = crud.Create(context.TODO(), data)
case "UPDATE":
result, err = crud.Update(context.TODO(), data)
case "READ":
result, err = crud.Read(context.TODO(), data)
default:
done <- eventQueryRetryResult{false, fmt.Errorf("verb '%s' undefined", crudType)}
return
}

if err == nil {
if result.IsSuccess && result.ErrMsg == "" {
done <- eventQueryRetryResult{false, nil}
return
} else if result.IsSuccess {
done <- eventQueryRetryResult{true, fmt.Errorf(result.ErrMsg)}
return
}
}
if time.Now().After(expiration) {
// Terminate the controller to prevent silent errors
if err != nil {
log.Fatal(err)
}
log.Fatal(result.ErrMsg)
}
if err != nil {
log.Println(err)
} else {
// Case where err is not nil but the result was not successful
log.Println(result.ErrMsg)
}

time.Sleep(20 * time.Second)
}
}()

e := <-done
return &e

}

func (i informer) addEvent(obj interface{}) {
tf, err := assertTf(obj)
if err != nil {
log.Printf("ERROR in add event: %s", err)
return
}
log.Printf("Add event observed '%s'", tf.Name)
log.Printf("Add event observed for tf resource '%s/%s'", tf.Namespace, tf.Name)

log.Printf("Gathering all resources to sync for tf resource '%s/%s'", tf.Namespace, tf.Name)
err = i.SyncDependencies(tf)
if err != nil {
log.Println(err.Error())
return
}

postResult, err := i.clientset.Cluster(i.clusterName).Event().Create(context.TODO(), tf)
if err != nil {
log.Println(err.Error())
result := eventQueryRetrier(i.clientset.Cluster(i.clusterName).Event(), "CREATE", tf)
if result == nil {
log.Println("An unknown error has occurred")
return
}

if postResult.IsSuccess {
if !result.next {
if result.err != nil {
log.Println(result.err.Error())
return
}
log.Printf("Successful POST request sent to api for tf resource '%s/%s'", tf.Namespace, tf.Name)
return
}

if !strings.Contains(postResult.ErrMsg, "TFOResource already exists") {
log.Println(postResult.ErrMsg)
return
}
// This used to continue if next was not defined

putResult, err := i.clientset.Cluster(i.clusterName).Event().Update(context.TODO(), tf)
if err != nil {
Expand All @@ -154,7 +216,9 @@ func (i informer) addEvent(obj interface{}) {
return
}

log.Printf("Successful PUT request sent to api for tf resource '%s/%s'", tf.Namespace, tf.Name)
i.queue.PushBack(*tf)
log.Printf("tf resource '%s/%s' in queue to determine completion", tf.Namespace, tf.Name)
}

func (i informer) updateEvent(old, new interface{}) {
Expand All @@ -174,6 +238,7 @@ func (i informer) updateEvent(old, new interface{}) {
log.Println("Observed update event: ", tfnew.Name)
}

log.Printf("Gathering all resources to sync for tf resource '%s/%s'", tfnew.Namespace, tfnew.Name)
err = i.SyncDependencies(tfnew)
if err != nil {
log.Println(err.Error())
Expand Down Expand Up @@ -250,6 +315,7 @@ func (i informer) gatherDependenciesToSync(tf *tfv1beta1.Terraform) (*corev1.Lis
for _, c := range tf.Spec.Credentials {
// credentials.secretNameRef
if c.SecretNameRef.Name != "" {
log.Printf("...found secret/%s", c.SecretNameRef.Name)
secretNames = append(secretNames, c.SecretNameRef.Name)
}
}
Expand All @@ -260,11 +326,13 @@ func (i informer) gatherDependenciesToSync(tf *tfv1beta1.Terraform) (*corev1.Lis
if c.Git != nil {
if c.Git.SSH != nil {
if c.Git.SSH.SSHKeySecretRef != nil {
log.Printf("...found secret/%s", c.Git.SSH.SSHKeySecretRef.Name)
secretNames = append(secretNames, c.Git.SSH.SSHKeySecretRef.Name)
}
}
if c.Git.HTTPS != nil {
if c.Git.HTTPS.TokenSecretRef != nil {
log.Printf("...found secret/%s", c.Git.HTTPS.TokenSecretRef.Name)
secretNames = append(secretNames, c.Git.HTTPS.TokenSecretRef.Name)
}
}
Expand All @@ -273,11 +341,13 @@ func (i informer) gatherDependenciesToSync(tf *tfv1beta1.Terraform) (*corev1.Lis

if tf.Spec.SSHTunnel != nil {
// sshtunnel.sshkeysecretref
log.Printf("...found secret/%s", tf.Spec.SSHTunnel.SSHKeySecretRef.Name)
secretNames = append(secretNames, tf.Spec.SSHTunnel.SSHKeySecretRef.Name)
}

for tf.Spec.TerraformModule.ConfigMapSelector != nil {
if tf.Spec.TerraformModule.ConfigMapSelector != nil {
// terraformmodule.configmapselector
log.Printf("...found configmap/%s", tf.Spec.TerraformModule.ConfigMapSelector.Name)
configMapNames = append(configMapNames, tf.Spec.TerraformModule.ConfigMapSelector.Name)
}

Expand All @@ -287,9 +357,11 @@ func (i informer) gatherDependenciesToSync(tf *tfv1beta1.Terraform) (*corev1.Lis
// taskoptions[].env[].valuefrom.secretkeyref
if e.ValueFrom != nil {
if e.ValueFrom.SecretKeyRef != nil {
log.Printf("...found secret/%s", e.ValueFrom.SecretKeyRef.Name)
secretNames = append(secretNames, e.ValueFrom.SecretKeyRef.Name)
}
if e.ValueFrom.ConfigMapKeyRef != nil {
log.Printf("...found configmap/%s", e.ValueFrom.ConfigMapKeyRef.Name)
configMapNames = append(configMapNames, e.ValueFrom.ConfigMapKeyRef.Name)
}
}
Expand All @@ -298,14 +370,17 @@ func (i informer) gatherDependenciesToSync(tf *tfv1beta1.Terraform) (*corev1.Lis
// taskoptions[].envfrom[].configmapref
// taskoptions[].envfrom[].secretref
if e.SecretRef != nil {
log.Printf("...found secret/%s", e.SecretRef.Name)
secretNames = append(secretNames, e.SecretRef.Name)
}
if e.ConfigMapRef != nil {
log.Printf("...found configmap/%s", e.ConfigMapRef.Name)
configMapNames = append(configMapNames, e.ConfigMapRef.Name)
}
}
if c.Script.ConfigMapSelector != nil {
// taskoptions[].script.configmapselector
log.Printf("...found configmap/%s", c.Script.ConfigMapSelector.Name)
configMapNames = append(configMapNames, c.Script.ConfigMapSelector.Name)
}
}
Expand Down Expand Up @@ -377,19 +452,21 @@ func (i informer) SyncDependencies(tf *tfv1beta1.Terraform) error {
// hub cluster (ie the vcluster).
dependencies, err := i.gatherDependenciesToSync(tf)
if err != nil {
return fmt.Errorf("An error occurred gathering dependencies from '%s/%s': %s\n", tf.Namespace, tf.Name, err.Error())
return fmt.Errorf("an error occurred gathering dependencies from '%s/%s': %s", tf.Namespace, tf.Name, err.Error())
}
data := map[string][]byte{
"raw": raw(dependencies),
"namespace": []byte(tf.Namespace),
}
syncResult, err := i.clientset.Cluster(i.clusterName).SyncDependencies().Update(context.TODO(), data)
if err != nil {
return err
result := eventQueryRetrier(i.clientset.Cluster(i.clusterName).SyncDependencies(), "UPDATE", data)
if result == nil {

return fmt.Errorf("an unknown error has occurred")
}
if !syncResult.IsSuccess {
return fmt.Errorf(syncResult.ErrMsg)
if result.err != nil {
return result.err
}

return nil
}

Expand Down
42 changes: 21 additions & 21 deletions pkg/tfoapiclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
)

type CRUDInterface interface {
Create(context.Context, any) (*result, error)
Read(context.Context, any) (*result, error)
Update(context.Context, any) (*result, error)
Delete(context.Context, any) (*result, error)
Create(context.Context, any) (*Result, error)
Read(context.Context, any) (*Result, error)
Update(context.Context, any) (*Result, error)
Delete(context.Context, any) (*Result, error)
}

type crudResource struct {
type CrudResource struct {
Clientset
url string
}
Expand All @@ -38,7 +38,7 @@ type ResourceClient struct {
resourceUID string
}

type result struct {
type Result struct {
Data api.Response
IsSuccess bool
ErrMsg string
Expand Down Expand Up @@ -83,7 +83,7 @@ func NewClientset(host, username, password string, insecureSkipVerify bool) (*Cl
return &tfoapiClientset, nil
}

func (c Clientset) AccessToken() crudResource {
func (c Clientset) AccessToken() CrudResource {
return newCRUDResource(c, fmt.Sprintf("%s/login", c.config.Host))
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func (c *Clientset) Resource(resourceUID string) *ResourceClient {
}
}

func (c *Clientset) do(method, url string, bodyData any) (*result, error) {
func (c *Clientset) do(method, url string, bodyData any) (*Result, error) {
jsonData, err := json.Marshal(bodyData)
if err != nil {
return nil, fmt.Errorf("ERROR marshaling added tf: %s", err)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (c *Clientset) do(method, url string, bodyData any) (*result, error) {
}

if response.StatusCode == http.StatusNoContent {
return &result{IsSuccess: true}, nil
return &Result{IsSuccess: true}, nil
}

var hasData bool = true
Expand Down Expand Up @@ -172,7 +172,7 @@ func (c *Clientset) do(method, url string, bodyData any) (*result, error) {

}

return &result{Data: structuredResponse, IsSuccess: status200 && hasData, ErrMsg: fmt.Sprint(errMsg)}, nil
return &Result{Data: structuredResponse, IsSuccess: status200 && hasData, ErrMsg: fmt.Sprint(errMsg)}, nil
}

func (c *Clientset) authenticate() error {
Expand Down Expand Up @@ -212,42 +212,42 @@ func (c RegistrationClient) Register(setupData ClientSetup) error {
return nil
}

func (c ClusterClient) Event() crudResource {
func (c ClusterClient) Event() CrudResource {
return newCRUDResource(c.Clientset, fmt.Sprintf("%s/api/v1/cluster/%s/event", c.config.Host, c.clientName))
}

func (c ClusterClient) Poll(namespace, name string) crudResource {
func (c ClusterClient) Poll(namespace, name string) CrudResource {
return newCRUDResource(c.Clientset, fmt.Sprintf("%s/api/v1/cluster/%s/resource/%s/%s/poll", c.config.Host, c.clientName, namespace, name))
}

func (c ClusterClient) SyncDependencies() crudResource {
func (c ClusterClient) SyncDependencies() CrudResource {
return newCRUDResource(c.Clientset, fmt.Sprintf("%s/api/v1/cluster/%s/sync-dependencies", c.config.Host, c.clientName))
}

func (c ClusterClient) Health() crudResource {
func (c ClusterClient) Health() CrudResource {
return newCRUDResource(c.Clientset, fmt.Sprintf("%s/api/v1/cluster/%s/health", c.config.Host, c.clientName))
}

func (c ClusterClient) TFOHealth() crudResource {
func (c ClusterClient) TFOHealth() CrudResource {
return newCRUDResource(c.Clientset, fmt.Sprintf("%s/api/v1/cluster/%s/tfohealth", c.config.Host, c.clientName))
}

func newCRUDResource(c Clientset, url string) crudResource {
return crudResource{Clientset: c, url: url}
func newCRUDResource(c Clientset, url string) CrudResource {
return CrudResource{Clientset: c, url: url}
}

func (c crudResource) Create(ctx context.Context, data any) (*result, error) {
func (c CrudResource) Create(ctx context.Context, data any) (*Result, error) {
return c.Clientset.do("POST", c.url, data)
}

func (c crudResource) Read(ctx context.Context, data any) (*result, error) {
func (c CrudResource) Read(ctx context.Context, data any) (*Result, error) {
return c.Clientset.do("GET", c.url, data)
}

func (c crudResource) Update(ctx context.Context, data any) (*result, error) {
func (c CrudResource) Update(ctx context.Context, data any) (*Result, error) {
return c.Clientset.do("PUT", c.url, data)
}

func (c crudResource) Delete(ctx context.Context, data any) (*result, error) {
func (c CrudResource) Delete(ctx context.Context, data any) (*Result, error) {
return c.Clientset.do("DELETE", c.url, data)
}