diff --git a/conn.go b/conn.go index d938edc8..1609344e 100644 --- a/conn.go +++ b/conn.go @@ -614,23 +614,6 @@ func (c *Conn) CauseOfDeath() error { return c.causeOfDeath } -func (c *Conn) sendSimpleQuery(sql string) error { - if err := c.ensureConnectionReadyForQuery(); err != nil { - return err - } - - buf := appendQuery(c.wbuf, sql) - - _, err := c.pgConn.Conn().Write(buf) - if err != nil { - c.die(err) - return err - } - c.pendingReadyForQueryCount++ - - return nil -} - // fatalWriteError takes the response of a net.Conn.Write and determines if it is fatal func fatalWriteErr(bytesWritten int, err error) bool { // Partial writes break the connection diff --git a/query.go b/query.go index 1c105df5..ec48db9b 100644 --- a/query.go +++ b/query.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" - "github.com/jackc/pgx/internal/sanitize" "github.com/jackc/pgx/pgconn" "github.com/jackc/pgx/pgtype" ) @@ -519,30 +518,30 @@ func (c *Conn) buildOneRoundTripQueryEx(buf []byte, sql string, options *QueryEx return buf, nil } -func (c *Conn) sanitizeAndSendSimpleQuery(sql string, args ...interface{}) (err error) { - if c.pgConn.ParameterStatus("standard_conforming_strings") != "on" { - return errors.New("simple protocol queries must be run with standard_conforming_strings=on") - } +// func (c *Conn) sanitizeAndSendSimpleQuery(sql string, args ...interface{}) (err error) { +// if c.pgConn.ParameterStatus("standard_conforming_strings") != "on" { +// return errors.New("simple protocol queries must be run with standard_conforming_strings=on") +// } - if c.pgConn.ParameterStatus("client_encoding") != "UTF8" { - return errors.New("simple protocol queries must be run with client_encoding=UTF8") - } +// if c.pgConn.ParameterStatus("client_encoding") != "UTF8" { +// return errors.New("simple protocol queries must be run with client_encoding=UTF8") +// } - valueArgs := make([]interface{}, len(args)) - for i, a := range args { - valueArgs[i], err = convertSimpleArgument(c.ConnInfo, a) - if err != nil { - return err - } - } +// valueArgs := make([]interface{}, len(args)) +// for i, a := range args { +// valueArgs[i], err = convertSimpleArgument(c.ConnInfo, a) +// if err != nil { +// return err +// } +// } - sql, err = sanitize.SanitizeSQL(sql, valueArgs...) - if err != nil { - return err - } +// sql, err = sanitize.SanitizeSQL(sql, valueArgs...) +// if err != nil { +// return err +// } - return c.sendSimpleQuery(sql) -} +// return c.sendSimpleQuery(sql) +// } func (c *Conn) QueryRowEx(ctx context.Context, sql string, options *QueryExOptions, args ...interface{}) *Row { rows, _ := c.QueryEx(ctx, sql, options, args...) diff --git a/replication.go b/replication.go index 8d67db85..21d9a3d8 100644 --- a/replication.go +++ b/replication.go @@ -330,41 +330,6 @@ func (rc *ReplicationConn) WaitForReplicationMessage(ctx context.Context) (*Repl return r, err } -func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { - rows := rc.c.getRows(sql, nil) - - if err := rc.c.lock(); err != nil { - rows.fatal(err) - return rows, err - } - rows.unlockConn = true - - err := rc.c.sendSimpleQuery(sql) - if err != nil { - rows.fatal(err) - } - - msg, err := rc.c.pgConn.ReceiveMessage() - if err != nil { - return nil, err - } - - switch msg := msg.(type) { - case *pgproto3.RowDescription: - rows.fields = rc.c.rxRowDescription(msg) - // We don't have c.PgTypes here because we're a replication - // connection. This means the field descriptions will have - // only OIDs. Not much we can do about this. - default: - if e := rc.c.processContextFreeMsg(msg); e != nil { - rows.fatal(e) - return rows, e - } - } - - return rows, rows.err -} - // Execute the "IDENTIFY_SYSTEM" command as documented here: // https://www.postgresql.org/docs/9.5/static/protocol-replication.html // @@ -376,7 +341,8 @@ func (rc *ReplicationConn) sendReplicationModeQuery(sql string) (*Rows, error) { // type names, so the field descriptions in the result will have only // OIDs and no DataTypeName values func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { - return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") + return nil, errors.New("TODO") + // return rc.sendReplicationModeQuery("IDENTIFY_SYSTEM") } // Execute the "TIMELINE_HISTORY" command as documented here: @@ -391,7 +357,8 @@ func (rc *ReplicationConn) IdentifySystem() (r *Rows, err error) { // type names, so the field descriptions in the result will have only // OIDs and no DataTypeName values func (rc *ReplicationConn) TimelineHistory(timeline int) (r *Rows, err error) { - return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) + return nil, errors.New("TODO") + // return rc.sendReplicationModeQuery(fmt.Sprintf("TIMELINE_HISTORY %d", timeline)) } // Start a replication connection, sending WAL data to the given replication @@ -416,8 +383,12 @@ func (rc *ReplicationConn) StartReplication(slotName string, startLsn uint64, ti queryString += fmt.Sprintf(" ( %s )", strings.Join(pluginArguments, ", ")) } - if err = rc.c.sendSimpleQuery(queryString); err != nil { - return + buf := appendQuery(rc.c.wbuf, queryString) + + _, err = rc.c.pgConn.Conn().Write(buf) + if err != nil { + rc.c.die(err) + return err } ctx, cancelFn := context.WithTimeout(context.Background(), initialReplicationResponseTimeout)