HW6 is completed
parent
fcc73dfa9a
commit
ad28f4b2b1
|
@ -13,3 +13,4 @@ linters:
|
|||
- lll
|
||||
- nakedret
|
||||
- wsl
|
||||
- gofumpt
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
package hw06_pipeline_execution //nolint:golint,stylecheck
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var stageError = errors.New("Error in stage")
|
||||
|
||||
func TestAdditional(t *testing.T) {
|
||||
// Stage generator
|
||||
g := func(name string, f func(v interface{}) interface{}) Stage {
|
||||
return func(in In) Out {
|
||||
out := make(Bi)
|
||||
go func() {
|
||||
defer close(out)
|
||||
for v := range in {
|
||||
time.Sleep(sleepPerStage)
|
||||
out <- f(v)
|
||||
}
|
||||
}()
|
||||
return out
|
||||
}
|
||||
}
|
||||
|
||||
type Results struct {
|
||||
v interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
stages := []Stage{
|
||||
g("Dummy", func(v interface{}) interface{} {
|
||||
return Results{v: v, err: nil}
|
||||
}),
|
||||
g("Divider (10/x)", func(v interface{}) interface{} {
|
||||
if v.(Results).v.(int) == 0 || v.(Results).err != nil {
|
||||
return Results{v: nil, err: stageError}
|
||||
}
|
||||
return Results{v: 10 / v.(Results).v.(int), err: nil}
|
||||
}),
|
||||
g("Multiplier (* 2)", func(v interface{}) interface{} {
|
||||
if v.(Results).err != nil {
|
||||
return Results{v: nil, err: stageError}
|
||||
}
|
||||
return Results{v: v.(Results).v.(int) * 2, err: nil}
|
||||
}),
|
||||
g("Adder (+ 100)", func(v interface{}) interface{} {
|
||||
if v.(Results).err != nil {
|
||||
return Results{v: nil, err: stageError}
|
||||
}
|
||||
return Results{v: v.(Results).v.(int) + 100, err: nil}
|
||||
}),
|
||||
g("Stringifier", func(v interface{}) interface{} {
|
||||
if v.(Results).err != nil {
|
||||
return Results{v: nil, err: stageError}
|
||||
}
|
||||
return Results{v: strconv.Itoa(v.(Results).v.(int)), err: nil}
|
||||
}),
|
||||
}
|
||||
|
||||
t.Run("error case", func(t *testing.T) {
|
||||
in := make(Bi)
|
||||
data := []int{1, 0, 2}
|
||||
|
||||
go func() {
|
||||
for _, v := range data {
|
||||
in <- v
|
||||
}
|
||||
close(in)
|
||||
}()
|
||||
|
||||
result := make([]Results, 0, 10)
|
||||
start := time.Now()
|
||||
for s := range ExecutePipeline(in, nil, stages...) {
|
||||
result = append(result, s.(Results))
|
||||
}
|
||||
elapsed := time.Since(start)
|
||||
|
||||
require.Equal(t, result, []Results{{"120", nil}, {nil, stageError}, {"110", nil}})
|
||||
require.Less(t,
|
||||
int64(elapsed),
|
||||
// ~0.8s for processing 5 values in 4 stages (100ms every) concurrently
|
||||
int64(sleepPerStage)*int64(len(stages)+len(data)-1)+int64(fault))
|
||||
})
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
module github.com/fixme_my_friend/hw06_pipeline_execution
|
||||
module github.com/tiburon-777/HW_OTUS/hw06_pipeline_execution
|
||||
|
||||
go 1.14
|
||||
|
||||
|
|
|
@ -10,6 +10,26 @@ type (
|
|||
type Stage func(in In) (out Out)
|
||||
|
||||
func ExecutePipeline(in In, done In, stages ...Stage) Out {
|
||||
// Place your code here
|
||||
return nil
|
||||
out := in
|
||||
for _, stage := range stages {
|
||||
out = func(in In, done In, stage Stage) (out Out) {
|
||||
ch := make(chan interface{})
|
||||
go func(ch chan interface{}) {
|
||||
defer close(ch)
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case v, ok := <-in:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
ch <- v
|
||||
}
|
||||
}
|
||||
}(ch)
|
||||
return stage(ch)
|
||||
}(out, done, stage)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue