From c7d779fb5fba3ffd1603127074458b6bdded2e61 Mon Sep 17 00:00:00 2001 From: hanshuaikang <1758504262@qq.com> Date: Mon, 18 Sep 2023 17:17:07 +0800 Subject: [PATCH] =?UTF-8?q?minor:=20=E9=87=8D=E6=9E=84=E5=9B=9E=E6=BB=9A?= =?UTF-8?q?=E9=83=A8=E5=88=86=E7=9A=84=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pipeline/contrib/rollback/api.py | 20 +- .../pipeline/contrib/rollback/handler.py | 331 ++++++++++++------ .../pipeline/tests/contrib/test_rollback.py | 2 +- 3 files changed, 230 insertions(+), 123 deletions(-) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py index e53ad2b2..f8deac76 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/api.py @@ -11,12 +11,14 @@ specific language governing permissions and limitations under the License. """ from pipeline.contrib.rollback.constants import TOKEN -from pipeline.contrib.rollback.handler import RollbackHandler +from pipeline.contrib.rollback.handler import RollbackDispatcher from pipeline.contrib.utils import ensure_return_pipeline_contrib_api_result @ensure_return_pipeline_contrib_api_result -def rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): +def rollback( + root_pipeline_id: str, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None, mode: str = TOKEN +): """ :param root_pipeline_id: pipeline id :param start_node_id: 开始的 id @@ -24,24 +26,24 @@ def rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mod :param mode 回滚模式 :return: True or False """ - RollbackHandler(root_pipeline_id, mode).rollback(start_node_id, target_node_id) + RollbackDispatcher(root_pipeline_id, mode).rollback(start_node_id, target_node_id) @ensure_return_pipeline_contrib_api_result def reserve_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): - RollbackHandler(root_pipeline_id, mode).reserve_rollback(start_node_id, target_node_id) + RollbackDispatcher(root_pipeline_id, mode).reserve_rollback(start_node_id, target_node_id) @ensure_return_pipeline_contrib_api_result -def retry_rollback_failed_node(root_pipeline_id: str, node_id: str, retry_data: dict = None, mode: str = TOKEN): - RollbackHandler(root_pipeline_id, mode).retry_rollback_failed_node(node_id, retry_data) +def cancel_reserved_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): + RollbackDispatcher(root_pipeline_id, mode).cancel_reserved_rollback(start_node_id, target_node_id) @ensure_return_pipeline_contrib_api_result -def cancel_reserve_rollback(root_pipeline_id: str, start_node_id: str, target_node_id: str, mode: str = TOKEN): - RollbackHandler(root_pipeline_id, mode).cancel_reserved_rollback(start_node_id, target_node_id) +def retry_rollback_failed_node(root_pipeline_id: str, node_id: str, retry_data: dict = None, mode: str = TOKEN): + RollbackDispatcher(root_pipeline_id, mode).retry_rollback_failed_node(node_id, retry_data) @ensure_return_pipeline_contrib_api_result def get_allowed_rollback_node_id_list(root_pipeline_id: str, start_node_id: str, mode: str = TOKEN): - return RollbackHandler(root_pipeline_id, mode).get_allowed_rollback_node_id_list(start_node_id) + return RollbackDispatcher(root_pipeline_id, mode).get_allowed_rollback_node_id_list(start_node_id) diff --git a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py index 18032304..92be017e 100644 --- a/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py +++ b/runtime/bamboo-pipeline/pipeline/contrib/rollback/handler.py @@ -18,6 +18,7 @@ from pipeline.conf.default_settings import ROLLBACK_QUEUE from pipeline.contrib.exceptions import RollBackException from pipeline.contrib.rollback import constants +from pipeline.contrib.rollback.constants import ANY, TOKEN from pipeline.contrib.rollback.graph import RollbackGraphHandler from pipeline.contrib.rollback.models import ( RollbackPlan, @@ -92,112 +93,55 @@ def validate_token(root_pipeline_id, start_node_id, target_node_id): "rollback failed: start node token must equal target node, pipeline_id={}".format(root_pipeline_id) ) - -class RollbackHandler: - def __init__(self, root_pipeline_id, mode): - self.root_pipeline_id = root_pipeline_id - # 检查pipeline 回滚的合法性 - RollbackValidator.validate_pipeline(root_pipeline_id) - self.mode = mode - self.use_token = True if mode == constants.TOKEN else False - self.runtime = BambooDjangoRuntime() - - def _node_state_is_failed(self, node_id): + @staticmethod + def validate_node_state_by_token_mode(root_pipeline_id, start_node_id): """ - 判断该节点是不是失败的状态 + 使用token模式下的回滚,相同token的节点不允许有正在运行的节点 """ - node_state = State.objects.filter(node_id=node_id).first() - if node_state.name == states.FAILED: - return True - return False - - def _get_token_allowed_rollback_node_id_list(self, start_node_id): try: - rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id) + rollback_token = RollbackToken.objects.get(root_pipeline_id=root_pipeline_id) except RollbackToken.DoesNotExist: raise RollBackException( - "rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id) + "rollback failed: pipeline token not exist, pipeline_id={}".format(root_pipeline_id) ) - node_map = self._get_allowed_rollback_node_map() - service_activity_node_list = [ - node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity - ] tokens = json.loads(rollback_token.token) start_token = tokens.get(start_node_id) - if not start_token: - return [] - - nodes = [] - for node_id, token in tokens.items(): - if start_token == token and node_id != start_node_id and node_id in service_activity_node_list: - nodes.append(node_id) - - return nodes + if start_token is None: + raise RollBackException("rollback failed: can't find the not token, node_id={}".format(start_token)) - def _get_any_allowed_rollback_node_id_list(self, start_node_id): - node_map = self._get_allowed_rollback_node_map() - start_node_state = ( - State.objects.filter(root_id=self.root_pipeline_id) - .exclude(node_id=self.root_pipeline_id) - .order_by("created_time") - .first() - ) - target_node_id = start_node_state.node_id - rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) - graph, _ = rollback_graph.build_rollback_graph() + node_id_list = [] + for node_id, token in node_id_list: + if token == start_token: + node_id_list.append(node_id) - return list(set(graph.nodes) - {constants.START_FLAG, constants.END_FLAG, start_node_id}) + if State.objects.filter(node_id__in=node_id_list, name=states.RUNNING).exists(): + raise RollBackException( + "rollback failed: there is currently the same node that the same token is running, node_id={}".format( + start_token + ) + ) - def get_allowed_rollback_node_id_list(self, start_node_id): + @staticmethod + def validate_node_state_by_any_mode(root_pipeline_id): """ - 获取允许回滚的节点范围 - 规则:token 一致的节点允许回滚 + 使用any模式下的回滚,相同token的节点不允许有正在运行的节点 """ - if self.use_token: - return self._get_token_allowed_rollback_node_id_list(start_node_id) + if State.objects.filter(root_id=root_pipeline_id, name=states.RUNNING).exists(): + raise RollBackException("rollback failed: there is currently the some node is running") - return self._get_any_allowed_rollback_node_id_list(start_node_id) - def retry_rollback_failed_node(self, node_id, retry_data): - """ - 重试回滚失败的节点 - """ - pipeline_state = State.objects.filter(node_id=self.root_pipeline_id).first() - if pipeline_state.name != states.ROLL_BACK_FAILED: - raise RollBackException( - "rollback failed: only retry the failed pipeline, current_status={}".format(pipeline_state.name) - ) - node_state = State.objects.filter(node_id=node_id).first() - if node_state.name != states.ROLL_BACK_FAILED: - raise RollBackException( - "rollback failed: only retry the failed node, current_status={}".format(node_state.name) - ) - - # 获取镜像 - try: - rollback_snapshot = RollbackSnapshot.objects.get(root_pipeline_id=self.root_pipeline_id, is_expired=False) - except RollbackSnapshot.DoesNotExist: - raise RollBackException("rollback failed: the rollback snapshot is not exists, please check") - except RollbackSnapshot.MultipleObjectsReturned: - raise RollBackException("rollback failed: found multi not expired rollback snapshot, please check") +class BaseRollbackHandler: + mode = None - # 重置pipeline的状态为回滚中 - self.runtime.set_state( - node_id=self.root_pipeline_id, - to_state=states.ROLLING_BACK, - ) + def __init__(self, root_pipeline_id): + self.root_pipeline_id = root_pipeline_id + self.runtime = BambooDjangoRuntime() + # 检查pipeline 回滚的合法性 + RollbackValidator.validate_pipeline(root_pipeline_id) - # 驱动这个任务 - token_rollback.apply_async( - kwargs={ - "snapshot_id": rollback_snapshot.id, - "node_id": node_id, - "retry": True, - "retry_data": retry_data, - }, - queue=ROLLBACK_QUEUE, - ) + def get_allowed_rollback_node_id_list(self, start_node_id): + return [] def _get_allowed_rollback_node_map(self): # 不需要遍历整颗树,获取到现在已经执行成功和失败节点的所有列表 @@ -212,8 +156,6 @@ def _get_allowed_rollback_node_map(self): def _reserve(self, start_node_id, target_node_id, reserve_rollback=True): # 节点预约 需要在 Node 里面 插入 reserve_rollback = True, 为 True的节点执行完将暂停 - if self.use_token: - RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) RollbackValidator.validate_node(target_node_id) node = Node.objects.filter(node_id=start_node_id).first() if node is None: @@ -263,15 +205,158 @@ def reserve_rollback(self, start_node_id, target_node_id): """ 预约回滚 """ + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) self._reserve(start_node_id, target_node_id) def cancel_reserved_rollback(self, start_node_id, target_node_id): """ 取消预约回滚 """ + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) self._reserve(start_node_id, target_node_id, reserve_rollback=False) - def get_failed_skip_node_id_list(self, node_id_list): + +class AnyRollbackHandler(BaseRollbackHandler): + mode = ANY + + def get_allowed_rollback_node_id_list(self, start_node_id): + node_map = self._get_allowed_rollback_node_map() + start_node_state = ( + State.objects.filter(root_id=self.root_pipeline_id) + .exclude(node_id=self.root_pipeline_id) + .order_by("created_time") + .first() + ) + target_node_id = start_node_state.node_id + rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) + graph, _ = rollback_graph.build_rollback_graph() + + return list(set(graph.nodes) - {constants.START_FLAG, constants.END_FLAG, start_node_id}) + + def retry_rollback_failed_node(self, node_id, retry_data): + """ """ + raise RollBackException("rollback failed: when mode is any, not support retry") + + def reserve_rollback(self, start_node_id, target_node_id): + """ + 预约回滚 + """ + self._reserve(start_node_id, target_node_id) + + def cancel_reserved_rollback(self, start_node_id, target_node_id): + """ + 取消预约回滚 + """ + self._reserve(start_node_id, target_node_id, reserve_rollback=False) + + def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): + # 回滚的开始节点运行失败的情况 + RollbackValidator.validate_node(start_node_id, allow_failed=True) + RollbackValidator.validate_node(target_node_id) + + node_map = self._get_allowed_rollback_node_map() + rollback_graph = RollbackGraphHandler(node_map=node_map, start_id=start_node_id, target_id=target_node_id) + + graph, other_nodes = rollback_graph.build_rollback_graph() + node_access_record = {node: 0 for node in graph.nodes} + + rollback_snapshot = RollbackSnapshot.objects.create( + root_pipeline_id=self.root_pipeline_id, + graph=json.dumps(graph.as_dict()), + node_access_record=json.dumps(node_access_record), + start_node_id=start_node_id, + target_node_id=target_node_id, + other_nodes=json.dumps(other_nodes), + skip_rollback_nodes=json.dumps([]), + ) + + any_rollback.apply_async( + kwargs={"snapshot_id": rollback_snapshot.id}, + queue=ROLLBACK_QUEUE, + ) + + +class TokenRollbackHandler(BaseRollbackHandler): + mode = TOKEN + + def get_allowed_rollback_node_id_list(self, start_node_id): + """ + 获取允许回滚的节点范围 + 规则:token 一致的节点允许回滚 + """ + try: + rollback_token = RollbackToken.objects.get(root_pipeline_id=self.root_pipeline_id) + except RollbackToken.DoesNotExist: + raise RollBackException( + "rollback failed: pipeline token not exist, pipeline_id={}".format(self.root_pipeline_id) + ) + node_map = self._get_allowed_rollback_node_map() + service_activity_node_list = [ + node_id for node_id, node_detail in node_map.items() if node_detail["type"] == PE.ServiceActivity + ] + + tokens = json.loads(rollback_token.token) + start_token = tokens.get(start_node_id) + if not start_token: + return [] + + nodes = [] + for node_id, token in tokens.items(): + if start_token == token and node_id != start_node_id and node_id in service_activity_node_list: + nodes.append(node_id) + + return nodes + + def retry_rollback_failed_node(self, node_id, retry_data): + """ + 重试回滚失败的节点 + """ + pipeline_state = State.objects.filter(node_id=self.root_pipeline_id).first() + if pipeline_state.name != states.ROLL_BACK_FAILED: + raise RollBackException( + "rollback failed: only retry the failed pipeline, current_status={}".format(pipeline_state.name) + ) + node_state = State.objects.filter(node_id=node_id).first() + if node_state.name != states.ROLL_BACK_FAILED: + raise RollBackException( + "rollback failed: only retry the failed node, current_status={}".format(node_state.name) + ) + + # 获取镜像 + try: + rollback_snapshot = RollbackSnapshot.objects.get(root_pipeline_id=self.root_pipeline_id, is_expired=False) + except RollbackSnapshot.DoesNotExist: + raise RollBackException("rollback failed: the rollback snapshot is not exists, please check") + except RollbackSnapshot.MultipleObjectsReturned: + raise RollBackException("rollback failed: found multi not expired rollback snapshot, please check") + + # 重置pipeline的状态为回滚中 + self.runtime.set_state( + node_id=self.root_pipeline_id, + to_state=states.ROLLING_BACK, + ) + + # 驱动这个任务 + token_rollback.apply_async( + kwargs={ + "snapshot_id": rollback_snapshot.id, + "node_id": node_id, + "retry": True, + "retry_data": retry_data, + }, + queue=ROLLBACK_QUEUE, + ) + + def _node_state_is_failed(self, node_id): + """ + 判断该节点是不是失败的状态 + """ + node_state = State.objects.filter(node_id=node_id).first() + if node_state.name == states.FAILED: + return True + return False + + def _get_failed_skip_node_id_list(self, node_id_list): failed_skip_node_id_list = State.objects.filter( Q(Q(skip=True) | Q(error_ignored=True)) & Q(node_id__in=node_id_list) ).values_list("node_id", flat=True) @@ -283,11 +368,12 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): if skip_rollback_nodes is None: skip_rollback_nodes = [] + # 相同token回滚时,不允许有正在运行的节点 + # 回滚的开始节点运行失败的情况 RollbackValidator.validate_node(start_node_id, allow_failed=True) RollbackValidator.validate_node(target_node_id) - if self.use_token: - RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) + RollbackValidator.validate_token(self.root_pipeline_id, start_node_id, target_node_id) # 如果开始节点是失败的情况,则跳过该节点的回滚操作 if self._node_state_is_failed(start_node_id): @@ -302,7 +388,7 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): node_access_record = {node: 0 for node in graph.nodes} # 所有失败并跳过的节点不再参与回滚 - failed_skip_node_id_list = self.get_failed_skip_node_id_list(node_map.keys()) + failed_skip_node_id_list = self._get_failed_skip_node_id_list(node_map.keys()) skip_rollback_nodes.extend(list(failed_skip_node_id_list)) rollback_snapshot = RollbackSnapshot.objects.create( @@ -315,23 +401,42 @@ def rollback(self, start_node_id, target_node_id, skip_rollback_nodes=None): skip_rollback_nodes=json.dumps(skip_rollback_nodes), ) - if self.use_token: - runtime.set_state( - node_id=self.root_pipeline_id, - to_state=states.ROLLING_BACK, - ) - # 驱动这个任务 - token_rollback.apply_async( - kwargs={ - "snapshot_id": rollback_snapshot.id, - "node_id": constants.START_FLAG, - "retry": False, - "retry_data": None, - }, - queue=ROLLBACK_QUEUE, - ) + runtime.set_state( + node_id=self.root_pipeline_id, + to_state=states.ROLLING_BACK, + ) + # 驱动这个任务 + token_rollback.apply_async( + kwargs={ + "snapshot_id": rollback_snapshot.id, + "node_id": constants.START_FLAG, + "retry": False, + "retry_data": None, + }, + queue=ROLLBACK_QUEUE, + ) + + +class RollbackDispatcher: + def __init__(self, root_pipeline_id, mode): + if mode == ANY: + self.handler = AnyRollbackHandler(root_pipeline_id) + elif mode == TOKEN: + self.handler = TokenRollbackHandler(root_pipeline_id) else: - any_rollback.apply_async( - kwargs={"snapshot_id": rollback_snapshot.id}, - queue=ROLLBACK_QUEUE, - ) + raise RollBackException("rollback failed: not support this mode, please check") + + def rollback(self, start_node_id: str, target_node_id: str, skip_rollback_nodes: list = None): + self.handler.rollback(start_node_id, target_node_id, skip_rollback_nodes) + + def reserve_rollback(self, start_node_id: str, target_node_id: str): + self.handler.reserve_rollback(start_node_id, target_node_id) + + def retry_rollback_failed_node(self, node_id: str, retry_data: dict = None): + self.handler.retry_rollback_failed_node(node_id, retry_data) + + def cancel_reserved_rollback(self, start_node_id: str, target_node_id: str): + self.handler.cancel_reserved_rollback(start_node_id, target_node_id) + + def get_allowed_rollback_node_id_list(self, start_node_id: str): + return self.handler.get_allowed_rollback_node_id_list(start_node_id) diff --git a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py index 0beea3d1..27acecf3 100644 --- a/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py +++ b/runtime/bamboo-pipeline/pipeline/tests/contrib/test_rollback.py @@ -206,7 +206,7 @@ def test_reserve_rollback(self): Node.objects.create(node_id=target_node_id, detail=json.dumps(target_node_detail)) Node.objects.create(node_id=start_node_id, detail=json.dumps(start_node_detail)) - result = api.reserve_rollback(pipeline_id, start_node_id, target_node_id) + result = api.reserved_rollback(pipeline_id, start_node_id, target_node_id) self.assertFalse(result.result) message = "reserve rollback failed, the node state is not Running, current state=FINISHED, node_id={}".format( # noqa