mirror of https://github.com/harness/drone.git
fix: [CDE-150]: In logstream, adding locking and panic recovery around a subscriber's publish method. Adding check in the stream's publish method to not publish if the sub is closed. Closing the err channel in the log stream API handler. (#2218)
* Formatting. * fix: [CDE-150]: In logstream, adding locking and panic recovery around a subscriber's publish method. Adding check in the stream's publish method to not publish if the sub is closed. Closing the err channel in the log stream API handler.unified-ui
parent
56df9da6b8
commit
2785206535
|
@ -58,6 +58,8 @@ func (c *Controller) LogsStream(
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(evenc)
|
defer close(evenc)
|
||||||
|
defer close(errch)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
|
|
@ -175,7 +175,7 @@ func (e *EmbeddedDockerOrchestrator) StartGitspace(
|
||||||
usedPorts = ports
|
usedPorts = ports
|
||||||
|
|
||||||
// TODO: Add gitspace status reporting.
|
// TODO: Add gitspace status reporting.
|
||||||
log.Debug().Msg("started gitspace")
|
log.Debug().Msgf("started gitspace: %s", gitspaceConfig.Identifier)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("gitspace %s is in a bad state: %s", containerName, state)
|
return nil, fmt.Errorf("gitspace %s is in a bad state: %s", containerName, state)
|
||||||
|
|
|
@ -42,8 +42,10 @@ func (s *stream) write(line *Line) error {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.hist = append(s.hist, line)
|
s.hist = append(s.hist, line)
|
||||||
for l := range s.list {
|
for l := range s.list {
|
||||||
|
if !l.closed {
|
||||||
l.publish(line)
|
l.publish(line)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// the history should not be unbounded. The history
|
// the history should not be unbounded. The history
|
||||||
// slice is capped and items are removed in a FIFO
|
// slice is capped and items are removed in a FIFO
|
||||||
// ordering when capacity is reached.
|
// ordering when capacity is reached.
|
||||||
|
|
|
@ -16,6 +16,8 @@ package livelog
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type subscriber struct {
|
type subscriber struct {
|
||||||
|
@ -27,6 +29,16 @@ type subscriber struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) publish(line *Line) {
|
func (s *subscriber) publish(line *Line) {
|
||||||
|
defer func() {
|
||||||
|
r := recover()
|
||||||
|
if r != nil {
|
||||||
|
log.Debug().Msgf("publishing to closed subscriber")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
s.Lock()
|
||||||
|
defer s.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-s.closec:
|
case <-s.closec:
|
||||||
case s.handler <- line:
|
case s.handler <- line:
|
||||||
|
|
Loading…
Reference in New Issue