mirror of https://github.com/harness/drone.git
feat: [CDE-472]: handle stream closing gracefully, and simplify sync cmd execution (#3102)
* feat: [CDE-472]: handle stream closing gracefully, and simplify sync cmd execution * feat: [CDE-472]: handle stream closing gracefully, and simplify sync cmd executionpull/3597/head
parent
6f7e203251
commit
26408533e1
|
@ -60,39 +60,34 @@ func (e *Exec) ExecuteCommand(
|
|||
) (string, error) {
|
||||
containerExecCreate, err := e.createExecution(ctx, command, root, workingDir, false)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", fmt.Errorf("failed to create exec instance: %w", err)
|
||||
}
|
||||
// Attach and inspect exec session to get the output
|
||||
inspectExec, err := e.attachAndInspectExec(ctx, containerExecCreate.ID, false)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to start docker exec for container %s: %w", e.ContainerName, err)
|
||||
}
|
||||
var stdoutBuf bytes.Buffer
|
||||
var stderrBuf bytes.Buffer
|
||||
|
||||
stdoutData, err := io.ReadAll(inspectExec.StdOut)
|
||||
resp, err := e.DockerClient.ContainerExecAttach(
|
||||
ctx, containerExecCreate.ID, container.ExecStartOptions{Detach: false})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error reading stdout: %w", err)
|
||||
return "", fmt.Errorf("failed to attach to exec session: %w", err)
|
||||
}
|
||||
stdoutBuf.Write(stdoutData)
|
||||
stderrData, err := io.ReadAll(inspectExec.StdErr)
|
||||
defer resp.Close()
|
||||
|
||||
// Prepare buffers for stdout and stderr
|
||||
var stdoutBuf, stderrBuf bytes.Buffer
|
||||
|
||||
// Use stdcopy to demultiplex output
|
||||
_, err = stdcopy.StdCopy(&stdoutBuf, &stderrBuf, resp.Reader)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("error reading stderr: %w", err)
|
||||
return "", fmt.Errorf("error during stdcopy: %w", err)
|
||||
}
|
||||
stderrBuf.Write(stderrData)
|
||||
inspect, err := e.DockerClient.ContainerExecInspect(ctx, containerExecCreate.ID)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to inspect exec session: %w", err)
|
||||
}
|
||||
|
||||
// If the exit code is non-zero, return both stdout and stderr
|
||||
// Handle non-zero exit codes
|
||||
if inspect.ExitCode != 0 {
|
||||
// Combine stdout and stderr
|
||||
return fmt.Sprintf(
|
||||
"STDOUT:\n%s\nSTDERR:\n%s", stdoutBuf.String(), stderrBuf.String()),
|
||||
fmt.Errorf("command exited with non-zero status: %d", inspect.ExitCode)
|
||||
"STDOUT:\n%s\nSTDERR:\n%s", stdoutBuf.String(), stderrBuf.String(),
|
||||
), fmt.Errorf("command exited with non-zero status: %d", inspect.ExitCode)
|
||||
}
|
||||
// If the exit code is zero, only return stdout
|
||||
return stdoutBuf.String(), nil
|
||||
}
|
||||
|
||||
|
@ -179,7 +174,7 @@ func (e *Exec) attachAndInspectExec(ctx context.Context, id string, detach bool)
|
|||
stdoutPipe, stdoutWriter := io.Pipe()
|
||||
stderrPipe, stderrWriter := io.Pipe()
|
||||
|
||||
go e.copyOutput(resp.Reader, stdoutWriter, stderrWriter)
|
||||
go e.copyOutput(resp, stdoutWriter, stderrWriter)
|
||||
|
||||
// Return the output streams and the response
|
||||
return &execResult{
|
||||
|
@ -224,12 +219,17 @@ func (e *Exec) streamResponse(resp *execResult, outputCh chan []byte) {
|
|||
}()
|
||||
}
|
||||
|
||||
func (e *Exec) copyOutput(reader io.Reader, stdoutWriter, stderrWriter io.WriteCloser) {
|
||||
func (e *Exec) copyOutput(response dockerTypes.HijackedResponse, stdoutWriter, stderrWriter io.WriteCloser) {
|
||||
defer func() {
|
||||
stdoutWriter.Close()
|
||||
stderrWriter.Close()
|
||||
if err := stdoutWriter.Close(); err != nil {
|
||||
log.Error().Err(err).Msg("Error closing stdoutWriter")
|
||||
}
|
||||
if err := stderrWriter.Close(); err != nil {
|
||||
log.Error().Err(err).Msg("Error closing stderrWriter")
|
||||
}
|
||||
response.Close()
|
||||
}()
|
||||
_, err := stdcopy.StdCopy(stdoutWriter, stderrWriter, reader)
|
||||
_, err := stdcopy.StdCopy(stdoutWriter, stderrWriter, response.Reader)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Error in stdcopy.StdCopy " + err.Error())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue