Skip to content

Commit

Permalink
fix(interactive): add error message for file operations (#4126)
Browse files Browse the repository at this point in the history
as titled
  • Loading branch information
liulx20 authored Aug 12, 2024
1 parent 6e15ef7 commit 0f427ce
Show file tree
Hide file tree
Showing 15 changed files with 296 additions and 91 deletions.
8 changes: 7 additions & 1 deletion flex/bin/bulk_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,14 @@ int main(int argc, char** argv) {

auto loader = gs::LoaderFactory::CreateFragmentLoader(
data_dir_path.string(), schema_res.value(), loading_config_res.value());
loader->LoadFragment();

auto result = loader->LoadFragment();
if (!result.ok()) {
std::filesystem::remove_all(data_dir_path);
LOG(ERROR) << "Failed to load fragment: "
<< result.status().error_message();
return -1;
}
t += grape::GetCurrentTime();
LOG(INFO) << "Finished bulk loading in " << t << " seconds.";

Expand Down
6 changes: 4 additions & 2 deletions flex/engines/graph_db/grin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ add_custom_target(grin_clformat
COMMAND clang-format --style=file -i ${FILES_NEED_FORMAT}
COMMENT "Running clang-format."
VERBATIM)
file(GLOB_RECURSE SRC_SOURCES "src/*.cc")
file(GLOB_RECURSE UTILS_SOURCES "../../../utils/*.cc")
file(GLOB_RECURSE STORAGE_SOURCES "../../../storages/rt_mutable_graph/*.cc")
set(SOURCES ${SRC_SOURCES} ${UTILS_SOURCES} ${STORAGE_SOURCES})

file(GLOB SOURCES "src/*.cc" "src/topology/*.cc" "src/property/*.cc" "src/index/*.cc" "src/common/*.cc" "../../../utils/property/*.cc" "../../../utils/*.cc"
"../../../storages/rt_mutable_graph/*.cc" "../../../storages/rt_mutable_graph/loader/*.cc")
add_library(flex_grin SHARED ${SOURCES})
target_link_libraries(flex_grin ${LIBGRAPELITE_LIBRARIES} ${GFLAGS_LIBRARIES} ${CMAKE_DL_LIBS} ${YAML_CPP_LIBRARIES})
if (ARROW_SHARED_LIB)
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/grin/src/property/property.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ GRIN_VERTEX_PROPERTY_LIST grin_get_vertex_properties_by_name(GRIN_GRAPH g,
std::string prop_name(name);
auto vps = new GRIN_VERTEX_PROPERTY_LIST_T();
std::string _name = std::string(name);
for (auto idx = 0; idx < _g->g.vertex_label_num_; idx++) {
for (size_t idx = 0; idx < _g->g.vertex_label_num_; idx++) {
auto& table = _g->g.get_vertex_table(static_cast<GRIN_VERTEX_TYPE>(idx));

auto col = table.get_column(name);
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/grin/src/property/propertylist.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ GRIN_EDGE_PROPERTY grin_get_edge_property_from_list(GRIN_GRAPH g,
size_t idx) {
auto _epl = static_cast<GRIN_EDGE_PROPERTY_LIST_T*>(epl);
if (_epl->size() <= idx) {
return GRIN_NULL_EDGE_PROPERTY;
return static_cast<GRIN_EDGE_PROPERTY>(GRIN_NULL_EDGE_PROPERTY);
}
return (*_epl)[idx];
}
Expand Down
52 changes: 52 additions & 0 deletions flex/storages/rt_mutable_graph/csr/mutable_csr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,58 @@

namespace gs {

void read_file(const std::string& filename, void* buffer, size_t size,
size_t num) {
FILE* fin = fopen(filename.c_str(), "r");
if (fin == nullptr) {
std::stringstream ss;
ss << "Failed to open file " << filename << ", " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
size_t ret_len = 0;
if ((ret_len = fread(buffer, size, num, fin)) != num) {
std::stringstream ss;
ss << "Failed to read file " << filename << ", expected " << num << ", got "
<< ret_len << ", " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
int ret = 0;
if ((ret = fclose(fin)) != 0) {
std::stringstream ss;
ss << "Failed to close file " << filename << ", error code: " << ret << " "
<< strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
}

void write_file(const std::string& filename, const void* buffer, size_t size,
size_t num) {
FILE* fout = fopen(filename.c_str(), "wb");
if (fout == nullptr) {
std::stringstream ss;
ss << "Failed to open file " << filename << ", " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
size_t ret_len = 0;
if ((ret_len = fwrite(buffer, size, num, fout)) != num) {
std::stringstream ss;
ss << "Failed to write file " << filename << ", expected " << num
<< ", got " << ret_len << ", " << strerror(errno);
LOG(ERROR) << ss.str();
}
int ret = 0;
if ((ret = fclose(fout)) != 0) {
std::stringstream ss;
ss << "Failed to close file " << filename << ", error code: " << ret << " "
<< strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
}
template class SingleMutableCsr<grape::EmptyType>;
template class MutableCsr<grape::EmptyType>;

Expand Down
100 changes: 71 additions & 29 deletions flex/storages/rt_mutable_graph/csr/mutable_csr.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@

namespace gs {

void read_file(const std::string& filename, void* buffer, size_t size,
size_t num);

void write_file(const std::string& filename, const void* buffer, size_t size,
size_t num);

template <typename EDATA_T>
class MutableCsrConstEdgeIter : public CsrConstEdgeIterBase {
using const_nbr_ptr_t = typename MutableNbrSlice<EDATA_T>::const_nbr_ptr_t;
Expand Down Expand Up @@ -391,28 +397,63 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
}

if (need_cap_list) {
FILE* fcap_out =
fopen((new_snapshot_dir + "/" + name + ".cap").c_str(), "wb");
CHECK_EQ(fwrite(cap_list.data(), sizeof(int), cap_list.size(), fcap_out),
cap_list.size());
fflush(fcap_out);
fclose(fcap_out);
write_file(new_snapshot_dir + "/" + name + ".cap", cap_list.data(),
sizeof(int), cap_list.size());
}

if (reuse_nbr_list && !nbr_list_.filename().empty() &&
std::filesystem::exists(nbr_list_.filename())) {
std::error_code errorCode;
std::filesystem::create_hard_link(nbr_list_.filename(),
new_snapshot_dir + "/" + name + ".nbr");
new_snapshot_dir + "/" + name + ".nbr",
errorCode);
if (errorCode) {
std::stringstream ss;
ss << "Failed to create hard link from " << nbr_list_.filename()
<< " to " << new_snapshot_dir + "/" + name + ".snbr"
<< ", error code: " << errorCode << " " << errorCode.message();
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
} else {
FILE* fout =
fopen((new_snapshot_dir + "/" + name + ".nbr").c_str(), "wb");
std::string filename = new_snapshot_dir + "/" + name + ".nbr";
if (fout == nullptr) {
std::stringstream ss;
ss << "Failed to open nbr list " << filename << ", " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}

for (size_t i = 0; i < vnum; ++i) {
CHECK_EQ(fwrite(adj_lists_[i].data(), sizeof(nbr_t),
adj_lists_[i].capacity(), fout),
adj_lists_[i].capacity());
size_t ret{};
if ((ret = fwrite(adj_lists_[i].data(), sizeof(nbr_t),
adj_lists_[i].capacity(), fout)) !=
static_cast<size_t>(adj_lists_[i].capacity())) {
std::stringstream ss;
ss << "Failed to write nbr list " << filename << ", expected "
<< adj_lists_[i].capacity() << ", got " << ret << ", "
<< strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
}
int ret = 0;
if ((ret = fflush(fout)) != 0) {
std::stringstream ss;
ss << "Failed to flush nbr list " << filename << ", error code: " << ret
<< " " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
if ((ret = fclose(fout)) != 0) {
std::stringstream ss;
ss << "Failed to close nbr list " << filename << ", error code: " << ret
<< " " << strerror(errno);
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
fflush(fout);
fclose(fout);
}
}

Expand Down Expand Up @@ -510,21 +551,16 @@ class MutableCsr : public TypedMutableCsrBase<EDATA_T> {
void load_meta(const std::string& prefix) {
std::string meta_file_path = prefix + ".meta";
if (std::filesystem::exists(meta_file_path)) {
FILE* meta_file_fd = fopen(meta_file_path.c_str(), "r");
CHECK_EQ(fread(&unsorted_since_, sizeof(timestamp_t), 1, meta_file_fd),
1);
fclose(meta_file_fd);
read_file(meta_file_path, &unsorted_since_, sizeof(timestamp_t), 1);

} else {
unsorted_since_ = 0;
}
}

void dump_meta(const std::string& prefix) const {
std::string meta_file_path = prefix + ".meta";
FILE* meta_file_fd = fopen((prefix + ".meta").c_str(), "wb");
CHECK_EQ(fwrite(&unsorted_since_, sizeof(timestamp_t), 1, meta_file_fd), 1);
fflush(meta_file_fd);
fclose(meta_file_fd);
write_file(meta_file_path, &unsorted_since_, sizeof(timestamp_t), 1);
}

grape::SpinLock* locks_;
Expand Down Expand Up @@ -768,9 +804,7 @@ class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
size_t old_size = nbr_list_.size();
nbr_list_.reset();
nbr_list_.resize(v_cap);
FILE* fin = fopen((prefix + ".snbr").c_str(), "r");
CHECK_EQ(fread(nbr_list_.data(), sizeof(nbr_t), old_size, fin), old_size);
fclose(fin);
read_file(prefix + ".snbr", nbr_list_.data(), sizeof(nbr_t), old_size);
for (size_t k = old_size; k != v_cap; ++k) {
nbr_list_[k].timestamp.store(std::numeric_limits<timestamp_t>::max());
}
Expand All @@ -792,13 +826,21 @@ class SingleMutableCsr : public TypedMutableCsrBase<EDATA_T> {
const std::string& new_snapshot_dir) override {
if ((!nbr_list_.filename().empty() &&
std::filesystem::exists(nbr_list_.filename()))) {
std::filesystem::create_hard_link(
nbr_list_.filename(), new_snapshot_dir + "/" + name + ".snbr");
std::error_code errorCode;
std::filesystem::create_hard_link(nbr_list_.filename(),
new_snapshot_dir + "/" + name + ".snbr",
errorCode);
if (errorCode) {
std::stringstream ss;
ss << "Failed to create hard link from " << nbr_list_.filename()
<< " to " << new_snapshot_dir + "/" + name + ".snbr"
<< ", error code: " << errorCode << " " << errorCode.message();
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
} else {
FILE* fp = fopen((new_snapshot_dir + "/" + name + ".snbr").c_str(), "wb");
fwrite(nbr_list_.data(), sizeof(nbr_t), nbr_list_.size(), fp);
fflush(fp);
fclose(fp);
write_file(new_snapshot_dir + "/" + name + ".snbr", nbr_list_.data(),
sizeof(nbr_t), nbr_list_.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/statvfs.h>

#include "flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h"
#include "flex/engines/hqps_db/core/utils/hqps_utils.h"
#include "flex/storages/rt_mutable_graph/loader/abstract_arrow_fragment_loader.h"

namespace gs {

void printDiskRemaining(const std::string& path) {
struct statvfs buf;
if (statvfs(path.c_str(), &buf) == 0) {
LOG(INFO) << "Disk remaining: " << buf.f_bsize * buf.f_bavail / 1024 / 1024
<< "MB";
}
}

bool check_primary_key_type(std::shared_ptr<arrow::DataType> data_type) {
if (data_type->Equals(arrow::int64()) || data_type->Equals(arrow::uint64()) ||
data_type->Equals(arrow::int32()) || data_type->Equals(arrow::uint32()) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@

namespace gs {

void printDiskRemaining(const std::string& path);
// The interface providing visitor pattern for RecordBatch.

class IRecordBatchSupplier {
public:
virtual ~IRecordBatchSupplier() = default;
Expand Down
20 changes: 15 additions & 5 deletions flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -728,11 +728,21 @@ void CSVFragmentLoader::loadEdges() {
}
}

void CSVFragmentLoader::LoadFragment() {
loadVertices();
loadEdges();

basic_fragment_loader_.LoadFragment();
Result<bool> CSVFragmentLoader::LoadFragment() {
try {
loadVertices();
loadEdges();

basic_fragment_loader_.LoadFragment();
} catch (const std::exception& e) {
auto work_dir = basic_fragment_loader_.work_dir();
printDiskRemaining(work_dir);
LOG(ERROR) << "Load fragment failed: " << e.what();
return Result<bool>(StatusCode::InternalError,
"Load fragment failed: " + std::string(e.what()),
false);
}
return Result<bool>(true);
}

const bool CSVFragmentLoader::registered_ = LoaderFactory::Register(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CSVFragmentLoader : public AbstractArrowFragmentLoader {

~CSVFragmentLoader() {}

void LoadFragment() override;
Result<bool> LoadFragment() override;

private:
void loadVertices();
Expand Down
2 changes: 1 addition & 1 deletion flex/storages/rt_mutable_graph/loader/i_fragment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace gs {
class IFragmentLoader {
public:
virtual ~IFragmentLoader() = default;
virtual void LoadFragment() = 0;
virtual Result<bool> LoadFragment() = 0;
};

} // namespace gs
Expand Down
22 changes: 16 additions & 6 deletions flex/storages/rt_mutable_graph/loader/odps_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,22 @@ std::shared_ptr<IFragmentLoader> ODPSFragmentLoader::Make(
}
void ODPSFragmentLoader::init() { odps_read_client_.init(); }

void ODPSFragmentLoader::LoadFragment() {
init();
loadVertices();
loadEdges();

basic_fragment_loader_.LoadFragment();
Result<bool> ODPSFragmentLoader::LoadFragment() {
try {
init();
loadVertices();
loadEdges();

basic_fragment_loader_.LoadFragment();
} catch (const std::exception& e) {
auto work_dir = basic_fragment_loader_.work_dir();
printDiskRemaining(work_dir);
LOG(ERROR) << "Failed to load fragment: " << e.what();
return Result<bool>(StatusCode::InternalError,
"Load fragment failed: " + std::string(e.what()),
false);
}
return Result<bool>(true);
}

// odps_table_path is like /project_name/table_name/partition_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class ODPSFragmentLoader : public AbstractArrowFragmentLoader {

~ODPSFragmentLoader() {}

void LoadFragment() override;
Result<bool> LoadFragment() override;

private:
void init();
Expand Down
10 changes: 9 additions & 1 deletion flex/storages/rt_mutable_graph/mutable_property_fragment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,15 @@ void MutablePropertyFragment::Compact(uint32_t version) {
void MutablePropertyFragment::Dump(const std::string& work_dir,
uint32_t version) {
std::string snapshot_dir_path = snapshot_dir(work_dir, version);
std::filesystem::create_directories(snapshot_dir_path);
std::error_code errorCode;
std::filesystem::create_directories(snapshot_dir_path, errorCode);
if (errorCode) {
std::stringstream ss;
ss << "Failed to create snapshot directory: " << snapshot_dir_path << ", "
<< errorCode.message();
LOG(ERROR) << ss.str();
throw std::runtime_error(ss.str());
}
std::vector<size_t> vertex_num(vertex_label_num_, 0);
for (size_t i = 0; i < vertex_label_num_; ++i) {
vertex_num[i] = lf_indexers_[i].size();
Expand Down
Loading

0 comments on commit 0f427ce

Please sign in to comment.