private const for pipelineRequestType

This commit is contained in:
zenkovev 2025-01-11 19:54:18 +03:00
parent de3f868c1d
commit c96a55f8c0

View File

@ -2013,13 +2013,13 @@ type CloseComplete struct{}
type pipelineRequestType int type pipelineRequestType int
const ( const (
PIPELINE_NIL pipelineRequestType = iota pipelineNil pipelineRequestType = iota
PIPELINE_PREPARE pipelinePrepare
PIPELINE_QUERY_PARAMS pipelineQueryParams
PIPELINE_QUERY_PREPARED pipelineQueryPrepared
PIPELINE_DEALLOCATE pipelineDeallocate
PIPELINE_SYNC_REQUEST pipelineSyncRequest
PIPELINE_FLUSH_REQUEST pipelineFlushRequest
) )
type pipelineRequestEvent struct { type pipelineRequestEvent struct {
@ -2037,7 +2037,7 @@ type pipelineState struct {
func (s *pipelineState) Init() { func (s *pipelineState) Init() {
s.requestEventQueue.Init() s.requestEventQueue.Init()
s.lastRequestType = PIPELINE_NIL s.lastRequestType = pipelineNil
} }
func (s *pipelineState) RegisterSendingToServer() { func (s *pipelineState) RegisterSendingToServer() {
@ -2063,19 +2063,19 @@ func (s *pipelineState) registerFlushingBufferOnServer() {
} }
func (s *pipelineState) PushBackRequestType(req pipelineRequestType) { func (s *pipelineState) PushBackRequestType(req pipelineRequestType) {
if req == PIPELINE_NIL { if req == pipelineNil {
return return
} }
if req != PIPELINE_FLUSH_REQUEST { if req != pipelineFlushRequest {
s.requestEventQueue.PushBack(pipelineRequestEvent{RequestType: req}) s.requestEventQueue.PushBack(pipelineRequestEvent{RequestType: req})
} }
if req == PIPELINE_FLUSH_REQUEST || req == PIPELINE_SYNC_REQUEST { if req == pipelineFlushRequest || req == pipelineSyncRequest {
s.registerFlushingBufferOnServer() s.registerFlushingBufferOnServer()
} }
s.lastRequestType = req s.lastRequestType = req
if req == PIPELINE_SYNC_REQUEST { if req == pipelineSyncRequest {
s.expectedReadyForQueryCount++ s.expectedReadyForQueryCount++
} }
} }
@ -2084,15 +2084,15 @@ func (s *pipelineState) ExtractFrontRequestType() pipelineRequestType {
for { for {
elem := s.requestEventQueue.Front() elem := s.requestEventQueue.Front()
if elem == nil { if elem == nil {
return PIPELINE_NIL return pipelineNil
} }
val := elem.Value.(pipelineRequestEvent) val := elem.Value.(pipelineRequestEvent)
if !(val.WasSentToServer && val.BeforeFlushOrSync) { if !(val.WasSentToServer && val.BeforeFlushOrSync) {
return PIPELINE_NIL return pipelineNil
} }
s.requestEventQueue.Remove(elem) s.requestEventQueue.Remove(elem)
if val.RequestType == PIPELINE_SYNC_REQUEST { if val.RequestType == pipelineSyncRequest {
s.pgErr = nil s.pgErr = nil
} }
if s.pgErr == nil { if s.pgErr == nil {
@ -2114,9 +2114,9 @@ func (s *pipelineState) PendingSync() bool {
if elem := s.requestEventQueue.Back(); elem != nil { if elem := s.requestEventQueue.Back(); elem != nil {
val := elem.Value.(pipelineRequestEvent) val := elem.Value.(pipelineRequestEvent)
notPendingSync = (val.RequestType == PIPELINE_SYNC_REQUEST) && val.WasSentToServer notPendingSync = (val.RequestType == pipelineSyncRequest) && val.WasSentToServer
} else { } else {
notPendingSync = (s.lastRequestType == PIPELINE_SYNC_REQUEST) || (s.lastRequestType == PIPELINE_NIL) notPendingSync = (s.lastRequestType == pipelineSyncRequest) || (s.lastRequestType == pipelineNil)
} }
return !notPendingSync return !notPendingSync
@ -2174,7 +2174,7 @@ func (p *Pipeline) SendPrepare(name, sql string, paramOIDs []uint32) {
p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs}) p.conn.frontend.SendParse(&pgproto3.Parse{Name: name, Query: sql, ParameterOIDs: paramOIDs})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'S', Name: name})
p.state.PushBackRequestType(PIPELINE_PREPARE) p.state.PushBackRequestType(pipelinePrepare)
} }
// SendDeallocate deallocates a prepared statement. // SendDeallocate deallocates a prepared statement.
@ -2184,7 +2184,7 @@ func (p *Pipeline) SendDeallocate(name string) {
} }
p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name}) p.conn.frontend.SendClose(&pgproto3.Close{ObjectType: 'S', Name: name})
p.state.PushBackRequestType(PIPELINE_DEALLOCATE) p.state.PushBackRequestType(pipelineDeallocate)
} }
// SendQueryParams is the pipeline version of *PgConn.QueryParams. // SendQueryParams is the pipeline version of *PgConn.QueryParams.
@ -2197,7 +2197,7 @@ func (p *Pipeline) SendQueryParams(sql string, paramValues [][]byte, paramOIDs [
p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) p.conn.frontend.SendBind(&pgproto3.Bind{ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
p.conn.frontend.SendExecute(&pgproto3.Execute{}) p.conn.frontend.SendExecute(&pgproto3.Execute{})
p.state.PushBackRequestType(PIPELINE_QUERY_PARAMS) p.state.PushBackRequestType(pipelineQueryParams)
} }
// SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared. // SendQueryPrepared is the pipeline version of *PgConn.QueryPrepared.
@ -2209,7 +2209,7 @@ func (p *Pipeline) SendQueryPrepared(stmtName string, paramValues [][]byte, para
p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats}) p.conn.frontend.SendBind(&pgproto3.Bind{PreparedStatement: stmtName, ParameterFormatCodes: paramFormats, Parameters: paramValues, ResultFormatCodes: resultFormats})
p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'}) p.conn.frontend.SendDescribe(&pgproto3.Describe{ObjectType: 'P'})
p.conn.frontend.SendExecute(&pgproto3.Execute{}) p.conn.frontend.SendExecute(&pgproto3.Execute{})
p.state.PushBackRequestType(PIPELINE_QUERY_PREPARED) p.state.PushBackRequestType(pipelineQueryPrepared)
} }
// SendFlushRequest sends a request for the server to flush its output buffer. // SendFlushRequest sends a request for the server to flush its output buffer.
@ -2225,7 +2225,7 @@ func (p *Pipeline) SendFlushRequest() {
} }
p.conn.frontend.Send(&pgproto3.Flush{}) p.conn.frontend.Send(&pgproto3.Flush{})
p.state.PushBackRequestType(PIPELINE_FLUSH_REQUEST) p.state.PushBackRequestType(pipelineFlushRequest)
} }
// SendPipelineSync marks a synchronization point in a pipeline by sending a sync message // SendPipelineSync marks a synchronization point in a pipeline by sending a sync message
@ -2240,7 +2240,7 @@ func (p *Pipeline) SendPipelineSync() {
} }
p.conn.frontend.SendSync(&pgproto3.Sync{}) p.conn.frontend.SendSync(&pgproto3.Sync{})
p.state.PushBackRequestType(PIPELINE_SYNC_REQUEST) p.state.PushBackRequestType(pipelineSyncRequest)
} }
// Flush flushes the queued requests without establishing a synchronization point. // Flush flushes the queued requests without establishing a synchronization point.
@ -2286,7 +2286,7 @@ func (p *Pipeline) GetResults() (results any, err error) {
return nil, errors.New("pipeline closed") return nil, errors.New("pipeline closed")
} }
if p.state.ExtractFrontRequestType() == PIPELINE_NIL { if p.state.ExtractFrontRequestType() == pipelineNil {
return nil, nil return nil, nil
} }