diff --git a/pgconn/benchmark_test.go b/pgconn/benchmark_test.go index aff21216..bdc550cb 100644 --- a/pgconn/benchmark_test.go +++ b/pgconn/benchmark_test.go @@ -36,6 +36,19 @@ func BenchmarkConnect(b *testing.B) { } } +func BenchmarkExec(b *testing.B) { + conn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) + require.Nil(b, err) + defer closeConn(b, conn) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := conn.Exec(context.Background(), "select 'hello'::text as a, 42::int4 as b, '2019-01-01'::date") + require.Nil(b, err) + } +} + func BenchmarkExecPrepared(b *testing.B) { conn, err := pgconn.Connect(context.Background(), os.Getenv("PGX_TEST_DATABASE")) require.Nil(b, err) diff --git a/pgconn/pgconn.go b/pgconn/pgconn.go index 8511d5b9..d7a99676 100644 --- a/pgconn/pgconn.go +++ b/pgconn/pgconn.go @@ -14,7 +14,6 @@ import ( "strings" "time" - "github.com/jackc/pgx/pgio" "github.com/jackc/pgx/pgproto3" ) @@ -354,127 +353,10 @@ func (ct CommandTag) String() string { // Execution is implicitly wrapped in a transactions unless a transaction is already in progress or sql contains // transaction control statements. It is only sent to the PostgreSQL server when Flush is called. func (pgConn *PgConn) SendExec(sql string) { - pgConn.batchBuf = appendQuery(pgConn.batchBuf, sql) + pgConn.batchBuf = (&pgproto3.Query{String: sql}).Encode(pgConn.batchBuf) pgConn.batchCount += 1 } -// appendQuery appends a PostgreSQL wire protocol query message to buf and returns it. -func appendQuery(buf []byte, query string) []byte { - buf = append(buf, 'Q') - buf = pgio.AppendInt32(buf, int32(len(query)+5)) - buf = append(buf, query...) - buf = append(buf, 0) - return buf -} - -// appendParse appends a PostgreSQL wire protocol parse message to buf and returns it. -func appendParse(buf []byte, name string, query string, paramOIDs []uint32) []byte { - if len(paramOIDs) > 65535 { - panic(fmt.Sprintf("len(paramOIDs) must be between 0 and 65535, received %d", len(paramOIDs))) - } - - buf = append(buf, 'P') - sp := len(buf) - buf = pgio.AppendInt32(buf, -1) - buf = append(buf, name...) - buf = append(buf, 0) - buf = append(buf, query...) - buf = append(buf, 0) - - buf = pgio.AppendInt16(buf, int16(len(paramOIDs))) - for _, oid := range paramOIDs { - buf = pgio.AppendUint32(buf, oid) - } - pgio.SetInt32(buf[sp:], int32(len(buf[sp:]))) - - return buf -} - -// appendDescribe appends a PostgreSQL wire protocol describe message to buf and returns it. -func appendDescribe(buf []byte, objectType byte, name string) []byte { - buf = append(buf, 'D') - sp := len(buf) - buf = pgio.AppendInt32(buf, -1) - buf = append(buf, objectType) - buf = append(buf, name...) - buf = append(buf, 0) - pgio.SetInt32(buf[sp:], int32(len(buf[sp:]))) - - return buf -} - -// appendSync appends a PostgreSQL wire protocol sync message to buf and returns it. -func appendSync(buf []byte) []byte { - buf = append(buf, 'S') - buf = pgio.AppendInt32(buf, 4) - - return buf -} - -// appendBind appends a PostgreSQL wire protocol bind message to buf and returns it. -func appendBind( - buf []byte, - destinationPortal, - preparedStatement string, - paramFormats []int16, - paramValues [][]byte, - resultFormatCodes []int16, -) []byte { - if len(paramFormats) != 0 && len(paramFormats) != len(paramValues) && len(paramFormats) != len(paramValues) { - panic(fmt.Sprintf("len(paramFormats) must be 0, 1, or len(paramValues), received %d", len(paramFormats))) - } - if len(paramValues) > 65535 { - panic(fmt.Sprintf("len(paramValues) must be between 0 and 65535, received %d", len(paramValues))) - } - - buf = append(buf, 'B') - sp := len(buf) - buf = pgio.AppendInt32(buf, -1) - buf = append(buf, destinationPortal...) - buf = append(buf, 0) - buf = append(buf, preparedStatement...) - buf = append(buf, 0) - - buf = pgio.AppendInt16(buf, int16(len(paramFormats))) - for _, f := range paramFormats { - buf = pgio.AppendInt16(buf, f) - } - - buf = pgio.AppendInt16(buf, int16(len(paramValues))) - for _, p := range paramValues { - if p == nil { - buf = pgio.AppendInt32(buf, -1) - continue - } - - buf = pgio.AppendInt32(buf, int32(len(p))) - buf = append(buf, p...) - } - - buf = pgio.AppendInt16(buf, int16(len(resultFormatCodes))) - for _, fc := range resultFormatCodes { - buf = pgio.AppendInt16(buf, fc) - } - pgio.SetInt32(buf[sp:], int32(len(buf[sp:]))) - - return buf -} - -// appendExecute appends a PostgreSQL wire protocol execute message to buf and returns it. -func appendExecute(buf []byte, portal string, maxRows uint32) []byte { - buf = append(buf, 'E') - sp := len(buf) - buf = pgio.AppendInt32(buf, -1) - - buf = append(buf, portal...) - buf = append(buf, 0) - buf = pgio.AppendUint32(buf, maxRows) - - pgio.SetInt32(buf[sp:], int32(len(buf[sp:]))) - - return buf -} - // SendExecParams enqueues the execution of sql via the PostgreSQL extended query protocol. // // sql is a SQL command string. It may only contain one query. Parameter substitution is position using $1, $2, $3, etc. @@ -498,11 +380,8 @@ func (pgConn *PgConn) SendExecParams(sql string, paramValues [][]byte, paramOIDs panic(fmt.Sprintf("len(paramOIDs) must be 0, 1, or len(paramValues), received %d", len(paramOIDs))) } - pgConn.batchBuf = appendParse(pgConn.batchBuf, "", sql, paramOIDs) - pgConn.batchBuf = appendDescribe(pgConn.batchBuf, 'S', "") - pgConn.batchBuf = appendBind(pgConn.batchBuf, "", "", paramFormats, paramValues, resultFormats) - pgConn.batchBuf = appendExecute(pgConn.batchBuf, "", 0) - pgConn.batchBuf = appendSync(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Parse{Query: sql, ParameterOIDs: paramOIDs}).Encode(pgConn.batchBuf) + pgConn.SendExecPrepared("", paramValues, paramFormats, resultFormats) pgConn.batchCount += 1 } @@ -519,10 +398,10 @@ func (pgConn *PgConn) SendExecParams(sql string, paramValues [][]byte, paramOIDs // // Query is only sent to the PostgreSQL server when Flush is called. func (pgConn *PgConn) SendExecPrepared(stmtName string, paramValues [][]byte, paramFormats []int16, resultFormats []int16) { - pgConn.batchBuf = appendDescribe(pgConn.batchBuf, 'S', stmtName) - pgConn.batchBuf = appendBind(pgConn.batchBuf, "", stmtName, paramFormats, paramValues, resultFormats) - pgConn.batchBuf = appendExecute(pgConn.batchBuf, "", 0) - pgConn.batchBuf = appendSync(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Describe{ObjectType: 'S', Name: stmtName}).Encode(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}).Encode(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Execute{}).Encode(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Sync{}).Encode(pgConn.batchBuf) pgConn.batchCount += 1 } @@ -890,9 +769,9 @@ func (pgConn *PgConn) Prepare(ctx context.Context, name, sql string, paramOIDs [ cleanupContext := contextDoneToConnDeadline(ctx, pgConn.conn) defer cleanupContext() - pgConn.batchBuf = appendParse(pgConn.batchBuf, name, sql, paramOIDs) - pgConn.batchBuf = appendDescribe(pgConn.batchBuf, 'S', name) - pgConn.batchBuf = appendSync(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}).Encode(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Describe{ObjectType: 'S', Name: name}).Encode(pgConn.batchBuf) + pgConn.batchBuf = (&pgproto3.Sync{}).Encode(pgConn.batchBuf) pgConn.batchCount += 1 err := pgConn.Flush(context.Background()) if err != nil {