From 5e1ad14f1feb874063db2ff84c46f1c87a0b8712 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 1 Oct 2024 11:41:40 -0400 Subject: [PATCH] scaling policy: use request namespace as target if unset in jobspec (#24065) When jobs are submitted with a scaling policy, the scaling policy's target only includes the job's namespace if the `namespace` field is set in the jobspec and not from the request. Normally jobs are canonicalized in the RPC handler before being written to Raft. But the scaling policy targets are instead written during the conversion from `api.Job` to `structs.Job`. We populate the `structs.Job` namespace from the request here as well, but only after the conversion has occurred. Swap the order of these operations so that the conversion is always happening with a correct namespace. Long-term we should not be making mutations during conversion either. But we can't remove it immediately because API requests may come from any agent across upgrades. Move the scaling target creation into the `Canonicalize` method and mark it for future removal in the API conversion code path. Fixes: https://github.com/hashicorp/nomad/issues/24039 --- .changelog/24065.txt | 3 ++ command/agent/job_endpoint.go | 18 +++++++----- command/agent/scaling_endpoint.go | 6 +++- nomad/fsm.go | 2 +- nomad/job_endpoint_test.go | 3 +- nomad/mock/job.go | 2 +- nomad/scaling_endpoint_test.go | 6 ++-- nomad/structs/structs.go | 40 +++++++++++++------------- nomad/structs/structs_test.go | 47 +++++++++++++++++++++++++++---- 9 files changed, 88 insertions(+), 39 deletions(-) create mode 100644 .changelog/24065.txt diff --git a/.changelog/24065.txt b/.changelog/24065.txt new file mode 100644 index 000000000000..82a374f841c9 --- /dev/null +++ b/.changelog/24065.txt @@ -0,0 +1,3 @@ +```release-note:bug +scaling: Fixed a bug where scaling policies would not get created during job submission unless namespace field was set in jobspec +``` diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 19b30e81def0..4eba764fbeb7 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/acl" api "github.com/hashicorp/nomad/api" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/jobspec2" "github.com/hashicorp/nomad/nomad/structs" ) @@ -1008,15 +1009,17 @@ func (s *HTTPServer) apiJobAndRequestToStructs(job *api.Job, req *http.Request, job, queryRegion, writeReq.Region, s.agent.GetConfig().Region, ) - sJob := ApiJobToStructJob(job) - sJob.Region = jobRegion - writeReq.Region = requestRegion - + // mutate the namespace before we convert just in case anything is expecting + // the namespace to be correct queryNamespace := req.URL.Query().Get("namespace") namespace := namespaceForJob(job.Namespace, queryNamespace, writeReq.Namespace) - sJob.Namespace = namespace + job.Namespace = pointer.Of(namespace) writeReq.Namespace = namespace + sJob := ApiJobToStructJob(job) + sJob.Region = jobRegion + writeReq.Region = requestRegion + return sJob, writeReq } @@ -1264,7 +1267,8 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta } if taskGroup.Scaling != nil { - tg.Scaling = ApiScalingPolicyToStructs(tg.Count, taskGroup.Scaling).TargetTaskGroup(job, tg) + tg.Scaling = ApiScalingPolicyToStructs( + job, tg, nil, tg.Count, taskGroup.Scaling) } tg.EphemeralDisk = &structs.EphemeralDisk{ @@ -1401,7 +1405,7 @@ func ApiTaskToStructsTask(job *structs.Job, group *structs.TaskGroup, for _, policy := range apiTask.ScalingPolicies { structsTask.ScalingPolicies = append( structsTask.ScalingPolicies, - ApiScalingPolicyToStructs(0, policy).TargetTask(job, group, structsTask)) + ApiScalingPolicyToStructs(job, group, structsTask, 0, policy)) } } diff --git a/command/agent/scaling_endpoint.go b/command/agent/scaling_endpoint.go index 18a19a1c9612..473eb1681fc5 100644 --- a/command/agent/scaling_endpoint.go +++ b/command/agent/scaling_endpoint.go @@ -81,7 +81,7 @@ func (s *HTTPServer) scalingPolicyQuery(resp http.ResponseWriter, req *http.Requ return out.Policy, nil } -func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { +func ApiScalingPolicyToStructs(job *structs.Job, tg *structs.TaskGroup, task *structs.Task, count int, ap *api.ScalingPolicy) *structs.ScalingPolicy { p := structs.ScalingPolicy{ Type: ap.Type, Policy: ap.Policy, @@ -103,5 +103,9 @@ func ApiScalingPolicyToStructs(count int, ap *api.ScalingPolicy) *structs.Scalin } else { p.Min = int64(count) } + + // COMPAT(1.12.0) - canonicalization is done in Job.Register as of 1.9, + // remove this canonicalization in 1.12.0 LTS + p.Canonicalize(job, tg, task) return &p } diff --git a/nomad/fsm.go b/nomad/fsm.go index d862c36177c7..4e8494eac2d7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1776,7 +1776,7 @@ func (n *nomadFSM) restoreImpl(old io.ReadCloser, filter *FSMFilter) error { if filter.Include(scalingPolicy) { // Handle upgrade path: // - Set policy type if empty - scalingPolicy.Canonicalize() + scalingPolicy.Canonicalize(nil, nil, nil) if err := restore.ScalingPolicyRestore(scalingPolicy); err != nil { return err } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 6d37d303082a..f0f8cef1fa2f 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -6628,7 +6628,8 @@ func TestJobEndpoint_Plan_Scaling(t *testing.T) { tg := job.TaskGroups[0] tg.Tasks[0].Resources.MemoryMB = 999999999 scaling := &structs.ScalingPolicy{Min: 1, Max: 100, Type: structs.ScalingPolicyTypeHorizontal} - tg.Scaling = scaling.TargetTaskGroup(job, tg) + scaling.Canonicalize(job, tg, nil) + tg.Scaling = scaling planReq := &structs.JobPlanRequest{ Job: job, Diff: false, diff --git a/nomad/mock/job.go b/nomad/mock/job.go index b3592dbf9b9d..d3a709042861 100644 --- a/nomad/mock/job.go +++ b/nomad/mock/job.go @@ -189,7 +189,7 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { Policy: map[string]interface{}{}, Enabled: true, } - policy.TargetTaskGroup(job, job.TaskGroups[0]) + policy.Canonicalize(job, job.TaskGroups[0], nil) job.TaskGroups[0].Scaling = policy return job, policy } diff --git a/nomad/scaling_endpoint_test.go b/nomad/scaling_endpoint_test.go index 67a7e59061e6..0d39cea2c12b 100644 --- a/nomad/scaling_endpoint_test.go +++ b/nomad/scaling_endpoint_test.go @@ -163,15 +163,15 @@ func TestScalingEndpoint_ListPolicies(t *testing.T) { j1 := mock.Job() j1polV := mock.ScalingPolicy() j1polV.Type = "vertical-cpu" - j1polV.TargetTask(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) + j1polV.Canonicalize(j1, j1.TaskGroups[0], j1.TaskGroups[0].Tasks[0]) j1polH := mock.ScalingPolicy() j1polH.Type = "horizontal" - j1polH.TargetTaskGroup(j1, j1.TaskGroups[0]) + j1polH.Canonicalize(j1, j1.TaskGroups[0], nil) j2 := mock.Job() j2polH := mock.ScalingPolicy() j2polH.Type = "horizontal" - j2polH.TargetTaskGroup(j2, j2.TaskGroups[0]) + j2polH.Canonicalize(j2, j2.TaskGroups[0], nil) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j1) s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1000, nil, j2) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 00282c8ba6bf..79f900b6e19d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6347,10 +6347,25 @@ const ( ScalingPolicyTypeHorizontal = "horizontal" ) -func (p *ScalingPolicy) Canonicalize() { +func (p *ScalingPolicy) Canonicalize(job *Job, tg *TaskGroup, task *Task) { if p.Type == "" { p.Type = ScalingPolicyTypeHorizontal } + + // during restore we canonicalize to update, but these values will already + // have been populated during submit and we don't have references to the + // job, group, and task + if job != nil && tg != nil { + p.Target = map[string]string{ + ScalingTargetNamespace: job.Namespace, + ScalingTargetJob: job.ID, + ScalingTargetGroup: tg.Name, + } + + if task != nil { + p.Target[ScalingTargetTask] = task.Name + } + } } func (p *ScalingPolicy) Copy() *ScalingPolicy { @@ -6439,23 +6454,6 @@ func (p *ScalingPolicy) Diff(p2 *ScalingPolicy) bool { return !reflect.DeepEqual(*p, copy) } -// TargetTaskGroup updates a ScalingPolicy target to specify a given task group -func (p *ScalingPolicy) TargetTaskGroup(job *Job, tg *TaskGroup) *ScalingPolicy { - p.Target = map[string]string{ - ScalingTargetNamespace: job.Namespace, - ScalingTargetJob: job.ID, - ScalingTargetGroup: tg.Name, - } - return p -} - -// TargetTask updates a ScalingPolicy target to specify a given task -func (p *ScalingPolicy) TargetTask(job *Job, tg *TaskGroup, task *Task) *ScalingPolicy { - p.TargetTaskGroup(job, tg) - p.Target[ScalingTargetTask] = task.Name - return p -} - func (p *ScalingPolicy) Stub() *ScalingPolicyListStub { stub := &ScalingPolicyListStub{ ID: p.ID, @@ -7050,7 +7048,7 @@ func (tg *TaskGroup) Canonicalize(job *Job) { } if tg.Scaling != nil { - tg.Scaling.Canonicalize() + tg.Scaling.Canonicalize(job, tg, nil) } for _, service := range tg.Services { @@ -8155,6 +8153,10 @@ func (t *Task) Canonicalize(job *Job, tg *TaskGroup) { t.KillTimeout = DefaultKillTimeout } + for _, policy := range t.ScalingPolicies { + policy.Canonicalize(job, tg, t) + } + if t.Vault != nil { t.Vault.Canonicalize() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 520252b9530d..c4105d822f10 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -6511,10 +6511,17 @@ func TestDispatchPayloadConfig_Validate(t *testing.T) { func TestScalingPolicy_Canonicalize(t *testing.T) { ci.Parallel(t) + job := &Job{Namespace: "prod", ID: "example"} + tg := &TaskGroup{Name: "web"} + task := &Task{Name: "httpd"} + cases := []struct { name string input *ScalingPolicy expected *ScalingPolicy + job *Job + tg *TaskGroup + task *Task }{ { name: "empty policy", @@ -6526,14 +6533,42 @@ func TestScalingPolicy_Canonicalize(t *testing.T) { input: &ScalingPolicy{Type: "other-type"}, expected: &ScalingPolicy{Type: "other-type"}, }, + { + name: "policy with type and task group", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + }, + }, + job: job, + tg: tg, + }, + { + name: "policy with type and task", + input: &ScalingPolicy{Type: "other-type"}, + expected: &ScalingPolicy{ + Type: "other-type", + Target: map[string]string{ + ScalingTargetNamespace: "prod", + ScalingTargetJob: "example", + ScalingTargetGroup: "web", + ScalingTargetTask: "httpd", + }, + }, + job: job, + tg: tg, + task: task, + }, } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - require := require.New(t) - - c.input.Canonicalize() - require.Equal(c.expected, c.input) + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc.input.Canonicalize(tc.job, tc.tg, tc.task) + must.Eq(t, tc.expected, tc.input) }) } }