Implement TranslateMapParallel() as generalized concurrent map iterator.

Integrate `TranslateMapParallel()` into datamodel for `Paths` to replace specialized async logic.
This commit is contained in:
Shawn Poulson
2023-07-27 10:22:17 -04:00
parent 5918b8a187
commit 0c3137aaf9
3 changed files with 223 additions and 22 deletions

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sort"
"strconv"
"sync"
"sync/atomic"
@@ -157,6 +158,135 @@ func TestTranslateSliceParallel(t *testing.T) {
}
}
func TestTranslateMapParallel(t *testing.T) {
const mapSize = 1000
t.Run("Happy path", func(t *testing.T) {
var expectedResults []string
m := make(map[string]int)
for i := 0; i < mapSize; i++ {
m[fmt.Sprintf("key%d", i)] = i + 1000
expectedResults = append(expectedResults, fmt.Sprintf("foobar %d", i+1000))
}
var translateCounter int64
translateFunc := func(_ string, value int) (string, error) {
result := fmt.Sprintf("foobar %d", value)
atomic.AddInt64(&translateCounter, 1)
return result, nil
}
var results []string
resultFunc := func(value string) error {
results = append(results, value)
return nil
}
err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc)
require.NoError(t, err)
assert.Equal(t, int64(mapSize), translateCounter)
assert.Equal(t, mapSize, len(results))
sort.Strings(results)
assert.Equal(t, expectedResults, results)
})
t.Run("Error in translate", func(t *testing.T) {
m := make(map[string]int)
for i := 0; i < mapSize; i++ {
m[fmt.Sprintf("key%d", i)] = i + 1000
}
var translateCounter int64
translateFunc := func(_ string, _ int) (string, error) {
atomic.AddInt64(&translateCounter, 1)
return "", errors.New("Foobar")
}
resultFunc := func(_ string) error {
t.Fatal("Expected no call to resultFunc()")
return nil
}
err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc)
require.ErrorContains(t, err, "Foobar")
assert.Less(t, translateCounter, int64(mapSize))
})
t.Run("Error in result", func(t *testing.T) {
m := make(map[string]int)
for i := 0; i < mapSize; i++ {
m[fmt.Sprintf("key%d", i)] = i + 1000
}
translateFunc := func(_ string, value int) (string, error) {
return "", nil
}
var resultCounter int
resultFunc := func(_ string) error {
resultCounter++
return errors.New("Foobar")
}
err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc)
require.ErrorContains(t, err, "Foobar")
assert.Less(t, resultCounter, mapSize)
})
t.Run("EOF in translate", func(t *testing.T) {
m := make(map[string]int)
for i := 0; i < mapSize; i++ {
m[fmt.Sprintf("key%d", i)] = i + 1000
}
var translateCounter int64
translateFunc := func(_ string, _ int) (string, error) {
atomic.AddInt64(&translateCounter, 1)
return "", io.EOF
}
resultFunc := func(_ string) error {
t.Fatal("Expected no call to resultFunc()")
return nil
}
err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc)
require.NoError(t, err)
assert.Less(t, translateCounter, int64(mapSize))
})
t.Run("EOF in result", func(t *testing.T) {
m := make(map[string]int)
for i := 0; i < mapSize; i++ {
m[fmt.Sprintf("key%d", i)] = i + 1000
}
translateFunc := func(_ string, value int) (string, error) {
return "", nil
}
var resultCounter int
resultFunc := func(_ string) error {
resultCounter++
return io.EOF
}
err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc)
require.NoError(t, err)
assert.Less(t, resultCounter, mapSize)
})
t.Run("Continue in translate", func(t *testing.T) {
m := make(map[string]int)
for i := 0; i < mapSize; i++ {
m[fmt.Sprintf("key%d", i)] = i + 1000
}
var translateCounter int64
translateFunc := func(_ string, _ int) (string, error) {
atomic.AddInt64(&translateCounter, 1)
return "", datamodel.Continue
}
resultFunc := func(_ string) error {
t.Fatal("Expected no call to resultFunc()")
return nil
}
err := datamodel.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc)
require.NoError(t, err)
assert.Equal(t, int64(mapSize), translateCounter)
})
}
func TestTranslatePipeline(t *testing.T) {
testCases := []struct {
ItemCount int