From 79ffabd3607406f1cc494b024b44f4dd1a753095 Mon Sep 17 00:00:00 2001 From: yankovs <49753115+yankovs@users.noreply.github.com> Date: Mon, 4 Apr 2022 16:14:33 +0300 Subject: [PATCH] Added get_outputs method (#152) --- karton/core/backend.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/karton/core/backend.py b/karton/core/backend.py index b8fe7530..ad2f2d8f 100644 --- a/karton/core/backend.py +++ b/karton/core/backend.py @@ -3,7 +3,7 @@ import time from collections import defaultdict, namedtuple from io import BytesIO -from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Tuple, Union +from typing import Any, BinaryIO, Dict, Iterator, List, Optional, Set, Tuple, Union from minio import Minio from minio.deleteobjects import DeleteObject @@ -27,6 +27,9 @@ ) +KartonOutputs = namedtuple("KartonOutputs", ["identity", "outputs"]) + + class KartonMetrics(enum.Enum): TASK_PRODUCED = "karton.metrics.produced" TASK_CONSUMED = "karton.metrics.consumed" @@ -151,6 +154,18 @@ def unserialize_bind(identity: str, bind_data: str) -> KartonBind: service_version=bind.get("service_version"), ) + @staticmethod + def unserialize_output(identity: str, output_data: Set[str]) -> KartonOutputs: + """ + Deserialize KartonOutputs object for given identity. + + :param identity: Karton service identity + :param output_data: Serialized output data + :return: KartonOutputs object with outputs definition + """ + output = [json.loads(output_type) for output_type in output_data] + return KartonOutputs(identity=identity, outputs=output) + def get_bind(self, identity: str) -> KartonBind: """ Get bind object for given identity @@ -643,5 +658,20 @@ def log_identity_output(self, identity: str, headers: Dict[str, Any]) -> None: self.redis.sadd(f"{KARTON_OUTPUTS_NAMESPACE}:{identity}", json.dumps(headers)) self.redis.expire(f"{KARTON_OUTPUTS_NAMESPACE}:{identity}", 60 * 60) + def get_outputs(self) -> List[KartonOutputs]: + """ + Get a list of the output types for each karton. + + :return: List of KartonOutputs + """ + + output_keys = self.redis.keys(f"{KARTON_OUTPUTS_NAMESPACE}:*") + return [ + self.unserialize_output( + identity.split(":")[1], self.redis.smembers(identity) + ) + for identity in output_keys + ] + def make_pipeline(self, transaction: bool = False) -> Pipeline: return self.redis.pipeline(transaction=transaction)