Skip to content

Commit

Permalink
add buffering for runner output capture
Browse files Browse the repository at this point in the history
Updating the step execution for every line can be extremely costly for jobs
producting millions of lines.

While this is an edge case (jobs should really not use stdout/stderr to output
massive amount of data), the consequences are severe:

1. Job execution time is dramatically impacted, since each line causes a query
to the database.

2. Each update produces a new line in the database (since PostgreSQL uses
MVCC), which results in an insane amount of dead tuples. As an example, a jobs
producing 5 megabytes of output could easily result in 20+ GB of dead tuples
in PostgreSQL. While the vacuum process ultimately frees that space, it is
simply not fast enough when updates are performed hundreds or thousand of
times per second.

Buffering aggressively for at least one second solves the problem at the
expense of memory usage.

An even better solution would be to update after X seconds and Y bytes, but
this requires a more sophisticated buffered reader.
  • Loading branch information
galdor committed Nov 7, 2022
1 parent e021fd8 commit de62e01
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ _Work in progress._
- Truncate the output of all step executions on the web interface. The current
limit is hardcoded to 1MB. Very large outputs cause performance issues both
on the server and in the web browser.
- Add buffering to runner output capture to avoid overloading the database for
jobs producing massive amounts of data. See the associated commit for more
information.

# v1.0.6
### Bug fixes
Expand Down
20 changes: 16 additions & 4 deletions pkg/eventline/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,12 @@ func (r *Runner) readOutput(se *StepExecution, output io.ReadCloser, name string
defer wg.Done()

bufferedOutput := bufio.NewReader(output)

var outputSize int
var line []byte

lastUpdate := time.Now()
updatePeriod := time.Duration(1 * time.Second)

for {
data, isPrefix, err := bufferedOutput.ReadLine()
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
Expand All @@ -420,24 +423,33 @@ func (r *Runner) readOutput(se *StepExecution, output io.ReadCloser, name string
}
}

if len(line) > 0 && line[len(line)-1] != '\n' {
line = append(line, '\n')
}

// There is no point in updating se.Output because we are not going to
// read it in the runner, so we may as well avoid allocating and
// copying data. This only works because se.Update does not modify the
// output column, so it will not be erased when updating the step
// execution later.

if len(line) > 0 {
err = r.UpdateStepExecutionOutput(se, append(line, '\n'))
isEOF := errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe)

if len(line) > 0 && (time.Since(lastUpdate) >= updatePeriod || isEOF) {
err = r.UpdateStepExecutionOutput(se, line)
if err != nil {
errChan <- fmt.Errorf("cannot update step execution %q: %v",
se.Id, err)
return
}

outputSize += len(line)
line = nil

lastUpdate = time.Now()
}

if errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) {
if isEOF {
break
}
}
Expand Down

0 comments on commit de62e01

Please sign in to comment.