mirror of https://github.com/jackc/pgx.git
Preparex
parent
90f22c1717
commit
e417cc2f15
106
conn.go
106
conn.go
|
@ -78,6 +78,11 @@ type PreparedStatement struct {
|
||||||
ParameterOids []Oid
|
ParameterOids []Oid
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PreparexOptions is an option struct that can be passed to Preparex
|
||||||
|
type PreparexOptions struct {
|
||||||
|
ParameterOids []Oid
|
||||||
|
}
|
||||||
|
|
||||||
// Notification is a message received from the PostgreSQL LISTEN/NOTIFY system
|
// Notification is a message received from the PostgreSQL LISTEN/NOTIFY system
|
||||||
type Notification struct {
|
type Notification struct {
|
||||||
Pid int32 // backend pid that sent the notification
|
Pid int32 // backend pid that sent the notification
|
||||||
|
@ -661,6 +666,106 @@ func (c *Conn) Prepare(name, sql string) (ps *PreparedStatement, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Preparex creates a prepared statement with name and sql. sql can contain placeholders
|
||||||
|
// for bound parameters. These placeholders are referenced positional as $1, $2, etc.
|
||||||
|
// It defers from Prepare as it allows additional options (such as parameter OIDs) to be passed via struct
|
||||||
|
//
|
||||||
|
// Preparex is idempotent; i.e. it is safe to call Preparex multiple times with the same
|
||||||
|
// name and sql arguments. This allows a code path to Preparex and Query/Exec without
|
||||||
|
// concern for if the statement has already been prepared.
|
||||||
|
func (c *Conn) Preparex(name, sql string, opts PreparexOptions) (ps *PreparedStatement, err error) {
|
||||||
|
if name != "" {
|
||||||
|
if ps, ok := c.preparedStatements[name]; ok && ps.SQL == sql {
|
||||||
|
return ps, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.shouldLog(LogLevelError) {
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
c.log(LogLevelError, fmt.Sprintf("Prepare `%s` as `%s` failed: %v", name, sql, err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse
|
||||||
|
wbuf := newWriteBuf(c, 'P')
|
||||||
|
wbuf.WriteCString(name)
|
||||||
|
wbuf.WriteCString(sql)
|
||||||
|
|
||||||
|
if len(opts.ParameterOids) > 65535 {
|
||||||
|
return nil, errors.New(fmt.Sprintf("Number of PreparexOptions ParameterOids must be between 0 and 65535, received %d", len(opts.ParameterOids)))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(opts.ParameterOids) > 0 {
|
||||||
|
wbuf.WriteInt16(int16(len(opts.ParameterOids)))
|
||||||
|
for _, oid := range opts.ParameterOids {
|
||||||
|
wbuf.WriteInt32(int32(oid))
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
wbuf.WriteInt16(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// describe
|
||||||
|
wbuf.startMsg('D')
|
||||||
|
wbuf.WriteByte('S')
|
||||||
|
wbuf.WriteCString(name)
|
||||||
|
|
||||||
|
// sync
|
||||||
|
wbuf.startMsg('S')
|
||||||
|
wbuf.closeMsg()
|
||||||
|
|
||||||
|
_, err = c.conn.Write(wbuf.buf)
|
||||||
|
if err != nil {
|
||||||
|
c.die(err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ps = &PreparedStatement{Name: name, SQL: sql}
|
||||||
|
|
||||||
|
var softErr error
|
||||||
|
|
||||||
|
for {
|
||||||
|
var t byte
|
||||||
|
var r *msgReader
|
||||||
|
t, r, err := c.rxMsg()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch t {
|
||||||
|
case parseComplete:
|
||||||
|
case parameterDescription:
|
||||||
|
ps.ParameterOids = c.rxParameterDescription(r)
|
||||||
|
|
||||||
|
if len(ps.ParameterOids) > 65535 && softErr == nil {
|
||||||
|
softErr = fmt.Errorf("PostgreSQL supports maximum of 65535 parameters, received %d", len(ps.ParameterOids))
|
||||||
|
}
|
||||||
|
case rowDescription:
|
||||||
|
ps.FieldDescriptions = c.rxRowDescription(r)
|
||||||
|
for i := range ps.FieldDescriptions {
|
||||||
|
t, _ := c.PgTypes[ps.FieldDescriptions[i].DataType]
|
||||||
|
ps.FieldDescriptions[i].DataTypeName = t.Name
|
||||||
|
ps.FieldDescriptions[i].FormatCode = t.DefaultFormat
|
||||||
|
}
|
||||||
|
case noData:
|
||||||
|
case readyForQuery:
|
||||||
|
c.rxReadyForQuery(r)
|
||||||
|
|
||||||
|
if softErr == nil {
|
||||||
|
c.preparedStatements[name] = ps
|
||||||
|
}
|
||||||
|
|
||||||
|
return ps, softErr
|
||||||
|
default:
|
||||||
|
if e := c.processContextFreeMsg(t, r); e != nil && softErr == nil {
|
||||||
|
softErr = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Deallocate released a prepared statement
|
// Deallocate released a prepared statement
|
||||||
func (c *Conn) Deallocate(name string) (err error) {
|
func (c *Conn) Deallocate(name string) (err error) {
|
||||||
delete(c.preparedStatements, name)
|
delete(c.preparedStatements, name)
|
||||||
|
@ -840,6 +945,7 @@ func (c *Conn) sendQuery(sql string, arguments ...interface{}) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) sendSimpleQuery(sql string, args ...interface{}) error {
|
func (c *Conn) sendSimpleQuery(sql string, args ...interface{}) error {
|
||||||
|
|
||||||
if len(args) == 0 {
|
if len(args) == 0 {
|
||||||
wbuf := newWriteBuf(c, 'Q')
|
wbuf := newWriteBuf(c, 'Q')
|
||||||
wbuf.WriteCString(sql)
|
wbuf.WriteCString(sql)
|
||||||
|
|
44
conn_pool.go
44
conn_pool.go
|
@ -361,6 +361,50 @@ func (p *ConnPool) Prepare(name, sql string) (*PreparedStatement, error) {
|
||||||
return ps, err
|
return ps, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Preparex creates a prepared statement on a connection in the pool to test the
|
||||||
|
// statement is valid. If it succeeds all connections accessed through the pool
|
||||||
|
// will have the statement available.
|
||||||
|
//
|
||||||
|
// Preparex creates a prepared statement with name and sql. sql can contain placeholders
|
||||||
|
// for bound parameters. These placeholders are referenced positional as $1, $2, etc.
|
||||||
|
// It defers from Prepare as it allows additional options (such as parameter OIDs) to be passed via struct
|
||||||
|
//
|
||||||
|
// Preparex is idempotent; i.e. it is safe to call Preparex multiple times with the same
|
||||||
|
// name and sql arguments. This allows a code path to Preparex and Query/Exec without
|
||||||
|
// concern for if the statement has already been prepared.
|
||||||
|
func (p *ConnPool) Preparex(name, sql string, opts PreparexOptions) (*PreparedStatement, error) {
|
||||||
|
p.cond.L.Lock()
|
||||||
|
defer p.cond.L.Unlock()
|
||||||
|
|
||||||
|
if ps, ok := p.preparedStatements[name]; ok && ps.SQL == sql {
|
||||||
|
return ps, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c, err := p.acquire(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ps, err := c.Preparex(name, sql, opts)
|
||||||
|
|
||||||
|
p.availableConnections = append(p.availableConnections, c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, c := range p.availableConnections {
|
||||||
|
_, err := c.Preparex(name, sql, opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.invalidateAcquired()
|
||||||
|
p.preparedStatements[name] = ps
|
||||||
|
|
||||||
|
return ps, err
|
||||||
|
}
|
||||||
|
|
||||||
// Deallocate releases a prepared statement from all connections in the pool.
|
// Deallocate releases a prepared statement from all connections in the pool.
|
||||||
func (p *ConnPool) Deallocate(name string) (err error) {
|
func (p *ConnPool) Deallocate(name string) (err error) {
|
||||||
p.cond.L.Lock()
|
p.cond.L.Lock()
|
||||||
|
|
Loading…
Reference in New Issue