Skip to content

Commit

Permalink
feature: 回滚功能支持 (#172)
Browse files Browse the repository at this point in the history
* feature: 回滚功能支持
  • Loading branch information
hanshuaikang authored Oct 18, 2023
1 parent 4471229 commit 17c21ec
Show file tree
Hide file tree
Showing 36 changed files with 2,435 additions and 544 deletions.
173 changes: 172 additions & 1 deletion bamboo_engine/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

from bamboo_engine.utils.string import unique_id

from ..validator.connection import validate_graph_without_circle
from .flow.data import Data, Params
from .flow.event import ExecutableEndEvent


__all__ = ["build_tree"]

__skeleton = {
Expand Down Expand Up @@ -94,6 +94,177 @@ def build_tree(start_elem, id=None, data=None):
return tree


def _get_next_node(node, pipeline_tree):
"""
获取当前节点的下一个节点
"""

out_goings = node["outgoing"]

# 当只有一个输出时,
if not isinstance(out_goings, list):
out_goings = [out_goings]

next_nodes = []
for out_going in out_goings:
target_id = pipeline_tree["flows"][out_going]["target"]
if target_id in pipeline_tree["activities"]:
next_nodes.append(pipeline_tree["activities"][target_id])
elif target_id in pipeline_tree["gateways"]:
next_nodes.append(pipeline_tree["gateways"][target_id])
elif target_id == pipeline_tree["end_event"]["id"]:
next_nodes.append(pipeline_tree["end_event"])

return next_nodes


def _get_all_nodes(pipeline_tree: dict, with_subprocess: bool = False) -> dict:
"""
获取 pipeline_tree 中所有 activity 的信息
:param pipeline_tree: pipeline web tree
:param with_subprocess: 是否是子流程的 tree
:return: 包含 pipeline_tree 中所有 activity 的字典(包括子流程的 acitivity)
"""
all_nodes = {}
all_nodes.update(pipeline_tree["activities"])
all_nodes.update(pipeline_tree["gateways"])
all_nodes.update(
{
pipeline_tree["start_event"]["id"]: pipeline_tree["start_event"],
pipeline_tree["end_event"]["id"]: pipeline_tree["end_event"],
}
)
if with_subprocess:
for act in pipeline_tree["activities"].values():
if act["type"] == "SubProcess":
all_nodes.update(_get_all_nodes(act["pipeline"], with_subprocess=True))
return all_nodes


def _delete_flow_id_from_node_io(node, flow_id, io_type):
"""
删除节点的某条连线,io_type(incoming or outgoing)
"""
if node[io_type] == flow_id:
node[io_type] = ""
elif isinstance(node[io_type], list):
if len(node[io_type]) == 1 and node[io_type][0] == flow_id:
node[io_type] = (
"" if node["type"] not in ["ExclusiveGateway", "ParallelGateway", "ConditionalParallelGateway"] else []
)
else:
node[io_type].pop(node[io_type].index(flow_id))

# recover to original format
if (
len(node[io_type]) == 1
and io_type == "outgoing"
and node["type"] in ["EmptyStartEvent", "ServiceActivity", "ConvergeGateway"]
):
node[io_type] = node[io_type][0]


def _acyclic(pipeline):
"""
@summary: 逆转反向边
@return:
"""

pipeline["all_nodes"] = _get_all_nodes(pipeline, with_subprocess=True)

deformed_flows = {
"{}.{}".format(flow["source"], flow["target"]): flow_id for flow_id, flow in pipeline["flows"].items()
}
while True:
no_circle = validate_graph_without_circle(pipeline)
if no_circle["result"]:
break
source = no_circle["error_data"][-2]
target = no_circle["error_data"][-1]
circle_flow_key = "{}.{}".format(source, target)
flow_id = deformed_flows[circle_flow_key]
pipeline["flows"][flow_id].update({"source": target, "target": source})

source_node = pipeline["all_nodes"][source]
_delete_flow_id_from_node_io(source_node, flow_id, "outgoing")

target_node = pipeline["all_nodes"][target]
_delete_flow_id_from_node_io(target_node, flow_id, "incoming")


def generate_pipeline_token(pipeline_tree):
tree = copy.deepcopy(pipeline_tree)
# 去环
_acyclic(tree)

start_node = tree["start_event"]
token = unique_id("t")
node_token_map = {start_node["id"]: token}
inject_pipeline_token(start_node, tree, node_token_map, token)
return node_token_map


# 需要处理子流程的问题
def inject_pipeline_token(node, pipeline_tree, node_token_map, token):
# 如果是网关
if node["type"] in ["ParallelGateway", "ExclusiveGateway", "ConditionalParallelGateway"]:
next_nodes = _get_next_node(node, pipeline_tree)
node_token = unique_id("t")
target_node = None
for next_node in next_nodes:
# 分支网关各个分支token相同
node_token_map[next_node["id"]] = node_token
# 并行网关token不同
if node["type"] in ["ParallelGateway", "ConditionalParallelGateway"]:
node_token = unique_id("t")
node_token_map[next_node["id"]] = node_token

# 如果是网关,沿着路径向内搜索,最终遇到对应的分支网关会返回
target_node = inject_pipeline_token(next_node, pipeline_tree, node_token_map, node_token)

if target_node is None:
return

# 汇聚网关可以直连结束节点,所以可能会存在找不到对应的汇聚网关的情况
if target_node["type"] == "EmptyEndEvent":
node_token_map[target_node["id"]] = token
return
# 汇聚网关的token等于对应的网关的token
node_token_map[target_node["id"]] = token
# 到汇聚网关之后,此时继续向下遍历
next_node = _get_next_node(target_node, pipeline_tree)[0]
# 汇聚网关只会有一个出度
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果是汇聚网关,并且id等于converge_id,说明此时遍历在某个单元
if node["type"] == "ConvergeGateway":
return node

# 如果是普通的节点,说明只有一个出度,此时直接向下遍历就好
if node["type"] in ["ServiceActivity", "EmptyStartEvent"]:
next_node = _get_next_node(node, pipeline_tree)[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果遇到结束节点,直接返回
if node["type"] == "EmptyEndEvent":
return node

if node["type"] == "SubProcess":
subprocess_pipeline_tree = node["pipeline"]
subprocess_start_node = subprocess_pipeline_tree["start_event"]
subprocess_start_node_token = unique_id("t")
node_token_map[subprocess_start_node["id"]] = subprocess_start_node_token
inject_pipeline_token(
subprocess_start_node, subprocess_pipeline_tree, node_token_map, subprocess_start_node_token
)
next_node = _get_next_node(node, pipeline_tree)[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)


def __update(tree, elem):
node_type = __node_type[elem.type()]
node = tree[node_type] if node_type == "end_event" else tree[node_type][elem.id]
Expand Down
2 changes: 2 additions & 0 deletions bamboo_engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ class Settings:
PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = default_expr_func

PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = ExclusiveGatewayStrategy.ONLY.value

PIPELINE_ENABLE_ROLLBACK = False
41 changes: 39 additions & 2 deletions bamboo_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
setup_gauge,
setup_histogram,
)
from .utils.constants import RuntimeSettings
from .utils.host import get_hostname
from .utils.string import get_lower_case_name

Expand Down Expand Up @@ -122,6 +123,7 @@ def run_pipeline(
process_id = self.runtime.prepare_run_pipeline(
pipeline, root_pipeline_data, root_pipeline_context, subprocess_context, **options
)

# execute from start event
self.runtime.execute(
process_id=process_id,
Expand Down Expand Up @@ -912,7 +914,8 @@ def execute(
# 设置状态前检测
if node_state.name not in states.INVERTED_TRANSITION[states.RUNNING]:
logger.info(
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING for exist state", # noqa
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING "
"for exist state",
process_info.root_pipeline_id,
node_state.name,
)
Expand Down Expand Up @@ -1010,6 +1013,15 @@ def execute(
hook=HookType.NODE_FINISH,
node=node,
)
if node.type == NodeType.ServiceActivity and self.runtime.get_config(
RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value
):
self._set_snapshot(root_pipeline_id, node)
# 判断是否已经预约了回滚,如果已经预约,则kill掉当前的process,直接return
if node.reserve_rollback:
self.runtime.die(process_id)
self.runtime.start_rollback(root_pipeline_id, node_id)
return

# 进程是否要进入睡眠
if execute_result.should_sleep:
Expand Down Expand Up @@ -1177,7 +1189,9 @@ def schedule(
# only retry at multiple calback type
if schedule.type is not ScheduleType.MULTIPLE_CALLBACK:
logger.info(
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, will not retry to get lock", # noqa
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, "
"will not retry to get lock",
# noqa
root_pipeline_id,
schedule_id,
node_id,
Expand Down Expand Up @@ -1290,6 +1304,15 @@ def schedule(
node=node,
callback_data=callback_data,
)
if node.type == NodeType.ServiceActivity and self.runtime.get_config(
RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value
):
self._set_snapshot(root_pipeline_id, node)
# 判断是否已经预约了回滚,如果已经预约,启动回滚流程
if node.reserve_rollback:
self.runtime.start_rollback(root_pipeline_id, node_id)
return

self.runtime.execute(
process_id=process_id,
node_id=schedule_result.next_node_id,
Expand All @@ -1302,6 +1325,20 @@ def schedule(
time.time() - engine_post_schedule_start_at
)

def _set_snapshot(self, root_pipeline_id, node):
inputs = self.runtime.get_execution_data_inputs(node.id)
outputs = self.runtime.get_execution_data_outputs(node.id)
root_pipeline_input = {key: di.value for key, di in self.runtime.get_data_inputs(root_pipeline_id).items()}
self.runtime.set_node_snapshot(
root_pipeline_id=root_pipeline_id,
node_id=node.id,
code=node.code,
version=node.version,
context_values=root_pipeline_input,
inputs=inputs,
outputs=outputs,
)

def _add_history(
self,
node_id: str,
Expand Down
30 changes: 30 additions & 0 deletions bamboo_engine/eri/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1577,6 +1577,35 @@ def get_config(self, name):
"""


class RollbackMixin:
@abstractmethod
def set_pipeline_token(self, pipeline_tree: dict):
"""
设置pipeline token
"""

@abstractmethod
def set_node_snapshot(
self,
root_pipeline_id: str,
node_id: str,
code: str,
version: str,
context_values: dict,
inputs: dict,
outputs: dict,
):
"""
创建一份节点快照
"""

@abstractmethod
def start_rollback(self, root_pipeline_id: str, node_id: str):
"""
开始回滚
"""


class EngineRuntimeInterface(
PluginManagerMixin,
EngineAPIHooksMixin,
Expand All @@ -1591,6 +1620,7 @@ class EngineRuntimeInterface(
ExecutionHistoryMixin,
InterruptMixin,
ConfigMixin,
RollbackMixin,
metaclass=ABCMeta,
):
@abstractmethod
Expand Down
6 changes: 4 additions & 2 deletions bamboo_engine/eri/models/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""

from enum import Enum
from typing import List, Dict
from typing import Dict, List

from bamboo_engine.utils.object import Representable

Expand Down Expand Up @@ -49,7 +49,8 @@ def __init__(
parent_pipeline_id: str,
can_skip: bool = True,
can_retry: bool = True,
name: str = None
name: str = None,
reserve_rollback: bool = False,
):
"""
Expand Down Expand Up @@ -82,6 +83,7 @@ def __init__(
self.can_skip = can_skip
self.can_retry = can_retry
self.name = name
self.reserve_rollback = reserve_rollback


class EmptyStartEvent(Node):
Expand Down
Loading

0 comments on commit 17c21ec

Please sign in to comment.