From ad28f4b2b16e7cae92568c800957cf00da61060f Mon Sep 17 00:00:00 2001 From: Andrey Ivanov Date: Wed, 29 Jul 2020 12:26:34 +0300 Subject: [PATCH] HW6 is completed --- .golangci.yml | 1 + hw06_pipeline_execution/additional_test.go | 88 ++++++++++++++++++++++ hw06_pipeline_execution/go.mod | 2 +- hw06_pipeline_execution/pipeline.go | 24 +++++- 4 files changed, 112 insertions(+), 3 deletions(-) create mode 100644 hw06_pipeline_execution/additional_test.go diff --git a/.golangci.yml b/.golangci.yml index c37e4a2..85eb8d5 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -13,3 +13,4 @@ linters: - lll - nakedret - wsl + - gofumpt diff --git a/hw06_pipeline_execution/additional_test.go b/hw06_pipeline_execution/additional_test.go new file mode 100644 index 0000000..e7dd680 --- /dev/null +++ b/hw06_pipeline_execution/additional_test.go @@ -0,0 +1,88 @@ +package hw06_pipeline_execution //nolint:golint,stylecheck + +import ( + "errors" + "github.com/stretchr/testify/require" + "strconv" + "testing" + "time" +) + +var stageError = errors.New("Error in stage") + +func TestAdditional(t *testing.T) { + // Stage generator + g := func(name string, f func(v interface{}) interface{}) Stage { + return func(in In) Out { + out := make(Bi) + go func() { + defer close(out) + for v := range in { + time.Sleep(sleepPerStage) + out <- f(v) + } + }() + return out + } + } + + type Results struct { + v interface{} + err error + } + + stages := []Stage{ + g("Dummy", func(v interface{}) interface{} { + return Results{v: v, err: nil} + }), + g("Divider (10/x)", func(v interface{}) interface{} { + if v.(Results).v.(int) == 0 || v.(Results).err != nil { + return Results{v: nil, err: stageError} + } + return Results{v: 10 / v.(Results).v.(int), err: nil} + }), + g("Multiplier (* 2)", func(v interface{}) interface{} { + if v.(Results).err != nil { + return Results{v: nil, err: stageError} + } + return Results{v: v.(Results).v.(int) * 2, err: nil} + }), + g("Adder (+ 100)", func(v interface{}) interface{} { + if v.(Results).err != nil { + return Results{v: nil, err: stageError} + } + return Results{v: v.(Results).v.(int) + 100, err: nil} + }), + g("Stringifier", func(v interface{}) interface{} { + if v.(Results).err != nil { + return Results{v: nil, err: stageError} + } + return Results{v: strconv.Itoa(v.(Results).v.(int)), err: nil} + }), + } + + t.Run("error case", func(t *testing.T) { + in := make(Bi) + data := []int{1, 0, 2} + + go func() { + for _, v := range data { + in <- v + } + close(in) + }() + + result := make([]Results, 0, 10) + start := time.Now() + for s := range ExecutePipeline(in, nil, stages...) { + result = append(result, s.(Results)) + } + elapsed := time.Since(start) + + require.Equal(t, result, []Results{{"120", nil}, {nil, stageError}, {"110", nil}}) + require.Less(t, + int64(elapsed), + // ~0.8s for processing 5 values in 4 stages (100ms every) concurrently + int64(sleepPerStage)*int64(len(stages)+len(data)-1)+int64(fault)) + }) +} diff --git a/hw06_pipeline_execution/go.mod b/hw06_pipeline_execution/go.mod index fbc0ee1..faf9ae5 100644 --- a/hw06_pipeline_execution/go.mod +++ b/hw06_pipeline_execution/go.mod @@ -1,4 +1,4 @@ -module github.com/fixme_my_friend/hw06_pipeline_execution +module github.com/tiburon-777/HW_OTUS/hw06_pipeline_execution go 1.14 diff --git a/hw06_pipeline_execution/pipeline.go b/hw06_pipeline_execution/pipeline.go index cfaa7ed..125dc57 100644 --- a/hw06_pipeline_execution/pipeline.go +++ b/hw06_pipeline_execution/pipeline.go @@ -10,6 +10,26 @@ type ( type Stage func(in In) (out Out) func ExecutePipeline(in In, done In, stages ...Stage) Out { - // Place your code here - return nil + out := in + for _, stage := range stages { + out = func(in In, done In, stage Stage) (out Out) { + ch := make(chan interface{}) + go func(ch chan interface{}) { + defer close(ch) + for { + select { + case <-done: + return + case v, ok := <-in: + if !ok { + return + } + ch <- v + } + } + }(ch) + return stage(ch) + }(out, done, stage) + } + return out }