From cd897e09eb1876421ff7cdc4ba67f4004f0f30ab Mon Sep 17 00:00:00 2001 From: Andrey Ivanov Date: Wed, 22 Jul 2020 11:20:32 +0300 Subject: [PATCH] HW5 is completed --- hw05_parallel_execution/run.go | 68 +++++++++----- hw05_parallel_execution/run_test.go | 14 +-- hw05_parallel_execution/saw_shift.go | 30 +++++++ hw05_parallel_execution/saw_shift_test.go | 103 ++++++++++++++++++++++ 4 files changed, 186 insertions(+), 29 deletions(-) create mode 100644 hw05_parallel_execution/saw_shift.go create mode 100644 hw05_parallel_execution/saw_shift_test.go diff --git a/hw05_parallel_execution/run.go b/hw05_parallel_execution/run.go index a01b12e..b603505 100644 --- a/hw05_parallel_execution/run.go +++ b/hw05_parallel_execution/run.go @@ -2,40 +2,64 @@ package hw05_parallel_execution //nolint:golint,stylecheck import ( "errors" - "log" "sync" ) var ErrErrorsLimitExceeded = errors.New("errors limit exceeded") -type Task func() error type Errors struct { count int - mx sync.Mutex + mx sync.RWMutex } +type Task func() error func Run(tasks []Task, n int, m int) error { - log.Println("Tasks:", len(tasks), "| Goroutines:", n, "| Errors:", m) + ch := make(chan Task) + wg := sync.WaitGroup{} errs := Errors{} - for i := 0; i < len(tasks); i += n { - wg := sync.WaitGroup{} - for g := 1; g <= n && i+g < len(tasks); g++ { - wg.Add(1) - go func(rt Task, errs *Errors) { - defer wg.Done() - err := rt() - if err != nil { - errs.mx.Lock() - errs.count++ - errs.mx.Unlock() - } - }(tasks[i+g], &errs) - } - wg.Wait() - if m > 0 && errs.count > m { - log.Println("Produced", errs.count, "errors of", m) - return ErrErrorsLimitExceeded + errs.count = m + var ignore bool + if errs.count < 0 { + ignore = true + } + + wg.Add(n) + for i := 0; i < n; i++ { + go consumer(ch, &wg, &errs, ignore) + } + + for _, task := range tasks { + errs.mx.RLock() + if !ignore && errs.count <= 0 { + errs.mx.RUnlock() + break } + errs.mx.RUnlock() + ch <- task + } + close(ch) + wg.Wait() + + if errs.count <= 0 && !ignore { + return ErrErrorsLimitExceeded } return nil } + +func consumer(ch <-chan Task, wg *sync.WaitGroup, errs *Errors, ignore bool) { + defer wg.Done() + for task := range ch { + errs.mx.RLock() + if !ignore && errs.count <= 0 { + errs.mx.RUnlock() + return + } + errs.mx.RUnlock() + err := task() + if !ignore && err != nil { + errs.mx.Lock() + errs.count-- + errs.mx.Unlock() + } + } +} diff --git a/hw05_parallel_execution/run_test.go b/hw05_parallel_execution/run_test.go index 78b742d..37e3e3c 100644 --- a/hw05_parallel_execution/run_test.go +++ b/hw05_parallel_execution/run_test.go @@ -14,7 +14,7 @@ import ( func TestRun(t *testing.T) { defer goleak.VerifyNone(t) - t.Run("if were errors in first M tasks, than finished not more N+M tasks", func(t *testing.T) { + t.Run("50 tasks in 10 goroutines with 100% errors and maxErrors=23 should run not more N+M (=33) tasks", func(t *testing.T) { tasksCount := 50 tasks := make([]Task, 0, tasksCount) @@ -37,11 +37,11 @@ func TestRun(t *testing.T) { require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started") }) - t.Run("if m<0 then ignore errores", func(t *testing.T) { + t.Run("50 tasks in 5 goroutines with 50% errors and m<0 should ignore errores", func(t *testing.T) { tasksCount := 50 tasks := make([]Task, 0, tasksCount) - var runTasksCount int32 = 1 + var runTasksCount int32 var sumTime time.Duration for i := 0; i < tasksCount; i++ { @@ -71,11 +71,11 @@ func TestRun(t *testing.T) { require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") }) - t.Run("tasks without errors", func(t *testing.T) { - tasksCount := 50 + t.Run("143 tasks in 7 goroutines without errors", func(t *testing.T) { + tasksCount := 143 tasks := make([]Task, 0, tasksCount) - var runTasksCount int32 = 1 + var runTasksCount int32 var sumTime time.Duration for i := 0; i < tasksCount; i++ { @@ -89,7 +89,7 @@ func TestRun(t *testing.T) { }) } - workersCount := 5 + workersCount := 7 maxErrorsCount := 1 start := time.Now() diff --git a/hw05_parallel_execution/saw_shift.go b/hw05_parallel_execution/saw_shift.go new file mode 100644 index 0000000..a4619f0 --- /dev/null +++ b/hw05_parallel_execution/saw_shift.go @@ -0,0 +1,30 @@ +package hw05_parallel_execution //nolint:golint,stylecheck + +import ( + "sync" +) + +// Функция, решающая задачу методом пилообразного сдвига. +func RunSawShift(tasks []Task, n int, m int) error { + errs := Errors{} + for i := 0; i < len(tasks); i += n { + wg := sync.WaitGroup{} + for g := 1; g <= n && i+g < len(tasks); g++ { + wg.Add(1) + go func(rt Task, errs *Errors) { + defer wg.Done() + err := rt() + if err != nil { + errs.mx.Lock() + errs.count++ + errs.mx.Unlock() + } + }(tasks[i+g], &errs) + } + wg.Wait() + if m > 0 && errs.count > m { + return ErrErrorsLimitExceeded + } + } + return nil +} diff --git a/hw05_parallel_execution/saw_shift_test.go b/hw05_parallel_execution/saw_shift_test.go new file mode 100644 index 0000000..622c88c --- /dev/null +++ b/hw05_parallel_execution/saw_shift_test.go @@ -0,0 +1,103 @@ +package hw05_parallel_execution //nolint:golint,stylecheck + +import ( + "fmt" + "math/rand" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func TestRunSawShift(t *testing.T) { + defer goleak.VerifyNone(t) + + t.Run("50 tasks in 10 goroutines with 100% errors and maxErrors=23 should run not more N+M (=33) tasks", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 + + for i := 0; i < tasksCount; i++ { + err := fmt.Errorf("error from task %d", i) + tasks = append(tasks, func() error { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(100))) + atomic.AddInt32(&runTasksCount, 1) + return err + }) + } + + workersCount := 10 + maxErrorsCount := 23 + result := RunSawShift(tasks, workersCount, maxErrorsCount) + + require.Equal(t, ErrErrorsLimitExceeded, result) + require.LessOrEqual(t, runTasksCount, int32(workersCount+maxErrorsCount), "extra tasks were started") + }) + + t.Run("50 tasks in 5 goroutines with 50% errors and m<0 should ignore errores", func(t *testing.T) { + tasksCount := 50 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 = 1 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + err := fmt.Errorf("error from task %d", i) + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + if i%2 == 0 { + return err + } + return nil + }) + } + + workersCount := 5 + maxErrorsCount := -1 + + start := time.Now() + result := RunSawShift(tasks, workersCount, maxErrorsCount) + elapsedTime := time.Since(start) + require.Nil(t, result) + + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") + }) + + t.Run("143 tasks in 7 goroutines without errors", func(t *testing.T) { + tasksCount := 143 + tasks := make([]Task, 0, tasksCount) + + var runTasksCount int32 = 1 + var sumTime time.Duration + + for i := 0; i < tasksCount; i++ { + taskSleep := time.Millisecond * time.Duration(rand.Intn(100)) + sumTime += taskSleep + + tasks = append(tasks, func() error { + time.Sleep(taskSleep) + atomic.AddInt32(&runTasksCount, 1) + return nil + }) + } + + workersCount := 7 + maxErrorsCount := 1 + + start := time.Now() + result := RunSawShift(tasks, workersCount, maxErrorsCount) + elapsedTime := time.Since(start) + require.Nil(t, result) + + require.Equal(t, runTasksCount, int32(tasksCount), "not all tasks were completed") + require.LessOrEqual(t, int64(elapsedTime), int64(sumTime/2), "tasks were run sequentially?") + }) +}