Skip to content

Commit

Permalink
Use legacySizeOfNull argument to determine the behavior of Spark size…
Browse files Browse the repository at this point in the history
… function (facebookincubator#10100)

Summary:
1) Spark `size` function's legacySizeOfNull is specified either by other functions
 (e.g., `array_size` function) or by configuration. However, in the existing
implementation, it only depends on the configuration property. This pr removes
the configuration property, then just uses the legacySizeOfNull arg passed from
framework like Gluten.
2) At Spark's analysis time, `array_size` is replaced with `size` function. And their
implementations are same actually. This pr removes duplicate code.

Pull Request resolved: facebookincubator#10100

Reviewed By: xiaoxmeng

Differential Revision: D58825758

Pulled By: bikramSingh91

fbshipit-source-id: 5e1c9679832dfb6ac7dd15e3c0c1d979d3219b93
  • Loading branch information
PHILO-HE committed Jul 1, 2024
1 parent 1cddb86 commit b33cdc0
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 170 deletions.
9 changes: 0 additions & 9 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,6 @@ class QueryConfig {
static constexpr const char* kPrestoArrayAggIgnoreNulls =
"presto.array_agg.ignore_nulls";

/// If false, size function returns null for null input.
static constexpr const char* kSparkLegacySizeOfNull =
"spark.legacy_size_of_null";

// The default number of expected items for the bloomfilter.
static constexpr const char* kSparkBloomFilterExpectedNumItems =
"spark.bloom_filter.expected_num_items";
Expand Down Expand Up @@ -632,11 +628,6 @@ class QueryConfig {
return get<int32_t>(kSpillableReservationGrowthPct, kDefaultPct);
}

bool sparkLegacySizeOfNull() const {
constexpr bool kDefault{true};
return get<bool>(kSparkLegacySizeOfNull, kDefault);
}

bool prestoArrayAggIgnoreNulls() const {
return get<bool>(kPrestoArrayAggIgnoreNulls, false);
}
Expand Down
17 changes: 7 additions & 10 deletions velox/docs/functions/spark/array.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ Array Functions
SELECT array_repeat(100, 0); -- []
SELECT array_repeat(100, -1); -- []

.. spark:function:: array_size(array(E)) -> integer
Returns the size of the array. ::
SELECT array_size(array(1, 2, 3)); -- 3
.. spark:function:: array_sort(array(E)) -> array(E)
Returns an array which has the sorted order of the input array(E). The elements of array(E) must
Expand Down Expand Up @@ -193,11 +187,14 @@ Array Functions
SELECT shuffle(array(0, 0, 0), 0); -- [0, 0, 0]
SELECT shuffle(array(1, NULL, 1, NULL, 2), 0); -- [2, 1, NULL, NULL, 1]

.. spark:function:: size(array(E)) -> bigint
.. spark:function:: size(array(E), legacySizeOfNull) -> integer
Returns the size of the array. Returns null for null input if `legacySizeOfNull`
is set to false. Otherwise, returns -1 for null input. ::

Returns the size of the array. Returns null for null input
if :doc:`spark.legacy_size_of_null <../../configs>` is set to false.
Otherwise, returns -1 for null input.
SELECT size(array(1, 2, 3), true); -- 3
SELECT size(NULL, true); -- -1
SELECT size(NULL, false); -- NULL

.. spark:function:: sort_array(array(E)) -> array(E)
Expand Down
11 changes: 7 additions & 4 deletions velox/docs/functions/spark/map.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ Map Functions
MAP(ARRAY['a', 'b', 'c'], ARRAY[1, 2, 3]),
(k, v1, v2) -> k || CAST(v1/v2 AS VARCHAR));

.. spark:function:: size(map(K,V)) -> bigint
.. spark:function:: size(map(K,V), legacySizeOfNull) -> integer
:noindex:

Returns the size of the input map. Returns null for null input
if :doc:`spark.legacy_size_of_null <../../configs>` is set to false.
Otherwise, returns -1 for null input.
Returns the size of the input map. Returns null for null input if ``legacySizeOfNull``
is set to false. Otherwise, returns -1 for null input. ::

SELECT size(map(array(1, 2), array(3, 4)), true); -- 2
SELECT size(NULL, true); -- -1
SELECT size(NULL, false); -- NULL
34 changes: 0 additions & 34 deletions velox/functions/sparksql/ArraySizeFunction.h

This file was deleted.

4 changes: 0 additions & 4 deletions velox/functions/sparksql/Register.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "velox/functions/prestosql/URLFunctions.h"
#include "velox/functions/sparksql/ArrayFlattenFunction.h"
#include "velox/functions/sparksql/ArrayMinMaxFunction.h"
#include "velox/functions/sparksql/ArraySizeFunction.h"
#include "velox/functions/sparksql/ArraySort.h"
#include "velox/functions/sparksql/Bitwise.h"
#include "velox/functions/sparksql/DateTimeFunctions.h"
Expand Down Expand Up @@ -171,9 +170,6 @@ inline void registerArrayMinMaxFunctions(const std::string& prefix) {
void registerFunctions(const std::string& prefix) {
registerAllSpecialFormGeneralFunctions();

registerFunction<sparksql::ArraySizeFunction, int32_t, Array<Any>>(
{prefix + "array_size"});

// Register size functions
registerSize(prefix + "size");

Expand Down
23 changes: 15 additions & 8 deletions velox/functions/sparksql/Size.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,40 @@ struct Size {
template <typename TInput>
FOLLY_ALWAYS_INLINE void initialize(
const std::vector<TypePtr>& /*inputTypes*/,
const core::QueryConfig& config,
const TInput* input /*input*/) {
legacySizeOfNull_ = config.sparkLegacySizeOfNull();
const core::QueryConfig& /*config*/,
const TInput* /*input*/,
const bool* legacySizeOfNull) {
if (legacySizeOfNull == nullptr) {
VELOX_USER_FAIL("Constant legacySizeOfNull is expected.");
}
legacySizeOfNull_ = *legacySizeOfNull;
}

template <typename TInput>
FOLLY_ALWAYS_INLINE bool callNullable(int32_t& out, const TInput* input) {
FOLLY_ALWAYS_INLINE bool callNullable(
int32_t& out,
const TInput* input,
const bool* /*legacySizeOfNull*/) {
if (input == nullptr) {
if (legacySizeOfNull_) {
out = -1;
return true;
} else {
return false;
}
return false;
}
out = input->size();
return true;
}

private:
// If true, returns -1 for null input. Otherwise, returns null.
bool legacySizeOfNull_;
};
} // namespace

void registerSize(const std::string& prefix) {
registerFunction<Size, int32_t, Array<Any>>({prefix});
registerFunction<Size, int32_t, Map<Any, Any>>({prefix});
registerFunction<Size, int32_t, Array<Any>, bool>({prefix});
registerFunction<Size, int32_t, Map<Any, Any>, bool>({prefix});
}

} // namespace facebook::velox::functions::sparksql
80 changes: 0 additions & 80 deletions velox/functions/sparksql/tests/ArraySizeTest.cpp

This file was deleted.

1 change: 0 additions & 1 deletion velox/functions/sparksql/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ add_executable(
ArrayGetTest.cpp
ArrayMaxTest.cpp
ArrayMinTest.cpp
ArraySizeTest.cpp
ArraySortTest.cpp
ArrayShuffleTest.cpp
ArgGeneratorTest.cpp
Expand Down
85 changes: 65 additions & 20 deletions velox/functions/sparksql/tests/SizeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
*/
#include <string>

#include <gtest/gtest.h>
#include <velox/core/QueryConfig.h>
#include <optional>
#include "velox/functions/sparksql/tests/SparkFunctionBaseTest.h"
#include "velox/type/Timestamp.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
Expand All @@ -28,9 +31,9 @@ class SizeTest : public SparkFunctionBaseTest {
std::function<vector_size_t(vector_size_t /* row */)> sizeAt =
[](vector_size_t row) { return 1 + row % 7; };

void testSize(VectorPtr vector, vector_size_t numRows) {
auto result =
evaluate<SimpleVector<int32_t>>("size(c0)", makeRowVector({vector}));
void testLegacySizeOfNull(VectorPtr vector, vector_size_t numRows) {
auto result = evaluate<SimpleVector<int32_t>>(
"size(c0, true)", makeRowVector({vector}));
for (vector_size_t i = 0; i < numRows; ++i) {
if (vector->isNullAt(i)) {
EXPECT_EQ(result->valueAt(i), -1) << "at " << i;
Expand All @@ -40,51 +43,93 @@ class SizeTest : public SparkFunctionBaseTest {
}
}

void testSizeLegacyNull(VectorPtr vector, vector_size_t numRows) {
auto result =
evaluate<SimpleVector<int32_t>>("size(c0)", makeRowVector({vector}));
void testSize(VectorPtr vector, vector_size_t numRows) {
auto result = evaluate<SimpleVector<int32_t>>(
"size(c0, false)", makeRowVector({vector}));
for (vector_size_t i = 0; i < numRows; ++i) {
EXPECT_EQ(result->isNullAt(i), vector->isNullAt(i)) << "at " << i;
}
}

void setConfig(std::string configStr, bool value) {
execCtx_.queryCtx()->testingOverrideConfigUnsafe({
{configStr, std::to_string(value)},
});
template <typename T>
int32_t testArraySize(const std::vector<std::optional<T>>& input) {
auto row = makeRowVector({makeNullableArrayVector(
std::vector<std::vector<std::optional<T>>>{input})});
return evaluateOnce<int32_t>("size(c0, false)", row).value();
}

static inline vector_size_t valueAt(vector_size_t idx) {
return idx + 1;
}
};

TEST_F(SizeTest, sizetest) {
// Ensure that out is set to -1 for null input if legacySizeOfNull = true.
TEST_F(SizeTest, legacySizeOfNull) {
vector_size_t numRows = 100;
auto arrayVector =
makeArrayVector<int64_t>(numRows, sizeAt, valueAt, nullptr);
testSize(arrayVector, numRows);
testLegacySizeOfNull(arrayVector, numRows);
arrayVector =
makeArrayVector<int64_t>(numRows, sizeAt, valueAt, nullEvery(5));
testSize(arrayVector, numRows);
testLegacySizeOfNull(arrayVector, numRows);
auto mapVector = makeMapVector<int64_t, int64_t>(
numRows, sizeAt, valueAt, valueAt, nullptr);
testSize(mapVector, numRows);
testLegacySizeOfNull(mapVector, numRows);
mapVector = makeMapVector<int64_t, int64_t>(
numRows, sizeAt, valueAt, valueAt, nullEvery(5));
testSize(mapVector, numRows);
testLegacySizeOfNull(mapVector, numRows);
}

// Ensure that out if set to -1 if SparkLegacySizeOfNull is specified.
TEST_F(SizeTest, legacySizeOfNull) {
// Ensure that out is set to null for null input if legacySizeOfNull = false.
TEST_F(SizeTest, size) {
vector_size_t numRows = 100;
setConfig(core::QueryConfig::kSparkLegacySizeOfNull, false);
auto arrayVector =
makeArrayVector<int64_t>(numRows, sizeAt, valueAt, nullEvery(1));
testSizeLegacyNull(arrayVector, numRows);
testSize(arrayVector, numRows);
auto mapVector = makeMapVector<int64_t, int64_t>(
numRows, sizeAt, valueAt, valueAt, nullEvery(1));
testSizeLegacyNull(mapVector, numRows);
testSize(mapVector, numRows);
}

TEST_F(SizeTest, boolean) {
EXPECT_EQ(testArraySize<bool>({true, false}), 2);
EXPECT_EQ(testArraySize<bool>({true}), 1);
EXPECT_EQ(testArraySize<bool>({}), 0);
EXPECT_EQ(testArraySize<bool>({true, false, true, std::nullopt}), 4);
}

TEST_F(SizeTest, smallint) {
EXPECT_EQ(testArraySize<int8_t>({}), 0);
EXPECT_EQ(testArraySize<int8_t>({1}), 1);
EXPECT_EQ(testArraySize<int8_t>({std::nullopt}), 1);
EXPECT_EQ(testArraySize<int8_t>({std::nullopt, 1}), 2);
}

TEST_F(SizeTest, real) {
EXPECT_EQ(testArraySize<float>({}), 0);
EXPECT_EQ(testArraySize<float>({1.1}), 1);
EXPECT_EQ(testArraySize<float>({std::nullopt}), 1);
EXPECT_EQ(testArraySize<float>({std::nullopt, 1.1}), 2);
}

TEST_F(SizeTest, varchar) {
EXPECT_EQ(testArraySize<std::string>({"red", "blue"}), 2);
EXPECT_EQ(
testArraySize<std::string>({std::nullopt, "blue", "yellow", "orange"}),
4);
EXPECT_EQ(testArraySize<std::string>({}), 0);
EXPECT_EQ(testArraySize<std::string>({std::nullopt}), 1);
}

TEST_F(SizeTest, integer) {
EXPECT_EQ(testArraySize<int32_t>({1, 2}), 2);
}

TEST_F(SizeTest, timestamp) {
auto ts = [](int64_t micros) { return Timestamp::fromMicros(micros); };
EXPECT_EQ(testArraySize<Timestamp>({}), 0);
EXPECT_EQ(testArraySize<Timestamp>({std::nullopt}), 1);
EXPECT_EQ(testArraySize<Timestamp>({ts(0), ts(1)}), 2);
}

} // namespace facebook::velox::functions::sparksql::test

0 comments on commit b33cdc0

Please sign in to comment.