Skip to content

Commit

Permalink
apacheGH-40078: [C++] Import/Export ArrowDeviceArrayStream (apache#40807
Browse files Browse the repository at this point in the history
)

### Rationale for this change
The original PRs for adding support for importing and exporting the new C Device interface (apache#36488 / apache#36489) only added support for the Arrays themselves, not for the stream structure. We should support both.

### What changes are included in this PR?
Adding parallel functions for Import/Export of streams that accept `ArrowDeviceArrayStream`.

### Are these changes tested?
Test writing in progress, wanted to get this up for review while I write tests.

### Are there any user-facing changes?
No, only new functions have been added.

* GitHub Issue: apache#40078

Lead-authored-by: Matt Topol <[email protected]>
Co-authored-by: Felipe Oliveira Carvalho <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
4 people authored May 21, 2024
1 parent f0678ec commit 8169d6e
Show file tree
Hide file tree
Showing 13 changed files with 1,051 additions and 99 deletions.
8 changes: 8 additions & 0 deletions cpp/src/arrow/array/array_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ class ARROW_EXPORT Array {
/// \return Status
Status ValidateFull() const;

/// \brief Return the device_type that this array's data is allocated on
///
/// This just delegates to calling device_type on the underlying ArrayData
/// object which backs this Array.
///
/// \return DeviceAllocationType
DeviceAllocationType device_type() const { return data_->device_type(); }

protected:
Array() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(Array);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/array/array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ TEST_F(TestArray, TestMakeArrayOfNull) {
ASSERT_EQ(array->type(), type);
ASSERT_OK(array->ValidateFull());
ASSERT_EQ(array->length(), length);
ASSERT_EQ(array->device_type(), DeviceAllocationType::kCPU);
if (is_union(type->id())) {
ASSERT_EQ(array->null_count(), 0);
ASSERT_EQ(array->ComputeLogicalNullCount(), length);
Expand Down Expand Up @@ -719,6 +720,7 @@ TEST_F(TestArray, TestMakeArrayFromScalar) {
ASSERT_OK(array->ValidateFull());
ASSERT_EQ(array->length(), length);
ASSERT_EQ(array->null_count(), 0);
ASSERT_EQ(array->device_type(), DeviceAllocationType::kCPU);

// test case for ARROW-13321
for (int64_t i : {int64_t{0}, length / 2, length - 1}) {
Expand All @@ -744,6 +746,7 @@ TEST_F(TestArray, TestMakeArrayFromScalarSliced) {
auto sliced = array->Slice(1, 4);
ASSERT_EQ(sliced->length(), 4);
ASSERT_EQ(sliced->null_count(), 0);
ASSERT_EQ(array->device_type(), DeviceAllocationType::kCPU);
ARROW_EXPECT_OK(sliced->ValidateFull());
}
}
Expand All @@ -758,6 +761,7 @@ TEST_F(TestArray, TestMakeArrayFromDictionaryScalar) {
ASSERT_OK(array->ValidateFull());
ASSERT_EQ(array->length(), 4);
ASSERT_EQ(array->null_count(), 0);
ASSERT_EQ(array->device_type(), DeviceAllocationType::kCPU);

for (int i = 0; i < 4; i++) {
ASSERT_OK_AND_ASSIGN(auto item, array->GetScalar(i));
Expand Down Expand Up @@ -797,6 +801,7 @@ TEST_F(TestArray, TestMakeEmptyArray) {
ASSERT_OK_AND_ASSIGN(auto array, MakeEmptyArray(type));
ASSERT_OK(array->ValidateFull());
ASSERT_EQ(array->length(), 0);

CheckSpanRoundTrip(*array);
}
}
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,42 @@ int64_t ArrayData::ComputeLogicalNullCount() const {
return ArraySpan(*this).ComputeLogicalNullCount();
}

DeviceAllocationType ArrayData::device_type() const {
// we're using 0 as a sentinel value for NOT YET ASSIGNED
// there is explicitly no constant DeviceAllocationType to represent
// the "UNASSIGNED" case as it is invalid for data to not have an
// assigned device type. If it's still 0 at the end, then we return
// CPU as the allocation device type
int type = 0;
for (const auto& buf : buffers) {
if (!buf) continue;
if (type == 0) {
type = static_cast<int>(buf->device_type());
} else {
DCHECK_EQ(type, static_cast<int>(buf->device_type()));
}
}

for (const auto& child : child_data) {
if (!child) continue;
if (type == 0) {
type = static_cast<int>(child->device_type());
} else {
DCHECK_EQ(type, static_cast<int>(child->device_type()));
}
}

if (dictionary) {
if (type == 0) {
type = static_cast<int>(dictionary->device_type());
} else {
DCHECK_EQ(type, static_cast<int>(dictionary->device_type()));
}
}

return type == 0 ? DeviceAllocationType::kCPU : static_cast<DeviceAllocationType>(type);
}

// ----------------------------------------------------------------------
// Methods for ArraySpan

Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/array/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ struct ARROW_EXPORT ArrayData {
int64_t null_count = kUnknownNullCount, int64_t offset = 0)
: ArrayData(std::move(type), length, null_count, offset) {
this->buffers = std::move(buffers);
#ifndef NDEBUG
// in debug mode, call the `device_type` function to trigger
// the DCHECKs that validate all the buffers are on the same device
ARROW_UNUSED(this->device_type());
#endif
}

ArrayData(std::shared_ptr<DataType> type, int64_t length,
Expand All @@ -110,6 +115,12 @@ struct ARROW_EXPORT ArrayData {
: ArrayData(std::move(type), length, null_count, offset) {
this->buffers = std::move(buffers);
this->child_data = std::move(child_data);
#ifndef NDEBUG
// in debug mode, call the `device_type` function to trigger
// the DCHECKs that validate all the buffers (including children)
// are on the same device
ARROW_UNUSED(this->device_type());
#endif
}

static std::shared_ptr<ArrayData> Make(std::shared_ptr<DataType> type, int64_t length,
Expand Down Expand Up @@ -358,6 +369,16 @@ struct ARROW_EXPORT ArrayData {
/// \see GetNullCount
int64_t ComputeLogicalNullCount() const;

/// \brief Returns the device_type of the underlying buffers and children
///
/// If there are no buffers in this ArrayData object, it just returns
/// DeviceAllocationType::kCPU as a default. We also assume that all buffers
/// should be allocated on the same device type and perform DCHECKs to confirm
/// this in debug mode.
///
/// \return DeviceAllocationType
DeviceAllocationType device_type() const;

std::shared_ptr<DataType> type;
int64_t length = 0;
mutable std::atomic<int64_t> null_count{0};
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/array/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class NullArrayFactory {
}

Status Visit(const StructType& type) {
for (int i = 0; i < type_->num_fields(); ++i) {
for (int i = 0; i < type.num_fields(); ++i) {
ARROW_ASSIGN_OR_RAISE(out_->child_data[i], CreateChild(type, i, length_));
}
return Status::OK();
Expand Down
Loading

0 comments on commit 8169d6e

Please sign in to comment.