diff --git a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc index 4e865e8df4c5..415a8cd23688 100644 --- a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.cc @@ -190,8 +190,8 @@ void check_edge_invariant( void AbstractArrowFragmentLoader::AddVerticesRecordBatch( label_t v_label_id, const std::vector& v_files, - std::function( - label_t, const std::string&, const LoadingConfig&)> + std::function>( + label_t, const std::string&, const LoadingConfig&, int)> supplier_creator) { auto primary_keys = schema_.get_vertex_primary_key(v_label_id); @@ -228,8 +228,9 @@ void AbstractArrowFragmentLoader::AddVerticesRecordBatch( void AbstractArrowFragmentLoader::AddEdgesRecordBatch( label_t src_label_i, label_t dst_label_i, label_t edge_label_i, const std::vector& filenames, - std::function( - label_t, label_t, label_t, const std::string&, const LoadingConfig&)> + std::function>( + label_t, label_t, label_t, const std::string&, const LoadingConfig&, + int)> supplier_creator) { auto src_label_name = schema_.get_vertex_label_name(src_label_i); auto dst_label_name = schema_.get_vertex_label_name(dst_label_i); diff --git a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h index 938b007fcbf6..5e325b590274 100644 --- a/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h @@ -21,6 +21,7 @@ #include "flex/storages/rt_mutable_graph/loader/i_fragment_loader.h" #include "flex/storages/rt_mutable_graph/loading_config.h" #include "flex/storages/rt_mutable_graph/mutable_property_fragment.h" +#include "grape/utils/concurrent_queue.h" #include #include @@ -186,7 +187,7 @@ template void _append(bool is_dst, size_t cur_ind, std::shared_ptr col, const IndexerType& indexer, std::vector>& parsed_edges, - std::vector& degree) { + std::vector>& degree) { static constexpr auto invalid_vid = std::numeric_limits::max(); if constexpr (std::is_same_v) { if (col->type()->Equals(arrow::utf8())) { @@ -246,7 +247,8 @@ static void append_edges( std::vector>& edata_cols, const PropertyType& edge_prop, std::vector>& parsed_edges, - std::vector& ie_degree, std::vector& oe_degree) { + std::vector>& ie_degree, + std::vector>& oe_degree) { CHECK(src_col->length() == dst_col->length()); auto indexer_check_lambda = [](const IndexerType& cur_indexer, const std::shared_ptr& cur_col) { @@ -331,23 +333,30 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { basic_fragment_loader_(schema_, work_dir) { vertex_label_num_ = schema_.vertex_label_num(); edge_label_num_ = schema_.edge_label_num(); + mtxs_ = new std::mutex[vertex_label_num_]; } - ~AbstractArrowFragmentLoader() {} + ~AbstractArrowFragmentLoader() { + if (mtxs_) { + delete[] mtxs_; + } + } void AddVerticesRecordBatch( label_t v_label_id, const std::vector& input_paths, - std::function( - label_t, const std::string&, const LoadingConfig&)> + std::function>( + label_t, const std::string&, const LoadingConfig&, int)> supplier_creator); // Add edges in record batch to output_parsed_edges, output_ie_degrees and // output_oe_degrees. + void AddEdgesRecordBatch( label_t src_label_id, label_t dst_label_id, label_t edge_label_id, const std::vector& input_paths, - std::function( - label_t, label_t, label_t, const std::string&, const LoadingConfig&)> + std::function>( + label_t, label_t, label_t, const std::string&, const LoadingConfig&, + int)> supplier_creator); protected: @@ -364,9 +373,10 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { std::vector vids; vids.reserve(row_num); - - _add_vertex()(primary_key_col, indexer, vids); - + { + std::unique_lock lock(mtxs_[v_label_id]); + _add_vertex()(primary_key_col, indexer, vids); + } for (size_t j = 0; j < property_cols.size(); ++j) { auto array = property_cols[j]; auto chunked_array = std::make_shared(array); @@ -382,8 +392,8 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { template void addVertexRecordBatchImpl( label_t v_label_id, const std::vector& v_files, - std::function( - label_t, const std::string&, const LoadingConfig&)> + std::function>( + label_t, const std::string&, const LoadingConfig&, int)> supplier_creator) { std::string v_label_name = schema_.get_vertex_label_name(v_label_id); VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label " @@ -393,37 +403,77 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { size_t primary_key_ind = std::get<2>(primary_key); IdIndexer indexer; + grape::BlockingQueue> queue; + queue.SetLimit(1024); + std::vector work_threads; + for (auto& v_file : v_files) { VLOG(10) << "Parsing vertex file:" << v_file << " for label " << v_label_name; - auto record_batch_supplier = - supplier_creator(v_label_id, v_file, loading_config_); - - bool first_batch = true; - while (true) { - auto batch = record_batch_supplier->GetNextBatch(); - if (!batch) { - break; - } - if (first_batch) { - auto header = batch->schema()->field_names(); - auto schema_column_names = - schema_.get_vertex_property_names(v_label_id); - CHECK(schema_column_names.size() + 1 == header.size()) - << "File header of size: " << header.size() - << " does not match schema column size: " - << schema_column_names.size() + 1; - first_batch = false; - } - auto columns = batch->columns(); - CHECK(primary_key_ind < columns.size()); - auto primary_key_column = columns[primary_key_ind]; - auto other_columns_array = columns; - other_columns_array.erase(other_columns_array.begin() + - primary_key_ind); - addVertexBatchFromArray(v_label_id, indexer, primary_key_column, - other_columns_array); + auto record_batch_supplier_vec = + supplier_creator(v_label_id, v_file, loading_config_, + std::thread::hardware_concurrency()); + queue.SetProducerNum(record_batch_supplier_vec.size()); + + for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) { + work_threads.emplace_back( + [&](int i) { + auto& record_batch_supplier = record_batch_supplier_vec[i]; + bool first_batch = true; + while (true) { + auto batch = record_batch_supplier->GetNextBatch(); + if (!batch) { + queue.DecProducerNum(); + break; + } + if (first_batch) { + auto header = batch->schema()->field_names(); + auto schema_column_names = + schema_.get_vertex_property_names(v_label_id); + CHECK(schema_column_names.size() + 1 == header.size()) + << "File header of size: " << header.size() + << " does not match schema column size: " + << schema_column_names.size() + 1; + first_batch = false; + } + queue.Put(batch); + } + }, + idx); + } + for (unsigned idx = 0; + idx < + std::min(static_cast(8 * record_batch_supplier_vec.size()), + std::thread::hardware_concurrency()); + ++idx) { + work_threads.emplace_back( + [&](int i) { + while (true) { + std::shared_ptr batch{nullptr}; + auto ret = queue.Get(batch); + if (!ret) { + break; + } + if (!batch) { + LOG(FATAL) << "get nullptr batch"; + } + auto columns = batch->columns(); + CHECK(primary_key_ind < columns.size()); + auto primary_key_column = columns[primary_key_ind]; + auto other_columns_array = columns; + other_columns_array.erase(other_columns_array.begin() + + primary_key_ind); + + addVertexBatchFromArray(v_label_id, indexer, primary_key_column, + other_columns_array); + } + }, + idx); } + for (auto& t : work_threads) { + t.join(); + } + work_threads.clear(); VLOG(10) << "Finish parsing vertex file:" << v_file << " for label " << v_label_name; } @@ -439,8 +489,8 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { template void addVertexRecordBatchImpl( label_t v_label_id, const std::vector& v_files, - std::function( - label_t, const std::string&, const LoadingConfig&)> + std::function>( + label_t, const std::string&, const LoadingConfig&, int)> supplier_creator) { std::string v_label_name = schema_.get_vertex_label_name(v_label_id); VLOG(10) << "Parsing vertex file:" << v_files.size() << " for label " @@ -448,104 +498,151 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { auto primary_key = schema_.get_vertex_primary_key(v_label_id)[0]; auto primary_key_name = std::get<1>(primary_key); size_t primary_key_ind = std::get<2>(primary_key); + grape::BlockingQueue> queue; + queue.SetLimit(1024); PTIndexerBuilder indexer_builder; - std::vector> batchs; + std::vector>> batchs( + std::thread::hardware_concurrency()); + std::vector work_threads; for (auto& v_file : v_files) { VLOG(10) << "Parsing vertex file:" << v_file << " for label " << v_label_name; - auto record_batch_supplier = - supplier_creator(v_label_id, v_file, loading_config_); - - bool first_batch = true; - while (true) { - auto batch = record_batch_supplier->GetNextBatch(); - if (!batch) { - break; - } - batchs.emplace_back(batch); - if (first_batch) { - auto header = batch->schema()->field_names(); - auto schema_column_names = - schema_.get_vertex_property_names(v_label_id); - CHECK(schema_column_names.size() + 1 == header.size()) - << "File header of size: " << header.size() - << " does not match schema column size: " - << schema_column_names.size() + 1; - first_batch = false; - } - auto columns = batch->columns(); - CHECK(primary_key_ind < columns.size()); - auto primary_key_column = columns[primary_key_ind]; - _add_vertex()(primary_key_column, indexer_builder); + auto record_batch_supplier_vec = + supplier_creator(v_label_id, v_file, loading_config_, + std::thread::hardware_concurrency()); + + queue.SetProducerNum(record_batch_supplier_vec.size()); + + for (size_t idx = 0; idx < record_batch_supplier_vec.size(); ++idx) { + work_threads.emplace_back( + [&](int i) { + auto& record_batch_supplier = record_batch_supplier_vec[i]; + bool first_batch = true; + while (true) { + auto batch = record_batch_supplier->GetNextBatch(); + if (!batch) { + queue.DecProducerNum(); + break; + } + if (first_batch) { + auto header = batch->schema()->field_names(); + auto schema_column_names = + schema_.get_vertex_property_names(v_label_id); + CHECK(schema_column_names.size() + 1 == header.size()) + << "File header of size: " << header.size() + << " does not match schema column size: " + << schema_column_names.size() + 1; + first_batch = false; + } + queue.Put(batch); + } + }, + idx); } + + for (unsigned idx = 0; idx < std::thread::hardware_concurrency(); ++idx) { + work_threads.emplace_back( + [&](int i) { + while (true) { + std::shared_ptr batch{nullptr}; + auto ret = queue.Get(batch); + if (!ret) { + break; + } + if (!batch) { + LOG(FATAL) << "get nullptr batch"; + } + batchs[i].emplace_back(batch); + auto columns = batch->columns(); + CHECK(primary_key_ind < columns.size()); + auto primary_key_column = columns[primary_key_ind]; + { + std::unique_lock lock(mtxs_[v_label_id]); + _add_vertex()(primary_key_column, indexer_builder); + } + } + }, + idx); + } + for (auto& t : work_threads) { + t.join(); + } + work_threads.clear(); + VLOG(10) << "Finish parsing vertex file:" << v_file << " for label " << v_label_name; } basic_fragment_loader_.FinishAddingVertex(v_label_id, indexer_builder); const auto& indexer = basic_fragment_loader_.GetLFIndexer(v_label_id); - std::vector work_threads; std::atomic cur_batch_id(0); for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) { - work_threads.emplace_back([&]() { - while (true) { - auto id = cur_batch_id.fetch_add(1); - if (id >= batchs.size()) { - break; - } - auto batch = batchs[id]; - auto columns = batch->columns(); - auto other_columns_array = columns; - auto primary_key_column = columns[primary_key_ind]; - size_t row_num = primary_key_column->length(); - std::vector vids; - if constexpr (!std::is_same::value) { - using arrow_array_t = - typename gs::TypeConverter::ArrowArrayType; - auto casted_array = - std::static_pointer_cast(primary_key_column); - for (size_t i = 0; i < row_num; ++i) { - vids.emplace_back(indexer.get_index(casted_array->Value(i))); - } - } else { - if (primary_key_column->type()->Equals(arrow::utf8())) { - auto casted_array = std::static_pointer_cast( - primary_key_column); - for (size_t i = 0; i < row_num; ++i) { - auto str = casted_array->GetView(i); - std::string_view str_view(str.data(), str.size()); - vids.emplace_back(indexer.get_index(str_view)); + work_threads.emplace_back( + [&](int idx) { + for (size_t id = 0; id < batchs[idx].size(); ++id) { + auto batch = batchs[idx][id]; + auto columns = batch->columns(); + auto other_columns_array = columns; + auto primary_key_column = columns[primary_key_ind]; + size_t row_num = primary_key_column->length(); + std::vector vids; + if constexpr (!std::is_same::value) { + using arrow_array_t = + typename gs::TypeConverter::ArrowArrayType; + auto casted_array = + std::static_pointer_cast(primary_key_column); + for (size_t i = 0; i < row_num; ++i) { + vids.emplace_back(indexer.get_index(casted_array->Value(i))); + } + } else { + if (primary_key_column->type()->Equals(arrow::utf8())) { + auto casted_array = + std::static_pointer_cast( + primary_key_column); + for (size_t i = 0; i < row_num; ++i) { + auto str = casted_array->GetView(i); + std::string_view str_view(str.data(), str.size()); + vids.emplace_back(indexer.get_index(str_view)); + } + } else if (primary_key_column->type()->Equals( + arrow::large_utf8())) { + auto casted_array = + std::static_pointer_cast( + primary_key_column); + for (size_t i = 0; i < row_num; ++i) { + auto str = casted_array->GetView(i); + std::string_view str_view(str.data(), str.size()); + vids.emplace_back(indexer.get_index(str_view)); + } + } } - } else if (primary_key_column->type()->Equals( - arrow::large_utf8())) { - auto casted_array = - std::static_pointer_cast( - primary_key_column); - for (size_t i = 0; i < row_num; ++i) { - auto str = casted_array->GetView(i); - std::string_view str_view(str.data(), str.size()); - vids.emplace_back(indexer.get_index(str_view)); + other_columns_array.erase(other_columns_array.begin() + + primary_key_ind); + + for (size_t j = 0; j < other_columns_array.size(); ++j) { + auto array = other_columns_array[j]; + auto chunked_array = + std::make_shared(array); + set_vertex_properties( + basic_fragment_loader_.GetVertexTable(v_label_id) + .column_ptrs()[j], + chunked_array, vids); } } - } - other_columns_array.erase(other_columns_array.begin() + - primary_key_ind); - - for (size_t j = 0; j < other_columns_array.size(); ++j) { - auto array = other_columns_array[j]; - auto chunked_array = std::make_shared(array); - set_vertex_properties( - basic_fragment_loader_.GetVertexTable(v_label_id) - .column_ptrs()[j], - chunked_array, vids); - } - } - }); + }, + i); } for (auto& t : work_threads) { t.join(); } + auto& v_data = basic_fragment_loader_.GetVertexTable(v_label_id); + auto label_name = schema_.get_vertex_label_name(v_label_id); + + v_data.resize(indexer.size()); + v_data.dump(vertex_table_prefix(label_name), + snapshot_dir(basic_fragment_loader_.work_dir(), 0)); + VLOG(10) << "Finish parsing vertex file:" << v_files.size() << " for label " << v_label_name; } @@ -559,7 +656,8 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { std::vector>& property_cols, const PropertyType& edge_property, std::vector>& parsed_edges, - std::vector& ie_degree, std::vector& oe_degree) { + std::vector>& ie_degree, + std::vector>& oe_degree) { auto dst_col_type = dst_col->type(); if (dst_col_type->Equals(arrow::int64())) { append_edges( @@ -588,8 +686,9 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { void addEdgesRecordBatchImpl( label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::vector& e_files, - std::function( - label_t, label_t, label_t, const std::string&, const LoadingConfig&)> + std::function>( + label_t, label_t, label_t, const std::string&, const LoadingConfig&, + int)> supplier_creator) { auto src_label_name = schema_.get_vertex_label_name(src_label_id); auto dst_label_name = schema_.get_vertex_label_name(dst_label_id); @@ -610,88 +709,145 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { check_edge_invariant(schema_, edge_column_mappings, src_col_ind, dst_col_ind, src_label_id, dst_label_id, e_label_id); - std::vector> parsed_edges; - std::vector ie_degree, oe_degree; const auto& src_indexer = basic_fragment_loader_.GetLFIndexer(src_label_id); const auto& dst_indexer = basic_fragment_loader_.GetLFIndexer(dst_label_id); - ie_degree.resize(dst_indexer.size()); - oe_degree.resize(src_indexer.size()); + std::vector>> + parsed_edges_vec; + parsed_edges_vec.resize(std::thread::hardware_concurrency()); + + std::vector> ie_degree(dst_indexer.size()), + oe_degree(src_indexer.size()); + for (size_t idx = 0; idx < ie_degree.size(); ++idx) { + ie_degree[idx].store(0); + } + for (size_t idx = 0; idx < oe_degree.size(); ++idx) { + oe_degree[idx].store(0); + } VLOG(10) << "src indexer size: " << src_indexer.size() << " dst indexer size: " << dst_indexer.size(); - // use a dummy vector to store the string columns, to avoid the strings - // being released as record batch is released. + grape::BlockingQueue> queue; + queue.SetLimit(1024); + std::vector work_threads; + + std::vector>> string_columns( + std::thread::hardware_concurrency()); + + // use a dummy vector to store the string columns, to avoid the + // strings being released as record batch is released. std::vector> string_cols; for (auto filename : e_files) { - auto record_batch_supplier = supplier_creator( - src_label_id, dst_label_id, e_label_id, filename, loading_config_); - bool first_batch = true; - while (true) { - auto record_batch = record_batch_supplier->GetNextBatch(); - if (!record_batch) { - break; - } - if (first_batch) { - auto header = record_batch->schema()->field_names(); - auto schema_column_names = schema_.get_edge_property_names( - src_label_id, dst_label_id, e_label_id); - auto schema_column_types = schema_.get_edge_properties( - src_label_id, dst_label_id, e_label_id); - CHECK(schema_column_names.size() + 2 == header.size()) - << "schema size: " << schema_column_names.size() - << " neq header size: " << header.size(); - } - // copy the table to csr. - auto columns = record_batch->columns(); - // We assume the src_col and dst_col will always be put at front. - CHECK(columns.size() >= 2); - auto src_col = columns[0]; - auto dst_col = columns[1]; - auto src_col_type = src_col->type(); - auto dst_col_type = dst_col->type(); - CHECK(check_primary_key_type(src_col_type)) - << "unsupported src_col type: " << src_col_type->ToString(); - CHECK(check_primary_key_type(dst_col_type)) - << "unsupported dst_col type: " << dst_col_type->ToString(); - - std::vector> property_cols; - for (size_t i = 2; i < columns.size(); ++i) { - property_cols.emplace_back(columns[i]); - if (columns[i]->type()->Equals(arrow::utf8()) || - columns[i]->type()->Equals(arrow::large_utf8())) { - string_cols.emplace_back(columns[i]); - } - } - CHECK(property_cols.size() <= 1) - << "Currently only support at most one property on edge"; - auto edge_property = - schema_.get_edge_property(src_label_id, dst_label_id, e_label_id); - // add edges to vector - CHECK(src_col->length() == dst_col->length()); - if (src_col_type->Equals(arrow::int64())) { - _append_edges( - src_col, dst_col, src_indexer, dst_indexer, property_cols, - edge_property, parsed_edges, ie_degree, oe_degree); - } else if (src_col_type->Equals(arrow::uint64())) { - _append_edges( - src_col, dst_col, src_indexer, dst_indexer, property_cols, - edge_property, parsed_edges, ie_degree, oe_degree); - } else if (src_col_type->Equals(arrow::int32())) { - _append_edges( - src_col, dst_col, src_indexer, dst_indexer, property_cols, - edge_property, parsed_edges, ie_degree, oe_degree); - } else if (src_col_type->Equals(arrow::uint32())) { - _append_edges( - src_col, dst_col, src_indexer, dst_indexer, property_cols, - edge_property, parsed_edges, ie_degree, oe_degree); - } else { - // must be string - _append_edges( - src_col, dst_col, src_indexer, dst_indexer, property_cols, - edge_property, parsed_edges, ie_degree, oe_degree); - } - first_batch = false; + auto record_batch_supplier_vec = + supplier_creator(src_label_id, dst_label_id, e_label_id, filename, + loading_config_, parsed_edges_vec.size()); + + queue.SetProducerNum(record_batch_supplier_vec.size()); + + for (size_t i = 0; i < record_batch_supplier_vec.size(); ++i) { + work_threads.emplace_back( + [&](int idx) { + auto& string_column = string_columns[idx]; + bool first_batch = true; + auto& record_batch_supplier = record_batch_supplier_vec[idx]; + while (true) { + auto record_batch = record_batch_supplier->GetNextBatch(); + if (!record_batch) { + queue.DecProducerNum(); + break; + } + if (first_batch) { + auto header = record_batch->schema()->field_names(); + auto schema_column_names = schema_.get_edge_property_names( + src_label_id, dst_label_id, e_label_id); + auto schema_column_types = schema_.get_edge_properties( + src_label_id, dst_label_id, e_label_id); + CHECK(schema_column_names.size() + 2 == header.size()) + << "schema size: " << schema_column_names.size() + << " neq header size: " << header.size(); + first_batch = false; + } + for (auto i = 0; i < record_batch->num_columns(); ++i) { + if (record_batch->column(i)->type()->Equals(arrow::utf8()) || + record_batch->column(i)->type()->Equals( + arrow::large_utf8())) { + string_column.emplace_back(record_batch->column(i)); + } + } + + queue.Put(record_batch); + } + }, + i); + } + for (size_t i = 0; + i < + std::min(static_cast(8 * record_batch_supplier_vec.size()), + std::thread::hardware_concurrency()); + ++i) { + work_threads.emplace_back( + [&](int idx) { + // copy the table to csr. + auto& parsed_edges = parsed_edges_vec[idx]; + while (true) { + std::shared_ptr record_batch{nullptr}; + auto ret = queue.Get(record_batch); + if (!ret) { + break; + } + if (!record_batch) { + LOG(FATAL) << "get nullptr batch"; + } + auto columns = record_batch->columns(); + // We assume the src_col and dst_col will always be put + // at front. + CHECK(columns.size() >= 2); + auto src_col = columns[0]; + auto dst_col = columns[1]; + auto src_col_type = src_col->type(); + auto dst_col_type = dst_col->type(); + CHECK(check_primary_key_type(src_col_type)) + << "unsupported src_col type: " << src_col_type->ToString(); + CHECK(check_primary_key_type(dst_col_type)) + << "unsupported dst_col type: " << dst_col_type->ToString(); + + std::vector> property_cols; + for (size_t i = 2; i < columns.size(); ++i) { + property_cols.emplace_back(columns[i]); + } + auto edge_property = schema_.get_edge_property( + src_label_id, dst_label_id, e_label_id); + // add edges to vector + CHECK(src_col->length() == dst_col->length()); + if (src_col_type->Equals(arrow::int64())) { + _append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else if (src_col_type->Equals(arrow::uint64())) { + _append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else if (src_col_type->Equals(arrow::int32())) { + _append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else if (src_col_type->Equals(arrow::uint32())) { + _append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } else { + // must be string + _append_edges( + src_col, dst_col, src_indexer, dst_indexer, property_cols, + edge_property, parsed_edges, ie_degree, oe_degree); + } + } + }, + i); + } + + for (auto& t : work_threads) { + t.join(); } VLOG(10) << "Finish parsing edge file:" << filename << " for label " << src_label_name << " -> " << dst_label_name << " -> " @@ -700,18 +856,32 @@ class AbstractArrowFragmentLoader : public IFragmentLoader { VLOG(10) << "Finish parsing edge file:" << e_files.size() << " for label " << src_label_name << " -> " << dst_label_name << " -> " << edge_label_name; + std::vector ie_deg(ie_degree.size()); + std::vector oe_deg(oe_degree.size()); + for (size_t idx = 0; idx < ie_deg.size(); ++idx) { + ie_deg[idx] = ie_degree[idx]; + } + for (size_t idx = 0; idx < oe_deg.size(); ++idx) { + oe_deg[idx] = oe_degree[idx]; + } - basic_fragment_loader_.PutEdges(src_label_id, dst_label_id, e_label_id, - parsed_edges, ie_degree, oe_degree); + basic_fragment_loader_.PutEdges(src_label_id, dst_label_id, + e_label_id, parsed_edges_vec, + ie_deg, oe_deg); + string_columns.clear(); + size_t sum = 0; + for (const auto& edges : parsed_edges_vec) { + sum += edges.size(); + } - VLOG(10) << "Finish putting: " << parsed_edges.size() << " edges"; + VLOG(10) << "Finish putting: " << sum << " edges"; } const LoadingConfig& loading_config_; const Schema& schema_; size_t vertex_label_num_, edge_label_num_; int32_t thread_num_; - + std::mutex* mtxs_; mutable BasicFragmentLoader basic_fragment_loader_; }; diff --git a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h index a4aee9738b7a..e3fe47e55d05 100644 --- a/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/basic_fragment_loader.h @@ -135,11 +135,12 @@ class BasicFragmentLoader { } template - void PutEdges(label_t src_label_id, label_t dst_label_id, - label_t edge_label_id, - const std::vector>& edges, - const std::vector& ie_degree, - const std::vector& oe_degree) { + void PutEdges( + label_t src_label_id, label_t dst_label_id, label_t edge_label_id, + const std::vector>>& + edges_vec, + const std::vector& ie_degree, + const std::vector& oe_degree) { size_t index = src_label_id * vertex_label_num_ * edge_label_num_ + dst_label_id * edge_label_num_ + edge_label_id; auto& src_indexer = lf_indexers_[src_label_id]; @@ -153,6 +154,8 @@ class BasicFragmentLoader { src_label_name, dst_label_name, edge_label_name); EdgeStrategy ie_strategy = schema_.get_incoming_edge_strategy( src_label_name, dst_label_name, edge_label_name); + auto INVALID_VID = std::numeric_limits::max(); + std::atomic edge_count(0); if constexpr (std::is_same_v) { const auto& prop = schema_.get_edge_properties(src_label_id, dst_label_id, edge_label_id); @@ -168,9 +171,27 @@ class BasicFragmentLoader { ie_prefix(src_label_name, dst_label_name, edge_label_name), edata_prefix(src_label_name, dst_label_name, edge_label_name), tmp_dir(work_dir_), oe_degree, ie_degree); - for (auto& edge : edges) { - dual_csr->BatchPutEdge(std::get<0>(edge), std::get<1>(edge), - std::get<2>(edge)); + + std::vector work_threads; + for (size_t i = 0; i < edges_vec.size(); ++i) { + work_threads.emplace_back( + [&](int idx) { + edge_count.fetch_add(edges_vec[idx].size()); + for (auto& edge : edges_vec[idx]) { + if (std::get<1>(edge) == INVALID_VID || + std::get<0>(edge) == INVALID_VID) { + VLOG(10) << "Skip invalid edge:" << std::get<0>(edge) << "->" + << std::get<1>(edge); + continue; + } + dual_csr->BatchPutEdge(std::get<0>(edge), std::get<1>(edge), + std::get<2>(edge)); + } + }, + i); + } + for (auto& t : work_threads) { + t.join(); } } else { @@ -192,14 +213,31 @@ class BasicFragmentLoader { ie_prefix(src_label_name, dst_label_name, edge_label_name), edata_prefix(src_label_name, dst_label_name, edge_label_name), tmp_dir(work_dir_), oe_degree, ie_degree); - for (auto& edge : edges) { - dual_csr->BatchPutEdge(std::get<0>(edge), std::get<1>(edge), - std::get<2>(edge)); + std::vector work_threads; + for (size_t i = 0; i < edges_vec.size(); ++i) { + work_threads.emplace_back( + [&](int idx) { + edge_count.fetch_add(edges_vec[idx].size()); + for (auto& edge : edges_vec[idx]) { + if (std::get<1>(edge) == INVALID_VID || + std::get<0>(edge) == INVALID_VID) { + VLOG(10) << "Skip invalid edge:" << std::get<0>(edge) << "->" + << std::get<1>(edge); + continue; + } + dual_csr->BatchPutEdge(std::get<0>(edge), std::get<1>(edge), + std::get<2>(edge)); + } + }, + i); + } + for (auto& t : work_threads) { + t.join(); } } append_edge_loading_progress(src_label_name, dst_label_name, edge_label_name, LoadingStatus::kLoaded); - VLOG(10) << "Finish adding edge batch of size: " << edges.size(); + VLOG(10) << "Finish adding edge batch of size: " << edge_count.load(); } Table& GetVertexTable(size_t ind) { diff --git a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc index e55e44a6e3fd..ae71dfeac316 100644 --- a/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc @@ -239,21 +239,25 @@ void CSVFragmentLoader::addVertices(label_t v_label_id, const std::vector& v_files) { auto record_batch_supplier_creator = [this](label_t label_id, const std::string& v_file, - const LoadingConfig& loading_config) { + const LoadingConfig& loading_config, int) { arrow::csv::ConvertOptions convert_options; arrow::csv::ReadOptions read_options; arrow::csv::ParseOptions parse_options; fillVertexReaderMeta(read_options, parse_options, convert_options, v_file, label_id); + std::vector> suppliers; if (loading_config.GetIsBatchReader()) { auto res = std::make_shared( label_id, v_file, convert_options, read_options, parse_options); - return std::dynamic_pointer_cast(res); + suppliers.emplace_back( + std::dynamic_pointer_cast(res)); } else { auto res = std::make_shared( label_id, v_file, convert_options, read_options, parse_options); - return std::dynamic_pointer_cast(res); + suppliers.emplace_back( + std::dynamic_pointer_cast(res)); } + return suppliers; }; return AbstractArrowFragmentLoader::AddVerticesRecordBatch( v_label_id, v_files, record_batch_supplier_creator); @@ -264,21 +268,25 @@ void CSVFragmentLoader::addEdges(label_t src_label_i, label_t dst_label_i, const std::vector& filenames) { auto lambda = [this](label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::string& filename, - const LoadingConfig& loading_config) { + const LoadingConfig& loading_config, int) { arrow::csv::ConvertOptions convert_options; arrow::csv::ReadOptions read_options; arrow::csv::ParseOptions parse_options; fillEdgeReaderMeta(read_options, parse_options, convert_options, filename, src_label_id, dst_label_id, e_label_id); + std::vector> suppliers; if (loading_config.GetIsBatchReader()) { auto res = std::make_shared( e_label_id, filename, convert_options, read_options, parse_options); - return std::dynamic_pointer_cast(res); + suppliers.emplace_back( + std::dynamic_pointer_cast(res)); } else { auto res = std::make_shared( e_label_id, filename, convert_options, read_options, parse_options); - return std::dynamic_pointer_cast(res); + suppliers.emplace_back( + std::dynamic_pointer_cast(res)); } + return suppliers; }; AbstractArrowFragmentLoader::AddEdgesRecordBatch( src_label_i, dst_label_i, edge_label_i, filenames, lambda); @@ -307,8 +315,8 @@ void CSVFragmentLoader::loadVertices() { ++iter) { vertex_files.emplace_back(iter->first, iter->second); } - LOG(INFO) << "Parallel loading with " << thread_num_ << " threads, " - << " " << vertex_files.size() << " vertex files, "; + LOG(INFO) << "Parallel loading with " << thread_num_ << " threads, " << " " + << vertex_files.size() << " vertex files, "; std::atomic v_ind(0); std::vector threads(thread_num_); for (int i = 0; i < thread_num_; ++i) { @@ -389,20 +397,20 @@ void CSVFragmentLoader::fillVertexReaderMeta( } else { for (size_t i = 0; i < cur_label_col_mapping.size(); ++i) { auto& [col_id, col_name, property_name] = cur_label_col_mapping[i]; - if (col_name.empty()){ + if (col_name.empty()) { if (col_id >= read_options.column_names.size() || col_id < 0) { LOG(FATAL) << "The specified column index: " << col_id << " is out of range, please check your configuration"; } col_name = read_options.column_names[col_id]; } - // check whether index match to the name if col_id is valid + // check whether index match to the name if col_id is valid if (col_id >= 0 && col_id < read_options.column_names.size()) { - if (col_name != read_options.column_names[col_id]) { - LOG(FATAL) << "The specified column name: " << col_name - << " does not match the column name in the file: " - << read_options.column_names[col_id]; - } + if (col_name != read_options.column_names[col_id]) { + LOG(FATAL) << "The specified column name: " << col_name + << " does not match the column name in the file: " + << read_options.column_names[col_id]; + } } included_col_names.emplace_back(col_name); mapped_property_names.emplace_back(property_name); @@ -529,7 +537,7 @@ void CSVFragmentLoader::fillEdgeReaderMeta( for (size_t i = 0; i < cur_label_col_mapping.size(); ++i) { // TODO: make the property column's names are in same order with schema. auto& [col_id, col_name, property_name] = cur_label_col_mapping[i]; - if (col_name.empty()){ + if (col_name.empty()) { if (col_id >= read_options.column_names.size() || col_id < 0) { LOG(FATAL) << "The specified column index: " << col_id << " is out of range, please check your configuration"; @@ -538,11 +546,11 @@ void CSVFragmentLoader::fillEdgeReaderMeta( } // check whether index match to the name if col_id is valid if (col_id >= 0 && col_id < read_options.column_names.size()) { - if (col_name != read_options.column_names[col_id]) { - LOG(FATAL) << "The specified column name: " << col_name - << " does not match the column name in the file: " - << read_options.column_names[col_id]; - } + if (col_name != read_options.column_names[col_id]) { + LOG(FATAL) << "The specified column name: " << col_name + << " does not match the column name in the file: " + << read_options.column_names[col_id]; + } } included_col_names.emplace_back(col_name); mapped_property_names.emplace_back(property_name); diff --git a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc index 5167317c343d..d0f330336d4a 100644 --- a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc +++ b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc @@ -117,9 +117,11 @@ void ODPSReadClient::CreateReadSession( const std::vector& selected_partitions) { auto resp = createReadSession(table_identifier, selected_cols, partition_cols, selected_partitions); - if (resp.status_ != apsara::odps::sdk::storage_api::Status::OK && - resp.status_ != apsara::odps::sdk::storage_api::Status::WAIT) { - LOG(FATAL) << "CreateReadSession failed" << resp.error_message_; + while (resp.status_ != apsara::odps::sdk::storage_api::Status::OK && + resp.status_ != apsara::odps::sdk::storage_api::Status::WAIT) { + LOG(ERROR) << "CreateReadSession failed" << resp.error_message_; + resp = createReadSession(table_identifier, selected_cols, partition_cols, + selected_partitions); } *session_id = resp.session_id_; @@ -174,6 +176,7 @@ bool ODPSReadClient::readRows( req.table_identifier_ = table_identifier; req.session_id_ = session_id; req.split_index_ = split_index; + req.max_batch_rows_ = 20000; auto reader = arrow_client_ptr_->ReadRows(req); std::shared_ptr record_batch; @@ -216,7 +219,8 @@ std::shared_ptr ODPSReadClient::ReadTable( for (int32_t i = 0; i < cur_thread_num; ++i) { auto indices = split_indices(i, cur_thread_num, split_count); LOG(INFO) << "Thread " << i << " will read " << indices.size() - << " splits of " << split_count << " splits"; + << " splits of " << split_count + << " splits: " << gs::to_string(indices); producers.emplace_back(std::thread( &ODPSReadClient::producerRoutine, this, std::cref(session_id), std::cref(table_id), std::ref(all_batches), std::move(indices))); @@ -252,17 +256,19 @@ std::shared_ptr ODPSReadClient::ReadTable( ODPSStreamRecordBatchSupplier::ODPSStreamRecordBatchSupplier( label_t label_id, const std::string& file_path, const ODPSReadClient& odps_table_reader, const std::string& session_id, - int split_count, TableIdentifier table_identifier) + int split_count, TableIdentifier table_identifier, int worker_id, + int worker_num) : file_path_(file_path), odps_read_client_(odps_table_reader), session_id_(session_id), split_count_(split_count), table_identifier_(table_identifier), - cur_split_index_(0) { + cur_split_index_(worker_id), + worker_num_(worker_num) { read_rows_req_.table_identifier_ = table_identifier_; read_rows_req_.session_id_ = session_id_; read_rows_req_.split_index_ = cur_split_index_; - read_rows_req_.max_batch_rows_ = 32768; + read_rows_req_.max_batch_rows_ = 20000; cur_batch_reader_ = odps_read_client_.GetArrowClient()->ReadRows(read_rows_req_); } @@ -270,7 +276,7 @@ ODPSStreamRecordBatchSupplier::ODPSStreamRecordBatchSupplier( std::shared_ptr ODPSStreamRecordBatchSupplier::GetNextBatch() { std::shared_ptr record_batch; - if (!cur_batch_reader_) { + if (!cur_batch_reader_ || cur_split_index_ >= split_count_) { return record_batch; } while (true) { @@ -286,7 +292,7 @@ ODPSStreamRecordBatchSupplier::GetNextBatch() { } else { VLOG(1) << "Read split " << cur_split_index_ << " finished"; // move to next split - ++cur_split_index_; + cur_split_index_ += worker_num_; if (cur_split_index_ >= split_count_) { VLOG(1) << "Finish Read all splits"; cur_batch_reader_.reset(); @@ -382,37 +388,40 @@ void ODPSFragmentLoader::parseLocation( void ODPSFragmentLoader::addVertices(label_t v_label_id, const std::vector& v_files) { - auto record_batch_supplier_creator = - [this](label_t label_id, const std::string& v_file, - const LoadingConfig& loading_config) { - auto vertex_column_mappings = - loading_config_.GetVertexColumnMappings(label_id); - std::string session_id; - int split_count; - TableIdentifier table_identifier; - std::vector partition_cols; - std::vector selected_partitions; - parseLocation(v_file, table_identifier, partition_cols, - selected_partitions); - auto selected_cols = - columnMappingsToSelectedCols(vertex_column_mappings); - odps_read_client_.CreateReadSession( - &session_id, &split_count, table_identifier, selected_cols, - partition_cols, selected_partitions); - VLOG(1) << "Successfully got session_id: " << session_id - << ", split count: " << split_count; - if (loading_config.GetIsBatchReader()) { - auto res = std::make_shared( - label_id, v_file, odps_read_client_, session_id, split_count, - table_identifier); - return std::dynamic_pointer_cast(res); - } else { - auto res = std::make_shared( - label_id, v_file, odps_read_client_, session_id, split_count, - table_identifier, thread_num_); - return std::dynamic_pointer_cast(res); - } - }; + auto record_batch_supplier_creator = [this]( + label_t label_id, + const std::string& v_file, + const LoadingConfig& loading_config, + int worker_num) { + auto vertex_column_mappings = + loading_config_.GetVertexColumnMappings(label_id); + std::string session_id; + int split_count; + TableIdentifier table_identifier; + std::vector partition_cols; + std::vector selected_partitions; + std::vector> suppliers; + parseLocation(v_file, table_identifier, partition_cols, + selected_partitions); + auto selected_cols = columnMappingsToSelectedCols(vertex_column_mappings); + odps_read_client_.CreateReadSession(&session_id, &split_count, + table_identifier, selected_cols, + partition_cols, selected_partitions); + VLOG(1) << "Successfully got session_id: " << session_id + << ", split count: " << split_count; + if (loading_config.GetIsBatchReader()) { + for (int i = 0; i < worker_num; ++i) { + suppliers.emplace_back(std::make_shared( + label_id, v_file, odps_read_client_, session_id, split_count, + table_identifier, i, worker_num)); + } + } else { + suppliers.emplace_back(std::make_shared( + label_id, v_file, odps_read_client_, session_id, split_count, + table_identifier, thread_num_)); + } + return suppliers; + }; return AbstractArrowFragmentLoader::AddVerticesRecordBatch( v_label_id, v_files, record_batch_supplier_creator); } @@ -440,8 +449,8 @@ void ODPSFragmentLoader::loadVertices() { ++iter) { vertex_files.emplace_back(iter->first, iter->second); } - LOG(INFO) << "Parallel loading with " << thread_num_ << " threads, " - << " " << vertex_files.size() << " vertex files, "; + LOG(INFO) << "Parallel loading with " << thread_num_ << " threads, " << " " + << vertex_files.size() << " vertex files, "; std::atomic v_ind(0); std::vector threads(thread_num_); for (int i = 0; i < thread_num_; ++i) { @@ -471,12 +480,13 @@ void ODPSFragmentLoader::addEdges(label_t src_label_i, label_t dst_label_i, const std::vector& table_paths) { auto lambda = [this](label_t src_label_id, label_t dst_label_id, label_t e_label_id, const std::string& table_path, - const LoadingConfig& loading_config) { + const LoadingConfig& loading_config, int worker_num) { std::string session_id; int split_count; TableIdentifier table_identifier; std::vector partition_cols; std::vector selected_partitions; + std::vector> suppliers; parseLocation(table_path, table_identifier, partition_cols, selected_partitions); auto edge_column_mappings = loading_config_.GetEdgeColumnMappings( @@ -506,16 +516,18 @@ void ODPSFragmentLoader::addEdges(label_t src_label_i, label_t dst_label_i, VLOG(1) << "Successfully got session_id: " << session_id << ", split count: " << split_count; if (loading_config.GetIsBatchReader()) { - auto res = std::make_shared( - e_label_id, table_path, odps_read_client_, session_id, split_count, - table_identifier); - return std::dynamic_pointer_cast(res); + for (int i = 0; i < worker_num; ++i) { + suppliers.emplace_back(std::make_shared( + e_label_id, table_path, odps_read_client_, session_id, split_count, + table_identifier, i, worker_num)); + } + } else { - auto res = std::make_shared( + suppliers.emplace_back(std::make_shared( e_label_id, table_path, odps_read_client_, session_id, split_count, - table_identifier, thread_num_); - return std::dynamic_pointer_cast(res); + table_identifier, thread_num_)); } + return suppliers; }; AbstractArrowFragmentLoader::AddEdgesRecordBatch( diff --git a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h index a894be69b2bb..7bdc6f1f9847 100644 --- a/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h +++ b/flex/storages/rt_mutable_graph/loader/odps_fragment_loader.h @@ -120,7 +120,8 @@ class ODPSStreamRecordBatchSupplier : public IRecordBatchSupplier { ODPSStreamRecordBatchSupplier(label_t label_id, const std::string& file_path, const ODPSReadClient& odps_table_reader, const std::string& session_id, int split_count, - TableIdentifier table_identifier); + TableIdentifier table_identifier, int worker_id, + int worker_num); std::shared_ptr GetNextBatch() override; @@ -132,6 +133,7 @@ class ODPSStreamRecordBatchSupplier : public IRecordBatchSupplier { TableIdentifier table_identifier_; int32_t cur_split_index_; + int worker_num_; ReadRowsReq read_rows_req_; std::shared_ptr cur_batch_reader_; }; diff --git a/flex/utils/property/column.h b/flex/utils/property/column.h index 85d27e4f765e..67c4e87e29ea 100644 --- a/flex/utils/property/column.h +++ b/flex/utils/property/column.h @@ -18,6 +18,7 @@ #include #include +#include "grape/utils/concurrent_queue.h" #include "flex/utils/mmap_array.h" #include "flex/utils/property/types.h" @@ -504,6 +505,7 @@ class StringMapColumn : public ColumnBase { private: TypedColumn index_col_; LFIndexer* meta_map_; + grape::SpinLock lock_; }; template @@ -547,7 +549,11 @@ void StringMapColumn::set_value(size_t idx, const std::string_view& val) { INDEX_T lid; if (!meta_map_->get_index(val, lid)) { - lid = meta_map_->insert(val); + lock_.lock(); + if (!meta_map_->get_index(val, lid)) { + lid = meta_map_->insert(val); + } + lock_.unlock(); } index_col_.set_value(idx, lid); }