Skip to content

Commit

Permalink
refactor: remove unnecessary PriorityMutex implementation (argoproj…
Browse files Browse the repository at this point in the history
…#13337)

Signed-off-by: Alan Clucas <[email protected]>
  • Loading branch information
Joibel authored Jul 23, 2024
1 parent 1ed1368 commit c551304
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 89 deletions.
75 changes: 3 additions & 72 deletions workflow/sync/mutex.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,6 @@
package sync

import (
"sync"
"time"
)

type PriorityMutex struct {
name string
mutex *PrioritySemaphore
lock *sync.Mutex
}

func (m *PriorityMutex) getCurrentPending() []string {
return m.mutex.getCurrentPending()
}

var _ Semaphore = &PriorityMutex{}

// NewMutex creates new mutex lock object
// name of the mutex
// callbackFunc is a release notification function.
func NewMutex(name string, nextWorkflow NextWorkflow) *PriorityMutex {
return &PriorityMutex{
name: name,
lock: &sync.Mutex{},
mutex: NewSemaphore(name, 1, nextWorkflow, "mutex"),
}
}

func (m *PriorityMutex) getName() string {
return m.name
}

func (m *PriorityMutex) getLimit() int {
return m.mutex.limit
}

func (m *PriorityMutex) getCurrentHolders() []string {
return m.mutex.getCurrentHolders()
}

func (m *PriorityMutex) resize(n int) bool {
return false
}

func (m *PriorityMutex) release(key string) bool {
m.lock.Lock()
defer m.lock.Unlock()
return m.mutex.release(key)
}

func (m *PriorityMutex) acquire(holderKey string) bool {
m.lock.Lock()
defer m.lock.Unlock()
return m.mutex.acquire(holderKey)
}

func (m *PriorityMutex) addToQueue(holderKey string, priority int32, creationTime time.Time) {
m.lock.Lock()
defer m.lock.Unlock()
m.mutex.addToQueue(holderKey, priority, creationTime)
}

func (m *PriorityMutex) removeFromQueue(holderKey string) {
m.lock.Lock()
defer m.lock.Unlock()
m.mutex.removeFromQueue(holderKey)
}

func (m *PriorityMutex) tryAcquire(holderKey string) (bool, string) {
m.lock.Lock()
defer m.lock.Unlock()
return m.mutex.tryAcquire(holderKey)
// NewMutex creates a size 1 semaphore
func NewMutex(name string, nextWorkflow NextWorkflow) *PrioritySemaphore {
return NewSemaphore(name, 1, nextWorkflow, "mutex")
}
27 changes: 10 additions & 17 deletions workflow/sync/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,14 @@ metadata:
selfLink: /apis/argoproj.io/v1alpha1/namespaces/default/workflows/hello-world-prtl9
uid: 790f5c47-211f-4a3b-8949-514ae916633b
spec:
entrypoint: whalesay
synchronization:
semaphore:
configMapKeyRef:
key: workflow
name: my-config
templates:
-
container:
- container:
args:
- hello world
command:
Expand Down Expand Up @@ -118,17 +116,14 @@ metadata:
name: semaphore-tmpl-level-xjvln
namespace: default
spec:
entrypoint: semaphore-tmpl-level-example
templates:
-
inputs: {}
- inputs: {}
metadata: {}
name: semaphore-tmpl-level-example
outputs: {}
steps:
- -
name: generate
- - name: generate
template: gen-number-list
- - arguments:
parameters:
Expand All @@ -137,8 +132,7 @@ spec:
name: sleep
template: sleep-n-sec
withParam: '{{steps.generate.outputs.result}}'
-
inputs: {}
- inputs: {}
metadata: {}
name: gen-number-list
outputs: {}
Expand All @@ -152,8 +146,7 @@ spec:
import json
import sys
json.dump([i for i in range(1, 3)], sys.stdout)
-
container:
- container:
args:
- echo sleeping for {{inputs.parameters.seconds}} seconds; sleep 10; echo done
command:
Expand Down Expand Up @@ -654,13 +647,13 @@ func TestMutexWfLevel(t *testing.T) {
assert.False(t, status)
assert.True(t, wfUpdate)

mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PriorityMutex)
mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PrioritySemaphore)
assert.NotNil(t, mutex)
assert.Len(t, mutex.mutex.pending.items, 2)
assert.Len(t, mutex.pending.items, 2)
concurrenyMgr.ReleaseAll(wf1)
assert.Len(t, mutex.mutex.pending.items, 1)
assert.Len(t, mutex.pending.items, 1)
concurrenyMgr.ReleaseAll(wf2)
assert.Len(t, mutex.mutex.pending.items, 0)
assert.Len(t, mutex.pending.items, 0)
})
}

Expand Down Expand Up @@ -692,7 +685,7 @@ func TestCheckWorkflowExistence(t *testing.T) {
_, _, _, _ = concurrenyMgr.TryAcquire(wfMutex1, "", wfMutex.Spec.Synchronization)
_, _, _, _ = concurrenyMgr.TryAcquire(wfSema, "", wfSema.Spec.Synchronization)
_, _, _, _ = concurrenyMgr.TryAcquire(wfSema1, "", wfSema.Spec.Synchronization)
mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PriorityMutex)
mutex := concurrenyMgr.syncLockMap["default/Mutex/my-mutex"].(*PrioritySemaphore)
semaphore := concurrenyMgr.syncLockMap["default/ConfigMap/my-config/workflow"]

assert.Len(mutex.getCurrentHolders(), 1)
Expand Down

0 comments on commit c551304

Please sign in to comment.