Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: 回滚功能支持 #172

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7d25ba5
feature: 支持token注入
hanshuaikang Aug 21, 2023
48690c0
minor: 修复单元测试
hanshuaikang Aug 21, 2023
073ba40
feature: 支持回滚相关的操作和边界条件
hanshuaikang Sep 12, 2023
3348d13
test: 修复单元测试
hanshuaikang Sep 12, 2023
1a04ede
minor: 支持任意节点跳转
hanshuaikang Sep 12, 2023
81e083e
minor: 回退测试修改
hanshuaikang Sep 12, 2023
331131c
minor: 注释test_rollback, 调试github action
hanshuaikang Sep 13, 2023
8b874fc
bugfix: 修复单元测试卡住的问题
hanshuaikang Sep 13, 2023
30a747f
bugfix: 单元测试修复
hanshuaikang Sep 13, 2023
aed377c
test: 补充单元测试
hanshuaikang Sep 13, 2023
ae798d0
minor: 支持子流程回滚
hanshuaikang Sep 15, 2023
c7d779f
minor: 重构回滚部分的实现
hanshuaikang Sep 18, 2023
50eceae
test: 修复单元测试
hanshuaikang Sep 19, 2023
75aa7a2
test: 修复单元测试
hanshuaikang Sep 19, 2023
ac6b6c6
minor: 优化异常处理,支持回滚之后节点自动暂停
hanshuaikang Sep 21, 2023
1c8ec82
minor: 修复单元测试
hanshuaikang Sep 25, 2023
8a1ff76
bugfix: 去除_get_converge_gateway函数
hanshuaikang Oct 11, 2023
a7aec42
minor: code review
hanshuaikang Oct 16, 2023
a23e194
docs: 补充回滚文档
hanshuaikang Oct 16, 2023
c88b72f
minor: 补充删除逻辑
hanshuaikang Oct 16, 2023
743554e
minor: 修改 any 模式下流程的回滚逻辑
hanshuaikang Oct 16, 2023
604dacb
minor: 更新ERI_SUPPORT_VERSION版本号
hanshuaikang Oct 18, 2023
4a62eac
minor: 解决冲突
hanshuaikang Oct 18, 2023
9746c08
minor: 修复单元测试
hanshuaikang Oct 18, 2023
c3d8cd3
minor: 修复单元测试
hanshuaikang Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 174 additions & 1 deletion bamboo_engine/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from .flow.data import Data, Params
from .flow.event import ExecutableEndEvent


__all__ = ["build_tree"]

from ..validator.connection import validate_graph_without_circle
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved

__skeleton = {
"id": None,
"start_event": None,
Expand Down Expand Up @@ -94,6 +95,178 @@ def build_tree(start_elem, id=None, data=None):
return tree


def _get_next_node(node, pipeline_tree):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

都是 token分发的代码,写在一个类中

"""
获取当前节点的下一个节点
"""

out_goings = node["outgoing"]

# 当只有一个输出时,
if not isinstance(out_goings, list):
out_goings = [out_goings]

next_nodes = []
for out_going in out_goings:
target_id = pipeline_tree["flows"][out_going]["target"]
if target_id in pipeline_tree["activities"]:
next_nodes.append(pipeline_tree["activities"][target_id])
elif target_id in pipeline_tree["gateways"]:
next_nodes.append(pipeline_tree["gateways"][target_id])
elif target_id == pipeline_tree["end_event"]["id"]:
next_nodes.append(pipeline_tree["end_event"])

return next_nodes


def _get_converge_gateway(pipeline_tree, converge_id):
return pipeline_tree["gateways"][converge_id]


def _get_all_nodes(pipeline_tree: dict, with_subprocess: bool = False) -> dict:
"""
获取 pipeline_tree 中所有 activity 的信息

:param pipeline_tree: pipeline web tree
:param with_subprocess: 是否是子流程的 tree
:return: 包含 pipeline_tree 中所有 activity 的字典(包括子流程的 acitivity)
"""
all_nodes = {}
all_nodes.update(pipeline_tree["activities"])
all_nodes.update(pipeline_tree["gateways"])
all_nodes.update(
{
pipeline_tree["start_event"]["id"]: pipeline_tree["start_event"],
pipeline_tree["end_event"]["id"]: pipeline_tree["end_event"],
}
)
if with_subprocess:
for act in pipeline_tree["activities"].values():
if act["type"] == "SubProcess":
all_nodes.update(_get_all_nodes(act["pipeline"], with_subprocess=True))
return all_nodes


def _delete_flow_id_from_node_io(node, flow_id, io_type):
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
if node[io_type] == flow_id:
node[io_type] = ""
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(node[io_type], list):
if len(node[io_type]) == 1 and node[io_type][0] == flow_id:
node[io_type] = (
"" if node["type"] not in ["ExclusiveGateway", "ParallelGateway", "ConditionalParallelGateway"] else []
)
else:
node[io_type].pop(node[io_type].index(flow_id))

# recover to original format
if (
len(node[io_type]) == 1
and io_type == "outgoing"
and node["type"] in ["EmptyStartEvent", "ServiceActivity", "ConvergeGateway"]
):
node[io_type] = node[io_type][0]


def _acyclic(pipeline):
"""
@summary: 逆转反向边
@return:
"""

pipeline["all_nodes"] = _get_all_nodes(pipeline, with_subprocess=True)

deformed_flows = {
"{}.{}".format(flow["source"], flow["target"]): flow_id for flow_id, flow in pipeline["flows"].items()
}
while True:
no_circle = validate_graph_without_circle(pipeline)
if no_circle["result"]:
break
source = no_circle["error_data"][-2]
target = no_circle["error_data"][-1]
circle_flow_key = "{}.{}".format(source, target)
flow_id = deformed_flows[circle_flow_key]
pipeline["flows"][flow_id].update({"source": target, "target": source})

source_node = pipeline["all_nodes"][source]
_delete_flow_id_from_node_io(source_node, flow_id, "outgoing")

target_node = pipeline["all_nodes"][target]
_delete_flow_id_from_node_io(target_node, flow_id, "incoming")


def generate_pipeline_token(pipeline_tree):
tree = copy.deepcopy(pipeline_tree)
# 去环
_acyclic(tree)

start_node = tree["start_event"]
token = unique_id("t")
node_token_map = {start_node["id"]: token}
inject_pipeline_token(start_node, tree, node_token_map, token)
return node_token_map


# 需要处理子流程的问题
def inject_pipeline_token(node, pipeline_tree, node_token_map, token):
# 如果是网关
if node["type"] in ["ParallelGateway", "ExclusiveGateway", "ConditionalParallelGateway"]:
next_nodes = _get_next_node(node, pipeline_tree)
node_token = unique_id("t")
target_node = None
for next_node in next_nodes:
# 分支网关各个分支token相同
node_token_map[next_node["id"]] = node_token
# 并行网关token不同
if node["type"] in ["ParallelGateway", "ConditionalParallelGateway"]:
node_token = unique_id("t")
node_token_map[next_node["id"]] = node_token

# 如果是网关,沿着路径向内搜索,最终遇到对应的分支网关会返回
target_node = inject_pipeline_token(next_node, pipeline_tree, node_token_map, node_token)

if target_node is None:
return

# 汇聚网关可以直连结束节点,所以可能会存在找不到对应的汇聚网关的情况
if target_node["type"] == "EmptyEndEvent":
node_token_map[target_node["id"]] = token
return
# 汇聚网关的token等于对应的网关的token
node_token_map[target_node["id"]] = token
# 到汇聚网关之后,此时继续向下遍历
next_node = _get_next_node(target_node, pipeline_tree)[0]
# 汇聚网关只会有一个出度
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果是汇聚网关,并且id等于converge_id,说明此时遍历在某个单元
if node["type"] == "ConvergeGateway":
return node

# 如果是普通的节点,说明只有一个出度,此时直接向下遍历就好
if node["type"] in ["ServiceActivity", "EmptyStartEvent"]:
next_node = _get_next_node(node, pipeline_tree)[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)

# 如果遇到结束节点,直接返回
if node["type"] == "EmptyEndEvent":
return node

if node["type"] == "SubProcess":
subprocess_pipeline_tree = node["pipeline"]
subprocess_start_node = subprocess_pipeline_tree["start_event"]
subprocess_start_node_token = unique_id("t")
node_token_map[subprocess_start_node["id"]] = subprocess_start_node_token
inject_pipeline_token(
subprocess_start_node, subprocess_pipeline_tree, node_token_map, subprocess_start_node_token
)
next_node = _get_next_node(node, pipeline_tree)[0]
node_token_map[next_node["id"]] = token
return inject_pipeline_token(next_node, pipeline_tree, node_token_map, token)


def __update(tree, elem):
node_type = __node_type[elem.type()]
node = tree[node_type] if node_type == "end_event" else tree[node_type][elem.id]
Expand Down
2 changes: 2 additions & 0 deletions bamboo_engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,5 @@ class Settings:
PIPELINE_EXCLUSIVE_GATEWAY_EXPR_FUNC = default_expr_func

PIPELINE_EXCLUSIVE_GATEWAY_STRATEGY = ExclusiveGatewayStrategy.ONLY.value

PIPELINE_ENABLE_ROLLBACK = False
55 changes: 53 additions & 2 deletions bamboo_engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
setup_gauge,
setup_histogram,
)
from .utils.constants import RuntimeSettings
from .utils.host import get_hostname
from .utils.string import get_lower_case_name

Expand Down Expand Up @@ -118,6 +119,10 @@ def run_pipeline(
process_id = self.runtime.prepare_run_pipeline(
pipeline, root_pipeline_data, root_pipeline_context, subprocess_context, **options
)

if self.runtime.get_config(RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value):
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
self.runtime.set_pipeline_token(pipeline)

# execute from start event
self.runtime.execute(
process_id=process_id,
Expand Down Expand Up @@ -743,7 +748,9 @@ def execute(
# 设置状态前检测
if node_state.name not in states.INVERTED_TRANSITION[states.RUNNING]:
logger.info(
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING for exist state", # noqa
"[pipeline-trace](root_pipeline: %s) can not transit state from %s to RUNNING "
"for exist state",
# noqa
process_info.root_pipeline_id,
node_state.name,
)
Expand Down Expand Up @@ -835,6 +842,27 @@ def execute(
# 节点运行成功并且不需要进行调度
if not execute_result.should_sleep and execute_result.next_node_id != node.id:
self.runtime.node_finish(root_pipeline_id=root_pipeline_id, node_id=node.id)
if node.type == NodeType.ServiceActivity:
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
if self.runtime.get_config(RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value):
inputs = self.runtime.get_execution_data_inputs(node.id)
outputs = self.runtime.get_execution_data_outputs(node.id)
root_pipeline_input = {
key: di.value for key, di in self.runtime.get_data_inputs(root_pipeline_id).items()
}
self.runtime.set_node_snapshot(
root_pipeline_id=root_pipeline_id,
node_id=node_id,
code=node.code,
version=node.version,
context_values=root_pipeline_input,
inputs=inputs,
outputs=outputs,
)
# 判断是否已经预约了回滚,如果已经预约,则kill掉当前的process,直接return
if node.reserve_rollback:
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
self.runtime.die(process_id)
self.runtime.start_rollback(root_pipeline_id, node_id)
return

# 进程是否要进入睡眠
if execute_result.should_sleep:
Expand Down Expand Up @@ -1002,7 +1030,9 @@ def schedule(
# only retry at multiple calback type
if schedule.type is not ScheduleType.MULTIPLE_CALLBACK:
logger.info(
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, will not retry to get lock", # noqa
"root pipeline[%s] schedule(%s) %s with version %s is not multiple callback type, "
"will not retry to get lock",
# noqa
root_pipeline_id,
schedule_id,
node_id,
Expand Down Expand Up @@ -1107,6 +1137,27 @@ def schedule(
if schedule_result.schedule_done:
self.runtime.finish_schedule(schedule_id)
self.runtime.node_finish(root_pipeline_id, node.id)
if node.type == NodeType.ServiceActivity:
hanshuaikang marked this conversation as resolved.
Show resolved Hide resolved
if self.runtime.get_config(RuntimeSettings.PIPELINE_ENABLE_ROLLBACK.value):
inputs = self.runtime.get_execution_data_inputs(node.id)
outputs = self.runtime.get_execution_data_outputs(node.id)
root_pipeline_input = {
key: di.value for key, di in self.runtime.get_data_inputs(root_pipeline_id).items()
}
self.runtime.set_node_snapshot(
root_pipeline_id=root_pipeline_id,
node_id=node_id,
code=node.code,
version=node.version,
context_values=root_pipeline_input,
inputs=inputs,
outputs=outputs,
)
# 判断是否已经预约了回滚,如果已经预约,启动回滚流程
if node.reserve_rollback:
self.runtime.start_rollback(root_pipeline_id, node_id)
return

self.runtime.execute(
process_id=process_id,
node_id=schedule_result.next_node_id,
Expand Down
30 changes: 30 additions & 0 deletions bamboo_engine/eri/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,35 @@ def get_config(self, name):
"""


class RollbackMixin:
@abstractmethod
def set_pipeline_token(self, pipeline_tree: dict):
"""
设置pipeline token
"""

@abstractmethod
def set_node_snapshot(
self,
root_pipeline_id: str,
node_id: str,
code: str,
version: str,
context_values: dict,
inputs: dict,
outputs: dict,
):
"""
创建一份节点快照
"""

@abstractmethod
def start_rollback(self, root_pipeline_id: str, node_id: str):
"""
开始回滚
"""


class EngineRuntimeInterface(
PluginManagerMixin,
EngineAPIHooksMixin,
Expand All @@ -1543,6 +1572,7 @@ class EngineRuntimeInterface(
ExecutionHistoryMixin,
InterruptMixin,
ConfigMixin,
RollbackMixin,
metaclass=ABCMeta,
):
@abstractmethod
Expand Down
6 changes: 4 additions & 2 deletions bamboo_engine/eri/models/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""

from enum import Enum
from typing import List, Dict
from typing import Dict, List

from bamboo_engine.utils.object import Representable

Expand Down Expand Up @@ -49,7 +49,8 @@ def __init__(
parent_pipeline_id: str,
can_skip: bool = True,
can_retry: bool = True,
name: str = None
name: str = None,
reserve_rollback: bool = False,
):
"""

Expand Down Expand Up @@ -82,6 +83,7 @@ def __init__(
self.can_skip = can_skip
self.can_retry = can_retry
self.name = name
self.reserve_rollback = reserve_rollback


class EmptyStartEvent(Node):
Expand Down
Loading
Loading