Skip to content

Commit

Permalink
Optimize readWithVisitor for RleEncoding and NullableEncoding (#58)
Browse files Browse the repository at this point in the history
Summary:
X-link: facebookincubator/velox#9896

Pull Request resolved: #58

- Add fash path for `RleEncoding::readWithVisitor`
- Use `materializeBoolsAsBits` in `NullableEncoding::readWithVisitor`
- Merge `ChunkedBoolsDecoder` with `ChunkedDecoder`
- Optimize the data type dispatch in `EncodingUtils.h` to improve compilation time

bypass-github-export-checks

Reviewed By: oerling

Differential Revision: D57675525

fbshipit-source-id: 419c24d81007b22d92a555e1648ac97077aaeecb
  • Loading branch information
Yuhta authored and facebook-github-bot committed May 28, 2024
1 parent d924245 commit 8c8e73d
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 159 deletions.
41 changes: 27 additions & 14 deletions dwio/nimble/encodings/DictionaryEncoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,34 +126,40 @@ void DictionaryEncoding<T>::materialize(uint32_t rowCount, void* buffer) {

namespace detail {

class ExtractDictionaryIndices {
class DictionaryIndicesHook : public velox::ValueHook {
public:
static constexpr bool kSkipNulls = true;
using HookType = velox::dwio::common::NoHook;

ExtractDictionaryIndices(uint32_t* indices, velox::vector_size_t offset)
DictionaryIndicesHook(uint32_t* indices, vector_size_t offset)
: indices_(indices), offset_(offset) {}

bool acceptsNulls() const {
bool acceptsNulls() const final {
return false;
}

void addValue(velox::vector_size_t i, uint32_t value) {
indices_[i - offset_] = value;
void addValue(vector_size_t i, const void* value) final {
indices_[i - offset_] = *reinterpret_cast<const uint32_t*>(value);
}

template <typename T>
void addNull(velox::vector_size_t /*i*/) {
NIMBLE_UNREACHABLE(__PRETTY_FUNCTION__);
void addValues(
const vector_size_t* rows,
const void* values,
vector_size_t size,
uint8_t valueWidth) final {
NIMBLE_DASSERT(valueWidth == sizeof(uint32_t), "");
auto* indices = reinterpret_cast<const uint32_t*>(values);
for (vector_size_t i = 0; i < size; ++i) {
indices_[rows[i] - offset_] = indices[i];
}
}

HookType& hook() {
return velox::dwio::common::noHook();
void addNull(vector_size_t /*i*/) final {
NIMBLE_UNREACHABLE(__PRETTY_FUNCTION__);
}

private:
uint32_t* const indices_;
const velox::vector_size_t offset_;
const vector_size_t offset_;
};

} // namespace detail
Expand All @@ -163,18 +169,25 @@ template <typename V>
void DictionaryEncoding<T>::readWithVisitor(
V& visitor,
ReadWithVisitorParams& params) {
if constexpr (sizeof(T) < sizeof(uint32_t)) {
// Column reader values buffer is not large enough to hold indices in this
// case.
NIMBLE_UNREACHABLE(typeid(T).name());
}
const auto startRowIndex = visitor.rowIndex();
buffer_.resize(visitor.numRows() - startRowIndex);
velox::common::AlwaysTrue indicesFilter;
detail::DictionaryIndicesHook indicesHook(buffer_.data(), startRowIndex);
auto indicesVisitor = DecoderVisitor<
int32_t,
velox::common::AlwaysTrue,
detail::ExtractDictionaryIndices,
velox::dwio::common::ExtractToHook<detail::DictionaryIndicesHook>,
V::dense>(
indicesFilter,
&visitor.reader(),
velox::RowSet(visitor.rows(), visitor.numRows()),
detail::ExtractDictionaryIndices(buffer_.data(), startRowIndex));
velox::dwio::common::ExtractToHook<detail::DictionaryIndicesHook>(
&indicesHook));
indicesVisitor.setRowIndex(startRowIndex);
callReadWithVisitor(*indicesEncoding_, indicesVisitor, params);
this->template readWithVisitorSlow<false>(visitor, params, [&] {
Expand Down
139 changes: 116 additions & 23 deletions dwio/nimble/encodings/Encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ template <typename T, typename Filter, typename ExtractValues, bool kIsDense>
using DecoderVisitor =
velox::dwio::common::ColumnVisitor<T, Filter, ExtractValues, kIsDense>;

using vector_size_t = velox::vector_size_t;

// Extra parameters that need to be persisted/used during a single call of
// readWithVisitor at column reader level, which might span multiple calls of
// readWithVisitor (one per chunk) in decoders.
Expand All @@ -75,14 +77,18 @@ struct ReadWithVisitorParams {
// across potential mutliple chunks.
std::function<uint64_t*()> makeReaderNulls;

// Initialize `SelectiveColumnReader::returnReaderNulls_' field. Need to be
// called after decoding nulls in `NullableEncoding'.
std::function<void()> initReturnReaderNulls;

// Create the result nulls if not already exists. Similar to
// `makeReaderNulls', we create one single buffer for all the results nulls
// across potential multiple chunks during one read.
std::function<void()> prepareResultNulls;

// Number of rows scanned so far. Contains rows scanned in previous chunks
// during this read call as well.
velox::vector_size_t numScanned;
vector_size_t numScanned;
};

class Encoding {
Expand Down Expand Up @@ -342,13 +348,13 @@ class BufferedEncoding {
std::array<T, BufferSize> buffer_;
};

template <typename Visitor1, typename Visitor2>
void checkCurrentRowEqual(const Visitor1& v1, const Visitor2& v2) {
if (v1.atEnd()) {
NIMBLE_DASSERT(v2.atEnd(), "");
template <typename T, typename PhysicalType>
T castFromPhysicalType(const PhysicalType& value) {
if constexpr (isFloatingPointType<T>()) {
static_assert(sizeof(T) == sizeof(PhysicalType));
return reinterpret_cast<const T&>(value);
} else {
NIMBLE_DASSERT(!v2.atEnd(), "");
NIMBLE_DASSERT(v1.currentRow() == v2.currentRow(), "");
return value;
}
}

Expand All @@ -362,10 +368,7 @@ void readWithVisitorSlow(
constexpr bool kExtractToReader = std::is_same_v<
typename DecoderVisitor::Extract,
velox::dwio::common::ExtractToReader>;
const uint64_t* nulls = nullptr;
if (auto& nullsBuf = visitor.reader().nullsInReadRange()) {
nulls = nullsBuf->template as<uint64_t>();
}
auto* nulls = visitor.reader().rawNullsInReadRange();
if constexpr (kExtractToReader) {
params.prepareResultNulls();
}
Expand All @@ -378,32 +381,122 @@ void readWithVisitorSlow(
numNonNulls -=
velox::bits::countNulls(nulls, numScanned, visitor.currentRow());
}
skip(numNonNulls);
if (numNonNulls > 0) {
skip(numNonNulls);
}
numScanned = visitor.currentRow() + 1;
}
if (nulls && velox::bits::isBitNull(nulls, visitor.currentRow())) {
if (!visitor.allowNulls()) {
visitor.setRowIndex(visitor.rowIndex() + 1);
visitor.addRowIndex(1);
atEnd = visitor.atEnd();
} else if (kExtractToReader && visitor.reader().returnReaderNulls()) {
visitor.setRowIndex(visitor.rowIndex() + 1);
visitor.setNumValues(visitor.reader().numValues() + 1);
visitor.addRowIndex(1);
visitor.addNumValues(1);
atEnd = visitor.atEnd();
} else {
visitor.processNull(atEnd);
}
} else {
auto value = decodeOne();
if constexpr (isFloatingPointType<T>()) {
if constexpr (sizeof(T) != sizeof(value)) {
NIMBLE_UNREACHABLE(typeid(decltype(value)).name());
}
visitor.process(reinterpret_cast<const T&>(value), atEnd);
} else {
visitor.process(value, atEnd);
visitor.process(castFromPhysicalType<T>(decodeOne()), atEnd);
}
}
}

template <typename TEncoding, typename V>
void readWithVisitorFast(
TEncoding& encoding,
V& visitor,
ReadWithVisitorParams& params,
const uint64_t* nulls) {
constexpr bool kOutputNulls = !V::kHasFilter && !V::kHasHook;
const auto numRows = visitor.numRows() - visitor.rowIndex();
auto& outerRows = visitor.outerNonNullRows();
if (!nulls) {
encoding.template bulkScan<false>(
visitor,
params.numScanned,
visitor.rows() + visitor.rowIndex(),
numRows,
velox::iota(visitor.numRows(), outerRows) + visitor.rowIndex());
return;
}
// TODO: Store last non null index and num non-nulls so far in decoder to
// accelerate multi-chunk decoding.
const auto numNonNullsSoFar =
velox::bits::countNonNulls(nulls, 0, params.numScanned);
if constexpr (V::dense) {
NIMBLE_DASSERT(
!visitor.reader().hasNulls() || visitor.reader().returnReaderNulls(),
"");
outerRows.resize(numRows);
auto numNonNulls = velox::simd::indicesOfSetBits(
nulls, visitor.rowIndex(), visitor.numRows(), outerRows.data());
outerRows.resize(numNonNulls);
if (outerRows.empty()) {
if constexpr (kOutputNulls) {
visitor.addNumValues(numRows);
}
visitor.addRowIndex(numRows);
} else {
encoding.template bulkScan<true>(
visitor,
numNonNullsSoFar,
visitor.rows() + numNonNullsSoFar,
numNonNulls,
outerRows.data());
}
return;
}
auto& innerRows = visitor.innerNonNullRows();
int32_t tailSkip = -1;
uint64_t* resultNulls = nullptr;
uint8_t* chunkResultNulls = nullptr;
if constexpr (kOutputNulls) {
params.prepareResultNulls();
resultNulls = visitor.reader().rawResultNulls();
chunkResultNulls = reinterpret_cast<uint8_t*>(resultNulls) +
velox::bits::nbytes(visitor.rowIndex());
}
bool anyNulls =
velox::dwio::common::nonNullRowsFromSparse<V::kHasFilter, kOutputNulls>(
nulls,
velox::RowSet(visitor.rows() + visitor.rowIndex(), numRows),
innerRows,
outerRows,
chunkResultNulls,
tailSkip);
if (anyNulls) {
visitor.setHasNulls();
}
if (kOutputNulls && visitor.rowIndex() % 8 != 0) {
velox::bits::copyBits(
resultNulls,
velox::bits::roundUp(visitor.rowIndex(), 8),
resultNulls,
visitor.rowIndex(),
numRows);
}
if (!V::kHasFilter && visitor.rowIndex() > 0) {
for (auto& row : outerRows) {
row += visitor.rowIndex();
}
}
if (innerRows.empty()) {
if constexpr (kOutputNulls) {
visitor.addNumValues(numRows);
}
visitor.addRowIndex(numRows);
encoding.skip(tailSkip - numNonNullsSoFar);
} else {
encoding.template bulkScan<true>(
visitor,
numNonNullsSoFar,
innerRows.data(),
innerRows.size(),
outerRows.data());
encoding.skip(tailSkip);
}
}

} // namespace detail
Expand Down
Loading

0 comments on commit 8c8e73d

Please sign in to comment.