Skip to content

Commit

Permalink
Impl dataloading interface
Browse files Browse the repository at this point in the history
  • Loading branch information
lidongze0629 committed Mar 6, 2024
1 parent 43d7f2d commit 3173f31
Show file tree
Hide file tree
Showing 8 changed files with 681 additions and 313 deletions.
14 changes: 13 additions & 1 deletion flex/coordinator/gs_flex_coordinator/core/insight/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ def schema(self):
self._schema = self._g.schema().to_dict()
return self._schema

@property
def conn(self):
return self._conn

def _fetch_endpoints_impl(self):
if CLUSTER_TYPE != "K8S":
return
Expand Down Expand Up @@ -189,6 +193,14 @@ def _fetch_endpoints_impl(self):
}
logger.info(f"Update frontend endpoints: {str(endpoints)}")

def get_vertex_primary_key(self, vertex_type: str) -> str:
for v in self._schema["vertices"]:
if vertex_type == v["label"]:
for p in v["properties"]:
if p["is_primary_key"]:
return p["name"]
raise RuntimeError(f"Vertex type {vertex_type} not exists")

def import_schema(self, data: dict):
schema = self._g.schema()
schema.from_dict(data)
Expand Down Expand Up @@ -273,7 +285,7 @@ def get_groot_graph_from_local():
).all().result()
print(
"DEBUG: ",
client.submit("g.with('evaluationTimeout', 5000).E().limit(10)")
client.submit("g.with('evaluationTimeout', 5000).V().valueMap().limit(10)")
.all()
.result(),
)
Expand Down
18 changes: 18 additions & 0 deletions flex/coordinator/gs_flex_coordinator/core/insight/groot.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ def __init__(self):
self._datasource_pickle_path = os.path.join(
self._workspace, "datasource.pickle"
)
# job
self._job_scheduler = {}
# job status
self._job_status = {}
# pickle path
Expand Down Expand Up @@ -261,10 +263,26 @@ def create_groot_dataloading_job(self, graph_name: str, job_config: dict) -> str
dataloading_job_scheduler = DataloadingJobScheduler(
job_config=job_config,
data_source=self._data_source,
job_scheduler=self._job_scheduler,
job_status=self._job_status,
graph=self._graph,
)
return dataloading_job_scheduler.schedulerid

def get_job_by_id(self, job_id: str) -> dict:
if job_id not in self._job_status:
raise RuntimeError(f"Job {job_id} not found")
return self._job_status[job_id].to_dict()

def delete_job_by_id(self, job_id: str) -> str:
if job_id not in self._job_status:
raise RuntimeError(f"Job {job_id} not found")
if job_id in self._job_scheduler:
# we don't have some processes in case of restart the coordinator
# some processes will not exist if the coordinator is restart
self._job_scheduler[job_id].cancel()
return f"Submit cancellation job successfully"


def init_groot_client():
return GrootClient()
125 changes: 121 additions & 4 deletions flex/coordinator/gs_flex_coordinator/core/insight/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@
# limitations under the License.
#

import pandas as pd
from graphscope.framework.record import EdgeRecordKey, VertexRecordKey

from gs_flex_coordinator.core.scheduler import Scheduler
from gs_flex_coordinator.core.utils import encode_datetime, get_current_time
from gs_flex_coordinator.models import JobStatus


class DataloadingJobScheduler(Scheduler):
def __init__(self, job_config, data_source, job_status):
# {'vertices': ['person', 'software'], 'edges': [{'type_name': 'knows', 'source_vertex': 'person', 'destination_vertex': 'person'}], 'schedule': 'now', 'repeat': 'once'}
def __init__(self, job_config, data_source, job_scheduler, job_status, graph):
super().__init__(job_config["schedule"], job_config["repeat"])
self._type = "Data Import"
self._job_config = job_config
self._data_source = data_source
# groot graph
self._graph = graph
# register job
self._job_scheduler = job_scheduler
# register the current status
self._job_status = job_status
# detailed information
Expand All @@ -44,7 +51,7 @@ def _construct_detailed_info(self):
e["type_name"], e["source_vertex"], e["destination_vertex"]
)
)
detail = {}
detail = {"graph_name": self._graph.name}
detail["label"] = ",".join(label_list)
return detail

Expand All @@ -53,6 +60,116 @@ def get_edge_full_label(
) -> str:
return f"{source_vertex_type}_{type_name}_{destination_vertex_type}"

def _import_data_from_local_file(self):
# construct data
vertices = []
for vlabel in self._job_config["vertices"]:
primary_key = self._graph.get_vertex_primary_key(vlabel)
datasource = self._data_source["vertices_datasource"][vlabel]
data = pd.read_csv(datasource["location"], sep=",|\|", engine="python")
for record in data.itertuples(index=False):
primary_key_dict = {}
property_mapping = {}
for k, v in datasource["property_mapping"].items():
if v == primary_key:
property_mapping[v] = record[int(k)]
else:
property_mapping[v] = record[int(k)]
vertices.append(
[VertexRecordKey(vlabel, primary_key_dict), property_mapping]
)
edges = []
for e in self._job_config["edges"]:
elabel = self.get_edge_full_label(
e["type_name"], e["source_vertex"], e["destination_vertex"]
)
datasource = self._data_source["edges_datasource"][elabel]
data = pd.read_csv(datasource["location"], sep=",|\|", engine="python")
for record in data.itertuples(index=False):
source_pk_column_map = {}
for k, v in datasource["source_pk_column_map"].items():
source_pk_column_map[v] = record[int(k)]
destination_pk_column_map = {}
for k, v in datasource["destination_pk_column_map"].items():
destination_pk_column_map[v] = record[int(k)]
property_mapping = {}
for k, v in datasource["property_mapping"].items():
property_mapping[v] = record[int(k)]
edges.append(
[
EdgeRecordKey(
e["type_name"],
VertexRecordKey(e["source_vertex"], source_pk_column_map),
VertexRecordKey(
e["destination_vertex"], destination_pk_column_map
),
),
property_mapping,
]
)
# insert
if not self.stopped():
conn = self._graph.conn
g = conn.g()
snapshot_id = g.update_vertex_properties_batch(vertices)
snapshot_id = g.update_edge_properties_batch(edges)
conn.remote_flush(snapshot_id, timeout_ms=600000)

def _set_and_update_job_status(
self, status: str, start_time: str, end_time=None, log=None
):
job_status = {
"job_id": self.jobid,
"type": self._type,
"status": status,
"start_time": start_time,
"end_time": end_time,
"log": log,
"detail": self._detail,
}
# remove None
job_status = {k: v for k, v in job_status.items() if v is not None}
# update
self._job_status[self.jobid] = JobStatus.from_dict(job_status)

def run(self):
"""This function needs to handle exception by itself"""
pass
start_time = encode_datetime(self.last_run)
try:
# register job
self._job_scheduler[self.jobid] = self
# init status
self._set_and_update_job_status("RUNNING", start_time)
load_from_odps = True
# check vertices
for vlabel in self._job_config["vertices"]:
if vlabel not in self._data_source["vertices_datasource"]:
raise RuntimeError(
f"Vertex type {vlabel} does not bind any data source"
)
location = self._data_source["vertices_datasource"][vlabel]["location"]
load_from_odps = load_from_odps and location.startswith("odps://")
# check edges
for e in self._job_config["edges"]:
elabel = self.get_edge_full_label(
e["type_name"], e["source_vertex"], e["destination_vertex"]
)
if elabel not in self._data_source["edges_datasource"]:
raise RuntimeError(
f"Edge type {elabel} does not bind any data source"
)
location = self._data_source["edges_datasource"][elabel]["location"]
load_from_odps = load_from_odps and location.startswith("odps://")
if load_from_odps:
self._import_data_from_odps()
else:
self._import_data_from_local_file()
except Exception as e:
end_time = encode_datetime(get_current_time())
self._set_and_update_job_status("FAILED", start_time, end_time, str(e))
else:
end_time = encode_datetime(get_current_time())
if not self.stopped():
self._set_and_update_job_status("SUCCESS", start_time, end_time)
else:
self._set_and_update_job_status("CANCELLED", start_time, end_time)
19 changes: 16 additions & 3 deletions flex/coordinator/gs_flex_coordinator/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
from string import ascii_uppercase

import schedule
from schedule import CancelJob

from gs_flex_coordinator.core.stoppable_thread import StoppableThread
from gs_flex_coordinator.core.utils import decode_datetimestr
from schedule import CancelJob


class Schedule(object):
Expand Down Expand Up @@ -225,14 +226,26 @@ def start(self):
"""Submit and schedule the job."""
self.submit()

def cancel(self):
def cancel(self, wait=False):
"""
Set the running job thread stoppable and wait for the
thread to exit properly by using join() method.
Args:
wait: Whether to wait for the wait to exit properly.
"""
if self._running_thread is not None and self._running_thread.is_alive():
self._running_thread.stop()
self._running_thread.join()
if wait:
self._running_thread.join()

def stopped(self):
"""
Check the stoppable flag of the current thread.
"""
if self._running_thread is None:
return True
return self._running_thread.stopped()

@abstractmethod
def run(self):
Expand Down
Loading

0 comments on commit 3173f31

Please sign in to comment.