feat: [CDE-127]: Adding changes to include logging in container orchestration's start flow. (#2192)

* feat: [CDE-96]: Renaming reusable scanner as scanner doesn't support resetting the reader. Removing closec as return param in livelog.Tail method. CLosing the handler channel when the stream is closed, this ensures the consumer to know the channel is closed. Adding a check in the render.StreamSSE to close the stream when the events channel is closed and drained.
* feat: [CDE-127]: Changes to enable log stream API for gitspaces. Adding close channel to the logstream Tail method. Changes to use a default host and working dir from config for a running gitspace instance's URL. Change to append gitspaces to the bind mount of the gitspace container in the container orchestrator instead of in the config provider.
* Initial commit
* feat: [CDE-127]: Removing streams map from StatefulLogger. Flushing the stream every time the invoking function is closed.
* feat: [CDE-127]: Removing streams map from StatefulLogger. Flushing the stream every time the invoking function is closed.
* feat: [CDE-127]: Adding flush stream logic to delete gitspace in orchestrator. Linting.
* feat: [CDE-127]: Addressing review comments.
* feat: [CDE-127]: Adding changes to make logutil.StatefulLogger a wrapper on livelog.LogStream and adding initialisation and flush functions.
* feat: [CDE-127]: Adding changes to make logutil.StatefulLogger a wrapper on livelog.LogStream and adding initialisation and flush functions.
* feat: [CDE-127]: Adding changes to include logging in container orchestration's start flow.
This commit is contained in:
Dhruv Dhruv 2024-07-10 10:25:41 +00:00 committed by Harness
parent 6864710e8e
commit 87157de7fa
18 changed files with 224 additions and 34 deletions

View File

@ -17,6 +17,7 @@ package gitspace
import (
"github.com/harness/gitness/app/auth/authz"
gitspaceevents "github.com/harness/gitness/app/events/gitspace"
"github.com/harness/gitness/app/gitspace/logutil"
"github.com/harness/gitness/app/gitspace/orchestrator"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/store/database/dbtx"
@ -32,6 +33,7 @@ type Controller struct {
orchestrator orchestrator.Orchestrator
gitspaceEventStore store.GitspaceEventStore
tx dbtx.Transactor
statefulLogger *logutil.StatefulLogger
}
func NewController(
@ -44,6 +46,7 @@ func NewController(
eventReporter *gitspaceevents.Reporter,
orchestrator orchestrator.Orchestrator,
gitspaceEventStore store.GitspaceEventStore,
statefulLogger *logutil.StatefulLogger,
) *Controller {
return &Controller{
tx: tx,
@ -55,5 +58,6 @@ func NewController(
eventReporter: eventReporter,
orchestrator: orchestrator,
gitspaceEventStore: gitspaceEventStore,
statefulLogger: statefulLogger,
}
}

View File

@ -31,7 +31,7 @@ func init() {
eventMessageMap = eventsMessageMapping()
}
func (c *Controller) GetEvents(
func (c *Controller) Events(
ctx context.Context,
session *auth.Session,
spaceRef string,

View File

@ -0,0 +1,89 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gitspace
import (
"context"
"encoding/json"
"fmt"
apiauth "github.com/harness/gitness/app/api/auth"
"github.com/harness/gitness/app/auth"
"github.com/harness/gitness/app/sse"
"github.com/harness/gitness/livelog"
"github.com/harness/gitness/types/enum"
)
func (c *Controller) LogsStream(
ctx context.Context,
session *auth.Session,
spaceRef string,
identifier string,
) (<-chan *sse.Event, <-chan error, error) {
space, err := c.spaceStore.FindByRef(ctx, spaceRef)
if err != nil {
return nil, nil, fmt.Errorf("failed to find space: %w", err)
}
err = apiauth.CheckGitspace(ctx, c.authorizer, session, space.Path, identifier, enum.PermissionGitspaceView)
if err != nil {
return nil, nil, fmt.Errorf("failed to authorize: %w", err)
}
gitspaceConfig, err := c.gitspaceConfigStore.FindByIdentifier(ctx, space.ID, identifier)
if err != nil {
return nil, nil, fmt.Errorf("failed to find gitspace config: %w", err)
}
linec, errc := c.statefulLogger.TailLogStream(ctx, gitspaceConfig.ID)
if linec == nil {
return nil, nil, fmt.Errorf("log stream not present, failed to tail log stream")
}
evenc := make(chan *sse.Event)
errch := make(chan error)
go func() {
defer close(evenc)
for {
select {
case <-ctx.Done():
return
case line, ok := <-linec:
if !ok {
return
}
event := sse.Event{
Type: enum.SSETypeLogLineAppended,
Data: marshalLine(line),
}
evenc <- &event
case err = <-errc:
if err != nil {
errch <- err
return
}
}
}
}()
return evenc, errch, nil
}
func marshalLine(line *livelog.Line) []byte {
data, _ := json.Marshal(line)
return data
}

View File

@ -17,6 +17,7 @@ package gitspace
import (
"github.com/harness/gitness/app/auth/authz"
gitspaceevents "github.com/harness/gitness/app/events/gitspace"
"github.com/harness/gitness/app/gitspace/logutil"
"github.com/harness/gitness/app/gitspace/orchestrator"
"github.com/harness/gitness/app/store"
"github.com/harness/gitness/store/database/dbtx"
@ -39,6 +40,7 @@ func ProvideController(
reporter *gitspaceevents.Reporter,
orchestrator orchestrator.Orchestrator,
eventStore store.GitspaceEventStore,
statefulLogger *logutil.StatefulLogger,
) *Controller {
return NewController(
tx,
@ -49,5 +51,7 @@ func ProvideController(
spaceStore,
reporter,
orchestrator,
eventStore)
eventStore,
statefulLogger,
)
}

View File

@ -23,7 +23,7 @@ import (
"github.com/harness/gitness/app/paths"
)
func HandleGetEvents(gitspaceCtrl *gitspace.Controller) http.HandlerFunc {
func HandleEvents(gitspaceCtrl *gitspace.Controller) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
session, _ := request.AuthSessionFrom(ctx)
@ -41,7 +41,7 @@ func HandleGetEvents(gitspaceCtrl *gitspace.Controller) http.HandlerFunc {
page := request.ParsePage(r)
limit := request.ParseLimit(r)
events, count, err := gitspaceCtrl.GetEvents(ctx, session, spaceRef, gitspaceIdentifier, page, limit)
events, count, err := gitspaceCtrl.Events(ctx, session, spaceRef, gitspaceIdentifier, page, limit)
if err != nil {
render.TranslatedUserError(ctx, w, err)
return

View File

@ -0,0 +1,49 @@
// Copyright 2023 Harness, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gitspace
import (
"net/http"
"github.com/harness/gitness/app/api/controller/gitspace"
"github.com/harness/gitness/app/api/render"
"github.com/harness/gitness/app/api/request"
"github.com/harness/gitness/app/paths"
)
func HandleLogsStream(gitspaceCtrl *gitspace.Controller) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
session, _ := request.AuthSessionFrom(ctx)
gitspaceRefFromPath, err := request.GetGitspaceRefFromPath(r)
if err != nil {
render.TranslatedUserError(ctx, w, err)
return
}
spaceRef, gitspaceIdentifier, err := paths.DisectLeaf(gitspaceRefFromPath)
if err != nil {
render.TranslatedUserError(ctx, w, err)
return
}
linec, errc, err := gitspaceCtrl.LogsStream(ctx, session, spaceRef, gitspaceIdentifier)
if err != nil {
render.TranslatedUserError(ctx, w, err)
return
}
render.StreamSSE(ctx, w, nil, linec, errc)
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/harness/gitness/app/api/controller/gitspace"
"github.com/harness/gitness/app/api/usererror"
"github.com/harness/gitness/livelog"
"github.com/harness/gitness/types"
"github.com/swaggest/openapi-go/openapi3"
@ -125,4 +126,16 @@ func gitspaceOperations(reflector *openapi3.Reflector) {
_ = reflector.SetJSONResponse(&opEventList, new(usererror.Error), http.StatusInternalServerError)
_ = reflector.SetJSONResponse(&opEventList, new(usererror.Error), http.StatusNotFound)
_ = reflector.Spec.AddOperation(http.MethodGet, "/gitspaces/{gitspace_identifier}/events", opEventList)
opStreamLogs := openapi3.Operation{}
opStreamLogs.WithTags("gitspaces")
opEventList.WithSummary("Stream gitspace logs")
opStreamLogs.WithMapOfAnything(map[string]interface{}{"operationId": "opStreamLogs"})
_ = reflector.SetRequest(&opStreamLogs, new(gitspaceRequest), http.MethodGet)
_ = reflector.SetStringResponse(&opStreamLogs, http.StatusOK, "text/event-stream")
_ = reflector.SetJSONResponse(&opStreamLogs, []*livelog.Line{}, http.StatusOK)
_ = reflector.SetJSONResponse(&opStreamLogs, new(usererror.Error), http.StatusUnauthorized)
_ = reflector.SetJSONResponse(&opStreamLogs, new(usererror.Error), http.StatusInternalServerError)
_ = reflector.SetJSONResponse(&opStreamLogs, new(usererror.Error), http.StatusNotFound)
_ = reflector.Spec.AddOperation(http.MethodGet, "/gitspaces/{gitspace_identifier}/logs/stream", opStreamLogs)
}

View File

@ -100,7 +100,11 @@ func StreamSSE(
return
}
case event := <-chEvents:
case event, canProduce := <-chEvents:
if !canProduce {
log.Ctx(ctx).Debug().Msg("events channel is drained and closed.")
return
}
if err := stream.event(event); err != nil {
log.Ctx(ctx).Err(err).Msgf("failed to send SSE event: %s", event.Type)
return

View File

@ -20,29 +20,27 @@ import (
"strings"
)
type reusableScanner struct {
scanner *bufio.Scanner
reader *strings.Reader
type scanner struct {
reader *strings.Reader
}
func newReusableScanner() *reusableScanner {
func newScanner() *scanner {
reader := strings.NewReader("")
scanner := bufio.NewScanner(reader)
return &reusableScanner{
scanner: scanner,
reader: reader,
return &scanner{
reader: reader,
}
}
func (r *reusableScanner) scan(input string) ([]string, error) {
func (r *scanner) scan(input string) ([]string, error) {
r.reader.Reset(input)
scanner := bufio.NewScanner(r.reader)
var lines []string
for r.scanner.Scan() {
lines = append(lines, r.scanner.Text())
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if err := r.scanner.Err(); err != nil {
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading string %s: %w", input, err)
}

View File

@ -35,7 +35,7 @@ type LogStreamInstance struct {
id int64
offsetID int64
position int
scanner *reusableScanner
scanner *scanner
logz livelog.LogStream
}
@ -62,13 +62,22 @@ func (s *StatefulLogger) CreateLogStream(ctx context.Context, id int64) (*LogStr
id: id,
offsetID: offsetID,
ctx: ctx,
scanner: newReusableScanner(),
scanner: newScanner(),
logz: s.logz,
}
return newStream, nil
}
// TailLogStream tails the underlying livelog.LogStream stream and returns the data and error channels.
func (s *StatefulLogger) TailLogStream(
ctx context.Context,
id int64,
) (<-chan *livelog.Line, <-chan error) {
offsetID := offset + id
return s.logz.Tail(ctx, offsetID)
}
// Write writes the msg into the underlying log stream.
func (l *LogStreamInstance) Write(msg string) error {
lines, err := l.scanner.scan(msg)
@ -76,6 +85,8 @@ func (l *LogStreamInstance) Write(msg string) error {
return fmt.Errorf("error parsing log lines %s: %w", msg, err)
}
now := time.Now().UnixMilli()
for _, line := range lines {
err = l.logz.Write(
l.ctx,
@ -83,7 +94,7 @@ func (l *LogStreamInstance) Write(msg string) error {
&livelog.Line{
Number: l.position,
Message: line,
Timestamp: time.Now().UnixMilli(),
Timestamp: now,
})
if err != nil {
return fmt.Errorf("could not write log %s for ID %d at pos %d: %w", line, l.id, l.position, err)

View File

@ -47,6 +47,7 @@ const (
containerStateRemoved = "removed"
templateCloneGit = "clone_git.sh"
templateSetupSSHServer = "setup_ssh_server.sh"
gitspacesDir = "gitspaces"
)
type Config struct {
@ -179,9 +180,10 @@ func (e *EmbeddedDockerOrchestrator) StartGitspace(
}
return &StartResponse{
ContainerID: containerID,
ContainerName: containerName,
PortsUsed: usedPorts,
ContainerID: containerID,
ContainerName: containerName,
WorkingDirectory: e.config.DefaultBindMountTargetPath,
PortsUsed: usedPorts,
}, nil
}
@ -498,6 +500,7 @@ func (e *EmbeddedDockerOrchestrator) createContainer(
bindMountSourcePath :=
filepath.Join(
e.config.DefaultBindMountSourceBasePath,
gitspacesDir,
gitspaceConfig.SpacePath,
gitspaceConfig.Identifier,
)

View File

@ -17,7 +17,8 @@ package container
import "github.com/harness/gitness/types/enum"
type StartResponse struct {
ContainerID string
ContainerName string
PortsUsed map[enum.IDEType]string
ContainerID string
ContainerName string
WorkingDirectory string
PortsUsed map[enum.IDEType]string
}

View File

@ -101,18 +101,31 @@ func (o orchestrator) StartGitspace(
var ideURL url.URL
if infra.Host == "" {
// TODO: This fix does not cover all use-cases. Ideally, we need to read the host name on which this docker is
// running and set it as the infra.Host. Remove once that change is done.
infra.Host = "localhost"
}
if gitspaceConfig.IDE == enum.IDETypeVSCodeWeb {
ideURL = url.URL{
Scheme: "http",
Host: infra.Host + ":" + port,
RawQuery: "folder=/gitspace/" + repoName,
RawQuery: "folder=/" + startResponse.WorkingDirectory + "/" + repoName,
}
} else if gitspaceConfig.IDE == enum.IDETypeVSCode {
// TODO: the following user ID is hard coded and should be changed.
ideURL = url.URL{
Scheme: "vscode-remote",
Host: "", // Empty since we include the host and port in the path
Path: fmt.Sprintf("ssh-remote+%s@%s:%s/gitspace/%s", "harness", infra.Host, port, repoName),
Path: fmt.Sprintf(
"ssh-remote+%s@%s:%s/%s/%s",
"harness",
infra.Host,
port,
startResponse.WorkingDirectory,
repoName,
),
}
}
ideURLString := ideURL.String()

View File

@ -715,7 +715,8 @@ func setupGitspaces(r chi.Router, gitspacesCtrl *gitspace.Controller) {
r.Post("/actions", handlergitspace.HandleAction(gitspacesCtrl))
r.Delete("/", handlergitspace.HandleDeleteConfig(gitspacesCtrl))
r.Patch("/", handlergitspace.HandleUpdateConfig(gitspacesCtrl))
r.Get("/events", handlergitspace.HandleGetEvents(gitspacesCtrl))
r.Get("/events", handlergitspace.HandleEvents(gitspacesCtrl))
r.Get("/logs/stream", handlergitspace.HandleLogsStream(gitspacesCtrl))
})
})
}

View File

@ -51,7 +51,6 @@ const (
schemeHTTPS = "https"
gitnessHomeDir = ".gitness"
blobDir = "blob"
gitspacesDir = "gitspaces"
)
// LoadConfig returns the system configuration from the
@ -405,9 +404,7 @@ func ProvideGitspaceContainerOrchestratorConfig(config *types.Config) (*containe
return nil, fmt.Errorf("unable to determine home directory: %w", err)
}
bindMountSourceBasePath = filepath.Join(homedir, gitnessHomeDir, gitspacesDir)
} else {
bindMountSourceBasePath = filepath.Join(config.Gitspace.DefaultBindMountSourceBasePath, gitspacesDir)
bindMountSourceBasePath = filepath.Join(homedir, gitnessHomeDir)
}
return &container.Config{

View File

@ -343,7 +343,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
containerOrchestrator := container.ProvideEmbeddedDockerOrchestrator(dockerClientFactory, vsCode, vsCodeWeb, containerConfig, statefulLogger)
orchestratorOrchestrator := orchestrator.ProvideOrchestrator(scmSCM, infraProviderResourceStore, infraProvisioner, containerOrchestrator)
gitspaceEventStore := database.ProvideGitspaceEventStore(db)
gitspaceController := gitspace.ProvideController(transactor, authorizer, infraProviderResourceStore, gitspaceConfigStore, gitspaceInstanceStore, spaceStore, reporter3, orchestratorOrchestrator, gitspaceEventStore)
gitspaceController := gitspace.ProvideController(transactor, authorizer, infraProviderResourceStore, gitspaceConfigStore, gitspaceInstanceStore, spaceStore, reporter3, orchestratorOrchestrator, gitspaceEventStore, statefulLogger)
migrateController := migrate.ProvideController(authorizer, principalStore)
apiHandler := router.ProvideAPIHandler(ctx, config, authenticator, repoController, reposettingsController, executionController, logsController, spaceController, pipelineController, secretController, triggerController, connectorController, templateController, pluginController, pullreqController, webhookController, githookController, gitInterface, serviceaccountController, controller, principalController, checkController, systemController, uploadController, keywordsearchController, infraproviderController, gitspaceController, migrateController)
gitHandler := router.ProvideGitHandler(provider, authenticator, repoController)

View File

@ -42,6 +42,7 @@ func (s *subscriber) close() {
s.Lock()
if !s.closed {
close(s.closec)
close(s.handler)
s.closed = true
}
s.Unlock()

View File

@ -28,4 +28,6 @@ const (
SSETypeRepositoryExportCompleted SSEType = "repository_export_completed"
SSETypePullRequestUpdated SSEType = "pullreq_updated"
SSETypeLogLineAppended SSEType = "log_line_appended"
)