Skip to content

Commit

Permalink
feat: add custom actions for Numaflow CRDs (argoproj#20332)
Browse files Browse the repository at this point in the history
* feat: add pause/unpause actions for Numaflow CRDs

Signed-off-by: Dillen Padhiar <[email protected]>

* fix: codegen

Signed-off-by: Dillen Padhiar <[email protected]>

---------

Signed-off-by: Dillen Padhiar <[email protected]>
  • Loading branch information
dpadhiar authored Oct 11, 2024
1 parent b8f85c9 commit b05cafd
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 0 deletions.
4 changes: 4 additions & 0 deletions docs/operator-manual/resource_actions_builtin.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
actionTests:
- action: pause
inputPath: testdata/monovertex.yaml
expectedOutputPath: testdata/monovertex-paused.yaml
- action: unpause
inputPath: testdata/monovertex-paused.yaml
expectedOutputPath: testdata/monovertex.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
local actions = {}
actions["pause"] = {["disabled"] = true}
actions["unpause"] = {["disabled"] = true}

local paused = false
if obj.spec.lifecycle ~= nil and obj.spec.lifecycle.desiredPhase ~= nil and obj.spec.lifecycle.desiredPhase == "Paused" then
paused = true
end
if paused then
actions["unpause"]["disabled"] = false
else
actions["pause"]["disabled"] = false
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
if obj.spec.lifecycle == nil then
obj.spec.lifecycle = {}
end
obj.spec.lifecycle.desiredPhase = "Paused"
return obj
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
creationTimestamp: "2024-10-09T21:18:37Z"
generation: 1
name: simple-mono-vertex
namespace: numaflow-system
resourceVersion: "1382"
uid: b7b9e4f8-cd4b-4771-9e4b-2880cc50467a
spec:
lifecycle:
desiredPhase: Paused
replicas: 1
sink:
udsink:
container:
image: quay.io/numaio/numaflow-java/simple-sink:stable
source:
transformer:
container:
image: quay.io/numaio/numaflow-rs/source-transformer-now:stable
udsource:
container:
image: quay.io/numaio/numaflow-java/source-simple-source:stable
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
status:
conditions:
- lastTransitionTime: "2024-10-09T21:18:41Z"
message: Successful
reason: Successful
status: "True"
type: DaemonHealthy
- lastTransitionTime: "2024-10-09T21:18:37Z"
message: Successful
reason: Successful
status: "True"
type: Deployed
- lastTransitionTime: "2024-10-09T21:18:37Z"
message: All pods are healthy
reason: Running
status: "True"
type: PodsHealthy
currentHash: 8ed34d9058faa60997ee13083ccb3d80691df37b45a34eaa347af99f237e8df6
desiredReplicas: 1
lastScaledAt: "2024-10-09T21:18:37Z"
lastUpdated: "2024-10-09T21:18:41Z"
observedGeneration: 1
phase: Running
replicas: 1
selector: app.kubernetes.io/component=mono-vertex,numaflow.numaproj.io/mono-vertex-name=simple-mono-vertex
updateHash: 8ed34d9058faa60997ee13083ccb3d80691df37b45a34eaa347af99f237e8df6
updatedReplicas: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: MonoVertex
metadata:
creationTimestamp: "2024-10-09T21:18:37Z"
generation: 1
name: simple-mono-vertex
namespace: numaflow-system
resourceVersion: "1382"
uid: b7b9e4f8-cd4b-4771-9e4b-2880cc50467a
spec:
lifecycle:
desiredPhase: Running
replicas: 1
sink:
udsink:
container:
image: quay.io/numaio/numaflow-java/simple-sink:stable
source:
transformer:
container:
image: quay.io/numaio/numaflow-rs/source-transformer-now:stable
udsource:
container:
image: quay.io/numaio/numaflow-java/source-simple-source:stable
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
status:
conditions:
- lastTransitionTime: "2024-10-09T21:18:41Z"
message: Successful
reason: Successful
status: "True"
type: DaemonHealthy
- lastTransitionTime: "2024-10-09T21:18:37Z"
message: Successful
reason: Successful
status: "True"
type: Deployed
- lastTransitionTime: "2024-10-09T21:18:37Z"
message: All pods are healthy
reason: Running
status: "True"
type: PodsHealthy
currentHash: 8ed34d9058faa60997ee13083ccb3d80691df37b45a34eaa347af99f237e8df6
desiredReplicas: 1
lastScaledAt: "2024-10-09T21:18:37Z"
lastUpdated: "2024-10-09T21:18:41Z"
observedGeneration: 1
phase: Running
replicas: 1
selector: app.kubernetes.io/component=mono-vertex,numaflow.numaproj.io/mono-vertex-name=simple-mono-vertex
updateHash: 8ed34d9058faa60997ee13083ccb3d80691df37b45a34eaa347af99f237e8df6
updatedReplicas: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
obj.spec.lifecycle.desiredPhase = "Running"
return obj
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
actionTests:
- action: pause
inputPath: testdata/pipeline.yaml
expectedOutputPath: testdata/pipeline-paused.yaml
- action: unpause
inputPath: testdata/pipeline-paused.yaml
expectedOutputPath: testdata/pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
local actions = {}
actions["pause"] = {["disabled"] = true}
actions["unpause"] = {["disabled"] = true}

local paused = false
if obj.spec.lifecycle ~= nil and obj.spec.lifecycle.desiredPhase ~= nil and obj.spec.lifecycle.desiredPhase == "Paused" then
paused = true
end
if paused then
actions["unpause"]["disabled"] = false
else
actions["pause"]["disabled"] = false
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
if obj.spec.lifecycle == nil then
obj.spec.lifecycle = {}
end
obj.spec.lifecycle.desiredPhase = "Paused"
return obj
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
creationTimestamp: "2024-10-08T18:22:18Z"
finalizers:
- pipeline-controller
generation: 1
name: simple-pipeline
namespace: numaflow-system
resourceVersion: "382381"
uid: bb6cc91c-eb05-4fe7-9380-63b9532a85db
spec:
edges:
- from: in
to: cat
- from: cat
to: out
lifecycle:
deleteGracePeriodSeconds: 30
desiredPhase: Paused
pauseGracePeriodSeconds: 30
limits:
bufferMaxLength: 30000
bufferUsageLimit: 80
readBatchSize: 500
readTimeout: 1s
vertices:
- name: in
scale:
min: 1
source:
generator:
duration: 1s
jitter: 0s
msgSize: 8
rpu: 5
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
- name: cat
scale:
min: 1
udf:
builtin:
name: cat
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
- name: out
scale:
min: 1
sink:
log: {}
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
watermark:
disabled: false
maxDelay: 0s
status:
conditions:
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: Successful
reason: Successful
status: "True"
type: Configured
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: Successful
reason: Successful
status: "True"
type: DaemonServiceHealthy
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: Successful
reason: Successful
status: "True"
type: Deployed
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: No Side Inputs attached to the pipeline
reason: NoSideInputs
status: "True"
type: SideInputsManagersHealthy
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: All vertices are healthy
reason: Successful
status: "True"
type: VerticesHealthy
lastUpdated: "2024-10-09T20:26:54Z"
mapUDFCount: 1
observedGeneration: 1
phase: Running
reduceUDFCount: 0
sinkCount: 1
sourceCount: 1
udfCount: 1
vertexCount: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
creationTimestamp: "2024-10-08T18:22:18Z"
finalizers:
- pipeline-controller
generation: 1
name: simple-pipeline
namespace: numaflow-system
resourceVersion: "382381"
uid: bb6cc91c-eb05-4fe7-9380-63b9532a85db
spec:
edges:
- from: in
to: cat
- from: cat
to: out
lifecycle:
deleteGracePeriodSeconds: 30
desiredPhase: Running
pauseGracePeriodSeconds: 30
limits:
bufferMaxLength: 30000
bufferUsageLimit: 80
readBatchSize: 500
readTimeout: 1s
vertices:
- name: in
scale:
min: 1
source:
generator:
duration: 1s
jitter: 0s
msgSize: 8
rpu: 5
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
- name: cat
scale:
min: 1
udf:
builtin:
name: cat
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
- name: out
scale:
min: 1
sink:
log: {}
updateStrategy:
rollingUpdate:
maxUnavailable: 25%
type: RollingUpdate
watermark:
disabled: false
maxDelay: 0s
status:
conditions:
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: Successful
reason: Successful
status: "True"
type: Configured
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: Successful
reason: Successful
status: "True"
type: DaemonServiceHealthy
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: Successful
reason: Successful
status: "True"
type: Deployed
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: No Side Inputs attached to the pipeline
reason: NoSideInputs
status: "True"
type: SideInputsManagersHealthy
- lastTransitionTime: "2024-10-09T20:26:54Z"
message: All vertices are healthy
reason: Successful
status: "True"
type: VerticesHealthy
lastUpdated: "2024-10-09T20:26:54Z"
mapUDFCount: 1
observedGeneration: 1
phase: Running
reduceUDFCount: 0
sinkCount: 1
sourceCount: 1
udfCount: 1
vertexCount: 3
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
obj.spec.lifecycle.desiredPhase = "Running"
return obj

0 comments on commit b05cafd

Please sign in to comment.