From e417cc2f155eebfa0e6026991a4bf5bc21862313 Mon Sep 17 00:00:00 2001 From: William King Date: Wed, 18 May 2016 13:15:15 -0700 Subject: [PATCH] Preparex --- conn.go | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++ conn_pool.go | 44 +++++++++++++++++++++ 2 files changed, 150 insertions(+) diff --git a/conn.go b/conn.go index b9a6f356..35ad3049 100644 --- a/conn.go +++ b/conn.go @@ -78,6 +78,11 @@ type PreparedStatement struct { ParameterOids []Oid } +// PreparexOptions is an option struct that can be passed to Preparex +type PreparexOptions struct { + ParameterOids []Oid +} + // Notification is a message received from the PostgreSQL LISTEN/NOTIFY system type Notification struct { Pid int32 // backend pid that sent the notification @@ -661,6 +666,106 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) { } } +// Preparex creates a prepared statement with name and sql. sql can contain placeholders +// for bound parameters. These placeholders are referenced positional as $1, $2, etc. +// It defers from Prepare as it allows additional options (such as parameter OIDs) to be passed via struct +// +// Preparex is idempotent; i.e. it is safe to call Preparex multiple times with the same +// name and sql arguments. This allows a code path to Preparex and Query/Exec without +// concern for if the statement has already been prepared. +func (c *Conn) Preparex(name, sql string, opts PreparexOptions) (ps *PreparedStatement, err error) { + if name != "" { + if ps, ok := c.preparedStatements[name]; ok && ps.SQL == sql { + return ps, nil + } + } + + if c.shouldLog(LogLevelError) { + defer func() { + if err != nil { + c.log(LogLevelError, fmt.Sprintf("Prepare `%s` as `%s` failed: %v", name, sql, err)) + } + }() + } + + // parse + wbuf := newWriteBuf(c, 'P') + wbuf.WriteCString(name) + wbuf.WriteCString(sql) + + if len(opts.ParameterOids) > 65535 { + return nil, errors.New(fmt.Sprintf("Number of PreparexOptions ParameterOids must be between 0 and 65535, received %d", len(opts.ParameterOids))) + } + + if len(opts.ParameterOids) > 0 { + wbuf.WriteInt16(int16(len(opts.ParameterOids))) + for _, oid := range opts.ParameterOids { + wbuf.WriteInt32(int32(oid)) + } + + } else { + wbuf.WriteInt16(0) + } + + // describe + wbuf.startMsg('D') + wbuf.WriteByte('S') + wbuf.WriteCString(name) + + // sync + wbuf.startMsg('S') + wbuf.closeMsg() + + _, err = c.conn.Write(wbuf.buf) + if err != nil { + c.die(err) + return nil, err + } + + ps = &PreparedStatement{Name: name, SQL: sql} + + var softErr error + + for { + var t byte + var r *msgReader + t, r, err := c.rxMsg() + if err != nil { + return nil, err + } + + switch t { + case parseComplete: + case parameterDescription: + ps.ParameterOids = c.rxParameterDescription(r) + + if len(ps.ParameterOids) > 65535 && softErr == nil { + softErr = fmt.Errorf("PostgreSQL supports maximum of 65535 parameters, received %d", len(ps.ParameterOids)) + } + case rowDescription: + ps.FieldDescriptions = c.rxRowDescription(r) + for i := range ps.FieldDescriptions { + t, _ := c.PgTypes[ps.FieldDescriptions[i].DataType] + ps.FieldDescriptions[i].DataTypeName = t.Name + ps.FieldDescriptions[i].FormatCode = t.DefaultFormat + } + case noData: + case readyForQuery: + c.rxReadyForQuery(r) + + if softErr == nil { + c.preparedStatements[name] = ps + } + + return ps, softErr + default: + if e := c.processContextFreeMsg(t, r); e != nil && softErr == nil { + softErr = e + } + } + } +} + // Deallocate released a prepared statement func (c *Conn) Deallocate(name string) (err error) { delete(c.preparedStatements, name) @@ -840,6 +945,7 @@ func (c *Conn) sendQuery(sql string, arguments ...interface{}) (err error) { } func (c *Conn) sendSimpleQuery(sql string, args ...interface{}) error { + if len(args) == 0 { wbuf := newWriteBuf(c, 'Q') wbuf.WriteCString(sql) diff --git a/conn_pool.go b/conn_pool.go index 6695e0e3..ea2a35f3 100644 --- a/conn_pool.go +++ b/conn_pool.go @@ -361,6 +361,50 @@ func (p *ConnPool) Prepare(name, sql string) (*PreparedStatement, error) { return ps, err } +// Preparex creates a prepared statement on a connection in the pool to test the +// statement is valid. If it succeeds all connections accessed through the pool +// will have the statement available. +// +// Preparex creates a prepared statement with name and sql. sql can contain placeholders +// for bound parameters. These placeholders are referenced positional as $1, $2, etc. +// It defers from Prepare as it allows additional options (such as parameter OIDs) to be passed via struct +// +// Preparex is idempotent; i.e. it is safe to call Preparex multiple times with the same +// name and sql arguments. This allows a code path to Preparex and Query/Exec without +// concern for if the statement has already been prepared. +func (p *ConnPool) Preparex(name, sql string, opts PreparexOptions) (*PreparedStatement, error) { + p.cond.L.Lock() + defer p.cond.L.Unlock() + + if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql { + return ps, nil + } + + c, err := p.acquire(nil) + if err != nil { + return nil, err + } + + ps, err := c.Preparex(name, sql, opts) + + p.availableConnections = append(p.availableConnections, c) + if err != nil { + return nil, err + } + + for _, c := range p.availableConnections { + _, err := c.Preparex(name, sql, opts) + if err != nil { + return nil, err + } + } + + p.invalidateAcquired() + p.preparedStatements[name] = ps + + return ps, err +} + // Deallocate releases a prepared statement from all connections in the pool. func (p *ConnPool) Deallocate(name string) (err error) { p.cond.L.Lock()