From 861085560f7f9e4d94f6cea7fda5eb76c82d2f73 Mon Sep 17 00:00:00 2001 From: Andrey Ivanov Date: Sun, 26 Jul 2020 19:22:06 +0300 Subject: [PATCH] HW5 is completed --- hw05_parallel_execution/mixed.go | 12 +++++++++-- hw05_parallel_execution/run.go | 37 ++++++++++++++++++++------------ 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/hw05_parallel_execution/mixed.go b/hw05_parallel_execution/mixed.go index 6bb35c8..be6378d 100644 --- a/hw05_parallel_execution/mixed.go +++ b/hw05_parallel_execution/mixed.go @@ -16,13 +16,16 @@ func RunMixed(tasks []Task, n int, m int) error { wg.Add(n) for i := 0; i < n; i++ { - go consMixed(ch, &wg, &errs, ignore) + go consumer(ch, &wg, &errs, ignore) } for _, task := range tasks { + errs.mx.RLock() if !ignore && errs.count <= 0 { + errs.mx.RUnlock() break } + errs.mx.RUnlock() ch <- task } close(ch) @@ -34,15 +37,20 @@ func RunMixed(tasks []Task, n int, m int) error { return nil } -func consMixed(ch <-chan Task, wg *sync.WaitGroup, errs *Errors, ignore bool) { +func consumer(ch <-chan Task, wg *sync.WaitGroup, errs *Errors, ignore bool) { defer wg.Done() for task := range ch { + errs.mx.RLock() if !ignore && errs.count <= 0 { + errs.mx.RUnlock() return } + errs.mx.RUnlock() err := task() if !ignore && err != nil { + errs.mx.Lock() errs.count-- + errs.mx.Unlock() } } } diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index 52cf1fb..5014c20 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -1,27 +1,36 @@ package hw05_parallel_execution //nolint:golint,stylecheck +import "sync" func Run(tasks []Task, n int, m int) error { if m == -1 { m = len(tasks) } - pool := make(chan int, n) - errs := make(chan int) - for _, task := range tasks { - pool <- 1 - go func(task Task, errs chan int, pool chan int) { - if _, ok := <-errs; !ok { - return + task, errs, wg, done := make(chan Task), make(chan error, len(tasks)), sync.WaitGroup{}, make(chan struct{}) + defer func() { + close(done) + wg.Wait() + }() + wg.Add(n) + for i := 1; i <= n; i++ { + go func(task chan Task, errs chan error, wg *sync.WaitGroup, done chan struct{}) { + for { + select { + case <-done: + wg.Done() + return + case t := <-task: + if err := t(); err != nil { + errs <- err + } + } } - if task() != nil { - errs <- 1 - } - <-pool - }(task, errs, pool) + }(task, errs, &wg, done) + } + for _, t := range tasks { + task <- t if len(errs) >= m { - close(errs) return ErrErrorsLimitExceeded } } - close(errs) return nil }