Skip to content

Commit

Permalink
fix graph db
Browse files Browse the repository at this point in the history
  • Loading branch information
liulx20 committed Jan 11, 2025
1 parent 19e2c90 commit d514b16
Show file tree
Hide file tree
Showing 14 changed files with 336 additions and 206 deletions.
50 changes: 31 additions & 19 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ GraphDB& GraphDB::get() {
return db;
}

QueryCache& GraphDB::getQueryCache() const { return query_cache_; }

Result<bool> GraphDB::Open(const Schema& schema, const std::string& data_dir,
int32_t thread_num, bool warmup, bool memory_only,
bool enable_auto_compaction) {
Expand Down Expand Up @@ -231,12 +229,17 @@ Result<bool> GraphDB::Open(const GraphDBConfig& config) {
timestamp_t ts = this->version_manager_.acquire_update_timestamp();
auto txn = CompactTransaction(this->graph_, this->contexts_[0].logger,
this->version_manager_, ts);
OutputCypherProfiles("./" + std::to_string(ts) + "_");
txn.Commit();
VLOG(10) << "Finish compaction";
}
}
});
}

unlink((work_dir_ + "/statistics.json").c_str());
unlink((work_dir_ + "/.compiler.yaml").c_str());
graph_.generateStatistics(work_dir_);
query_cache_.cache.clear();

return Result<bool>(true);
Expand Down Expand Up @@ -265,8 +268,9 @@ void GraphDB::Close() {
std::fill(app_factories_.begin(), app_factories_.end(), nullptr);
}

ReadTransaction GraphDB::GetReadTransaction(int thread_id) {
return contexts_[thread_id].session.GetReadTransaction();
ReadTransaction GraphDB::GetReadTransaction() {
uint32_t ts = version_manager_.acquire_read_timestamp();
return {graph_, version_manager_, ts};
}

InsertTransaction GraphDB::GetInsertTransaction(int thread_id) {
Expand Down Expand Up @@ -304,21 +308,6 @@ timestamp_t GraphDB::GetLastCompactionTimestamp() const {
return last_compaction_ts_;
}

const MutablePropertyFragment& GraphDB::graph() const { return graph_; }
MutablePropertyFragment& GraphDB::graph() { return graph_; }

const Schema& GraphDB::schema() const { return graph_.schema(); }

std::shared_ptr<ColumnBase> GraphDB::get_vertex_property_column(
uint8_t label, const std::string& col_name) const {
return graph_.get_vertex_property_column(label, col_name);
}

std::shared_ptr<RefColumnBase> GraphDB::get_vertex_id_column(
uint8_t label) const {
return graph_.get_vertex_id_column(label);
}

AppWrapper GraphDB::CreateApp(uint8_t app_type, int thread_id) {
if (app_factories_[app_type] == nullptr) {
LOG(ERROR) << "Stored procedure " << static_cast<int>(app_type)
Expand Down Expand Up @@ -516,4 +505,27 @@ size_t GraphDB::getExecutedQueryNum() const {
return ret;
}

QueryCache& GraphDB::getQueryCache() const { return query_cache_; }

void GraphDB::OutputCypherProfiles(const std::string& prefix) {
runtime::OprTimer read_timer, write_timer;
int session_num = SessionNum();
for (int i = 0; i < session_num; ++i) {
auto read_app_ptr = GetSession(i).GetApp(Schema::CYPHER_READ_PLUGIN_ID);
auto casted_read_app = dynamic_cast<CypherReadApp*>(read_app_ptr);
if (casted_read_app) {
read_timer += casted_read_app->timer();
}

auto write_app_ptr = GetSession(i).GetApp(Schema::CYPHER_WRITE_PLUGIN_ID);
auto casted_write_app = dynamic_cast<CypherWriteApp*>(write_app_ptr);
if (casted_write_app) {
write_timer += casted_write_app->timer();
}
}

read_timer.output(prefix + "read_profile.log");
write_timer.output(prefix + "write_profile.log");
}

} // namespace gs
30 changes: 19 additions & 11 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <map>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <vector>

Expand Down Expand Up @@ -93,8 +94,6 @@ class GraphDB {

static GraphDB& get();

QueryCache& getQueryCache() const;

/**
* @brief Load the graph from data directory.
* @param schema The schema of graph. It should be the same as the schema,
Expand All @@ -119,7 +118,7 @@ class GraphDB {
*
* @return graph_dir The directory of graph data.
*/
ReadTransaction GetReadTransaction(int thread_id = 0);
ReadTransaction GetReadTransaction();

/** @brief Create a transaction to insert vertices and edges with a default
* allocator.
Expand Down Expand Up @@ -150,15 +149,20 @@ class GraphDB {
*/
UpdateTransaction GetUpdateTransaction(int thread_id = 0);

const MutablePropertyFragment& graph() const;
MutablePropertyFragment& graph();
inline const MutablePropertyFragment& graph() const { return graph_; }
inline MutablePropertyFragment& graph() { return graph_; }

const Schema& schema() const;
inline const Schema& schema() const { return graph_.schema(); }

std::shared_ptr<ColumnBase> get_vertex_property_column(
uint8_t label, const std::string& col_name) const;
inline std::shared_ptr<ColumnBase> get_vertex_property_column(
uint8_t label, const std::string& col_name) const {
return graph_.get_vertex_table(label).get_column(col_name);
}

std::shared_ptr<RefColumnBase> get_vertex_id_column(uint8_t label) const;
inline std::shared_ptr<RefColumnBase> get_vertex_id_column(
uint8_t label) const {
return graph_.get_vertex_id_column(label);
}

AppWrapper CreateApp(uint8_t app_type, int thread_id);

Expand All @@ -172,8 +176,12 @@ class GraphDB {
void UpdateCompactionTimestamp(timestamp_t ts);
timestamp_t GetLastCompactionTimestamp() const;

QueryCache& getQueryCache() const;

std::string work_dir() const { return work_dir_; }

void OutputCypherProfiles(const std::string& prefix);

private:
bool registerApp(const std::string& path, uint8_t index = 0);

Expand All @@ -193,8 +201,6 @@ class GraphDB {

friend class GraphDBSession;

mutable QueryCache query_cache_;

std::string work_dir_;
SessionLocalContext* contexts_;

Expand All @@ -206,6 +212,8 @@ class GraphDB {
std::array<std::string, 256> app_paths_;
std::array<std::shared_ptr<AppFactoryBase>, 256> app_factories_;

mutable QueryCache query_cache_;

std::thread monitor_thread_;
bool monitor_thread_running_;

Expand Down
30 changes: 28 additions & 2 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace gs {

ReadTransaction GraphDBSession::GetReadTransaction() const {
uint32_t ts = db_.version_manager_.acquire_read_timestamp();
return ReadTransaction(*this, db_.graph_, db_.version_manager_, ts);
return ReadTransaction(db_.graph_, db_.version_manager_, ts);
}

InsertTransaction GraphDBSession::GetInsertTransaction() {
Expand Down Expand Up @@ -79,7 +79,33 @@ std::shared_ptr<ColumnBase> GraphDBSession::get_vertex_property_column(

std::shared_ptr<RefColumnBase> GraphDBSession::get_vertex_id_column(
uint8_t label) const {
return db_.get_vertex_id_column(label);
if (db_.graph().lf_indexers_[label].get_type() == PropertyType::kInt64) {
return std::make_shared<TypedRefColumn<int64_t>>(
dynamic_cast<const TypedColumn<int64_t>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else if (db_.graph().lf_indexers_[label].get_type() ==
PropertyType::kInt32) {
return std::make_shared<TypedRefColumn<int32_t>>(
dynamic_cast<const TypedColumn<int32_t>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else if (db_.graph().lf_indexers_[label].get_type() ==
PropertyType::kUInt64) {
return std::make_shared<TypedRefColumn<uint64_t>>(
dynamic_cast<const TypedColumn<uint64_t>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else if (db_.graph().lf_indexers_[label].get_type() ==
PropertyType::kUInt32) {
return std::make_shared<TypedRefColumn<uint32_t>>(
dynamic_cast<const TypedColumn<uint32_t>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else if (db_.graph().lf_indexers_[label].get_type() ==
PropertyType::kStringView) {
return std::make_shared<TypedRefColumn<std::string_view>>(
dynamic_cast<const TypedColumn<std::string_view>&>(
db_.graph().lf_indexers_[label].get_keys()));
} else {
return nullptr;
}
}

Result<std::vector<char>> GraphDBSession::Eval(const std::string& input) {
Expand Down
9 changes: 2 additions & 7 deletions flex/engines/graph_db/database/read_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

namespace gs {

ReadTransaction::ReadTransaction(const GraphDBSession& session,
const MutablePropertyFragment& graph,
ReadTransaction::ReadTransaction(const MutablePropertyFragment& graph,
VersionManager& vm, timestamp_t timestamp)
: session_(session), graph_(graph), vm_(vm), timestamp_(timestamp) {}
: graph_(graph), vm_(vm), timestamp_(timestamp) {}
ReadTransaction::~ReadTransaction() { release(); }

timestamp_t ReadTransaction::timestamp() const { return timestamp_; }
Expand Down Expand Up @@ -127,15 +126,11 @@ ReadTransaction::edge_iterator ReadTransaction::GetInEdgeIterator(
graph_.get_incoming_edges(label, u, neighbor_label, edge_label)};
}

const Schema& ReadTransaction::schema() const { return graph_.schema(); }

void ReadTransaction::release() {
if (timestamp_ != std::numeric_limits<timestamp_t>::max()) {
vm_.release_read_timestamp();
timestamp_ = std::numeric_limits<timestamp_t>::max();
}
}

const GraphDBSession& ReadTransaction::GetSession() const { return session_; }

} // namespace gs
Loading

0 comments on commit d514b16

Please sign in to comment.