diff --git a/common/async.go b/common/async.go index 23d1a42b..e7bc71b1 100644 --- a/common/async.go +++ b/common/async.go @@ -1,5 +1,9 @@ package common +import ( + "sync/atomic" +) + // val: the value returned after task execution. // err: the error returned during task completion. // abort: tells Parallel to return, whether or not all tasks have completed. @@ -14,12 +18,15 @@ type TaskResult struct { type TaskResultCh <-chan TaskResult // Run tasks in parallel, with ability to abort early. +// Returns ok=false iff any of the tasks returned abort=true. // NOTE: Do not implement quit features here. Instead, provide convenient // concurrent quit-like primitives, passed implicitly via Task closures. (e.g. // it's not Parallel's concern how you quit/abort your tasks). -func Parallel(tasks ...Task) []TaskResultCh { +func Parallel(tasks ...Task) (chz []TaskResultCh, ok bool) { var taskResultChz = make([]TaskResultCh, len(tasks)) // To return. var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received. + var numPanics = new(int32) // Keep track of panics to set ok=false later. + ok = true // We will set it to false iff any tasks panic'd or returned abort. // Start all tasks in parallel in separate goroutines. // When the task is complete, it will appear in the @@ -31,6 +38,7 @@ func Parallel(tasks ...Task) []TaskResultCh { // Recovery defer func() { if pnk := recover(); pnk != nil { + atomic.AddInt32(numPanics, 1) taskResultCh <- TaskResult{nil, nil, pnk} taskDoneCh <- false } @@ -46,15 +54,21 @@ func Parallel(tasks ...Task) []TaskResultCh { } // Wait until all tasks are done, or until abort. + // DONE_LOOP: for i := 0; i < len(tasks); i++ { abort := <-taskDoneCh if abort { + ok = false break } } + // Ok is also false if there were any panics. + // We must do this check here (after DONE_LOOP). + ok = ok && (atomic.LoadInt32(numPanics) == 0) + // Caller can use this however they want. // TODO: implement convenience functions to // make sense of this structure safely. - return taskResultChz + return taskResultChz, ok } diff --git a/common/async_test.go b/common/async_test.go index 1d6b0e7b..f2a83d56 100644 --- a/common/async_test.go +++ b/common/async_test.go @@ -22,7 +22,8 @@ func TestParallel(t *testing.T) { } // Run in parallel. - var taskResultChz = Parallel(tasks...) + var taskResultChz, ok = Parallel(tasks...) + assert.True(t, ok) // Verify. assert.Equal(t, int(*counter), len(tasks), "Each task should have incremented the counter already") @@ -78,7 +79,8 @@ func TestParallelAbort(t *testing.T) { } // Run in parallel. - var taskResultChz = Parallel(tasks...) + var taskResultChz, ok = Parallel(tasks...) + assert.False(t, ok, "ok should be false since we aborted task #2.") // Verify task #3. // Initially taskResultCh[3] sends nothing since flow4 didn't send. @@ -109,7 +111,8 @@ func TestParallelRecover(t *testing.T) { } // Run in parallel. - var taskResultChz = Parallel(tasks...) + var taskResultChz, ok = Parallel(tasks...) + assert.False(t, ok, "ok should be false since we panic'd in task #2.") // Verify task #0, #1, #2. waitFor(t, taskResultChz[0], "Task #0", 0, nil, nil)