Skip to content

Commit

Permalink
Allow multiple calls to thunk/thunkMany
Browse files Browse the repository at this point in the history
Allow the returned Thunk/ThunkMany functions to be called multiple times
without blocking or executing the batch function per invocation
  • Loading branch information
andy9775 committed Aug 11, 2018
1 parent 1e0d7f4 commit d076ec6
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 19 deletions.
33 changes: 28 additions & 5 deletions strategies/once/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func WithInBackground() Option {
// Load returns a Thunk which either calls the batch function when invoked or waits for a result from a
// background go routine (blocking if no data is available).
func (s *onceStrategy) Load(ctx context.Context, key dataloader.Key) dataloader.Thunk {
var result dataloader.Result

if s.options.inBackground {
resultChan := make(chan dataloader.Result)

Expand All @@ -72,20 +74,31 @@ func (s *onceStrategy) Load(ctx context.Context, key dataloader.Key) dataloader.

// call batch in background and block util it returns
return func() dataloader.Result {
return <-resultChan
if result.Result != nil || result.Err != nil {
return result
}

result = <-resultChan
return result
}
}

// call batch when thunk is called
return func() dataloader.Result {
return (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key)
}
if result.Result != nil || result.Err != nil {
return result
}

result = (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key)
return result
}
}

// LoadMany returns a ThunkMany which either calls the batch function when invoked or waits for a result from
// a background go routine (blocking if no data is available).
func (s *onceStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) dataloader.ThunkMany {
var result dataloader.ResultMap

if s.options.inBackground {
resultChan := make(chan dataloader.ResultMap)

Expand All @@ -95,13 +108,23 @@ func (s *onceStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d

// call batch in background and block util it returnsS
return func() dataloader.ResultMap {
return <-resultChan
if result != nil {
return result
}

result = <-resultChan
return result
}
}

// call batch when thunk is called
return func() dataloader.ResultMap {
return *s.batchFunc(ctx, dataloader.NewKeysWith(keyArr...))
if result != nil {
return result
}

result = *s.batchFunc(ctx, dataloader.NewKeysWith(keyArr...))
return result
}

}
Expand Down
14 changes: 14 additions & 0 deletions strategies/once/once_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func TestBatchLoadInForegroundCalled(t *testing.T) {
r := thunk()
assert.Equal(t, 1, callCount, "Batch function expected to be called on thunk()")
assert.Equal(t, expectedResult, r.Result.(string), "Expected result from batch function")

r = thunk()
assert.Equal(t, 1, callCount, "Batch function expected to be called on thunk()")
assert.Equal(t, expectedResult, r.Result.(string), "Expected result from batch function")
}

// TestBatchInForegroundCalled asserts that the once strategy will call the batch function only
Expand All @@ -106,6 +110,10 @@ func TestBatchLoadManyInForegroundCalled(t *testing.T) {
r := thunkMany()
assert.Equal(t, 1, callCount, "Batch function expected to be called on thunkMany()")
assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function")

r = thunkMany()
assert.Equal(t, 1, callCount, "Batch function expected to be called on thunkMany()")
assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function")
}

// ========================= background calls =========================
Expand Down Expand Up @@ -148,6 +156,9 @@ func TestBatchLoadInBackgroundCalled(t *testing.T) {

r := thunk()
assert.Equal(t, expectedResult, r.Result.(string), "Expected value from batch function")
r = thunk()
assert.Equal(t, expectedResult, r.Result.(string), "Expected value from batch function")
assert.Equal(t, 1, callCount, "Batch function expected to be called on Load() in background")
}

// TestBatchLoadManyInBackgroundCalled asserts that the once strategy will call the batch function
Expand Down Expand Up @@ -183,4 +194,7 @@ func TestBatchLoadManyInBackgroundCalled(t *testing.T) {

r := thunkMany()
assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function")
r = thunkMany()
assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function")
assert.Equal(t, 1, callCount, "Batch function expected to be called on LoadMany()")
}
27 changes: 21 additions & 6 deletions strategies/sozu/sozu.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s *sozuStrategy) Load(ctx context.Context, key dataloader.Key) dataloader.
message := workerMessage{k: []dataloader.Key{key}, resultChan: resultChan}
s.keyChan <- message // pass key to the worker go routine

var result dataloader.Result
/*
TODO: clean up
If a worker go routine is in the process of calling the batch function and another
Expand All @@ -130,19 +131,25 @@ func (s *sozuStrategy) Load(ctx context.Context, key dataloader.Key) dataloader.
and process it.
*/
return func() dataloader.Result {
if result.Result != nil || result.Err != nil {
return result
}

for {
/*
dual select statements allow prioritization of cases in situations where both channels have data
*/
select {
case result := <-resultChan:
return result.GetValue(key)
case r := <-resultChan:
result = r.GetValue(key)
return result
default:
}

select {
case result := <-resultChan:
return result.GetValue(key)
case r := <-resultChan:
result = r.GetValue(key)
return result
case <-s.closeChan:
/*
Current worker closed, therefore no readers reading off of the key chan to get
Expand All @@ -167,8 +174,14 @@ func (s *sozuStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d
message := workerMessage{k: keyArr, resultChan: resultChan}
s.keyChan <- message

var resultMap dataloader.ResultMap

// See comments in Load method above
return func() dataloader.ResultMap {
if resultMap != nil {
return resultMap
}

for {
/*
dual select statements allow prioritization of cases in situations where both channels have data
Expand All @@ -181,7 +194,8 @@ func (s *sozuStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d
result.Set(k.String(), r.GetValue(k))
}

return result
resultMap = result
return resultMap
default:
}

Expand All @@ -193,7 +207,8 @@ func (s *sozuStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d
result.Set(k.String(), r.GetValue(k))
}

return result
resultMap = result
return resultMap
case <-s.closeChan:
s.startWorker(ctx)
}
Expand Down
60 changes: 60 additions & 0 deletions strategies/sozu/sozu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ func TestLoadTimeoutTriggered(t *testing.T) {
"Expected batch function to return result",
)

// test double call to thunk
r = thunk()
assert.Equal(
t,
fmt.Sprintf("2_%s", expectedResult),
r.Result.(string),
"Expected batch function to return result",
)
assert.Equal(t, 1, callCount, "Batch function expected to be called once")
}

// TestLoadManyTimeoutTriggered ensures that the timeout function triggers the batch function
Expand Down Expand Up @@ -220,6 +229,16 @@ func TestLoadManyTimeoutTriggered(t *testing.T) {
r1.(dataloader.ResultMap).Length()+r2.(dataloader.ResultMap).Length(),
"Expected 3 total results from both thunkMany function",
)

// test double call to thunk
r2 = thunkMany()
assert.Equal(
t,
3,
r1.(dataloader.ResultMap).Length()+r2.(dataloader.ResultMap).Length(),
"Expected 3 total results from both thunkMany function",
)
assert.Equal(t, 1, callCount, "Batch function expected to be called once")
}

// ========================= test non-timeout =========================
Expand Down Expand Up @@ -291,6 +310,16 @@ func TestLoadTriggered(t *testing.T) {
)

assert.False(t, timedOut, "Expected function to not timeout")

// test double call to thunk
r1 = thunk()
assert.Equal(
t,
fmt.Sprintf("1_%s", expectedResult),
r1.Result.(string),
"Expected batch function to return on thunk()",
)
assert.Equal(t, 1, callCount, "Batch function expected to be called once")
}

// TestLoadManyTriggered asserts that load calls do not timeout and call the batch function after
Expand Down Expand Up @@ -361,6 +390,16 @@ func TestLoadManyTriggered(t *testing.T) {
)

assert.False(t, timedOut, "Expected function to not timeout")

// test double call to thunk
r1 = thunk() // don't block on second call
assert.Equal(
t,
fmt.Sprintf("1_%s", expectedResult),
r1.(dataloader.ResultMap).GetValue(key).Result,
"Expected batch function to return on thunk()",
)
assert.Equal(t, 1, callCount, "Batch function expected to be called once ")
}

// TestLoadBlocked calls thunk without using a wait group and expects to be blocked before getting data back.
Expand Down Expand Up @@ -413,6 +452,14 @@ func TestLoadBlocked(t *testing.T) {
assert.False(t, timedOut, "Batch function should not have timed out")
assert.Equal(t, 1, len(k), "Should have been called with one key")
assert.Equal(t, fmt.Sprintf("1_%s", expectedResult), r.Result.(string), "Expected result from thunk()")

// test double call to thunk
r = thunk() // don't block on second call

assert.Equal(t, 1, callCount, "Batch function should have been called once")
assert.False(t, timedOut, "Batch function should not have timed out")
assert.Equal(t, 1, len(k), "Should have been called with one key")
assert.Equal(t, fmt.Sprintf("1_%s", expectedResult), r.Result.(string), "Expected result from thunk()")
}

// TestLoadManyBlocked calls thunkMany without using a wait group and expects to be blocked before
Expand Down Expand Up @@ -472,4 +519,17 @@ func TestLoadManyBlocked(t *testing.T) {
r.(dataloader.ResultMap).GetValue(key2).Result.(string),
"Expected result from thunkMany()",
)

// test double call to thunk
r = thunkMany() // don't block on second call

assert.Equal(t, 1, callCount, "Batch function should have been called once")
assert.False(t, timedOut, "Batch function should not have timed out")
assert.Equal(t, 2, len(k), "Should have been called with two keys")
assert.Equal(
t,
fmt.Sprintf("2_%s", expectedResult),
r.(dataloader.ResultMap).GetValue(key2).Result.(string),
"Expected result from thunkMany()",
)
}
34 changes: 26 additions & 8 deletions strategies/standard/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,30 @@ func (s *standardStrategy) Load(ctx context.Context, key dataloader.Key) dataloa
message := workerMessage{k: []dataloader.Key{key}, resultChan: resultChan}
s.keyChan <- message // pass key to the worker go routine (buffered channel)

var result dataloader.Result

return func() dataloader.Result {
if result.Result != nil || result.Err != nil {
return result
}

/*
dual select statements allow prioritization of cases in situations where both channels have data
*/
select {
case result := <-resultChan:
return result.GetValue(key)
case r := <-resultChan:
result = r.GetValue(key)
return result
default:
}

select {
case result := <-resultChan:
return result.GetValue(key)
case r := <-resultChan:
result = r.GetValue(key)
return result
case <-s.closeChan:
return (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key)
result = (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key)
return result
}
}
}
Expand All @@ -128,22 +137,31 @@ func (s *standardStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Ke
message := workerMessage{k: keyArr, resultChan: resultChan}
s.keyChan <- message

var resultMap dataloader.ResultMap

return func() dataloader.ResultMap {
if resultMap != nil {
return resultMap
}

/*
dual select statements allow prioritization of cases in situations where both channels have data
*/
select {
case r := <-resultChan:
return buildResultMap(keyArr, r)
resultMap = buildResultMap(keyArr, r)
return resultMap
default:
}

select {
case r := <-resultChan:
return buildResultMap(keyArr, r)
resultMap = buildResultMap(keyArr, r)
return resultMap
case <-s.closeChan: // batch the keys if closed
r := *s.batchFunc(ctx, dataloader.NewKeysWith(keyArr...))
return buildResultMap(keyArr, r)
resultMap = buildResultMap(keyArr, r)
return resultMap
}
}
}
Expand Down
Loading

0 comments on commit d076ec6

Please sign in to comment.