From f95f6b48c6d771ceae2fe62baad754e039e28a83 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Fri, 22 Sep 2023 15:39:44 -0400 Subject: [PATCH] Improve test coverage. --- datamodel/translate_test.go | 69 +++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/datamodel/translate_test.go b/datamodel/translate_test.go index ef7704b..406f8c2 100644 --- a/datamodel/translate_test.go +++ b/datamodel/translate_test.go @@ -473,6 +473,75 @@ func TestTranslatePipeline(t *testing.T) { require.NoError(t, inputErr) assert.Zero(t, resultCounter) }) + + // Target error handler that catches when internal context cancels + // while waiting on input. + t.Run("Error while waiting on input", func(t *testing.T) { + in := make(chan int) + out := make(chan string) + var wg sync.WaitGroup + wg.Add(1) // input goroutine + + // Send input. + go func() { + in <- 1 + wg.Done() + }() + + // No need to capture output channel. + + err := datamodel.TranslatePipeline[int, string](in, out, + func(value int) (string, error) { + // Returning an error causes TranslatePipline to cancel its internal context. + return "", errors.New("Foobar") + }, + ) + wg.Wait() + require.Error(t, err) + }) + + // Target error handler that catches when internal context cancels + // while sending a pipelineJobStatus to worker pool channel. + // This happens when one item returns an error, triggering a + // context cancel. Then the second item is aborted by this error + // handler. + t.Run("Error while waiting on worker", func(t *testing.T) { + const concurrency = 2 + in := make(chan int) + out := make(chan string) + done := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) // input goroutine + + // Send input. + go func() { + // Fill up worker pool with items. + for i := 0; i < concurrency; i++ { + select { + case in <- i: + case <-done: + } + } + wg.Done() + }() + + // No need to capture output channel. + + var itemCount atomic.Int64 + err := datamodel.TranslatePipeline[int, string](in, out, + func(value int) (string, error) { + counter := itemCount.Add(1) + // Cause error on first call. + if counter == 1 { + return "", errors.New("Foobar") + } + return "", nil + }, + ) + close(done) + wg.Wait() + require.Error(t, err) + }) }) } }