add release tracer

pull/2017/head
ngavinsir 2024-05-11 11:25:40 +07:00 committed by Jack Christensen
parent 19fcb54564
commit 25914e21f3
4 changed files with 82 additions and 2 deletions

View File

@ -26,6 +26,13 @@ func (c *Conn) Release() {
res := c.res
c.res = nil
if c.p.releaseTracer != nil {
ctx := c.p.releaseTracer.TraceReleaseStart(context.Background(), c.p, TraceReleaseStartData{Conn: conn})
defer func() {
c.p.releaseTracer.TraceReleaseEnd(ctx, c.p, TraceReleaseEndData{})
}()
}
if conn.IsClosed() || conn.PgConn().IsBusy() || conn.PgConn().TxStatus() != 'I' {
res.Destroy()
// Signal to the health check to run since we just destroyed a connections

View File

@ -96,6 +96,7 @@ type Pool struct {
healthCheckChan chan struct{}
acquireTracer AcquireTracer
releaseTracer ReleaseTracer
closeOnce sync.Once
closeChan chan struct{}
@ -201,6 +202,10 @@ func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
p.acquireTracer = t
}
if t, ok := config.ConnConfig.Tracer.(ReleaseTracer); ok {
p.releaseTracer = t
}
var err error
p.p, err = puddle.NewPool(
&puddle.Config[*connResource]{

View File

@ -11,7 +11,7 @@ type AcquireTracer interface {
// TraceAcquireStart is called at the beginning of Acquire.
// The returned context is used for the rest of the call and will be passed to the TraceAcquireEnd.
TraceAcquireStart(ctx context.Context, pool *Pool, data TraceAcquireStartData) context.Context
// TraceAcquireEnd is called when a connection has been acquired
// TraceAcquireEnd is called when a connection has been acquired.
TraceAcquireEnd(ctx context.Context, pool *Pool, data TraceAcquireEndData)
}
@ -21,3 +21,18 @@ type TraceAcquireEndData struct {
Conn *pgx.Conn
Err error
}
// ReleaseTracer traces Release.
type ReleaseTracer interface {
// TraceReleaseStart is called at the beginning of Release.
// The returned context will be passed to the TraceReleaseEnd.
TraceReleaseStart(ctx context.Context, pool *Pool, data TraceReleaseStartData) context.Context
// TraceReleaseEnd is called when a connection has been released.
TraceReleaseEnd(ctx context.Context, pool *Pool, data TraceReleaseEndData)
}
type TraceReleaseStartData struct {
Conn *pgx.Conn
}
type TraceReleaseEndData struct{}

View File

@ -14,6 +14,8 @@ import (
type testTracer struct {
traceAcquireStart func(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireStartData) context.Context
traceAcquireEnd func(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireEndData)
traceReleaseStart func(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceReleaseStartData) context.Context
traceReleaseEnd func(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceReleaseEndData)
}
type ctxKey string
@ -31,6 +33,19 @@ func (tt *testTracer) TraceAcquireEnd(ctx context.Context, pool *pgxpool.Pool, d
}
}
func (tt *testTracer) TraceReleaseStart(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceReleaseStartData) context.Context {
if tt.traceReleaseStart != nil {
return tt.traceReleaseStart(ctx, pool, data)
}
return ctx
}
func (tt *testTracer) TraceReleaseEnd(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceReleaseEndData) {
if tt.traceReleaseEnd != nil {
tt.traceReleaseEnd(ctx, pool, data)
}
}
func (tt *testTracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
return ctx
}
@ -64,7 +79,7 @@ func TestTraceAcquire(t *testing.T) {
traceAcquireEndCalled := false
tracer.traceAcquireEnd = func(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceAcquireEndData) {
traceAcquireEndCalled = true
require.Equal(t, "foo", ctx.Value(ctxKey(ctxKey("fromTraceAcquireStart"))))
require.Equal(t, "foo", ctx.Value(ctxKey("fromTraceAcquireStart")))
require.NotNil(t, pool)
require.NotNil(t, data.Conn)
require.NoError(t, data.Err)
@ -92,3 +107,41 @@ func TestTraceAcquire(t *testing.T) {
require.True(t, traceAcquireStartCalled)
require.True(t, traceAcquireEndCalled)
}
func TestTraceRelease(t *testing.T) {
t.Parallel()
tracer := &testTracer{}
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
config, err := pgxpool.ParseConfig(os.Getenv("PGX_TEST_DATABASE"))
require.NoError(t, err)
config.ConnConfig.Tracer = tracer
pool, err := pgxpool.NewWithConfig(ctx, config)
require.NoError(t, err)
defer pool.Close()
traceReleaseStartCalled := false
tracer.traceReleaseStart = func(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceReleaseStartData) context.Context {
traceReleaseStartCalled = true
require.NotNil(t, pool)
require.NotNil(t, data.Conn)
return context.WithValue(ctx, ctxKey("fromTraceReleaseStart"), "foo")
}
traceReleaseEndCalled := false
tracer.traceReleaseEnd = func(ctx context.Context, pool *pgxpool.Pool, data pgxpool.TraceReleaseEndData) {
traceReleaseEndCalled = true
require.Equal(t, "foo", ctx.Value(ctxKey("fromTraceReleaseStart")))
require.NotNil(t, pool)
}
c, err := pool.Acquire(ctx)
require.NoError(t, err)
c.Release()
require.True(t, traceReleaseStartCalled)
require.True(t, traceReleaseEndCalled)
}