Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Make use of all available cpus at startup #4343

Merged
merged 14 commits into from
Dec 19, 2024
6 changes: 5 additions & 1 deletion .github/workflows/pr-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ jobs:
python3 -m black --check --diff .
python3 -m flake8 .
popd
pushd flex/interactive/sdk/python
# we need to generate the code first
pushd flex/interactive/sdk
bash generate_sdk.sh -g python
pushd python
python3 -m isort --check --diff .
python3 -m black --check --diff .
python3 -m flake8 .
popd
popd

- name: Generate Docs
shell: bash
Expand Down
28 changes: 17 additions & 11 deletions flex/engines/graph_db/runtime/adhoc/expr_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,23 @@ class WithInExpr : public ExprBase {
WithInExpr(const ReadTransaction& txn, const Context& ctx,
std::unique_ptr<ExprBase>&& key, const common::Value& array)
: key_(std::move(key)) {
if constexpr (std::is_same_v<T, int64_t>) {
CHECK(array.item_case() == common::Value::kI64Array);
size_t len = array.i64_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i64_array().item(idx));
}
} else if constexpr (std::is_same_v<T, int32_t>) {
CHECK(array.item_case() == common::Value::kI32Array);
size_t len = array.i32_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i32_array().item(idx));
if constexpr ((std::is_same_v<T, int64_t>) ||
(std::is_same_v<T, int32_t>) ) {
// Implicitly convert to T
if (array.item_case() == common::Value::kI64Array) {
size_t len = array.i64_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i64_array().item(idx));
}
} else if (array.item_case() == common::Value::kI32Array) {
size_t len = array.i32_array().item_size();
for (size_t idx = 0; idx < len; ++idx) {
container_.push_back(array.i32_array().item(idx));
}
} else {
LOG(FATAL) << "Fail to construct WithInExpr of type "
<< typeid(T).name() << " with array of type "
<< array.item_case();
}
} else if constexpr (std::is_same_v<T, std::string>) {
CHECK(array.item_case() == common::Value::kStrArray);
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/http_server/handler/graph_db_http_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class stored_proc_handler : public StoppableHandler {

bool start() override {
if (get_executors()[StoppableHandler::shard_id()].size() > 0) {
LOG(ERROR) << "The actors have been already created!";
VLOG(10) << "The actors have been already created!";
return false;
}
return StoppableHandler::start_scope(
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/http_server/handler/graph_db_http_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class StoppableHandler : public seastar::httpd::handler_base {
} catch (const std::exception& e) {
// In case the scope is already cancelled, we should ignore the
// exception.
LOG(INFO) << "Failed to cancel IC scope: " << e.what();
VLOG(1) << "Failed to cancel IC scope: " << e.what();
}
func();
return seastar::make_ready_future<>();
Expand Down
3 changes: 3 additions & 0 deletions flex/interactive/sdk/python/gs_interactive/client/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from gs_interactive.exceptions import NotFoundException
from gs_interactive.exceptions import ServiceException
from urllib3.exceptions import MaxRetryError
from urllib3.exceptions import ProtocolError

from gs_interactive.client.generated.interactive_pb2 import Code as StatusCode
from gs_interactive.models.api_response_with_code import APIResponseWithCode
Expand Down Expand Up @@ -108,6 +109,8 @@ def from_exception(exception: ApiException):
return Status(StatusCode.INTERNAL_ERROR, exception.body)
elif isinstance(exception, MaxRetryError):
return Status(StatusCode.INTERNAL_ERROR, exception)
elif isinstance(exception, ProtocolError):
return Status(StatusCode.INTERNAL_ERROR, exception)
return Status(
StatusCode.UNKNOWN, "Unknown Error from exception " + exception.body
)
Expand Down
31 changes: 27 additions & 4 deletions flex/interactive/sdk/python/gs_interactive/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#

# get the directory of the current file
import copy
import os
import time

Expand All @@ -27,6 +28,7 @@
from gs_interactive.client.session import Session
from gs_interactive.models import CreateGraphRequest
from gs_interactive.models import CreateProcedureRequest
from gs_interactive.models import GetGraphSchemaResponse
from gs_interactive.models import SchemaMapping
from gs_interactive.models import StartServiceRequest
from gs_interactive.models import UpdateProcedureRequest
Expand All @@ -39,7 +41,7 @@


modern_graph_full = {
"name": "modern_graph",
"name": "full_graph",
"description": "This is a test graph",
"schema": {
"vertex_types": [
Expand Down Expand Up @@ -120,7 +122,7 @@
}

modern_graph_vertex_only = {
"name": "modern_graph",
"name": "vertex_only",
"description": "This is a test graph, only contains vertex",
"schema": {
"vertex_types": [
Expand Down Expand Up @@ -148,7 +150,7 @@
}

modern_graph_partial = {
"name": "modern_graph",
"name": "partial_graph",
"description": "This is a test graph",
"schema": {
"vertex_types": [
Expand Down Expand Up @@ -336,7 +338,7 @@ def create_partial_modern_graph(interactive_session):

@pytest.fixture(scope="function")
def create_graph_with_custom_pk_name(interactive_session):
modern_graph_custom_pk_name = modern_graph_full.copy()
modern_graph_custom_pk_name = copy.deepcopy(modern_graph_full)
for vertex_type in modern_graph_custom_pk_name["schema"]["vertex_types"]:
vertex_type["properties"][0]["property_name"] = "custom_id"
vertex_type["primary_keys"] = ["custom_id"]
Expand Down Expand Up @@ -479,3 +481,24 @@ def start_service_on_graph(interactive_session, graph_id: str):
assert resp.is_ok()
# wait three second to let compiler get the new graph
time.sleep(3)


def ensure_compiler_schema_ready(
interactive_session, neo4j_session: Neo4jSession, graph_id: str
):
rel_graph_meta = interactive_session.get_graph_schema(graph_id).get_value()
max_times = 10
while True:
if max_times == 0:
raise Exception("compiler schema is not ready")
res = neo4j_session.run("CALL gs.procedure.meta.schema();")
val = res.single().value()
compiler_graph_schema = GetGraphSchemaResponse.from_json(val)
# print("compiler_graph_schema: ", compiler_graph_schema)
# print("rel_graph_meta: ", rel_graph_meta)
if compiler_graph_schema == rel_graph_meta:
break
print("compiler schema is not ready, wait for 1 second")
time.sleep(1)
max_times -= 1
print("compiler schema is ready")
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from gs_interactive.tests.conftest import call_procedure # noqa: E402
from gs_interactive.tests.conftest import create_procedure
from gs_interactive.tests.conftest import delete_procedure
from gs_interactive.tests.conftest import ensure_compiler_schema_ready
from gs_interactive.tests.conftest import import_data_to_full_modern_graph
from gs_interactive.tests.conftest import import_data_to_partial_modern_graph
from gs_interactive.tests.conftest import import_data_to_vertex_only_modern_graph
Expand Down Expand Up @@ -61,6 +62,9 @@ def test_query_on_vertex_only_graph(
"""
print("[Query on vertex only graph]")
start_service_on_graph(interactive_session, create_vertex_only_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_vertex_only_modern_graph
)
run_cypher_test_suite(
neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries
)
Expand All @@ -69,6 +73,10 @@ def test_query_on_vertex_only_graph(
import_data_to_vertex_only_modern_graph(
interactive_session, create_vertex_only_modern_graph
)
start_service_on_graph(interactive_session, create_vertex_only_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_vertex_only_modern_graph
)
run_cypher_test_suite(
neo4j_session, create_vertex_only_modern_graph, vertex_only_cypher_queries
)
Expand All @@ -83,12 +91,19 @@ def test_query_on_partial_graph(
print("[Query on partial graph]")
# start service on new graph
start_service_on_graph(interactive_session, create_partial_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_partial_modern_graph
)
# try to query on the graph
run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries)
start_service_on_graph(interactive_session, "1")
import_data_to_partial_modern_graph(
interactive_session, create_partial_modern_graph
)
start_service_on_graph(interactive_session, create_partial_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_partial_modern_graph
)
run_cypher_test_suite(neo4j_session, create_partial_modern_graph, cypher_queries)


Expand All @@ -100,10 +115,17 @@ def test_query_on_full_modern_graph(
"""
print("[Query on full modern graph]")
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
# try to query on the graph
run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries)
start_service_on_graph(interactive_session, "1")
import_data_to_full_modern_graph(interactive_session, create_modern_graph)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
run_cypher_test_suite(neo4j_session, create_modern_graph, cypher_queries)


Expand All @@ -129,6 +151,9 @@ def test_service_switching(
)
print("Procedure id: ", a_proc_id)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
call_procedure(neo4j_session, create_modern_graph, a_proc_id)

# create procedure on graph_b_id
Expand All @@ -139,6 +164,9 @@ def test_service_switching(
"MATCH(n: person) return count(n);",
)
start_service_on_graph(interactive_session, create_vertex_only_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_vertex_only_modern_graph
)
call_procedure(neo4j_session, create_vertex_only_modern_graph, b_proc_id)


Expand All @@ -156,6 +184,9 @@ def test_procedure_creation(interactive_session, neo4j_session, create_modern_gr
)
print("Procedure id: ", a_proc_id)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
call_procedure(neo4j_session, create_modern_graph, a_proc_id)

# create procedure with name containing space,
Expand Down Expand Up @@ -202,6 +233,9 @@ def test_builtin_procedure(interactive_session, neo4j_session, create_modern_gra
)
# Call the builtin procedure
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
call_procedure(
neo4j_session,
create_modern_graph,
Expand Down Expand Up @@ -259,6 +293,10 @@ def test_list_jobs(interactive_session, create_vertex_only_modern_graph):
def test_call_proc_in_cypher(interactive_session, neo4j_session, create_modern_graph):
print("[Test call procedure in cypher]")
import_data_to_full_modern_graph(interactive_session, create_modern_graph)
start_service_on_graph(interactive_session, create_modern_graph)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_modern_graph
)
result = neo4j_session.run(
'MATCH(p: person) with p.id as oid CALL k_neighbors("person", oid, 1) return label_name, vertex_oid;'
)
Expand All @@ -276,6 +314,9 @@ def test_custom_pk_name(
interactive_session, create_graph_with_custom_pk_name
)
start_service_on_graph(interactive_session, create_graph_with_custom_pk_name)
ensure_compiler_schema_ready(
interactive_session, neo4j_session, create_graph_with_custom_pk_name
)
result = neo4j_session.run(
"MATCH (n: person) where n.custom_id = 4 return n.custom_id;"
)
Expand All @@ -289,4 +330,3 @@ def test_custom_pk_name(
)
records = result.fetch(1)
assert len(records) == 1 and records[0]["$f0"] == 2
start_service_on_graph(interactive_session, "1")
2 changes: 1 addition & 1 deletion flex/interactive/sdk/python/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ profile = black
ensure_newline_before_comments = True
line_length = 88
force_single_line = True
skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py,
skip = build/,dist/,gs_interactive/api/,gs_interactive/api_response.py,gs_interactive/configuration.py,gs_interactive/exceptions.py,gs_interactive/models/,gs_interactiverest.py,gs_interactive/api_client.py,gs_interactive/__init__.py,gs_interactive/rest.py
skip_glob = *_pb2.py,*_pb2_grpc.py,build/*

[flake8]
Expand Down
53 changes: 31 additions & 22 deletions flex/storages/metadata/local_file_metadata_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ Result<bool> LocalFileMetadataStore::UpdateMeta(const meta_kind_t& meta_kind,

Result<LocalFileMetadataStore::meta_key_t>
LocalFileMetadataStore::get_next_meta_key(
const LocalFileMetadataStore::meta_kind_t& meta_kind) const {
return std::to_string(get_max_id(meta_kind) + 1);
const LocalFileMetadataStore::meta_kind_t& meta_kind) {
return std::to_string(increase_and_get_id(meta_kind));
}

std::string LocalFileMetadataStore::get_root_meta_dir() const {
Expand All @@ -208,29 +208,38 @@ std::string LocalFileMetadataStore::get_meta_file(const meta_kind_t& meta_kind,
return ret;
}

int32_t LocalFileMetadataStore::get_max_id(const meta_kind_t& meta_kind) const {
// iterate all files in the directory, get the max id.
int max_id_ = 0;
// Guarded by meta_mutex_ outside.
int32_t LocalFileMetadataStore::increase_and_get_id(
const meta_kind_t& meta_kind) {
auto dir = get_meta_kind_dir(meta_kind);
for (auto& p : std::filesystem::directory_iterator(dir)) {
if (std::filesystem::is_directory(p)) {
continue;
}
auto file_name = p.path().filename().string();
if (file_name.find(META_FILE_PREFIX) != std::string::npos) {
auto id_str = file_name.substr(strlen(META_FILE_PREFIX));
int32_t id;
try {
id = std::stoi(id_str);
} catch (std::invalid_argument& e) {
LOG(ERROR) << "Invalid id: " << id_str;
continue;
}
if (id > max_id_) {
max_id_ = id;
}
int max_id_ = 0;
// In the directory, we expect a file with name CUR_ID_FILE_NAME.
// If the file does not exist, we will create one with content "0".
auto cur_id_file = dir + "/" + CUR_ID_FILE_NAME;
if (!std::filesystem::exists(cur_id_file)) {
std::ofstream out_file(cur_id_file);
if (!out_file.is_open()) {
LOG(ERROR) << "Failed to create file: " << cur_id_file;
return -1;
}
out_file << "0";
out_file.close();
}
std::ifstream in_file(cur_id_file);
if (!in_file.is_open()) {
LOG(ERROR) << "Failed to open file: " << cur_id_file;
return -1;
}
in_file >> max_id_;
in_file.close();
max_id_++;
std::ofstream out_file(cur_id_file);
if (!out_file.is_open()) {
LOG(ERROR) << "Failed to open file: " << cur_id_file;
return -1;
}
out_file << max_id_;
out_file.close();
return max_id_;
}

Expand Down
Loading
Loading