diff --git a/go.mod b/go.mod index 7b931cd..5f0a20a 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,20 @@ module github.com/pb33f/libopenapi go 1.20 require ( - github.com/stretchr/testify v1.8.0 + github.com/lucasjones/reggen v0.0.0-20200904144131-37ba4fa293bb + github.com/stretchr/testify v1.8.1 github.com/vmware-labs/yaml-jsonpath v0.3.2 + github.com/wk8/go-ordered-map/v2 v2.1.8 golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb golang.org/x/sync v0.1.0 gopkg.in/yaml.v3 v3.0.1 ) require ( + github.com/bahlo/generic-list-go v0.2.0 // indirect + github.com/buger/jsonparser v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dprotaso/go-yit v0.0.0-20220510233725-9ba8df137936 // indirect - github.com/lucasjones/reggen v0.0.0-20200904144131-37ba4fa293bb // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index 674b83a..ec2a67b 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= +github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= +github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= +github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -27,6 +31,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -34,6 +39,8 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/lucasjones/reggen v0.0.0-20200904144131-37ba4fa293bb h1:w1g9wNDIE/pHSTmAaUhv4TZQuPBS6GV3mMz5hkgziIU= github.com/lucasjones/reggen v0.0.0-20200904144131-37ba4fa293bb/go.mod h1:5ELEyG+X8f+meRWHuqUOewBOhvHkl7M76pdGEansxW4= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -55,13 +62,17 @@ github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/vmware-labs/yaml-jsonpath v0.3.2 h1:/5QKeCBGdsInyDCyVNLbXyilb61MXGi9NP674f9Hobk= github.com/vmware-labs/yaml-jsonpath v0.3.2/go.mod h1:U6whw1z03QyqgWdgXxvVnQ90zN1BWz5V+51Ewf8k+rQ= +github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= +github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/orderedmap/orderedmap.go b/orderedmap/orderedmap.go new file mode 100644 index 0000000..83d4ea5 --- /dev/null +++ b/orderedmap/orderedmap.go @@ -0,0 +1,248 @@ +// Ordered map container +// Works like the Golang `map` built-in, but preserves order that key/value +// pairs were added when iterating. + +package orderedmap + +import ( + "context" + "io" + "runtime" + "sync" + + wk8orderedmap "github.com/wk8/go-ordered-map/v2" +) + +type Map[K comparable, V any] interface { + Lengthiness + Get(K) (V, bool) + GetOrZero(K) V + Set(K, V) (V, bool) + Delete(K) (V, bool) + First() Pair[K, V] +} + +type Lengthiness interface { + Len() int +} + +type Pair[K comparable, V any] interface { + Key() K + KeyPtr() *K + Value() V + ValuePtr() *V + Next() Pair[K, V] +} + +type wrapOrderedMap[K comparable, V any] struct { + *wk8orderedmap.OrderedMap[K, V] +} + +type wrapPair[K comparable, V any] struct { + *wk8orderedmap.Pair[K, V] +} + +type ActionFunc[K comparable, V any] func(Pair[K, V]) error +type TranslateFunc[IN any, OUT any] func(IN) (OUT, error) +type ResultFunc[V any] func(V) error + +// New creates an ordered map generic object. +func New[K comparable, V any]() Map[K, V] { + return &wrapOrderedMap[K, V]{ + OrderedMap: wk8orderedmap.New[K, V](), + } +} + +func (o *wrapOrderedMap[K, V]) GetOrZero(k K) V { + v, ok := o.OrderedMap.Get(k) + if !ok { + var zero V + return zero + } + return v +} + +func (o *wrapOrderedMap[K, V]) First() Pair[K, V] { + pair := o.OrderedMap.Oldest() + if pair == nil { + return nil + } + return &wrapPair[K, V]{ + Pair: pair, + } +} + +func (p *wrapPair[K, V]) Next() Pair[K, V] { + next := p.Pair.Next() + if next == nil { + return nil + } + return &wrapPair[K, V]{ + Pair: next, + } +} + +func (p *wrapPair[K, V]) Key() K { + return p.Pair.Key +} + +func (p *wrapPair[K, V]) KeyPtr() *K { + return &p.Pair.Key +} + +func (p *wrapPair[K, V]) Value() V { + return p.Pair.Value +} + +func (p *wrapPair[K, V]) ValuePtr() *V { + return &p.Pair.Value +} + +// Len returns the length of a container implementing a `Len()` method. +// Safely returns zero on nil pointer. +func Len(l Lengthiness) int { + if l == nil { + return 0 + } + return l.Len() +} + +// ToOrderedMap converts map built-in to OrderedMap. +// Iterate the map in order. +// Safely handles nil pointer. +// Be sure to iterate to end or cancel the context when done to release +// resources. +func Iterate[K comparable, V any](ctx context.Context, m Map[K, V]) <-chan Pair[K, V] { + c := make(chan Pair[K, V]) + if Len(m) == 0 { + close(c) + return c + } + go func() { + defer close(c) + for pair := m.First(); pair != nil; pair = pair.Next() { + select { + case c <- pair: + case <-ctx.Done(): + return + } + } + }() + return c +} + +// ToOrderedMap converts a `map` to `OrderedMap`. +func ToOrderedMap[K comparable, V any](m map[K]V) Map[K, V] { + om := New[K, V]() + for k, v := range m { + om.Set(k, v) + } + return om +} + +// For iterates a `Map` and calls action() on each map pair. +// action() may return `io.EOF` to break iteration. +// Safely handles nil pointer. +func For[K comparable, V any](m Map[K, V], action ActionFunc[K, V]) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := Iterate(ctx, m) + for pair := range c { + err := action(pair) + if err == io.EOF { + return nil + } + if err != nil { + return err + } + } + return nil +} + +// TranslateMapParallel iterates a `Map` in parallel and calls translate() +// asynchronously. +// translate() or result() may return `io.EOF` to break iteration. +// Safely handles nil pointer. +// Results are provided sequentially to result() in stable order from `Map`. +func TranslateMapParallel[K comparable, V any, RV any](m Map[K, V], translate TranslateFunc[Pair[K, V], RV], result ResultFunc[RV]) error { + if m == nil { + return nil + } + + type jobStatus struct { + done chan struct{} + result RV + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + concurrency := runtime.NumCPU() + c := Iterate(ctx, m) + jobChan := make(chan *jobStatus, concurrency) + var reterr error + var wg sync.WaitGroup + var mu sync.Mutex + + // Fan out translate jobs. + wg.Add(1) + go func() { + defer func() { + close(jobChan) + wg.Done() + }() + for pair := range c { + j := &jobStatus{ + done: make(chan struct{}), + } + select { + case jobChan <- j: + case <-ctx.Done(): + return + } + + wg.Add(1) + go func(pair Pair[K, V]) { + value, err := translate(pair) + if err != nil { + mu.Lock() + defer func() { + mu.Unlock() + wg.Done() + cancel() + }() + if reterr == nil { + reterr = err + } + return + } + j.result = value + close(j.done) + wg.Done() + }(pair) + } + }() + + // Iterate jobChan as jobs complete. + defer wg.Wait() +JOBLOOP: + for j := range jobChan { + select { + case <-j.done: + err := result(j.result) + if err != nil { + cancel() + if err == io.EOF { + return nil + } + return err + } + case <-ctx.Done(): + break JOBLOOP + } + } + + if reterr == io.EOF { + return nil + } + return reterr +} diff --git a/orderedmap/orderedmap_test.go b/orderedmap/orderedmap_test.go new file mode 100644 index 0000000..7c20a89 --- /dev/null +++ b/orderedmap/orderedmap_test.go @@ -0,0 +1,336 @@ +package orderedmap_test + +import ( + "context" + "errors" + "fmt" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/pb33f/libopenapi/orderedmap" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOrderedMap(t *testing.T) { + t.Run("Empty", func(t *testing.T) { + m := orderedmap.New[string, int]() + assert.Equal(t, m.Len(), 0) + assert.Nil(t, m.First()) + }) + + t.Run("First()", func(t *testing.T) { + const mapSize = 1000 + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("foobar_%d", i), i) + } + assert.Equal(t, m.Len(), mapSize) + + for i := 0; i < mapSize; i++ { + assert.Equal(t, i, m.GetOrZero(fmt.Sprintf("foobar_%d", i))) + } + + var i int + for pair := m.First(); pair != nil; pair = pair.Next() { + assert.Equal(t, fmt.Sprintf("foobar_%d", i), pair.Key()) + assert.Equal(t, fmt.Sprintf("foobar_%d", i), *pair.KeyPtr()) + assert.Equal(t, i, pair.Value()) + assert.Equal(t, i, *pair.ValuePtr()) + i++ + require.LessOrEqual(t, i, mapSize) + } + assert.Equal(t, mapSize, i) + }) + + t.Run("Get()", func(t *testing.T) { + const mapSize = 1000 + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), 1000+i) + } + + for i := 0; i < mapSize; i++ { + actual, ok := m.Get(fmt.Sprintf("key%d", i)) + assert.True(t, ok) + assert.Equal(t, 1000+i, actual) + } + + _, ok := m.Get("bogus") + assert.False(t, ok) + }) + + t.Run("GetOrZero()", func(t *testing.T) { + const mapSize = 1000 + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), 1000+i) + } + + for i := 0; i < mapSize; i++ { + actual := m.GetOrZero(fmt.Sprintf("key%d", i)) + assert.Equal(t, 1000+i, actual) + } + + assert.Equal(t, 0, m.GetOrZero("bogus")) + }) +} + +func TestMap(t *testing.T) { + t.Run("Len()", func(t *testing.T) { + const mapSize = 100 + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + assert.Equal(t, mapSize, m.Len()) + assert.Equal(t, mapSize, orderedmap.Len(m)) + + t.Run("Nil pointer", func(t *testing.T) { + var m orderedmap.Map[string, int] + assert.Zero(t, orderedmap.Len(m)) + }) + }) + + t.Run("Iterate()", func(t *testing.T) { + const mapSize = 10 + + t.Run("Empty", func(t *testing.T) { + m := orderedmap.New[string, int]() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := orderedmap.Iterate(ctx, m) + for range c { + t.Fatal("Expected no data") + } + requireClosed(t, c) + }) + + t.Run("Full iteration", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + var i int + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := orderedmap.Iterate(ctx, m) + for pair := range c { + assert.Equal(t, fmt.Sprintf("key%d", i), pair.Key()) + assert.Equal(t, fmt.Sprintf("key%d", i), *pair.KeyPtr()) + assert.Equal(t, i+1000, pair.Value()) + assert.Equal(t, i+1000, *pair.ValuePtr()) + i++ + require.LessOrEqual(t, i, mapSize) + } + assert.Equal(t, mapSize, i) + requireClosed(t, c) + }) + + t.Run("Partial iteration", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + var i int + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := orderedmap.Iterate(ctx, m) + for pair := range c { + assert.Equal(t, fmt.Sprintf("key%d", i), pair.Key()) + assert.Equal(t, fmt.Sprintf("key%d", i), *pair.KeyPtr()) + assert.Equal(t, i+1000, pair.Value()) + assert.Equal(t, i+1000, *pair.ValuePtr()) + i++ + if i >= mapSize/2 { + break + } + } + + cancel() + time.Sleep(10 * time.Millisecond) + requireClosed(t, c) + assert.Equal(t, mapSize/2, i) + }) + }) + + t.Run("For()", func(t *testing.T) { + const mapSize = 10 + + t.Run("Nil pointer", func(t *testing.T) { + var m orderedmap.Map[string, int] + err := orderedmap.For(m, func(_ orderedmap.Pair[string, int]) error { + return errors.New("Expected no data") + }) + require.NoError(t, err) + }) + + t.Run("Empty", func(t *testing.T) { + m := orderedmap.New[string, int]() + err := orderedmap.For(m, func(_ orderedmap.Pair[string, int]) error { + return errors.New("Expected no data") + }) + require.NoError(t, err) + }) + + t.Run("Full iteration", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + var i int + err := orderedmap.For(m, func(pair orderedmap.Pair[string, int]) error { + assert.Equal(t, fmt.Sprintf("key%d", i), pair.Key()) + assert.Equal(t, fmt.Sprintf("key%d", i), *pair.KeyPtr()) + assert.Equal(t, i+1000, pair.Value()) + assert.Equal(t, i+1000, *pair.ValuePtr()) + i++ + require.LessOrEqual(t, i, mapSize) + return nil + }) + require.NoError(t, err) + assert.Equal(t, mapSize, i) + }) + + t.Run("Partial iteration", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + var i int + err := orderedmap.For(m, func(pair orderedmap.Pair[string, int]) error { + assert.Equal(t, fmt.Sprintf("key%d", i), pair.Key()) + assert.Equal(t, fmt.Sprintf("key%d", i), *pair.KeyPtr()) + assert.Equal(t, i+1000, pair.Value()) + assert.Equal(t, i+1000, *pair.ValuePtr()) + i++ + if i >= mapSize/2 { + return io.EOF + } + return nil + }) + require.NoError(t, err) + assert.Equal(t, mapSize/2, i) + }) + }) + + t.Run("TranslateMapParallel()", func(t *testing.T) { + const mapSize = 1000 + + t.Run("Happy path", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + var translateCounter int64 + translateFunc := func(pair orderedmap.Pair[string, int]) (string, error) { + result := fmt.Sprintf("foobar %d", pair.Value()) + atomic.AddInt64(&translateCounter, 1) + return result, nil + } + var resultCounter int + resultFunc := func(value string) error { + assert.Equal(t, fmt.Sprintf("foobar %d", resultCounter+1000), value) + resultCounter++ + return nil + } + err := orderedmap.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.NoError(t, err) + assert.Equal(t, int64(mapSize), translateCounter) + assert.Equal(t, mapSize, resultCounter) + }) + + t.Run("Error in translate", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + translateFunc := func(pair orderedmap.Pair[string, int]) (string, error) { + return "", errors.New("Foobar") + } + var resultCounter int + resultFunc := func(value string) error { + resultCounter++ + return nil + } + err := orderedmap.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.ErrorContains(t, err, "Foobar") + assert.Zero(t, resultCounter) + }) + + t.Run("Error in result", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + translateFunc := func(pair orderedmap.Pair[string, int]) (string, error) { + return "", nil + } + var resultCounter int + resultFunc := func(value string) error { + resultCounter++ + return errors.New("Foobar") + } + err := orderedmap.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.ErrorContains(t, err, "Foobar") + assert.Equal(t, 1, resultCounter) + }) + + t.Run("EOF in translate", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + translateFunc := func(pair orderedmap.Pair[string, int]) (string, error) { + return "", io.EOF + } + var resultCounter int + resultFunc := func(value string) error { + resultCounter++ + return nil + } + err := orderedmap.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.NoError(t, err) + assert.Zero(t, resultCounter) + }) + + t.Run("EOF in result", func(t *testing.T) { + m := orderedmap.New[string, int]() + for i := 0; i < mapSize; i++ { + m.Set(fmt.Sprintf("key%d", i), i+1000) + } + + translateFunc := func(pair orderedmap.Pair[string, int]) (string, error) { + return "", nil + } + var resultCounter int + resultFunc := func(value string) error { + resultCounter++ + return io.EOF + } + err := orderedmap.TranslateMapParallel[string, int, string](m, translateFunc, resultFunc) + require.NoError(t, err) + assert.Equal(t, 1, resultCounter) + }) + }) +} + +func requireClosed[K comparable, V any](t *testing.T, c <-chan orderedmap.Pair[K, V]) { + select { + case pair := <-c: + require.Nil(t, pair, "Expected channel to be closed") + case <-time.After(100 * time.Millisecond): + t.Fatal("Timeout reading channel; expected channel to be closed") + } +}