mirror of https://github.com/harness/drone.git
add livelog and move logs to log controller
parent
9ea22140aa
commit
b6304683d4
|
@ -20,6 +20,7 @@ import (
|
|||
checkcontroller "github.com/harness/gitness/internal/api/controller/check"
|
||||
"github.com/harness/gitness/internal/api/controller/execution"
|
||||
"github.com/harness/gitness/internal/api/controller/githook"
|
||||
controllerlogs "github.com/harness/gitness/internal/api/controller/logs"
|
||||
"github.com/harness/gitness/internal/api/controller/pipeline"
|
||||
"github.com/harness/gitness/internal/api/controller/principal"
|
||||
"github.com/harness/gitness/internal/api/controller/pullreq"
|
||||
|
@ -46,6 +47,7 @@ import (
|
|||
"github.com/harness/gitness/internal/store/database"
|
||||
"github.com/harness/gitness/internal/store/logs"
|
||||
"github.com/harness/gitness/internal/url"
|
||||
"github.com/harness/gitness/livelog"
|
||||
"github.com/harness/gitness/lock"
|
||||
"github.com/harness/gitness/pubsub"
|
||||
"github.com/harness/gitness/types"
|
||||
|
@ -101,6 +103,8 @@ func initSystem(ctx context.Context, config *types.Config) (*cliserver.System, e
|
|||
execution.WireSet,
|
||||
pipeline.WireSet,
|
||||
logs.WireSet,
|
||||
livelog.WireSet,
|
||||
controllerlogs.WireSet,
|
||||
secret.WireSet,
|
||||
)
|
||||
return &cliserver.System{}, nil
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
check2 "github.com/harness/gitness/internal/api/controller/check"
|
||||
"github.com/harness/gitness/internal/api/controller/execution"
|
||||
"github.com/harness/gitness/internal/api/controller/githook"
|
||||
logs2 "github.com/harness/gitness/internal/api/controller/logs"
|
||||
"github.com/harness/gitness/internal/api/controller/pipeline"
|
||||
"github.com/harness/gitness/internal/api/controller/principal"
|
||||
"github.com/harness/gitness/internal/api/controller/pullreq"
|
||||
|
@ -45,6 +46,7 @@ import (
|
|||
"github.com/harness/gitness/internal/store/database"
|
||||
"github.com/harness/gitness/internal/store/logs"
|
||||
"github.com/harness/gitness/internal/url"
|
||||
"github.com/harness/gitness/livelog"
|
||||
"github.com/harness/gitness/lock"
|
||||
"github.com/harness/gitness/pubsub"
|
||||
"github.com/harness/gitness/types"
|
||||
|
@ -92,9 +94,11 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
|
|||
}
|
||||
repoController := repo.ProvideController(config, db, provider, pathUID, authorizer, pathStore, repoStore, spaceStore, principalStore, gitrpcInterface)
|
||||
executionStore := database.ProvideExecutionStore(db)
|
||||
logStore := logs.ProvideLogStore(db, config)
|
||||
pipelineStore := database.ProvidePipelineStore(db)
|
||||
executionController := execution.ProvideController(db, authorizer, executionStore, logStore, pipelineStore, spaceStore)
|
||||
executionController := execution.ProvideController(db, authorizer, executionStore, pipelineStore, spaceStore)
|
||||
logStore := logs.ProvideLogStore(db, config)
|
||||
logStream := livelog.ProvideLogStream(config)
|
||||
logsController := logs2.ProvideController(db, authorizer, executionStore, pipelineStore, logStore, logStream, spaceStore)
|
||||
secretStore := database.ProvideSecretStore(db)
|
||||
spaceController := space.ProvideController(db, provider, pathUID, authorizer, pathStore, pipelineStore, secretStore, spaceStore, repoStore, principalStore, repoController, membershipStore)
|
||||
pipelineController := pipeline.ProvideController(db, pathUID, pathStore, repoStore, authorizer, pipelineStore, spaceStore)
|
||||
|
@ -157,7 +161,7 @@ func initSystem(ctx context.Context, config *types.Config) (*server.System, erro
|
|||
checkStore := database.ProvideCheckStore(db, principalInfoCache)
|
||||
checkController := check2.ProvideController(db, authorizer, repoStore, checkStore, gitrpcInterface)
|
||||
systemController := system.NewController(principalStore, config)
|
||||
apiHandler := router.ProvideAPIHandler(config, authenticator, repoController, executionController, spaceController, pipelineController, secretController, pullreqController, webhookController, githookController, serviceaccountController, controller, principalController, checkController, systemController)
|
||||
apiHandler := router.ProvideAPIHandler(config, authenticator, repoController, executionController, logsController, spaceController, pipelineController, secretController, pullreqController, webhookController, githookController, serviceaccountController, controller, principalController, checkController, systemController)
|
||||
gitHandler := router.ProvideGitHandler(config, provider, repoStore, authenticator, authorizer, gitrpcInterface)
|
||||
webHandler := router.ProvideWebHandler(config)
|
||||
routerRouter := router.ProvideRouter(config, apiHandler, gitHandler, webHandler)
|
||||
|
|
|
@ -7,7 +7,6 @@ package execution
|
|||
import (
|
||||
"github.com/harness/gitness/internal/auth/authz"
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/internal/store/logs"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
@ -16,7 +15,6 @@ type Controller struct {
|
|||
db *sqlx.DB
|
||||
authorizer authz.Authorizer
|
||||
executionStore store.ExecutionStore
|
||||
logStore logs.LogStore
|
||||
pipelineStore store.PipelineStore
|
||||
spaceStore store.SpaceStore
|
||||
}
|
||||
|
@ -25,7 +23,6 @@ func NewController(
|
|||
db *sqlx.DB,
|
||||
authorizer authz.Authorizer,
|
||||
executionStore store.ExecutionStore,
|
||||
logStore logs.LogStore,
|
||||
pipelineStore store.PipelineStore,
|
||||
spaceStore store.SpaceStore,
|
||||
) *Controller {
|
||||
|
@ -33,7 +30,6 @@ func NewController(
|
|||
db: db,
|
||||
authorizer: authorizer,
|
||||
executionStore: executionStore,
|
||||
logStore: logStore,
|
||||
pipelineStore: pipelineStore,
|
||||
spaceStore: spaceStore,
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ package execution
|
|||
import (
|
||||
"github.com/harness/gitness/internal/auth/authz"
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/internal/store/logs"
|
||||
|
||||
"github.com/google/wire"
|
||||
"github.com/jmoiron/sqlx"
|
||||
|
@ -21,9 +20,8 @@ var WireSet = wire.NewSet(
|
|||
func ProvideController(db *sqlx.DB,
|
||||
authorizer authz.Authorizer,
|
||||
executionStore store.ExecutionStore,
|
||||
logStore logs.LogStore,
|
||||
pipelineStore store.PipelineStore,
|
||||
spaceStore store.SpaceStore,
|
||||
) *Controller {
|
||||
return NewController(db, authorizer, executionStore, logStore, pipelineStore, spaceStore)
|
||||
return NewController(db, authorizer, executionStore, pipelineStore, spaceStore)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package logs
|
||||
|
||||
import (
|
||||
"github.com/harness/gitness/internal/auth/authz"
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/livelog"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type Controller struct {
|
||||
db *sqlx.DB
|
||||
authorizer authz.Authorizer
|
||||
executionStore store.ExecutionStore
|
||||
pipelineStore store.PipelineStore
|
||||
logStore store.LogStore
|
||||
logStream livelog.LogStream
|
||||
spaceStore store.SpaceStore
|
||||
}
|
||||
|
||||
func NewController(
|
||||
db *sqlx.DB,
|
||||
authorizer authz.Authorizer,
|
||||
executionStore store.ExecutionStore,
|
||||
pipelineStore store.PipelineStore,
|
||||
logStore store.LogStore,
|
||||
logStream livelog.LogStream,
|
||||
spaceStore store.SpaceStore,
|
||||
) *Controller {
|
||||
return &Controller{
|
||||
db: db,
|
||||
authorizer: authorizer,
|
||||
executionStore: executionStore,
|
||||
pipelineStore: pipelineStore,
|
||||
logStore: logStore,
|
||||
logStream: logStream,
|
||||
spaceStore: spaceStore,
|
||||
}
|
||||
}
|
|
@ -2,7 +2,7 @@
|
|||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package execution
|
||||
package logs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/harness/gitness/types/enum"
|
||||
)
|
||||
|
||||
func (c *Controller) FindLogs(
|
||||
func (c *Controller) Find(
|
||||
ctx context.Context,
|
||||
session *auth.Session,
|
||||
spaceRef string,
|
|
@ -0,0 +1,30 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package logs
|
||||
|
||||
import (
|
||||
"github.com/harness/gitness/internal/auth/authz"
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/livelog"
|
||||
|
||||
"github.com/google/wire"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
// WireSet provides a wire set for this package.
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideController,
|
||||
)
|
||||
|
||||
func ProvideController(db *sqlx.DB,
|
||||
authorizer authz.Authorizer,
|
||||
executionStore store.ExecutionStore,
|
||||
pipelineStore store.PipelineStore,
|
||||
logStore store.LogStore,
|
||||
logStream livelog.LogStream,
|
||||
spaceStore store.SpaceStore,
|
||||
) *Controller {
|
||||
return NewController(db, authorizer, executionStore, pipelineStore, logStore, logStream, spaceStore)
|
||||
}
|
|
@ -8,13 +8,13 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/harness/gitness/internal/api/controller/execution"
|
||||
"github.com/harness/gitness/internal/api/controller/logs"
|
||||
"github.com/harness/gitness/internal/api/render"
|
||||
"github.com/harness/gitness/internal/api/request"
|
||||
"github.com/harness/gitness/internal/paths"
|
||||
)
|
||||
|
||||
func HandleFindLogs(executionCtrl *execution.Controller) http.HandlerFunc {
|
||||
func HandleFind(logCtrl *logs.Controller) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
session, _ := request.AuthSessionFrom(ctx)
|
||||
|
@ -44,7 +44,7 @@ func HandleFindLogs(executionCtrl *execution.Controller) http.HandlerFunc {
|
|||
return
|
||||
}
|
||||
|
||||
rc, err := executionCtrl.FindLogs(
|
||||
rc, err := logCtrl.Find(
|
||||
ctx, session, spaceRef, pipelineUID,
|
||||
executionNum, stageNum, stepNum)
|
||||
if err != nil {
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/harness/gitness/internal/api/controller/check"
|
||||
"github.com/harness/gitness/internal/api/controller/execution"
|
||||
controllergithook "github.com/harness/gitness/internal/api/controller/githook"
|
||||
"github.com/harness/gitness/internal/api/controller/logs"
|
||||
"github.com/harness/gitness/internal/api/controller/pipeline"
|
||||
"github.com/harness/gitness/internal/api/controller/principal"
|
||||
"github.com/harness/gitness/internal/api/controller/pullreq"
|
||||
|
@ -26,6 +27,7 @@ import (
|
|||
handlercheck "github.com/harness/gitness/internal/api/handler/check"
|
||||
handlerexecution "github.com/harness/gitness/internal/api/handler/execution"
|
||||
handlergithook "github.com/harness/gitness/internal/api/handler/githook"
|
||||
handlerlogs "github.com/harness/gitness/internal/api/handler/logs"
|
||||
handlerpipeline "github.com/harness/gitness/internal/api/handler/pipeline"
|
||||
handlerprincipal "github.com/harness/gitness/internal/api/handler/principal"
|
||||
handlerpullreq "github.com/harness/gitness/internal/api/handler/pullreq"
|
||||
|
@ -70,6 +72,7 @@ func NewAPIHandler(
|
|||
authenticator authn.Authenticator,
|
||||
repoCtrl *repo.Controller,
|
||||
executionCtrl *execution.Controller,
|
||||
logCtrl *logs.Controller,
|
||||
spaceCtrl *space.Controller,
|
||||
pipelineCtrl *pipeline.Controller,
|
||||
secretCtrl *secret.Controller,
|
||||
|
@ -103,7 +106,7 @@ func NewAPIHandler(
|
|||
r.Use(middlewareauthn.Attempt(authenticator, authn.SourceRouterAPI))
|
||||
|
||||
r.Route("/v1", func(r chi.Router) {
|
||||
setupRoutesV1(r, repoCtrl, executionCtrl, pipelineCtrl, secretCtrl, spaceCtrl, pullreqCtrl, webhookCtrl, githookCtrl,
|
||||
setupRoutesV1(r, repoCtrl, executionCtrl, logCtrl, pipelineCtrl, secretCtrl, spaceCtrl, pullreqCtrl, webhookCtrl, githookCtrl,
|
||||
saCtrl, userCtrl, principalCtrl, checkCtrl, sysCtrl)
|
||||
})
|
||||
|
||||
|
@ -127,6 +130,7 @@ func corsHandler(config *types.Config) func(http.Handler) http.Handler {
|
|||
func setupRoutesV1(r chi.Router,
|
||||
repoCtrl *repo.Controller,
|
||||
executionCtrl *execution.Controller,
|
||||
logCtrl *logs.Controller,
|
||||
pipelineCtrl *pipeline.Controller,
|
||||
secretCtrl *secret.Controller,
|
||||
spaceCtrl *space.Controller,
|
||||
|
@ -141,7 +145,7 @@ func setupRoutesV1(r chi.Router,
|
|||
) {
|
||||
setupSpaces(r, spaceCtrl)
|
||||
setupRepos(r, repoCtrl, pullreqCtrl, webhookCtrl, checkCtrl)
|
||||
setupPipelines(r, pipelineCtrl, executionCtrl)
|
||||
setupPipelines(r, pipelineCtrl, executionCtrl, logCtrl)
|
||||
setupSecrets(r, secretCtrl)
|
||||
setupUser(r, userCtrl)
|
||||
setupServiceAccounts(r, saCtrl)
|
||||
|
@ -290,7 +294,11 @@ func setupRepos(r chi.Router,
|
|||
})
|
||||
}
|
||||
|
||||
func setupPipelines(r chi.Router, pipelineCtrl *pipeline.Controller, executionCtrl *execution.Controller) {
|
||||
func setupPipelines(
|
||||
r chi.Router,
|
||||
pipelineCtrl *pipeline.Controller,
|
||||
executionCtrl *execution.Controller,
|
||||
logCtrl *logs.Controller) {
|
||||
r.Route("/pipelines", func(r chi.Router) {
|
||||
// Create takes path and parentId via body, not uri
|
||||
r.Post("/", handlerpipeline.HandleCreate(pipelineCtrl))
|
||||
|
@ -298,7 +306,7 @@ func setupPipelines(r chi.Router, pipelineCtrl *pipeline.Controller, executionCt
|
|||
r.Get("/", handlerpipeline.HandleFind(pipelineCtrl))
|
||||
r.Patch("/", handlerpipeline.HandleUpdate(pipelineCtrl))
|
||||
r.Delete("/", handlerpipeline.HandleDelete(pipelineCtrl))
|
||||
setupExecutions(r, pipelineCtrl, executionCtrl)
|
||||
setupExecutions(r, pipelineCtrl, executionCtrl, logCtrl)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -315,7 +323,12 @@ func setupSecrets(r chi.Router, secretCtrl *secret.Controller) {
|
|||
})
|
||||
}
|
||||
|
||||
func setupExecutions(r chi.Router, pipelineCtrl *pipeline.Controller, executionCtrl *execution.Controller) {
|
||||
func setupExecutions(
|
||||
r chi.Router,
|
||||
pipelineCtrl *pipeline.Controller,
|
||||
executionCtrl *execution.Controller,
|
||||
logCtrl *logs.Controller,
|
||||
) {
|
||||
r.Route("/executions", func(r chi.Router) {
|
||||
r.Get("/", handlerexecution.HandleList(executionCtrl))
|
||||
r.Post("/", handlerexecution.HandleCreate(executionCtrl))
|
||||
|
@ -327,7 +340,7 @@ func setupExecutions(r chi.Router, pipelineCtrl *pipeline.Controller, executionC
|
|||
fmt.Sprintf("/logs/{%s}/{%s}",
|
||||
request.PathParamStageNumber,
|
||||
request.PathParamStepNumber,
|
||||
), handlerexecution.HandleFindLogs(executionCtrl))
|
||||
), handlerlogs.HandleFind(logCtrl))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/harness/gitness/internal/api/controller/check"
|
||||
"github.com/harness/gitness/internal/api/controller/execution"
|
||||
"github.com/harness/gitness/internal/api/controller/githook"
|
||||
"github.com/harness/gitness/internal/api/controller/logs"
|
||||
"github.com/harness/gitness/internal/api/controller/pipeline"
|
||||
"github.com/harness/gitness/internal/api/controller/principal"
|
||||
"github.com/harness/gitness/internal/api/controller/pullreq"
|
||||
|
@ -62,6 +63,7 @@ func ProvideAPIHandler(
|
|||
authenticator authn.Authenticator,
|
||||
repoCtrl *repo.Controller,
|
||||
executionCtrl *execution.Controller,
|
||||
logCtrl *logs.Controller,
|
||||
spaceCtrl *space.Controller,
|
||||
pipelineCtrl *pipeline.Controller,
|
||||
secretCtrl *secret.Controller,
|
||||
|
@ -74,7 +76,7 @@ func ProvideAPIHandler(
|
|||
checkCtrl *check.Controller,
|
||||
sysCtrl *system.Controller,
|
||||
) APIHandler {
|
||||
return NewAPIHandler(config, authenticator, repoCtrl, executionCtrl, spaceCtrl, pipelineCtrl, secretCtrl,
|
||||
return NewAPIHandler(config, authenticator, repoCtrl, executionCtrl, logCtrl, spaceCtrl, pipelineCtrl, secretCtrl,
|
||||
pullreqCtrl, webhookCtrl, githookCtrl, saCtrl, userCtrl, principalCtrl, checkCtrl, sysCtrl)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ package store
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/harness/gitness/types"
|
||||
|
@ -526,18 +525,4 @@ type (
|
|||
// Count the number of executions in a space
|
||||
Count(ctx context.Context, parentID int64) (int64, error)
|
||||
}
|
||||
|
||||
LogStore interface {
|
||||
// Find returns a log stream from the datastore.
|
||||
Find(ctx context.Context, stepID int64) (io.ReadCloser, error)
|
||||
|
||||
// Create writes copies the log stream from Reader r to the datastore.
|
||||
Create(ctx context.Context, stepID int64, r io.Reader) error
|
||||
|
||||
// Update copies the log stream from Reader r to the datastore.
|
||||
Update(ctx context.Context, stepID int64, r io.Reader) error
|
||||
|
||||
// Delete purges the log stream from the datastore.
|
||||
Delete(ctx context.Context, stepID int64) error
|
||||
}
|
||||
)
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package logs
|
||||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
)
|
||||
|
||||
// LogStore provides an interface for the persistent log store backend
|
||||
type LogStore interface {
|
||||
// Find returns a log stream from the datastore.
|
||||
Find(ctx context.Context, stepID int64) (io.ReadCloser, error)
|
|
@ -7,13 +7,15 @@ package logs
|
|||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/harness/gitness/internal/store"
|
||||
)
|
||||
|
||||
// NewCombined returns a new combined log store that will fallback
|
||||
// to a secondary log store when necessary. This can be useful when
|
||||
// migrating from database logs to s3, where logs for older builds
|
||||
// are still being stored in the database, and newer logs in s3.
|
||||
func NewCombined(primary, secondary LogStore) LogStore {
|
||||
func NewCombined(primary, secondary store.LogStore) store.LogStore {
|
||||
return &combined{
|
||||
primary: primary,
|
||||
secondary: secondary,
|
||||
|
@ -21,7 +23,7 @@ func NewCombined(primary, secondary LogStore) LogStore {
|
|||
}
|
||||
|
||||
type combined struct {
|
||||
primary, secondary LogStore
|
||||
primary, secondary store.LogStore
|
||||
}
|
||||
|
||||
func (s *combined) Find(ctx context.Context, step int64) (io.ReadCloser, error) {
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/harness/gitness/internal/store"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
|
@ -18,7 +20,7 @@ import (
|
|||
)
|
||||
|
||||
// NewS3Env returns a new S3 log store.
|
||||
func NewS3LogStore(bucket, prefix, endpoint string, pathStyle bool) LogStore {
|
||||
func NewS3LogStore(bucket, prefix, endpoint string, pathStyle bool) store.LogStore {
|
||||
disableSSL := false
|
||||
|
||||
if endpoint != "" {
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
package logs
|
||||
|
||||
import (
|
||||
"github.com/harness/gitness/internal/store"
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
"github.com/google/wire"
|
||||
|
@ -16,7 +17,7 @@ var WireSet = wire.NewSet(
|
|||
ProvideLogStore,
|
||||
)
|
||||
|
||||
func ProvideLogStore(db *sqlx.DB, config *types.Config) LogStore {
|
||||
func ProvideLogStore(db *sqlx.DB, config *types.Config) store.LogStore {
|
||||
s := NewDatabaseLogStore(db)
|
||||
if config.S3.Bucket != "" {
|
||||
p := NewS3LogStore(
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package livelog
|
||||
|
||||
import "context"
|
||||
|
||||
// Line represents a line in the logs.
|
||||
type Line struct {
|
||||
Number int `json:"pos"`
|
||||
Message string `json:"out"`
|
||||
Timestamp int64 `json:"time"`
|
||||
}
|
||||
|
||||
// LogStreamInfo provides internal stream information. This can
|
||||
// be used to monitor the number of registered streams and
|
||||
// subscribers.
|
||||
type LogStreamInfo struct {
|
||||
// Streams is a key-value pair where the key is the step
|
||||
// identifier, and the value is the count of subscribers
|
||||
// streaming the logs.
|
||||
Streams map[int64]int `json:"streams"`
|
||||
}
|
||||
|
||||
// LogStream manages a live stream of logs.
|
||||
type LogStream interface {
|
||||
// Create creates the log stream for the step ID.
|
||||
Create(context.Context, int64) error
|
||||
|
||||
// Delete deletes the log stream for the step ID.
|
||||
Delete(context.Context, int64) error
|
||||
|
||||
// Writes writes to the log stream.
|
||||
Write(context.Context, int64, *Line) error
|
||||
|
||||
// Tail tails the log stream.
|
||||
Tail(context.Context, int64) (<-chan *Line, <-chan error)
|
||||
|
||||
// Info returns internal stream information.
|
||||
Info(context.Context) *LogStreamInfo
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package livelog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// error returned when a stream is not registered with
|
||||
// the streamer.
|
||||
var errStreamNotFound = errors.New("stream: not found")
|
||||
|
||||
type streamer struct {
|
||||
sync.Mutex
|
||||
|
||||
streams map[int64]*stream
|
||||
}
|
||||
|
||||
// New returns a new in-memory log streamer.
|
||||
func NewMemory() LogStream {
|
||||
return &streamer{
|
||||
streams: make(map[int64]*stream),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamer) Create(ctx context.Context, id int64) error {
|
||||
s.Lock()
|
||||
s.streams[id] = newStream()
|
||||
s.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamer) Delete(ctx context.Context, id int64) error {
|
||||
s.Lock()
|
||||
stream, ok := s.streams[id]
|
||||
if ok {
|
||||
delete(s.streams, id)
|
||||
}
|
||||
s.Unlock()
|
||||
if !ok {
|
||||
return errStreamNotFound
|
||||
}
|
||||
return stream.close()
|
||||
}
|
||||
|
||||
func (s *streamer) Write(ctx context.Context, id int64, line *Line) error {
|
||||
s.Lock()
|
||||
stream, ok := s.streams[id]
|
||||
s.Unlock()
|
||||
if !ok {
|
||||
return errStreamNotFound
|
||||
}
|
||||
return stream.write(line)
|
||||
}
|
||||
|
||||
func (s *streamer) Tail(ctx context.Context, id int64) (<-chan *Line, <-chan error) {
|
||||
s.Lock()
|
||||
stream, ok := s.streams[id]
|
||||
s.Unlock()
|
||||
if !ok {
|
||||
return nil, nil
|
||||
}
|
||||
return stream.subscribe(ctx)
|
||||
}
|
||||
|
||||
func (s *streamer) Info(ctx context.Context) *LogStreamInfo {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
info := &LogStreamInfo{
|
||||
Streams: map[int64]int{},
|
||||
}
|
||||
for id, stream := range s.streams {
|
||||
stream.Lock()
|
||||
info.Streams[id] = len(stream.list)
|
||||
stream.Unlock()
|
||||
}
|
||||
return info
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package livelog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// this is the amount of items that are stored in memory
|
||||
// in the buffer. This should result in approximately 10kb
|
||||
// of memory allocated per-stream and per-subscriber, not
|
||||
// including any logdata stored in these structures.
|
||||
const bufferSize = 5000
|
||||
|
||||
type stream struct {
|
||||
sync.Mutex
|
||||
|
||||
hist []*Line
|
||||
list map[*subscriber]struct{}
|
||||
}
|
||||
|
||||
func newStream() *stream {
|
||||
return &stream{
|
||||
list: map[*subscriber]struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *stream) write(line *Line) error {
|
||||
s.Lock()
|
||||
s.hist = append(s.hist, line)
|
||||
for l := range s.list {
|
||||
l.publish(line)
|
||||
}
|
||||
// the history should not be unbounded. The history
|
||||
// slice is capped and items are removed in a FIFO
|
||||
// ordering when capacity is reached.
|
||||
if size := len(s.hist); size >= bufferSize {
|
||||
s.hist = s.hist[size-bufferSize:]
|
||||
}
|
||||
s.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stream) subscribe(ctx context.Context) (<-chan *Line, <-chan error) {
|
||||
sub := &subscriber{
|
||||
handler: make(chan *Line, bufferSize),
|
||||
closec: make(chan struct{}),
|
||||
}
|
||||
err := make(chan error)
|
||||
|
||||
s.Lock()
|
||||
for _, line := range s.hist {
|
||||
sub.publish(line)
|
||||
}
|
||||
s.list[sub] = struct{}{}
|
||||
s.Unlock()
|
||||
|
||||
go func() {
|
||||
defer close(err)
|
||||
select {
|
||||
case <-sub.closec:
|
||||
case <-ctx.Done():
|
||||
sub.close()
|
||||
}
|
||||
}()
|
||||
return sub.handler, err
|
||||
}
|
||||
|
||||
func (s *stream) close() error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for sub := range s.list {
|
||||
delete(s.list, sub)
|
||||
sub.close()
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package livelog
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type subscriber struct {
|
||||
sync.Mutex
|
||||
|
||||
handler chan *Line
|
||||
closec chan struct{}
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (s *subscriber) publish(line *Line) {
|
||||
select {
|
||||
case <-s.closec:
|
||||
case s.handler <- line:
|
||||
default:
|
||||
// lines are sent on a buffered channel. If there
|
||||
// is a slow consumer that is not processing events,
|
||||
// the buffered channel will fill and newer messages
|
||||
// are ignored.
|
||||
}
|
||||
}
|
||||
|
||||
func (s *subscriber) close() {
|
||||
s.Lock()
|
||||
if !s.closed {
|
||||
close(s.closec)
|
||||
s.closed = true
|
||||
}
|
||||
s.Unlock()
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
// Copyright 2022 Harness Inc. All rights reserved.
|
||||
// Use of this source code is governed by the Polyform Free Trial License
|
||||
// that can be found in the LICENSE.md file for this repository.
|
||||
|
||||
package livelog
|
||||
|
||||
import (
|
||||
"github.com/harness/gitness/types"
|
||||
|
||||
"github.com/google/wire"
|
||||
)
|
||||
|
||||
// WireSet provides a wire set for this package.
|
||||
var WireSet = wire.NewSet(
|
||||
ProvideLogStream,
|
||||
)
|
||||
|
||||
// ProvideLogStream provides an implementation of a logs streamer
|
||||
// TODO: Implement Redis backend once implemented and add the check in config
|
||||
func ProvideLogStream(config *types.Config) LogStream {
|
||||
return NewMemory()
|
||||
}
|
Loading…
Reference in New Issue