Update prefork logic

pull/507/head
Fenny 2020-06-23 16:56:48 +02:00
parent 37ebde8b86
commit 6638efafa7
1 changed files with 105 additions and 35 deletions

140
app.go
View File

@ -7,6 +7,7 @@ package fiber
import (
"bufio"
"crypto/tls"
"errors"
"flag"
"fmt"
"log"
@ -14,6 +15,7 @@ import (
"net/http"
"net/http/httputil"
"os"
"os/exec"
"reflect"
"runtime"
"sort"
@ -26,7 +28,7 @@ import (
utils "github.com/gofiber/utils"
colorable "github.com/segrey/go-colorable"
fasthttp "github.com/valyala/fasthttp"
fprefork "github.com/valyala/fasthttp/prefork"
"github.com/valyala/fasthttp/reuseport"
)
// Version of current package
@ -210,17 +212,6 @@ var (
defaultCompressedFileSuffix = ".fiber.gz"
)
var (
preforkFlag = "-prefork"
preforkEnabled bool
)
func init() { //nolint:gochecknoinits
// Definition flag to not break the program when the user adds their own flags
// and runs `flag.Parse()`
flag.BoolVar(&preforkEnabled, preforkFlag[1:], false, "use prefork")
}
// New creates a new Fiber named instance.
// You can pass optional settings when creating a new instance.
func New(settings ...*Settings) *App {
@ -265,7 +256,7 @@ func New(settings ...*Settings) *App {
}
if !app.Settings.Prefork { // Default to -prefork flag if false
app.Settings.Prefork = utils.GetArgument(preforkFlag)
app.Settings.Prefork = utils.GetArgument(flagPrefork)
}
// Replace unsafe conversion functions
if app.Settings.Immutable {
@ -451,16 +442,7 @@ func (app *App) Listen(address interface{}, tlsconfig ...*tls.Config) error {
}
// Start prefork
if app.Settings.Prefork {
pf := fprefork.New(app.server) // fasthttp/prefork
pf.Reuseport = true
pf.Network = "tcp4"
pf.ServeFunc = func(lnn net.Listener) error {
if len(tlsconfig) > 0 {
lnn = tls.NewListener(lnn, tlsconfig[0])
}
return app.server.Serve(lnn)
}
return pf.ListenAndServe(addr)
return app.prefork(addr, tlsconfig...)
}
// Setup listener
ln, err := net.Listen("tcp4", addr)
@ -471,10 +453,6 @@ func (app *App) Listen(address interface{}, tlsconfig ...*tls.Config) error {
if len(tlsconfig) > 0 {
ln = tls.NewListener(ln, tlsconfig[0])
}
// // Print startup message
// if !app.Settings.DisableStartupMessage {
// app.startupMessage(ln.Addr().String())
// }
// Start listening
return app.server.Serve(ln)
}
@ -628,18 +606,110 @@ func (app *App) startupMessage(port string) {
// tabwriter makes sure the spacing are consistant across different values
// colorable handles the escape sequence for stdout using ascii color codes
out := tabwriter.NewWriter(colorable.NewColorableStdout(), 0, 8, 0, ' ', 0)
if fprefork.IsChild() {
fmt.Fprintf(out, "%sChild PID: %s#%v%s\n", cBlack, cGreen, os.Getpid(), cReset)
} else {
if !utils.GetArgument(flagChild) {
fmt.Fprintf(out, "%s ___ __ ___ __ \n|__ | |__) |__ |__)\n| | |__) |___ | \\", cGreen)
fmt.Fprintf(out, "%sv%s\n", cBlack, Version)
fmt.Fprintf(out, "PORT: %s%s%s \tRoutes: %s%v%s\n", cGreen, port, cBlack, cGreen, len(app.Routes()), cBlack)
fmt.Fprintf(out, "PORT: %s%s%s \tROUTES: %s%v%s\n", cGreen, port, cBlack, cGreen, len(app.Routes()), cBlack)
fmt.Fprintf(out, "PPID: %s%v%s \tPREFORK: %s%v%s\n", cGreen, os.Getppid(), cBlack, cGreen, app.Settings.Prefork, cBlack)
fmt.Fprintf(out, "OS: %s%v%s \tARCH: %s%v%s\n\n", cGreen, runtime.GOOS, cBlack, cGreen, runtime.GOARCH, cReset)
//fmt.Fprintf(out, "\n%sFiber v%s listening on %s%s", cGreen, Version, addr, cReset)
//fmt.Fprintf(out, "\n%sMain process #%v%s", cBlack, os.Getppid(), cReset)
//fmt.Printf(" _______ __\n ____ / ____(_) /_ ___ _____\n_____ / /_ / / __ \\/ _ \\/ ___/\n __ / __/ / / /_/ / __/ /\n /_/ /_/_.___/\\___/_/ v%s\n", Version)
// fmt.Printf("Started listening on %s\n", addr)
}
out.Flush()
}
var (
flagPrefork = "-prefork"
flagChild = "-prefork-child"
isPrefork bool
isChild bool
)
func init() { //nolint:gochecknoinits
// Definition flag to not break the program when the user adds their own flags
// and runs `flag.Parse()`
flag.BoolVar(&isPrefork, flagChild[1:], false, "use prefork")
flag.BoolVar(&isChild, flagPrefork[1:], false, "is child proc")
}
func (app *App) prefork(addr string, tlsconfig ...*tls.Config) (err error) {
var ln net.Listener
var network = "tcp4"
var threshold = runtime.GOMAXPROCS(0) / 2
// child process
if utils.GetArgument(flagChild) {
runtime.GOMAXPROCS(1)
ln, err = reuseport.Listen(network, addr)
if err != nil {
return err
}
if len(tlsconfig) > 0 {
ln = tls.NewListener(ln, tlsconfig[0])
}
return app.server.Serve(ln)
}
// master process
type procSig struct {
pid int
err error
}
goMaxProcs := runtime.GOMAXPROCS(0)
sigCh := make(chan procSig, goMaxProcs)
childProcs := make(map[int]*exec.Cmd)
defer func() {
for _, proc := range childProcs {
_ = proc.Process.Kill()
}
}()
out := tabwriter.NewWriter(colorable.NewColorableStdout(), 0, 8, 0, ' ', 0)
for i := 0; i < goMaxProcs; i++ {
/* #nosec G204 */
cmd := exec.Command(os.Args[0], append(os.Args[1:], flagChild)...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err = cmd.Start(); err != nil {
return fmt.Errorf("failed to start a child prefork process, error: %v\n", err)
}
childProcs[cmd.Process.Pid] = cmd
app.mutex.Lock()
fmt.Fprintf(out, "%sChild PID: %s#%v%s\n", cBlack, cGreen, cmd.Process.Pid, cReset)
app.mutex.Unlock()
out.Flush()
go func() {
sigCh <- procSig{cmd.Process.Pid, cmd.Wait()}
}()
}
var exitedProcs int
for sig := range sigCh {
delete(childProcs, sig.pid)
// fix this error handling
fmt.Printf("one of the child prefork processes exited with error: %v", sig.err)
if exitedProcs++; exitedProcs > threshold {
fmt.Printf("child prefork processes exit too many times, "+
"which exceeds the value of RecoverThreshold(%d), "+
"exiting the master process.\n", exitedProcs)
err = errors.New("exceeding the value of RecoverThreshold")
break
}
/* #nosec G204 */
cmd := exec.Command(os.Args[0], append(os.Args[1:], flagChild)...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err = cmd.Start(); err != nil {
break
}
childProcs[cmd.Process.Pid] = cmd
go func() {
sigCh <- procSig{cmd.Process.Pid, cmd.Wait()}
}()
}
return
}