Skip to content

Commit

Permalink
[Python/Native] Support Python read pk table (#503)
Browse files Browse the repository at this point in the history
* add pk table read support for python

Signed-off-by: chenxu <[email protected]>

* fix c bindings error of unknown symbol

Signed-off-by: chenxu <[email protected]>

* fix pytorch distributed initialization error

Signed-off-by: chenxu <[email protected]>

* upgrade pyarrow to 16

Signed-off-by: chenxu <[email protected]>

* upgrade python docs

Signed-off-by: chenxu <[email protected]>

* fix issues with pyarrow 16

Signed-off-by: chenxu <[email protected]>

* fix object store configs

Signed-off-by: chenxu <[email protected]>

* fix doc links for python packages

Signed-off-by: chenxu <[email protected]>

---------

Signed-off-by: chenxu <[email protected]>
Co-authored-by: chenxu <[email protected]>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Jul 5, 2024
1 parent 2f28639 commit 4b86110
Show file tree
Hide file tree
Showing 34 changed files with 529 additions and 226 deletions.
6 changes: 2 additions & 4 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ add_library(libarrow SHARED IMPORTED)
set_target_properties(libarrow PROPERTIES
IMPORTED_LOCATION "${PYARROW_DIR}/libarrow.so.${PYARROW_ABI_TAG}"
INTERFACE_INCLUDE_DIRECTORIES "${PYARROW_DIR}/include"
INTERFACE_COMPILE_DEFINITIONS "_GLIBCXX_USE_CXX11_ABI=0")
INTERFACE_COMPILE_DEFINITIONS "_GLIBCXX_USE_CXX11_ABI=1")

add_library(libarrow_dataset SHARED IMPORTED)
set_target_properties(libarrow_dataset PROPERTIES
IMPORTED_LOCATION "${PYARROW_DIR}/libarrow_dataset.so.${PYARROW_ABI_TAG}"
INTERFACE_INCLUDE_DIRECTORIES "${PYARROW_DIR}/include"
INTERFACE_COMPILE_DEFINITIONS "_GLIBCXX_USE_CXX11_ABI=0"
INTERFACE_COMPILE_DEFINITIONS "_GLIBCXX_USE_CXX11_ABI=1"
INTERFACE_LINK_LIBRARIES "libarrow")

add_library(libarrow_python SHARED IMPORTED)
Expand All @@ -151,13 +151,11 @@ set_target_properties(libarrow_python PROPERTIES

# Generate .py from .proto
add_custom_command(OUTPUT ${PROJECT_BINARY_DIR}/python/lakesoul/metadata/generated/entity_pb2.py
${PROJECT_BINARY_DIR}/python/lakesoul/metadata/generated/entity_pb2_grpc.py
COMMAND ${CMAKE_COMMAND} -E make_directory
${PROJECT_BINARY_DIR}/python/lakesoul/metadata/generated
COMMAND ${VENV_PYTHON} -m grpc.tools.protoc
-I=${PROJECT_SOURCE_DIR}/../rust/proto/src
--python_out=${PROJECT_BINARY_DIR}/python/lakesoul/metadata/generated
--grpc_python_out=${PROJECT_BINARY_DIR}/python/lakesoul/metadata/generated
${PROJECT_SOURCE_DIR}/../rust/proto/src/entity.proto
MAIN_DEPENDENCY ${PROJECT_SOURCE_DIR}/../rust/proto/src/entity.proto)

Expand Down
3 changes: 2 additions & 1 deletion cpp/build-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
#
# SPDX-License-Identifier: Apache-2.0

pyarrow~=12.0
pyarrow==16.1.0
numpy<2
cython>=0.29.31,<3
grpcio[protobuf]~=1.57
auditwheel~=5.4
4 changes: 3 additions & 1 deletion cpp/compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,7 @@ rm -rf $(dirname ${BASH_SOURCE[0]})/build
cmake -S $(dirname ${BASH_SOURCE[0]}) \
-B $(dirname ${BASH_SOURCE[0]})/build \
-G Ninja -DCMAKE_BUILD_TYPE=Release \
-DPython_EXECUTABLE=$(which python)
-DPython_EXECUTABLE=$(which python) \
-DCMAKE_VERBOSE_MAKEFILE:BOOL=ON \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON
cmake --build $(dirname ${BASH_SOURCE[0]})/build
6 changes: 6 additions & 0 deletions cpp/include/lakesoul/lakesoul_data_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class LakeSoulDataReader : public std::enable_shared_from_this<LakeSoulDataReade
public:
LakeSoulDataReader(std::shared_ptr<arrow::Schema> schema,
const std::vector<std::string>& file_urls,
const std::vector<std::string>& primary_keys,
const std::vector<std::pair<std::string, std::string>>& partition_info);

int GetBatchSize() const;
Expand All @@ -28,18 +29,23 @@ class LakeSoulDataReader : public std::enable_shared_from_this<LakeSoulDataReade
bool IsFinished() const;
arrow::Future<std::shared_ptr<arrow::RecordBatch>> ReadRecordBatchAsync();

void SetRetainPartitionColumns();
void SetObjectStoreConfigs(const std::vector<std::pair<std::string, std::string>>& configs);
private:
lakesoul::IOConfig* CreateIOConfig();
lakesoul::TokioRuntime* CreateTokioRuntime();
std::shared_ptr<lakesoul::CResult<lakesoul::Reader>> CreateReader();

std::shared_ptr<arrow::Schema> schema_;
std::vector<std::string> file_urls_;
std::vector<std::string> primary_keys_;
std::vector<std::pair<std::string, std::string>> partition_info_;
std::vector<std::pair<std::string, std::string>> object_store_configs_;
int batch_size_ = 16;
int thread_num_ = 1;
std::shared_ptr<lakesoul::CResult<lakesoul::Reader>> reader_;
bool finished_ = false;
bool retain_partition_columns_ = false;
};

} // namespace lakesoul
Expand Down
13 changes: 11 additions & 2 deletions cpp/include/lakesoul/lakesoul_dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ class LakeSoulDataset : public arrow::dataset::Dataset
arrow::Result<arrow::dataset::FragmentIterator>
GetFragmentsImpl(arrow::compute::Expression predicate) override;

void AddFileUrl(const std::string& file_url);
void AddFileUrls(const std::vector<std::string>& file_urls);

void AddPrimaryKeys(const std::vector<std::string>& pks);

void AddPartitionKeyValue(const std::string& key, const std::string& value);
void AddPartitionKeyValues(const std::vector<std::pair<std::string, std::string>>& key_values);

Expand All @@ -37,11 +38,19 @@ class LakeSoulDataset : public arrow::dataset::Dataset
int GetThreadNum() const;
void SetThreadNum(int thread_num);

void SetRetainPartitionColumns();

void SetObjectStoreConfig(const std::string& key, const std::string& value);

private:
std::vector<std::string> file_urls_;
std::vector<std::vector<std::string>> file_urls_;
std::vector<std::vector<std::string>> primary_keys_;
std::vector<std::pair<std::string, std::string>> partition_info_;
std::vector<std::shared_ptr<arrow::dataset::Fragment>> fragments_;
std::vector<std::pair<std::string, std::string>> object_store_configs_;
int batch_size_ = 16;
int thread_num_ = 1;
bool retain_partition_columns_ = false;
};

} // namespace lakesoul
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/lakesoul/lakesoul_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class LakeSoulFragment : public arrow::dataset::Fragment
void AddFileUrl(const std::string& file_url);
void AddFileUrls(const std::vector<std::string>& file_urls);

void AddPrimaryKeys(const std::vector<std::string>& pks);

void AddPartitionKeyValue(const std::string& key, const std::string& value);
void AddPartitionKeyValues(const std::vector<std::pair<std::string, std::string>>& key_values);

Expand All @@ -37,13 +39,19 @@ class LakeSoulFragment : public arrow::dataset::Fragment

void CreateDataReader();

void SetRetainPartitionColumns();

void SetObjectStoreConfigs(const std::vector<std::pair<std::string, std::string>>& configs);
private:
std::shared_ptr<arrow::Schema> schema_;
std::vector<std::string> file_urls_;
std::vector<std::string> primary_keys_;
std::vector<std::pair<std::string, std::string>> partition_info_;
std::shared_ptr<lakesoul::LakeSoulDataReader> data_reader_;
std::vector<std::pair<std::string, std::string>> object_store_configs_;
int batch_size_ = 16;
int thread_num_ = 1;
bool retain_partition_columns_ = false;
};

} // namespace lakesoul
Expand Down
98 changes: 86 additions & 12 deletions cpp/src/lakesoul/lakesoul_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@
#include <filesystem>
#include <arrow/util/future.h>
#include <lakesoul/lakesoul_data_reader.h>
#include <arrow/record_batch.h>

namespace lakesoul {

LakeSoulDataReader::LakeSoulDataReader(std::shared_ptr<arrow::Schema> schema,
const std::vector<std::string>& file_urls,
const std::vector<std::string>& primary_keys,
const std::vector<std::pair<std::string, std::string>>& partition_info)
: schema_(std::move(schema))
, file_urls_(file_urls)
, primary_keys_(primary_keys)
, partition_info_(partition_info)
{
lakesoul::rust_logger_init();
}

int LakeSoulDataReader::GetBatchSize() const
Expand All @@ -42,24 +46,82 @@ void LakeSoulDataReader::SetThreadNum(int thread_num)

lakesoul::IOConfig* LakeSoulDataReader::CreateIOConfig()
{
lakesoul::IOConfigBuilder* builder = lakesoul::new_lakesoul_io_config_builder();
for (const std::string& file_url : file_urls_) {
builder = lakesoul::lakesoul_config_builder_add_single_file(builder, file_url.c_str());
}

for (const std::string& pk : primary_keys_) {
builder = lakesoul::lakesoul_config_builder_add_single_primary_key(builder, pk.c_str());
}

builder = lakesoul::lakesoul_config_builder_set_batch_size(builder, batch_size_);
builder = lakesoul::lakesoul_config_builder_set_thread_num(builder, thread_num_);

// create projected schema: keep partition columns if retain_partition_columns is true
arrow::FieldVector projected_fields;
arrow::FieldVector partition_fields;
projected_fields.reserve(schema_->num_fields());
partition_fields.reserve(partition_info_.size());
for (const auto &field : schema_->fields())
{
if (std::find_if(partition_info_.begin(), partition_info_.end(), [&](const std::pair<std::string, std::string> &partition_pair)
{ return partition_pair.first == field->name(); }) != partition_info_.end())
{
partition_fields.push_back(field);
if (retain_partition_columns_)
{
projected_fields.push_back(field);
}
}
else
{
projected_fields.push_back(field);
}
}

arrow::Schema projected_schema(projected_fields);
ArrowSchema c_schema;
auto status = arrow::ExportSchema(*schema_, &c_schema);
auto status = arrow::ExportSchema(projected_schema, &c_schema);
if (!status.ok())
{
std::ostringstream sout;
sout << "Fail to export schema: " << status.ToString();
sout << "Fail to export projected schema: " << status.ToString();
std::string message = sout.str();
std::cerr << message << std::endl;
throw std::runtime_error(message);
}
lakesoul::IOConfigBuilder* builder = lakesoul::new_lakesoul_io_config_builder();
for (const std::string& file_url : file_urls_)
builder = lakesoul::lakesoul_config_builder_add_single_file(builder, file_url.c_str());
builder = lakesoul::lakesoul_config_builder_set_batch_size(builder, batch_size_);
builder = lakesoul::lakesoul_config_builder_set_thread_num(builder, thread_num_);
builder = lakesoul::lakesoul_config_builder_set_schema(builder, reinterpret_cast<lakesoul::c_ptrdiff_t>(&c_schema));
for (auto&& [key, value] : partition_info_)

if (partition_fields.size() > 0 && retain_partition_columns_) {
arrow::Schema partition_schema(partition_fields);
auto status = arrow::ExportSchema(partition_schema, &c_schema);
if (!status.ok())
{
std::ostringstream sout;
sout << "Fail to export partition schema: " << status.ToString();
std::string message = sout.str();
std::cerr << message << std::endl;
throw std::runtime_error(message);
}
builder = lakesoul::lakesoul_config_builder_set_partition_schema(builder, reinterpret_cast<lakesoul::c_ptrdiff_t>(&c_schema));
}

for (auto&& [key, value] : partition_info_) {
builder = lakesoul::lakesoul_config_builder_set_default_column_value(builder, key.c_str(), value.c_str());
}
bool has_path_style_config = false;
for (const auto& [key, value] : object_store_configs_) {
if (key == "fs.s3a.path.style.access") {
has_path_style_config = true;
}
builder = lakesoul::lakesoul_config_builder_set_object_store_option(builder, key.c_str(), value.c_str());
}
if (!has_path_style_config) {
// if this config is not specified by user, we always set it to true
builder = lakesoul::lakesoul_config_builder_set_object_store_option(builder, "fs.s3a.path.style.access", "true");
}

lakesoul::IOConfig* io_config = lakesoul::create_lakesoul_io_config_from_builder(builder);
return io_config;
}
Expand All @@ -80,15 +142,13 @@ std::shared_ptr<lakesoul::CResult<lakesoul::Reader>> LakeSoulDataReader::CreateR
std::shared_ptr<lakesoul::CResult<lakesoul::Reader>> reader(result, [](lakesoul::CResult<lakesoul::Reader>* ptr)
{
lakesoul::free_lakesoul_reader(ptr);
//std::cerr << "lakesoul::free_lakesoul_reader called\n";
});
const char* err = lakesoul::check_reader_created(result);
if (err != nullptr)
{
std::ostringstream sout;
sout << "Fail to create reader: " << err;
std::string message = sout.str();
std::cerr << message << std::endl;
throw std::runtime_error(message);
}
return reader;
Expand Down Expand Up @@ -124,7 +184,12 @@ void LakeSoulDataReader::StartReader()
closure->future.MarkFinished(true);
}
});
future.Wait();
const arrow::Status& status = future.status();
if (status != arrow::Status::OK())
{
std::string message = status.ToString();
throw std::runtime_error(std::move(message));
}
}

bool LakeSoulDataReader::IsFinished() const
Expand Down Expand Up @@ -161,12 +226,13 @@ arrow::Future<std::shared_ptr<arrow::RecordBatch>> LakeSoulDataReader::ReadRecor
if (err != nullptr)
sout << ": " << err;
std::string message = sout.str();
std::cerr << message << std::endl;
closure->future.MarkFinished(arrow::Status::IOError(std::move(message)));
}
else if (n == 0)
{
closure->future.MarkFinished(nullptr);
closure->reader->finished_ = true;
closure->future.MarkFinished(nullptr);
}
else
{
Expand All @@ -177,4 +243,12 @@ arrow::Future<std::shared_ptr<arrow::RecordBatch>> LakeSoulDataReader::ReadRecor
return future;
}

void LakeSoulDataReader::SetRetainPartitionColumns() {
retain_partition_columns_ = true;
}

void LakeSoulDataReader::SetObjectStoreConfigs(const std::vector<std::pair<std::string, std::string>>& configs) {
object_store_configs_ = configs;
}

} // namespace lakesoul
56 changes: 40 additions & 16 deletions cpp/src/lakesoul/lakesoul_dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ arrow::Result<std::shared_ptr<arrow::dataset::Dataset>>
LakeSoulDataset::ReplaceSchema(std::shared_ptr<arrow::Schema> schema) const
{
auto dataset = std::make_shared<LakeSoulDataset>(std::move(schema));
dataset->AddFileUrls(file_urls_);
for (const auto& files : file_urls_) {
dataset->AddFileUrls(files);
}
arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> result(std::move(dataset));
return result;
}
Expand All @@ -32,26 +34,40 @@ LakeSoulDataset::ReplaceSchema(std::shared_ptr<arrow::Schema> schema) const
arrow::Result<arrow::dataset::FragmentIterator>
LakeSoulDataset::GetFragmentsImpl(arrow::compute::Expression predicate)
{
auto fragment = std::make_shared<LakeSoulFragment>(this->schema());
fragment->AddFileUrls(file_urls_);
fragment->AddPartitionKeyValues(partition_info_);
fragment->SetBatchSize(batch_size_);
fragment->SetThreadNum(thread_num_);
fragment->CreateDataReader();
std::vector<std::shared_ptr<arrow::dataset::Fragment>> fragments;
fragments.push_back(fragment);
arrow::Result<arrow::dataset::FragmentIterator> result(arrow::MakeVectorIterator(std::move(fragments)));
return result;
try {
std::vector<std::shared_ptr<arrow::dataset::Fragment>> fragments;
fragments.reserve(file_urls_.size());
for (size_t i = 0; i < file_urls_.size(); ++i) {
const auto files = file_urls_.at(i);
const auto pks = primary_keys_.at(i);
auto fragment = std::make_shared<LakeSoulFragment>(this->schema());
fragment->AddFileUrls(files);
fragment->AddPrimaryKeys(pks);
fragment->AddPartitionKeyValues(partition_info_);
fragment->SetBatchSize(batch_size_);
fragment->SetThreadNum(thread_num_);
if (retain_partition_columns_) {
fragment->SetRetainPartitionColumns();
}
fragment->SetObjectStoreConfigs(object_store_configs_);
fragment->CreateDataReader();
fragments.push_back(fragment);
}
fragments_ = fragments;
arrow::Result<arrow::dataset::FragmentIterator> result(arrow::MakeVectorIterator(std::move(fragments)));
return result;
} catch (const std::exception& e) {
return arrow::Status::IOError(e.what());
}
}

void LakeSoulDataset::AddFileUrl(const std::string& file_url)
void LakeSoulDataset::AddFileUrls(const std::vector<std::string>& file_urls)
{
file_urls_.push_back(file_url);
file_urls_.push_back(file_urls);
}

void LakeSoulDataset::AddFileUrls(const std::vector<std::string>& file_urls)
{
file_urls_.insert(file_urls_.end(), file_urls.begin(), file_urls.end());
void LakeSoulDataset::AddPrimaryKeys(const std::vector<std::string>& pks) {
primary_keys_.push_back(pks);
}

void LakeSoulDataset::AddPartitionKeyValue(const std::string& key, const std::string& value)
Expand Down Expand Up @@ -84,4 +100,12 @@ void LakeSoulDataset::SetThreadNum(int thread_num)
thread_num_ = thread_num >= 1 ? thread_num : 1;
}

void LakeSoulDataset::SetRetainPartitionColumns() {
retain_partition_columns_ = true;
}

void LakeSoulDataset::SetObjectStoreConfig(const std::string& key, const std::string& value) {
object_store_configs_.push_back(std::make_pair(key, value));
}

} // namespace lakesoul
Loading

0 comments on commit 4b86110

Please sign in to comment.