🔥 Feature (v3): Add buffered streaming support (#3131)

* 🔥 Feature: Add SendStreamWriter to Ctx

Create a new `*DefaultCtx` method called `SendStreamWriter()`
that maps to fasthttp's `Response.SetBodyStreamWriter()`

* 🚨 Test: Validate regular use of c.SendStreamWriter()

- Adds Test_Ctx_SendStreamWriter to ctx_test.go

* 🚨 Test: (WIP) Validate interrupted use of c.SendStreamWriter()

- Adds Test_Ctx_SendStreamWriter_Interrupted to ctx_test.go
    - (Work-In-Progress) This test verifies that some data is
      still sent before a client disconnects when using the method
      `c.SendStreamWriter()`.

**Note:** Running this test reports a race condition when using
the `-race` flag or running `make test`. The test uses a channel
and mutex to prevent race conditions, but still triggers a warning.

* 📚 Doc: Add `SendStreamWriter` to docs/api/ctx.md

* 🩹 Fix: Remove race condition in Test_Ctx_SendStreamWriter_Interrupted

* 🎨 Styles: Update ctx_test.go to respect golangci-lint

* 📚 Doc: Update /docs/api/ctx.md to show proper `w.Flush()` error handling

* 📚 Doc: Add SendStreamWriter details to docs/whats_new.md

* 🎨 Styles: Update /docs/whats_new.md to respect markdownlint-cli2

* 🩹 Fix: Fix Fprintf syntax error in docs/whats_new.md

---------

Co-authored-by: M. Efe Çetin <efectn@protonmail.com>
pull/3218/head^2
Giovanni Rivera 2024-11-27 02:11:56 -08:00 committed by GitHub
parent ff55cfd7c7
commit 31a503f699
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 174 additions and 0 deletions

8
ctx.go
View File

@ -5,6 +5,7 @@
package fiber
import (
"bufio"
"bytes"
"context"
"crypto/tls"
@ -1671,6 +1672,13 @@ func (c *DefaultCtx) SendStream(stream io.Reader, size ...int) error {
return nil
}
// SendStreamWriter sets response body stream writer
func (c *DefaultCtx) SendStreamWriter(streamWriter func(*bufio.Writer)) error {
c.fasthttp.Response.SetBodyStreamWriter(fasthttp.StreamWriter(streamWriter))
return nil
}
// Set sets the response's HTTP header field to the specified key, value.
func (c *DefaultCtx) Set(key, val string) {
c.fasthttp.Response.Header.Set(key, val)

View File

@ -3,6 +3,7 @@
package fiber
import (
"bufio"
"context"
"crypto/tls"
"io"
@ -283,6 +284,8 @@ type Ctx interface {
SendString(body string) error
// SendStream sets response body stream and optional body size.
SendStream(stream io.Reader, size ...int) error
// SendStreamWriter sets response body stream writer
SendStreamWriter(streamWriter func(*bufio.Writer)) error
// Set sets the response's HTTP header field to the specified key, value.
Set(key, val string)
setCanonical(key, val string)

View File

@ -4447,6 +4447,71 @@ func Test_Ctx_SendStream(t *testing.T) {
require.Equal(t, "Hello bufio", string(c.Response().Body()))
}
// go test -run Test_Ctx_SendStreamWriter
func Test_Ctx_SendStreamWriter(t *testing.T) {
t.Parallel()
app := New()
c := app.AcquireCtx(&fasthttp.RequestCtx{})
err := c.SendStreamWriter(func(w *bufio.Writer) {
w.WriteString("Don't crash please") //nolint:errcheck, revive // It is fine to ignore the error
})
require.NoError(t, err)
require.Equal(t, "Don't crash please", string(c.Response().Body()))
err = c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck, revive // It is fine to ignore the error
if err := w.Flush(); err != nil {
t.Errorf("unexpected error: %s", err)
return
}
}
})
require.NoError(t, err)
require.Equal(t, "Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n", string(c.Response().Body()))
err = c.SendStreamWriter(func(_ *bufio.Writer) {})
require.NoError(t, err)
require.Empty(t, c.Response().Body())
}
// go test -run Test_Ctx_SendStreamWriter_Interrupted
func Test_Ctx_SendStreamWriter_Interrupted(t *testing.T) {
t.Parallel()
app := New()
app.Get("/", func(c Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
for lineNum := 1; lineNum <= 5; lineNum++ {
fmt.Fprintf(w, "Line %d\n", lineNum) //nolint:errcheck // It is fine to ignore the error
if err := w.Flush(); err != nil {
if lineNum < 3 {
t.Errorf("unexpected error: %s", err)
}
return
}
time.Sleep(400 * time.Millisecond)
}
})
})
req := httptest.NewRequest(MethodGet, "/", nil)
testConfig := TestConfig{
Timeout: 1 * time.Second,
FailOnTimeout: false,
}
resp, err := app.Test(req, testConfig)
require.NoError(t, err)
body, err := io.ReadAll(resp.Body)
t.Logf("%v", err)
require.EqualError(t, err, "unexpected EOF")
require.Equal(t, "Line 1\nLine 2\nLine 3\n", string(body))
}
// go test -run Test_Ctx_Set
func Test_Ctx_Set(t *testing.T) {
t.Parallel()

View File

@ -1852,6 +1852,66 @@ app.Get("/", func(c fiber.Ctx) error {
})
```
## SendStreamWriter
Sets the response body stream writer.
:::note
The argument `streamWriter` represents a function that populates
the response body using a buffered stream writer.
:::
```go title="Signature"
func (c Ctx) SendStreamWriter(streamWriter func(*bufio.Writer)) error
```
```go title="Example"
app.Get("/", func (c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
fmt.Fprintf(w, "Hello, World!\n")
})
// => "Hello, World!"
})
```
:::info
To send data before `streamWriter` returns, you can call `w.Flush()`
on the provided writer. Otherwise, the buffered stream flushes after
`streamWriter` returns.
:::
:::note
`w.Flush()` will return an error if the client disconnects before `streamWriter` finishes writing a response.
:::
```go title="Example"
app.Get("/wait", func(c fiber.Ctx) error {
return c.SendStreamWriter(func(w *bufio.Writer) {
// Begin Work
fmt.Fprintf(w, "Please wait for 10 seconds\n")
if err := w.Flush(); err != nil {
log.Print("Client disconnected!")
return
}
// Send progress over time
time.Sleep(time.Second)
for i := 0; i < 9; i++ {
fmt.Fprintf(w, "Still waiting...\n")
if err := w.Flush(); err != nil {
// If client disconnected, cancel work and finish
log.Print("Client disconnected!")
return
}
time.Sleep(time.Second)
}
// Finish
fmt.Fprintf(w, "Done!\n")
})
})
```
## Set
Sets the responses HTTP header field to the specified `key`, `value`.

View File

@ -268,6 +268,7 @@ DRAFT section
- Reset
- Schema -> ExpressJs like
- SendStream -> ExpressJs like
- SendStreamWriter
- SendString -> ExpressJs like
- String -> ExpressJs like
- ViewBind -> instead of Bind
@ -296,6 +297,43 @@ DRAFT section
- UserContext has been renamed to Context which returns a context.Context object.
- SetUserContext has been renamed to SetContext.
### SendStreamWriter
In v3, we added support for buffered streaming by providing the new method `SendStreamWriter()`.
```go
func (c Ctx) SendStreamWriter(streamWriter func(w *bufio.Writer))
```
With this new method, you can implement:
- Server-Side Events (SSE)
- Large file downloads
- Live data streaming
```go
app.Get("/sse", func(c fiber.Ctx) {
c.Set("Content-Type", "text/event-stream")
c.Set("Cache-Control", "no-cache")
c.Set("Connection", "keep-alive")
c.Set("Transfer-Encoding", "chunked")
return c.SendStreamWriter(func(w *bufio.Writer) {
for {
fmt.Fprintf(w, "event: my-event\n")
fmt.Fprintf(w, "data: Hello SSE\n\n")
if err := w.Flush(); err != nil {
log.Print("Client disconnected!")
return
}
}
})
})
```
You can find more details about this feature in [/docs/api/ctx.md](./api/ctx.md).
---
## 🌎 Client package