Improve QueryChunks signature to be easier to use

The changes made on this commit were designed by
Raí Tamarindo (raitamarindo@gmail.com) on a previous meeting.
pull/2/head
Vinícius Garcia 2020-10-28 08:55:55 -03:00
parent c1f645216c
commit 6978474d41
3 changed files with 76 additions and 60 deletions

View File

@ -30,14 +30,9 @@ type ChunkParser struct {
Params []interface{} Params []interface{}
ChunkSize int ChunkSize int
Chunk interface{} // Must be a pointer to a slice of structs
// The closure that will be called right after // This attribute must be a func(chunk []<Record>) error,
// filling the Chunk with ChunkSize records // where the actual Record should be a struct
// // representing the rows you are expecting to receive.
// Each chunk consecutively parsed will overwrite the ForEachChunk interface{}
// same slice, so don't keep references to it, if you
// need some data to be preserved after all chunks are
// processed copy the records by value.
ForEachChunk func() error
} }

View File

@ -139,28 +139,33 @@ func (c Client) QueryChunks(
} }
defer rows.Close() defer rows.Close()
sliceRef, structType, isSliceOfPtrs, err := decodeAsSliceOfStructs(parser.Chunk) fnValue := reflect.ValueOf(parser.ForEachChunk)
chunkType, err := parseInputFunc(parser.ForEachChunk)
if err != nil { if err != nil {
return err return err
} }
slice := sliceRef.Elem() chunk := reflect.MakeSlice(chunkType, 0, parser.ChunkSize)
if slice.Len() > parser.ChunkSize {
slice = slice.Slice(0, parser.ChunkSize) structType, isSliceOfPtrs, err := decodeAsSliceOfStructs(chunkType)
if err != nil {
return err
} }
var idx = 0 var idx = 0
for rows.Next() { for rows.Next() {
if slice.Len() <= idx { // Allocate new slice elements
// only if they are not already allocated:
if chunk.Len() <= idx {
var elemValue reflect.Value var elemValue reflect.Value
elemValue = reflect.New(structType) elemValue = reflect.New(structType)
if !isSliceOfPtrs { if !isSliceOfPtrs {
elemValue = elemValue.Elem() elemValue = elemValue.Elem()
} }
slice = reflect.Append(slice, elemValue) chunk = reflect.Append(chunk, elemValue)
} }
err = c.db.ScanRows(rows, slice.Index(idx).Addr().Interface()) err = c.db.ScanRows(rows, chunk.Index(idx).Addr().Interface())
if err != nil { if err != nil {
return err return err
} }
@ -171,8 +176,7 @@ func (c Client) QueryChunks(
} }
idx = 0 idx = 0
sliceRef.Elem().Set(slice) err, _ = fnValue.Call([]reflect.Value{chunk})[0].Interface().(error)
err = parser.ForEachChunk()
if err != nil { if err != nil {
if err == ErrAbortIteration { if err == ErrAbortIteration {
return nil return nil
@ -184,8 +188,9 @@ func (c Client) QueryChunks(
// If no rows were found or idx was reset to 0 // If no rows were found or idx was reset to 0
// on the last iteration skip this last call to ForEachChunk: // on the last iteration skip this last call to ForEachChunk:
if idx > 0 { if idx > 0 {
sliceRef.Elem().Set(slice.Slice(0, idx)) chunk = chunk.Slice(0, idx)
err = parser.ForEachChunk()
err, _ = fnValue.Call([]reflect.Value{chunk})[0].Interface().(error)
if err != nil { if err != nil {
if err == ErrAbortIteration { if err == ErrAbortIteration {
return nil return nil
@ -435,9 +440,18 @@ func FillStructWith(record interface{}, dbRow map[string]interface{}) error {
// and the second is a slice of maps representing the database rows you want // and the second is a slice of maps representing the database rows you want
// to use to update this struct. // to use to update this struct.
func FillSliceWith(entities interface{}, dbRows []map[string]interface{}) error { func FillSliceWith(entities interface{}, dbRows []map[string]interface{}) error {
sliceRef, structType, isSliceOfPtrs, err := decodeAsSliceOfStructs(entities) sliceRef := reflect.ValueOf(entities)
sliceType := sliceRef.Type()
if sliceType.Kind() != reflect.Ptr {
return fmt.Errorf(
"FillSliceWith: expected input to be a pointer to struct but got %v",
sliceType,
)
}
structType, isSliceOfPtrs, err := decodeAsSliceOfStructs(sliceType.Elem())
if err != nil { if err != nil {
return err return fmt.Errorf("FillSliceWith: %s", err.Error())
} }
info, found := tagInfoCache[structType] info, found := tagInfoCache[structType]
@ -468,34 +482,20 @@ func FillSliceWith(entities interface{}, dbRows []map[string]interface{}) error
return nil return nil
} }
func decodeAsSliceOfStructs(slice interface{}) ( func decodeAsSliceOfStructs(slice reflect.Type) (
sliceRef reflect.Value,
structType reflect.Type, structType reflect.Type,
isSliceOfPtrs bool, isSliceOfPtrs bool,
err error, err error,
) { ) {
slicePtrValue := reflect.ValueOf(slice) if slice.Kind() != reflect.Slice {
slicePtrType := slicePtrValue.Type()
if slicePtrType.Kind() != reflect.Ptr {
err = fmt.Errorf( err = fmt.Errorf(
"FillListWith: expected input to be a pointer to struct but got %T", "expected input kind to be a slice but got %v",
slice, slice,
) )
return return
} }
t := slicePtrType.Elem() elemType := slice.Elem()
if t.Kind() != reflect.Slice {
err = fmt.Errorf(
"FillListWith: expected input kind to be a slice but got %T",
slice,
)
return
}
elemType := t.Elem()
isPtr := elemType.Kind() == reflect.Ptr isPtr := elemType.Kind() == reflect.Ptr
if isPtr { if isPtr {
@ -504,11 +504,39 @@ func decodeAsSliceOfStructs(slice interface{}) (
if elemType.Kind() != reflect.Struct { if elemType.Kind() != reflect.Struct {
err = fmt.Errorf( err = fmt.Errorf(
"FillListWith: expected input to be a slice of structs but got %T", "expected input to be a slice of structs but got %v",
slice, slice,
) )
return return
} }
return slicePtrValue, elemType, isPtr, nil return elemType, isPtr, nil
}
var errType = reflect.TypeOf(new(error)).Elem()
func parseInputFunc(fn interface{}) (reflect.Type, error) {
t := reflect.TypeOf(fn)
if t.Kind() != reflect.Func {
return nil, fmt.Errorf("the ForEachChunk callback must be a function")
}
if t.NumIn() != 1 {
return nil, fmt.Errorf("the ForEachChunk callback must have 1 argument")
}
if t.NumOut() != 1 {
return nil, fmt.Errorf("the ForEachChunk callback must have a single return value")
}
if t.Out(0) != errType {
return nil, fmt.Errorf("the return value of the ForEachChunk callback must be of type error")
}
argsType := t.In(0)
if argsType.Kind() != reflect.Slice {
return nil, fmt.Errorf("the argument of the ForEachChunk callback must a slice of structs")
}
return argsType, nil
} }

View File

@ -565,14 +565,12 @@ func TestQueryChunks(t *testing.T) {
var length int var length int
var u User var u User
var users []User
err = c.QueryChunks(ctx, ChunkParser{ err = c.QueryChunks(ctx, ChunkParser{
Query: `select * from users where name = ?;`, Query: `select * from users where name = ?;`,
Params: []interface{}{"User1"}, Params: []interface{}{"User1"},
ChunkSize: 100, ChunkSize: 100,
Chunk: &users, ForEachChunk: func(users []User) error {
ForEachChunk: func() error {
length = len(users) length = len(users)
if length > 0 { if length > 0 {
u = users[0] u = users[0]
@ -605,20 +603,23 @@ func TestQueryChunks(t *testing.T) {
_ = c.Insert(ctx, &User{Name: "User1"}) _ = c.Insert(ctx, &User{Name: "User1"})
_ = c.Insert(ctx, &User{Name: "User2"}) _ = c.Insert(ctx, &User{Name: "User2"})
var lengths []int
var users []User var users []User
err = c.QueryChunks(ctx, ChunkParser{ err = c.QueryChunks(ctx, ChunkParser{
Query: `select * from users where name like ? order by name asc;`, Query: `select * from users where name like ? order by name asc;`,
Params: []interface{}{"User%"}, Params: []interface{}{"User%"},
ChunkSize: 2, ChunkSize: 2,
Chunk: &users, ForEachChunk: func(buffer []User) error {
ForEachChunk: func() error { users = append(users, buffer...)
lengths = append(lengths, len(buffer))
return nil return nil
}, },
}) })
assert.Equal(t, nil, err) assert.Equal(t, nil, err)
assert.Equal(t, 2, len(users)) assert.Equal(t, 1, len(lengths))
assert.Equal(t, 2, lengths[0])
assert.NotEqual(t, 0, users[0].ID) assert.NotEqual(t, 0, users[0].ID)
assert.Equal(t, "User1", users[0].Name) assert.Equal(t, "User1", users[0].Name)
assert.NotEqual(t, 0, users[1].ID) assert.NotEqual(t, 0, users[1].ID)
@ -645,14 +646,12 @@ func TestQueryChunks(t *testing.T) {
var lengths []int var lengths []int
var users []User var users []User
var buffer []User
err = c.QueryChunks(ctx, ChunkParser{ err = c.QueryChunks(ctx, ChunkParser{
Query: `select * from users where name like ? order by name asc;`, Query: `select * from users where name like ? order by name asc;`,
Params: []interface{}{"User%"}, Params: []interface{}{"User%"},
ChunkSize: 1, ChunkSize: 1,
Chunk: &buffer, ForEachChunk: func(buffer []User) error {
ForEachChunk: func() error {
lengths = append(lengths, len(buffer)) lengths = append(lengths, len(buffer))
users = append(users, buffer...) users = append(users, buffer...)
return nil return nil
@ -689,14 +688,12 @@ func TestQueryChunks(t *testing.T) {
var lengths []int var lengths []int
var users []User var users []User
var buffer []User
err = c.QueryChunks(ctx, ChunkParser{ err = c.QueryChunks(ctx, ChunkParser{
Query: `select * from users where name like ? order by name asc;`, Query: `select * from users where name like ? order by name asc;`,
Params: []interface{}{"User%"}, Params: []interface{}{"User%"},
ChunkSize: 2, ChunkSize: 2,
Chunk: &buffer, ForEachChunk: func(buffer []User) error {
ForEachChunk: func() error {
lengths = append(lengths, len(buffer)) lengths = append(lengths, len(buffer))
users = append(users, buffer...) users = append(users, buffer...)
return nil return nil
@ -735,14 +732,12 @@ func TestQueryChunks(t *testing.T) {
var lengths []int var lengths []int
var users []User var users []User
var buffer []User
err = c.QueryChunks(ctx, ChunkParser{ err = c.QueryChunks(ctx, ChunkParser{
Query: `select * from users where name like ? order by name asc;`, Query: `select * from users where name like ? order by name asc;`,
Params: []interface{}{"User%"}, Params: []interface{}{"User%"},
ChunkSize: 2, ChunkSize: 2,
Chunk: &buffer, ForEachChunk: func(buffer []User) error {
ForEachChunk: func() error {
lengths = append(lengths, len(buffer)) lengths = append(lengths, len(buffer))
users = append(users, buffer...) users = append(users, buffer...)
return ErrAbortIteration return ErrAbortIteration
@ -780,14 +775,12 @@ func TestQueryChunks(t *testing.T) {
returnVals := []error{nil, ErrAbortIteration} returnVals := []error{nil, ErrAbortIteration}
var lengths []int var lengths []int
var users []User var users []User
var buffer []User
err = c.QueryChunks(ctx, ChunkParser{ err = c.QueryChunks(ctx, ChunkParser{
Query: `select * from users where name like ? order by name asc;`, Query: `select * from users where name like ? order by name asc;`,
Params: []interface{}{"User%"}, Params: []interface{}{"User%"},
ChunkSize: 2, ChunkSize: 2,
Chunk: &buffer, ForEachChunk: func(buffer []User) error {
ForEachChunk: func() error {
lengths = append(lengths, len(buffer)) lengths = append(lengths, len(buffer))
users = append(users, buffer...) users = append(users, buffer...)