diff --git a/drone/agent/agent.go b/drone/agent/agent.go index df0059521..7c35808fa 100644 --- a/drone/agent/agent.go +++ b/drone/agent/agent.go @@ -3,18 +3,17 @@ package agent import ( "os" "os/signal" + "strings" "sync" "syscall" "time" "github.com/drone/drone/queue" "github.com/drone/mq/stomp" - "github.com/samalba/dockerclient" - - "strings" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/samalba/dockerclient" ) // AgentCmd is the exported command for starting the drone agent. @@ -160,16 +159,6 @@ func start(c *cli.Context) { ) server := strings.TrimRight(c.String("drone-server"), "/") - client, err := stomp.Dial(server) - if err != nil { - logrus.Fatalf("Cannot connect to host %s. %s", server, err) - } - opts := []stomp.MessageOption{ - stomp.WithCredentials("x-token", accessToken), - } - if err = client.Connect(opts...); err != nil { - logrus.Fatalf("Cannot connect to host %s. %s", server, err) - } tls, err := dockerclient.TLSConfigFromCertPath(c.String("docker-cert-path")) if err == nil { @@ -180,6 +169,8 @@ func start(c *cli.Context) { logrus.Fatal(err) } + var client *stomp.Client + handler := func(m *stomp.Message) { running.Add(1) defer func() { @@ -205,24 +196,44 @@ func start(c *cli.Context) { r.run(work) } - _, err = client.Subscribe("/queue/pending", stomp.HandlerFunc(handler), - stomp.WithAck("client"), - stomp.WithPrefetch( - c.Int("docker-max-procs"), - ), - // stomp.WithSelector( - // fmt.Sprintf("platorm == '%s/%s'", - // c.String("drone-os"), - // c.String("drone-arch"), - // ), - // ), - ) - if err != nil { - logrus.Fatalf("Unable to connect to queue. %s", err) - } handleSignals() - <-client.Done() + backoff := c.Duration("backoff") + + for { + // dial the drone server to establish a TCP connection. + client, err = stomp.Dial(server) + if err != nil { + logrus.Errorf("Failed to establish server connection, %s, retry in %v", err, backoff) + <-time.After(backoff) + continue + } + opts := []stomp.MessageOption{ + stomp.WithCredentials("x-token", accessToken), + } + + // initialize the stomp session and authenticate. + if err = client.Connect(opts...); err != nil { + logrus.Errorf("Failed to establish server session, %s, retry in %v", err, backoff) + <-time.After(backoff) + continue + } + + // subscribe to the pending build queue. + client.Subscribe("/queue/pending", stomp.HandlerFunc(func(m *stomp.Message) { + go handler(m) // HACK until we a channel based Subscribe implementation + }), + stomp.WithAck("client"), + stomp.WithPrefetch( + c.Int("docker-max-procs"), + ), + ) + + logrus.Infof("Server connection establish, ready to process builds.") + <-client.Done() + + logrus.Warnf("Server connection interrupted, attempting to reconnect.") + } } // tracks running builds diff --git a/drone/agent/exec.go b/drone/agent/exec.go index 049485256..269a6754f 100644 --- a/drone/agent/exec.go +++ b/drone/agent/exec.go @@ -62,9 +62,9 @@ func (r *pipeline) run(w *queue.Work) { } // signal for canceling the build. - sub, err := r.drone.Subscribe("/topic/cancels", stomp.HandlerFunc(cancelFunc)) + sub, err := r.drone.Subscribe("/topic/cancel", stomp.HandlerFunc(cancelFunc)) if err != nil { - logrus.Errorf("Error subscribing to /topic/cancels. %s", err) + logrus.Errorf("Error subscribing to /topic/cancel. %s", err) } defer func() { r.drone.Unsubscribe(sub) diff --git a/drone/server.go b/drone/server.go index 390642fbc..94c7cfe01 100644 --- a/drone/server.go +++ b/drone/server.go @@ -6,10 +6,10 @@ import ( "github.com/drone/drone/router" "github.com/drone/drone/router/middleware" - "github.com/gin-gonic/contrib/ginrus" "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" + "github.com/gin-gonic/contrib/ginrus" ) var serverCmd = cli.Command{ diff --git a/server/build.go b/server/build.go index 3e9d7a4ac..3fdc81960 100644 --- a/server/build.go +++ b/server/build.go @@ -156,7 +156,7 @@ func DeleteBuild(c *gin.Context) { Repo: *repo, Build: *build, Job: *job, - }) + }, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10))) c.String(204, "") } diff --git a/server/queue.go b/server/queue.go index 23acf8111..d9ca743c7 100644 --- a/server/queue.go +++ b/server/queue.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strconv" + "time" "golang.org/x/net/context" @@ -411,13 +412,21 @@ func HandleUpdate(c context.Context, message *stomp.Message) { logrus.Errorf("Unable to read logs from broker. %s", err) return } - <-done + + defer func() { + client.Unsubscribe(sub) + client.Send(dest, []byte{}, stomp.WithRetain("remove")) + }() + + select { + case <-done: + case <-time.After(30 * time.Second): + logrus.Errorf("Unable to read logs from broker. Timeout. %s", err) + return + } if err := store.WriteLog(c, job, &buf); err != nil { logrus.Errorf("Unable to write logs to store. %s", err) return } - - client.Unsubscribe(sub) - client.Send(dest, []byte{}, stomp.WithRetain("remove")) } diff --git a/server/stream.go b/server/stream.go index 9c88ebed9..d4a626a48 100644 --- a/server/stream.go +++ b/server/stream.go @@ -65,17 +65,17 @@ func LogStream(c *gin.Context) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() + logs := make(chan []byte) done := make(chan bool) dest := fmt.Sprintf("/topic/logs.%d", job.ID) client, _ := stomp.FromContext(c) sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { - defer m.Release() if m.Header.GetBool("eof") { done <- true - return + } else { + logs <- m.Body } - ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteMessage(websocket.TextMessage, m.Body) + m.Release() })) if err != nil { logrus.Errorf("Unable to read logs from broker. %s", err) @@ -83,10 +83,15 @@ func LogStream(c *gin.Context) { } defer func() { client.Unsubscribe(sub) + close(done) + close(logs) }() for { select { + case buf := <-logs: + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, buf) case <-done: return case <-ticker.C: @@ -139,9 +144,9 @@ func EventStream(c *gin.Context) { return } defer func() { + client.Unsubscribe(sub) close(quitc) close(eventc) - client.Unsubscribe(sub) }() go func() {