Skip to content

Commit

Permalink
add TFO_ORIGIN envs used for logging
Browse files Browse the repository at this point in the history
the TFO_ORIGIN environment variables are used to tie the original resource that is sent from external agents thru the API to get added to the vCluster. This allows the usage the the UUID to match what will exist in the TFO API's database.
  • Loading branch information
isaaguilar committed Sep 3, 2023
1 parent 3fe8c9c commit 2ddc564
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 31 deletions.
13 changes: 9 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gammazero/deque"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"gorm.io/gorm"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -22,7 +23,7 @@ import (

var (
port string
dbUrl string
dbURL string
ssoLoginURL string
samlIssuer string
samlRecipient string
Expand All @@ -42,7 +43,7 @@ func main() {
pflag.CommandLine.Set("logtostderr", "true")
pflag.StringVar(&port, "port", "", "Port to expose the API on")
viper.BindPFlag("port", pflag.Lookup("port"))
pflag.StringVar(&dbUrl, "db-url", "", "Database url format (Example: 'postgres://user:password@srv:5432/db')")
pflag.StringVar(&dbURL, "db-url", "", "Database url format (Example: 'postgres://user:password@srv:5432/db')")
viper.BindPFlag("db-url", pflag.Lookup("db-url"))
pflag.StringVar(&ssoLoginURL, "sso-login-url", "", "IDP Login URL ")
viper.BindPFlag("sso-login-url", pflag.Lookup("sso-login-url"))
Expand All @@ -64,14 +65,18 @@ func main() {
klog.Warning("Don't show this warning")

port = viper.GetString("port")
dbUrl = viper.GetString("db-url")
dbURL = viper.GetString("db-url")
ssoLoginURL = viper.GetString("sso-login-url")
samlIssuer = viper.GetString("saml-issuer")
samlRecipient = viper.GetString("saml-recipient")
samlMetadataURL = viper.GetString("saml-metadata-url")

clientset := kubernetes.NewForConfigOrDie(NewConfigOrDie(os.Getenv("KUBECONFIG")))
database := db.Init(dbUrl)
var database *gorm.DB
if dbURL != "" {
database = db.Init(dbURL)
}

queue := deque.Deque[tfv1beta1.Terraform]{}

qworker.BackgroundWorker(&queue)
Expand Down
26 changes: 20 additions & 6 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"crypto/x509"
"fmt"
"net/http"

tfv1beta1 "github.com/galleybytes/terraform-operator/pkg/apis/tf/v1beta1"
"github.com/gammazero/deque"
Expand Down Expand Up @@ -70,6 +71,20 @@ func (h APIHandler) RegisterRoutes() {
auth.GET("/sso", h.ssoRedirecter)
auth.POST("/sso/saml", h.samlConnecter)

empty := h.Server.Group("/")
empty.GET("/placeholder", func(c *gin.Context) {
c.JSON(200, response(200, "", []string{"Please come again!"}))
})
empty.POST("/placeholder", func(c *gin.Context) {
var data any
err := c.BindJSON(&data)
if err != nil {
c.AbortWithError(http.StatusNotAcceptable, err)
return
}
c.JSON(200, response(200, "", []any{data}))
})

routes := h.Server.Group("/api/v1/")
routes.Use(validateJwt)
routes.GET("/", h.Index)
Expand Down Expand Up @@ -100,18 +115,17 @@ func (h APIHandler) RegisterRoutes() {
routes.GET("/resource/:tfo_resource_uuid/generations", h.GetDistinctGeneration)
// ReourceSpec
routes.GET("/resource/:tfo_resource_uuid/resource-spec/generation/:generation", h.GetResourceSpec)
// // Poll for resource objects in the cluster
// routes.GET("/resource/:tfo_resource_uuid/poll", h.ResourcePoll)
// Logs
routes.POST("/logs", h.AddTFOTaskLogs) // requires access to fs of logs

routes.GET("/resource/:tfo_resource_uuid/logs", h.GetClustersResourcesLogs)
routes.GET("/resource/:tfo_resource_uuid/logs/generation/:generation", h.GetClustersResourcesLogs)
routes.GET("/resource/:tfo_resource_uuid/logs/generation/:generation/task/:task_type", h.GetClustersResourcesLogs)
routes.GET("/resource/:tfo_resource_uuid/logs/generation/:generation/task/:task_type/rerun/:rerun", h.GetClustersResourcesLogs)
routes.GET("/task/:task_pod_uuid/logs", h.GetTFOTaskLogsViaTask)

// Tasks
routes.POST("/task", h.AddTaskPod) // requires access to fs of logs
routes.GET("/task/:task_pod_uuid", h.GetTaskPod)
routes.POST("/task", h.AddTaskPod)
routes.GET("/task/:task_pod_uuid", h.GetTaskPod) // TODO Should getting a task out of band (ie not with cluster info) be allowed?

// Approval
routes.GET("/resource/:tfo_resource_uuid/approval-status", h.GetApprovalStatus)
routes.GET("/task/:task_pod_uuid/approval-status", h.GetApprovalStatusViaTaskPodUUID)
Expand Down
13 changes: 6 additions & 7 deletions pkg/api/get_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (h APIHandler) ResourceLogs(generationFilter, rerunFilter, taskTypeFilter,
taskPodUUIDs = append(taskPodUUIDs, t.UUID)
}
var tfoTaskLogs []models.TFOTaskLog
if result := h.DB.Where("tfo_resource_uuid = ? AND task_pod_uuid IN ?", &uuid, taskPodUUIDs).Find(&tfoTaskLogs); result.Error != nil {
if result := h.DB.Where("task_pod_uuid IN ?", taskPodUUIDs).Find(&tfoTaskLogs); result.Error != nil {
return logs, result.Error
}

Expand All @@ -284,13 +284,12 @@ func (h APIHandler) ResourceLogs(generationFilter, rerunFilter, taskTypeFilter,
for _, taskPod := range taskPodsOfHighestRerun {
for _, log := range tfoTaskLogs {
if log.TaskPodUUID == taskPod.UUID {
// TODO does the size need to be sent?
logs = append(logs, ResourceLog{
ID: log.ID,
LogMessage: log.Message,
LineNo: log.LineNo,
Rerun: taskPod.Rerun,
TaskType: taskPod.TaskType,
TFOResourceUUID: log.TFOResourceUUID,
ID: log.ID,
LogMessage: log.Message,
Rerun: taskPod.Rerun,
TaskType: taskPod.TaskType,
})
}
}
Expand Down
69 changes: 64 additions & 5 deletions pkg/api/log.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package api

import (
"fmt"
"log"
"net/http"
"strconv"

"github.com/galleybytes/terraform-operator-api/pkg/common/models"
"github.com/gin-gonic/gin"
Expand All @@ -11,27 +13,84 @@ import (
func (h APIHandler) AddTaskPod(c *gin.Context) {

jsonData := struct {
TaskPod models.TaskPod `json:"task_pod"`
TFOResourceUUID string `json:"tfo_resource_uuid"`
Generation string `json:"tfo_resource_generation"`
RerunID string `json:"rerun_id"`
TaskName string `json:"task_name"`
UUID string `json:"uuid"`
Content string `json:"content"`
}{}
err := c.BindJSON(&jsonData)
if err != nil {
log.Println(err)
c.JSON(http.StatusUnprocessableEntity, response(http.StatusUnprocessableEntity, err.Error(), nil))
return
}
if jsonData.TaskPod.UUID == "" {
if jsonData.UUID == "" {
c.JSON(http.StatusUnprocessableEntity, response(http.StatusUnprocessableEntity, "missing request data", nil))
return
}

taskPod := jsonData.TaskPod
result := h.DB.Where("uuid = ?", &taskPod.UUID).FirstOrCreate(&taskPod)
rerunID, err := strconv.Atoi(jsonData.RerunID)
if err != nil {
c.JSON(http.StatusUnprocessableEntity, response(http.StatusUnprocessableEntity, fmt.Sprintf("rerun is must be an int, got %s", jsonData.RerunID), nil))
return
}

taskPod := models.TaskPod{
UUID: jsonData.UUID,
TaskType: jsonData.TaskName,
Generation: jsonData.Generation,
Rerun: rerunID,
TFOResourceUUID: jsonData.TFOResourceUUID,
}
result := h.DB.Where("uuid = ?", &jsonData.UUID).FirstOrCreate(&taskPod)
if result.Error != nil {
c.JSON(http.StatusUnprocessableEntity, response(http.StatusUnprocessableEntity, result.Error.Error(), nil))
return
}

c.JSON(http.StatusOK, response(http.StatusOK, "", []models.TaskPod{taskPod}))
if jsonData.Content == "" {
c.JSON(http.StatusOK, response(http.StatusOK, "", []models.TaskPod{taskPod}))
}

// Combine the creation or finding of the taskPod with logging when content is sent
err = h.saveTaskLog(taskPod.UUID, jsonData.Content)
if err != nil {
c.JSON(http.StatusUnprocessableEntity, response(http.StatusUnprocessableEntity, err.Error(), nil))
return
}

c.JSON(http.StatusNoContent, nil)

}

func (h APIHandler) saveTaskLog(taskUUID, content string) error {

taskLog := models.TFOTaskLog{
TaskPodUUID: taskUUID,
Message: content,
Size: uint64(len([]byte(content))),
}

if result := h.DB.Where("task_pod_uuid = ?", &taskLog.TaskPodUUID).FirstOrCreate(&taskLog); result.Error != nil {
return fmt.Errorf("failed to save task log: %+v, %+v", taskLog, result.Error)
}

if taskLog.Size != uint64(len([]byte(content))) {
if taskLog.Size > uint64(len([]byte(content))) {
return fmt.Errorf("The message size was smaller than already recorded")
}
// The content has been updated. Read the bytes after what has already been written to preserve the
// original content. We don't want to allow logs in the database to be changed once they are written.
taskLog.Message += string([]byte(content)[taskLog.Size:])
taskLog.Size = uint64(len([]byte(taskLog.Message)))
if result := h.DB.Save(&taskLog); result.Error != nil {
return result.Error
}
}

return nil
}

func (h APIHandler) AddTFOTaskLogs(c *gin.Context) {
Expand Down
25 changes: 22 additions & 3 deletions pkg/api/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,8 @@ func (h APIHandler) addResource(c *gin.Context) error {
return fmt.Errorf("error saving tfo_resource_spec: %s", result.Error)
}

h.appendClusterNameLabel(&jsonData.Terraform, cluster.Name)
appendClusterNameLabel(&jsonData.Terraform, cluster.Name)
addOriginEnvs(&jsonData.Terraform)
// TODO Allow using a different queue
h.Queue.PushBack(jsonData.Terraform)

Expand Down Expand Up @@ -716,7 +717,8 @@ func (h APIHandler) updateResource(c *gin.Context) error {
return fmt.Errorf("error occurred when looking for tfo_resource_spec: %v", result.Error)
}

h.appendClusterNameLabel(&jsonData.Terraform, cluster.Name)
appendClusterNameLabel(&jsonData.Terraform, cluster.Name)
addOriginEnvs(&jsonData.Terraform)
// TODO Allow using a different queue
h.Queue.PushBack(jsonData.Terraform)

Expand All @@ -725,7 +727,7 @@ func (h APIHandler) updateResource(c *gin.Context) error {

// appendClusterNameLabel will hack the cluster name to the resource's labels.
// This make it easier to identify the origin of the resource in a remote cluster.
func (h APIHandler) appendClusterNameLabel(tf *tfv1beta1.Terraform, clusterName string) {
func appendClusterNameLabel(tf *tfv1beta1.Terraform, clusterName string) {
if clusterName == "" {
return
}
Expand All @@ -735,6 +737,23 @@ func (h APIHandler) appendClusterNameLabel(tf *tfv1beta1.Terraform, clusterName
tf.Labels["tfo-api.galleybytes.com/cluster-name"] = clusterName
}

// addOriginEnvs will inject TFO_ORIGIN envs to the incoming resource
func addOriginEnvs(tf *tfv1beta1.Terraform) {
tf.Spec.TaskOptions = append(tf.Spec.TaskOptions, tfv1beta1.TaskOption{
For: []tfv1beta1.TaskName{"*"},
Env: []corev1.EnvVar{
{
Name: "TFO_ORIGIN_UUID",
Value: string(tf.UID),
},
{
Name: "TFO_ORIGIN_GENERATION",
Value: fmt.Sprintf("%d", tf.Generation),
},
},
})
}

func compare(s1, op, s2 string) bool {
i1, err := strconv.Atoi(s1)
if err != nil {
Expand Down
10 changes: 4 additions & 6 deletions pkg/common/models/tfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (

type TFOTaskLog struct {
gorm.Model
TaskPod TaskPod `json:"task_pod,omitempty"`
TaskPodUUID string `json:"task_pod_uuid"`
TFOResource TFOResource `json:"tfo_resource,omitempty"`
TFOResourceUUID string `json:"tfo_resource_uuid"`
Message string `json:"message"`
LineNo string `json:"line_no"`
TaskPod TaskPod `json:"task_pod,omitempty"`
TaskPodUUID string `json:"task_pod_uuid"`
Message string `json:"message" gorm:"type:varchar(1048576)"`
Size uint64 `json:"size"`
}

type TFOResource struct {
Expand Down

0 comments on commit 2ddc564

Please sign in to comment.