diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 54bfeef..da2eb65 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -9,25 +9,31 @@ import ( var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") type Task func() error +type Errors struct { + count int + mx sync.Mutex +} -// Run starts tasks in N goroutines and stops its work when receiving M errors from tasks -func Run(tasks []Task, N int, M int) error { - log.Println("Tasks:", len(tasks), "| Goroutines:", N, "| Errors:", M) - errs := 0 - for i := 0; i < len(tasks); i = i + N { +func Run(tasks []Task, n int, m int) error { + log.Println("Tasks:", len(tasks), "| Goroutines:", n, "| Errors:", m) + errs := Errors{} + for i := 0; i < len(tasks); i += n { wg := sync.WaitGroup{} - for g := 1; g <= N && i+g < len(tasks); g++ { + for g := 1; g <= n && i+g < len(tasks); g++ { wg.Add(1) - go func(rt Task, i int, g int, errs *int) { - if err := rt; err != nil { - *errs++ + go func(rt Task, errs *Errors) { + err := rt + if err != nil { + errs.mx.Lock() + errs.count++ + errs.mx.Unlock() } wg.Done() - }(tasks[i+g], i, g, &errs) + }(tasks[i+g], &errs) } wg.Wait() - if errs > M { - log.Println("Produced", errs, "errors of", M) + if errs.count > m { + log.Println("Produced", errs.count, "errors of", m) return ErrErrorsLimitExceeded } }