Improve test coverage.

This commit is contained in:
Shawn Poulson
2023-09-22 15:39:44 -04:00
parent 8bfa8c6a2e
commit f95f6b48c6

View File

@@ -473,6 +473,75 @@ func TestTranslatePipeline(t *testing.T) {
require.NoError(t, inputErr) require.NoError(t, inputErr)
assert.Zero(t, resultCounter) assert.Zero(t, resultCounter)
}) })
// Target error handler that catches when internal context cancels
// while waiting on input.
t.Run("Error while waiting on input", func(t *testing.T) {
in := make(chan int)
out := make(chan string)
var wg sync.WaitGroup
wg.Add(1) // input goroutine
// Send input.
go func() {
in <- 1
wg.Done()
}()
// No need to capture output channel.
err := datamodel.TranslatePipeline[int, string](in, out,
func(value int) (string, error) {
// Returning an error causes TranslatePipline to cancel its internal context.
return "", errors.New("Foobar")
},
)
wg.Wait()
require.Error(t, err)
})
// Target error handler that catches when internal context cancels
// while sending a pipelineJobStatus to worker pool channel.
// This happens when one item returns an error, triggering a
// context cancel. Then the second item is aborted by this error
// handler.
t.Run("Error while waiting on worker", func(t *testing.T) {
const concurrency = 2
in := make(chan int)
out := make(chan string)
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1) // input goroutine
// Send input.
go func() {
// Fill up worker pool with items.
for i := 0; i < concurrency; i++ {
select {
case in <- i:
case <-done:
}
}
wg.Done()
}()
// No need to capture output channel.
var itemCount atomic.Int64
err := datamodel.TranslatePipeline[int, string](in, out,
func(value int) (string, error) {
counter := itemCount.Add(1)
// Cause error on first call.
if counter == 1 {
return "", errors.New("Foobar")
}
return "", nil
},
)
close(done)
wg.Wait()
require.Error(t, err)
})
}) })
} }
} }