From d076ec68022457b228766758ae7bfeefbaa8140d Mon Sep 17 00:00:00 2001 From: Andy Chrzaszcz Date: Fri, 10 Aug 2018 23:21:47 -0400 Subject: [PATCH] Allow multiple calls to thunk/thunkMany Allow the returned Thunk/ThunkMany functions to be called multiple times without blocking or executing the batch function per invocation --- strategies/once/once.go | 33 ++++++++++++--- strategies/once/once_test.go | 14 +++++++ strategies/sozu/sozu.go | 27 ++++++++++--- strategies/sozu/sozu_test.go | 60 ++++++++++++++++++++++++++++ strategies/standard/standard.go | 34 ++++++++++++---- strategies/standard/standard_test.go | 42 +++++++++++++++++++ 6 files changed, 191 insertions(+), 19 deletions(-) diff --git a/strategies/once/once.go b/strategies/once/once.go index 225c042..f106279 100644 --- a/strategies/once/once.go +++ b/strategies/once/once.go @@ -63,6 +63,8 @@ func WithInBackground() Option { // Load returns a Thunk which either calls the batch function when invoked or waits for a result from a // background go routine (blocking if no data is available). func (s *onceStrategy) Load(ctx context.Context, key dataloader.Key) dataloader.Thunk { + var result dataloader.Result + if s.options.inBackground { resultChan := make(chan dataloader.Result) @@ -72,20 +74,31 @@ func (s *onceStrategy) Load(ctx context.Context, key dataloader.Key) dataloader. // call batch in background and block util it returns return func() dataloader.Result { - return <-resultChan + if result.Result != nil || result.Err != nil { + return result + } + + result = <-resultChan + return result } } // call batch when thunk is called return func() dataloader.Result { - return (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key) - } + if result.Result != nil || result.Err != nil { + return result + } + result = (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key) + return result + } } // LoadMany returns a ThunkMany which either calls the batch function when invoked or waits for a result from // a background go routine (blocking if no data is available). func (s *onceStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) dataloader.ThunkMany { + var result dataloader.ResultMap + if s.options.inBackground { resultChan := make(chan dataloader.ResultMap) @@ -95,13 +108,23 @@ func (s *onceStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d // call batch in background and block util it returnsS return func() dataloader.ResultMap { - return <-resultChan + if result != nil { + return result + } + + result = <-resultChan + return result } } // call batch when thunk is called return func() dataloader.ResultMap { - return *s.batchFunc(ctx, dataloader.NewKeysWith(keyArr...)) + if result != nil { + return result + } + + result = *s.batchFunc(ctx, dataloader.NewKeysWith(keyArr...)) + return result } } diff --git a/strategies/once/once_test.go b/strategies/once/once_test.go index c335a6c..6dabdd8 100644 --- a/strategies/once/once_test.go +++ b/strategies/once/once_test.go @@ -81,6 +81,10 @@ func TestBatchLoadInForegroundCalled(t *testing.T) { r := thunk() assert.Equal(t, 1, callCount, "Batch function expected to be called on thunk()") assert.Equal(t, expectedResult, r.Result.(string), "Expected result from batch function") + + r = thunk() + assert.Equal(t, 1, callCount, "Batch function expected to be called on thunk()") + assert.Equal(t, expectedResult, r.Result.(string), "Expected result from batch function") } // TestBatchInForegroundCalled asserts that the once strategy will call the batch function only @@ -106,6 +110,10 @@ func TestBatchLoadManyInForegroundCalled(t *testing.T) { r := thunkMany() assert.Equal(t, 1, callCount, "Batch function expected to be called on thunkMany()") assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function") + + r = thunkMany() + assert.Equal(t, 1, callCount, "Batch function expected to be called on thunkMany()") + assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function") } // ========================= background calls ========================= @@ -148,6 +156,9 @@ func TestBatchLoadInBackgroundCalled(t *testing.T) { r := thunk() assert.Equal(t, expectedResult, r.Result.(string), "Expected value from batch function") + r = thunk() + assert.Equal(t, expectedResult, r.Result.(string), "Expected value from batch function") + assert.Equal(t, 1, callCount, "Batch function expected to be called on Load() in background") } // TestBatchLoadManyInBackgroundCalled asserts that the once strategy will call the batch function @@ -183,4 +194,7 @@ func TestBatchLoadManyInBackgroundCalled(t *testing.T) { r := thunkMany() assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function") + r = thunkMany() + assert.Equal(t, expectedResult, r.GetValue(key).Result.(string), "Expected result from batch function") + assert.Equal(t, 1, callCount, "Batch function expected to be called on LoadMany()") } diff --git a/strategies/sozu/sozu.go b/strategies/sozu/sozu.go index f80ddca..db4b1e7 100644 --- a/strategies/sozu/sozu.go +++ b/strategies/sozu/sozu.go @@ -114,6 +114,7 @@ func (s *sozuStrategy) Load(ctx context.Context, key dataloader.Key) dataloader. message := workerMessage{k: []dataloader.Key{key}, resultChan: resultChan} s.keyChan <- message // pass key to the worker go routine + var result dataloader.Result /* TODO: clean up If a worker go routine is in the process of calling the batch function and another @@ -130,19 +131,25 @@ func (s *sozuStrategy) Load(ctx context.Context, key dataloader.Key) dataloader. and process it. */ return func() dataloader.Result { + if result.Result != nil || result.Err != nil { + return result + } + for { /* dual select statements allow prioritization of cases in situations where both channels have data */ select { - case result := <-resultChan: - return result.GetValue(key) + case r := <-resultChan: + result = r.GetValue(key) + return result default: } select { - case result := <-resultChan: - return result.GetValue(key) + case r := <-resultChan: + result = r.GetValue(key) + return result case <-s.closeChan: /* Current worker closed, therefore no readers reading off of the key chan to get @@ -167,8 +174,14 @@ func (s *sozuStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d message := workerMessage{k: keyArr, resultChan: resultChan} s.keyChan <- message + var resultMap dataloader.ResultMap + // See comments in Load method above return func() dataloader.ResultMap { + if resultMap != nil { + return resultMap + } + for { /* dual select statements allow prioritization of cases in situations where both channels have data @@ -181,7 +194,8 @@ func (s *sozuStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d result.Set(k.String(), r.GetValue(k)) } - return result + resultMap = result + return resultMap default: } @@ -193,7 +207,8 @@ func (s *sozuStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Key) d result.Set(k.String(), r.GetValue(k)) } - return result + resultMap = result + return resultMap case <-s.closeChan: s.startWorker(ctx) } diff --git a/strategies/sozu/sozu_test.go b/strategies/sozu/sozu_test.go index d61e55e..e868409 100644 --- a/strategies/sozu/sozu_test.go +++ b/strategies/sozu/sozu_test.go @@ -140,6 +140,15 @@ func TestLoadTimeoutTriggered(t *testing.T) { "Expected batch function to return result", ) + // test double call to thunk + r = thunk() + assert.Equal( + t, + fmt.Sprintf("2_%s", expectedResult), + r.Result.(string), + "Expected batch function to return result", + ) + assert.Equal(t, 1, callCount, "Batch function expected to be called once") } // TestLoadManyTimeoutTriggered ensures that the timeout function triggers the batch function @@ -220,6 +229,16 @@ func TestLoadManyTimeoutTriggered(t *testing.T) { r1.(dataloader.ResultMap).Length()+r2.(dataloader.ResultMap).Length(), "Expected 3 total results from both thunkMany function", ) + + // test double call to thunk + r2 = thunkMany() + assert.Equal( + t, + 3, + r1.(dataloader.ResultMap).Length()+r2.(dataloader.ResultMap).Length(), + "Expected 3 total results from both thunkMany function", + ) + assert.Equal(t, 1, callCount, "Batch function expected to be called once") } // ========================= test non-timeout ========================= @@ -291,6 +310,16 @@ func TestLoadTriggered(t *testing.T) { ) assert.False(t, timedOut, "Expected function to not timeout") + + // test double call to thunk + r1 = thunk() + assert.Equal( + t, + fmt.Sprintf("1_%s", expectedResult), + r1.Result.(string), + "Expected batch function to return on thunk()", + ) + assert.Equal(t, 1, callCount, "Batch function expected to be called once") } // TestLoadManyTriggered asserts that load calls do not timeout and call the batch function after @@ -361,6 +390,16 @@ func TestLoadManyTriggered(t *testing.T) { ) assert.False(t, timedOut, "Expected function to not timeout") + + // test double call to thunk + r1 = thunk() // don't block on second call + assert.Equal( + t, + fmt.Sprintf("1_%s", expectedResult), + r1.(dataloader.ResultMap).GetValue(key).Result, + "Expected batch function to return on thunk()", + ) + assert.Equal(t, 1, callCount, "Batch function expected to be called once ") } // TestLoadBlocked calls thunk without using a wait group and expects to be blocked before getting data back. @@ -413,6 +452,14 @@ func TestLoadBlocked(t *testing.T) { assert.False(t, timedOut, "Batch function should not have timed out") assert.Equal(t, 1, len(k), "Should have been called with one key") assert.Equal(t, fmt.Sprintf("1_%s", expectedResult), r.Result.(string), "Expected result from thunk()") + + // test double call to thunk + r = thunk() // don't block on second call + + assert.Equal(t, 1, callCount, "Batch function should have been called once") + assert.False(t, timedOut, "Batch function should not have timed out") + assert.Equal(t, 1, len(k), "Should have been called with one key") + assert.Equal(t, fmt.Sprintf("1_%s", expectedResult), r.Result.(string), "Expected result from thunk()") } // TestLoadManyBlocked calls thunkMany without using a wait group and expects to be blocked before @@ -472,4 +519,17 @@ func TestLoadManyBlocked(t *testing.T) { r.(dataloader.ResultMap).GetValue(key2).Result.(string), "Expected result from thunkMany()", ) + + // test double call to thunk + r = thunkMany() // don't block on second call + + assert.Equal(t, 1, callCount, "Batch function should have been called once") + assert.False(t, timedOut, "Batch function should not have timed out") + assert.Equal(t, 2, len(k), "Should have been called with two keys") + assert.Equal( + t, + fmt.Sprintf("2_%s", expectedResult), + r.(dataloader.ResultMap).GetValue(key2).Result.(string), + "Expected result from thunkMany()", + ) } diff --git a/strategies/standard/standard.go b/strategies/standard/standard.go index 73e3d8b..2c7e017 100644 --- a/strategies/standard/standard.go +++ b/strategies/standard/standard.go @@ -99,21 +99,30 @@ func (s *standardStrategy) Load(ctx context.Context, key dataloader.Key) dataloa message := workerMessage{k: []dataloader.Key{key}, resultChan: resultChan} s.keyChan <- message // pass key to the worker go routine (buffered channel) + var result dataloader.Result + return func() dataloader.Result { + if result.Result != nil || result.Err != nil { + return result + } + /* dual select statements allow prioritization of cases in situations where both channels have data */ select { - case result := <-resultChan: - return result.GetValue(key) + case r := <-resultChan: + result = r.GetValue(key) + return result default: } select { - case result := <-resultChan: - return result.GetValue(key) + case r := <-resultChan: + result = r.GetValue(key) + return result case <-s.closeChan: - return (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key) + result = (*s.batchFunc(ctx, dataloader.NewKeysWith(key))).GetValue(key) + return result } } } @@ -128,22 +137,31 @@ func (s *standardStrategy) LoadMany(ctx context.Context, keyArr ...dataloader.Ke message := workerMessage{k: keyArr, resultChan: resultChan} s.keyChan <- message + var resultMap dataloader.ResultMap + return func() dataloader.ResultMap { + if resultMap != nil { + return resultMap + } + /* dual select statements allow prioritization of cases in situations where both channels have data */ select { case r := <-resultChan: - return buildResultMap(keyArr, r) + resultMap = buildResultMap(keyArr, r) + return resultMap default: } select { case r := <-resultChan: - return buildResultMap(keyArr, r) + resultMap = buildResultMap(keyArr, r) + return resultMap case <-s.closeChan: // batch the keys if closed r := *s.batchFunc(ctx, dataloader.NewKeysWith(keyArr...)) - return buildResultMap(keyArr, r) + resultMap = buildResultMap(keyArr, r) + return resultMap } } } diff --git a/strategies/standard/standard_test.go b/strategies/standard/standard_test.go index 67650de..0630e8a 100644 --- a/strategies/standard/standard_test.go +++ b/strategies/standard/standard_test.go @@ -137,6 +137,16 @@ func TestLoadNoTimeout(t *testing.T) { ) assert.False(t, timedOut, "Expected loader not to timeout") + + // test double call to thunk + r = thunk() + assert.Equal( + t, + fmt.Sprintf("2_%s", expectedResult), + r.Result.(string), + "Expected result from thunk()", + ) + assert.Equal(t, 1, callCount, "Batch function expected to be called once") } // TestLoadManyNoTimeout tests calling the load function without timing out @@ -207,6 +217,16 @@ func TestLoadManyNoTimeout(t *testing.T) { ) assert.False(t, timedOut, "Expected loader not to timeout") + + // test double call to thunk + r = thunk() + assert.Equal( + t, + fmt.Sprintf("2_%s", expectedResult), + r.(dataloader.ResultMap).GetValue(key2).Result.(string), + "Expected result from thunk()", + ) + assert.Equal(t, 1, callCount, "Batch function expected to be called once") } // ================================================= timeout ================================================= @@ -293,6 +313,17 @@ func TestLoadTimeout(t *testing.T) { r.Result.(string), "Expected result from thunk", ) + + // test double call to thunk + r = thunk() + + // called once in go routine after timeout, once in thunk + assert.Equal(t, 2, callCount, "Batch function expected to be called twice") + assert.Equal(t, + fmt.Sprintf("1_%s", expectedResult), + r.Result.(string), + "Expected result from thunk", + ) } // TestLoadManyTimeout tests that the first load call is performed after a timeout and the second @@ -379,4 +410,15 @@ func TestLoadManyTimeout(t *testing.T) { "Expected result from thunkMany", ) assert.Equal(t, 2, len(k), "Expected to be called with 2 keys") // second function call + + // test double call to thunk + r = thunkMany() + + // called once in go routine after timeout, once in thunkMany + assert.Equal(t, 2, callCount, "Batch function expected to be called twice") + assert.Equal(t, + fmt.Sprintf("3_%s", expectedResult), + r.(dataloader.ResultMap).GetValue(key3).Result.(string), + "Expected result from thunkMany", + ) }