Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement more efficient shift processing for column sources #6161

Open
rcaudy opened this issue Oct 1, 2024 · 1 comment
Open

Implement more efficient shift processing for column sources #6161

rcaudy opened this issue Oct 1, 2024 · 1 comment
Assignees
Labels
core Core development tasks feature request New feature or request query engine
Milestone

Comments

@rcaudy
Copy link
Member

rcaudy commented Oct 1, 2024

(This is from a discussion in community Slack.)

I think we can hybridize the approach used in update with the one used in update_by.

Something like this, maybe:

            if (upstream.shifted().nonempty()) {
                // Shift the non-redirected output sources now, after parallelPopulation.
                final int chunkSize = (int) Math.min(ArrayBackedColumnSource.BLOCK_SIZE, aggRecorder.getParent().size());
                try (final ChunkSource.FillContext srcContext = columnCopy.makeFillContext(chunkSize);
                     final ChunkSink.FillFromContext destContext = columnCopy.makeFillFromContext(chunkSize);
                     final WritableChunk<Values> chunk = columnCopy.getChunkType().makeWritableChunk(chunkSize);
                     final RowSet prevMinusRemoves = aggRecorder.getParent().getRowSet().prev().minus(upstream.removed())) {
                    upstream.shifted().apply((begin, end, delta) -> {
                        try (final RowSet preShiftKeys = prevMinusRemoves.subSetByKeyRange(begin, end);
                             final RowSequence.Iterator preShiftKeysIter = preShiftKeys.getRowSequenceIterator();
                             final RowSet postShiftKeys = preShiftKeys.shift(delta);
                             final RowSequence.Iterator postShiftKeysIter = postShiftKeys.getRowSequenceIterator()) {
                            while (preShiftKeysIter.hasMore()) {
                                final RowSequence preShiftKeysSlice = preShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
                                final RowSequence postShiftKeysSlice = postShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
                                columnCopy.fillPrevChunk(srcContext, chunk, preShiftKeysSlice);
                                columnCopy.fillFromChunk(destContext, chunk, postShiftKeysSlice);
                            }
                        }
                    });
                }
            }

For whatever reason we have two parallel tools for the null-filling. WritableColumnSource.setNull(RowSequence), and io.deephaven.engine.table.impl.util.ChunkUtils#fillWithNullValue. I slightly prefer the implementation in the second, which of course suggests that it should be the implementation of the first. The easier way is actually to do the null filling at the end, to minimize the work. You don’t want to null-fill as you shift, since you may be null-filling slots that are going to be overwritten anyway. Really, just take the previous row set minus the current row set (final WritableRowSet toClear = resultRowSet.prev().minus(resultRowSet);), and null-fill that.

@rcaudy rcaudy added feature request New feature or request query engine core Core development tasks labels Oct 1, 2024
@rcaudy rcaudy added this to the Backlog milestone Oct 1, 2024
@rcaudy rcaudy self-assigned this Oct 1, 2024
@rcaudy
Copy link
Member Author

rcaudy commented Oct 1, 2024

User supplied this version of same:

    TableUpdate aggUpdates = this.aggRecorder.getUpdate();
    if (aggUpdates == null) {
      aggUpdates = new TableUpdateImpl(RowSetFactory.empty(), RowSetFactory.empty(), RowSetFactory.empty(),
          RowSetShiftData.EMPTY, ModifiedColumnSet.EMPTY);
    }

    // We need to ensure we have enough capacity (not size) to handle up to the very
    // last row key in our parent.
    this.columnCopy.ensureCapacity(this.aggRecorder.getParent().getRowSet().lastRowKey() + 1);

    // Null out any removed keys.
    final TrackingRowSet resultRowSet = aggRecorder.getParent().getRowSet();
    try (final WritableRowSet toClear = resultRowSet.prev().minus(resultRowSet)) {
      ChunkUtils.fillWithNullValue(this.columnCopy, toClear);
    }

    // Handle shifts
    if (aggUpdates.shifted().nonempty()) {
      // Shift the non-redirected output sources now, after parallelPopulation.
      final int chunkSize = (int) Math.min(ArrayBackedColumnSource.BLOCK_SIZE, aggRecorder.getParent().size());
      try (final ChunkSource.FillContext srcContext = columnCopy.makeFillContext(chunkSize);
          final ChunkSink.FillFromContext destContext = columnCopy.makeFillFromContext(chunkSize);
          final WritableChunk<Values> chunk = columnCopy.getChunkType().makeWritableChunk(chunkSize);
          final RowSet prevMinusRemoves = aggRecorder.getParent().getRowSet().prev().minus(aggUpdates.removed())) {
        aggUpdates.shifted().apply((begin, end, delta) -> {
          try (final RowSet preShiftKeys = prevMinusRemoves.subSetByKeyRange(begin, end);
              final RowSequence.Iterator preShiftKeysIter = preShiftKeys.getRowSequenceIterator();
              final RowSet postShiftKeys = preShiftKeys.shift(delta);
              final RowSequence.Iterator postShiftKeysIter = postShiftKeys.getRowSequenceIterator()) {
            while (preShiftKeysIter.hasMore()) {
              final RowSequence preShiftKeysSlice = preShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
              final RowSequence postShiftKeysSlice = postShiftKeysIter.getNextRowSequenceWithLength(chunkSize);
              columnCopy.fillPrevChunk(srcContext, chunk, preShiftKeysSlice);
              columnCopy.fillFromChunk(destContext, chunk, postShiftKeysSlice);
            }
          }
        });
      }
    }

    // Copy in any adds/modifies from our upstream to our custom column source.
    try (final RowSet added = aggUpdates.added().union(aggUpdates.modified())) {
      ChunkUtils.copyData(this.column, added, this.columnCopy, added, false);
    }

This had two flaws:

  1. You need to do the null filling after you apply the shifts.
  2. Oh, and the way you have this written, you can remove the mods from the row set to shift.
        final RowSet prevMinusRemoves = aggRecorder.getParent().getRowSet().prev().minus(aggUpdates.removed());
        final RowSet prevMinusRemovesMinusMods = prevMinusRemoves.minus(aggUpdates.getModifiedPreShift())) {
            aggUpdates.shifted().apply((begin, end, delta) -> {
                        try (final RowSet preShiftKeys = prevMinusRemovesMinusMods.subSetByKeyRange(begin, end);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core development tasks feature request New feature or request query engine
Projects
None yet
Development

No branches or pull requests

1 participant