diff --git a/internal/nbconn/nbconn.go b/internal/nbconn/nbconn.go index 22050dc7..054e9610 100644 --- a/internal/nbconn/nbconn.go +++ b/internal/nbconn/nbconn.go @@ -26,7 +26,9 @@ import ( var errClosed = errors.New("closed") var ErrWouldBlock = new(wouldBlockError) -const fakeNonblockingWaitDuration = 100 * time.Millisecond +const fakeNonblockingWriteWaitDuration = 100 * time.Millisecond +const minNonblockingReadWaitDuration = time.Microsecond +const maxNonblockingReadWaitDuration = 100 * time.Millisecond // NonBlockingDeadline is a magic value that when passed to Set[Read]Deadline places the connection in non-blocking read // mode. @@ -86,9 +88,10 @@ type NetConn struct { nonblockReadErr error nonblockReadN int - readDeadlineLock sync.Mutex - readDeadline time.Time - readNonblocking bool + readDeadlineLock sync.Mutex + readDeadline time.Time + readNonblocking bool + fakeNonblockingReadWaitDuration time.Duration writeDeadlineLock sync.Mutex writeDeadline time.Time @@ -96,7 +99,8 @@ type NetConn struct { func NewNetConn(conn net.Conn, fakeNonBlockingIO bool) *NetConn { nc := &NetConn{ - conn: conn, + conn: conn, + fakeNonblockingReadWaitDuration: maxNonblockingReadWaitDuration, } if !fakeNonBlockingIO { @@ -379,7 +383,7 @@ func (c *NetConn) fakeNonblockingWrite(b []byte) (n int, err error) { c.writeDeadlineLock.Lock() defer c.writeDeadlineLock.Unlock() - deadline := time.Now().Add(fakeNonblockingWaitDuration) + deadline := time.Now().Add(fakeNonblockingWriteWaitDuration) if c.writeDeadline.IsZero() || deadline.Before(c.writeDeadline) { err = c.conn.SetWriteDeadline(deadline) if err != nil { @@ -412,13 +416,30 @@ func (c *NetConn) fakeNonblockingRead(b []byte) (n int, err error) { c.readDeadlineLock.Lock() defer c.readDeadlineLock.Unlock() - deadline := time.Now().Add(fakeNonblockingWaitDuration) + startTime := time.Now() + deadline := startTime.Add(c.fakeNonblockingReadWaitDuration) if c.readDeadline.IsZero() || deadline.Before(c.readDeadline) { err = c.conn.SetReadDeadline(deadline) if err != nil { return 0, err } defer func() { + // If the read was successful and the wait duration is not already the minimum + if err == nil && c.fakeNonblockingReadWaitDuration > minNonblockingReadWaitDuration { + endTime := time.Now() + + // The wait duration should be 2x the fastest read that has occurred. This should give reasonable assurance that + // a Read deadline will not block a read before it has a chance to read data already in Go or the OS's receive + // buffer. + proposedWait := endTime.Sub(startTime) * 2 + if proposedWait < c.fakeNonblockingReadWaitDuration { + if proposedWait < minNonblockingReadWaitDuration { + proposedWait = minNonblockingReadWaitDuration + } + c.fakeNonblockingReadWaitDuration = proposedWait + } + } + // Ignoring error resetting deadline as there is nothing that can reasonably be done if it fails. c.conn.SetReadDeadline(c.readDeadline)