diff --git a/reapply/conflict_resolution.py b/reapply/conflict_resolution.py new file mode 100644 index 00000000..2722de08 --- /dev/null +++ b/reapply/conflict_resolution.py @@ -0,0 +1,95 @@ +import asyncio +import sys +from datetime import timedelta + +import temporalio.api.common.v1 +import temporalio.api.enums.v1 +import temporalio.api.history.v1 +import temporalio.api.workflowservice.v1 +import temporalio.common +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.worker import Worker + +try: + from rich import print +except ImportError: + pass + +RunId = str + +NAMESPACE = "ns1" +WORKFLOW_ID = "my-workflow-id" +TASK_QUEUE = "my-task-queue" +N_SIGNALS = 1 +N_UPDATES = 0 +REPLAY = False + + +@activity.defn +async def my_activity(arg: str) -> str: + return f"activity-result-{arg}" + + +@workflow.defn(name="my-workflow", sandboxed=False) +class MyWorkflow: + def __init__(self) -> None: + self.done = False + self.signal_results = [] + self.update_results = [] + + @workflow.signal(name="my-signal") + async def my_signal(self, arg: str): + if arg == "done": + self.done = True + else: + self.signal_results.append(arg) + + @workflow.update(name="my-update") + async def my_update(self, arg: str): + r = await workflow.execute_activity( + my_activity, arg, start_to_close_timeout=timedelta(seconds=10) + ) + self.update_results.append(r) + return self.update_results + + @workflow.run + async def run(self): + await workflow.wait_condition(lambda: self.done) + return { + "signal_results": self.signal_results, + "update_results": self.update_results, + } + + +async def execute_workflow(client: Client): + handle = await client.start_workflow( + MyWorkflow.run, + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=temporalio.common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + + print( + f"started workflow: http://localhost:XXXX/namespaces/default/workflows/{WORKFLOW_ID}" + ) + wf_result = await handle.result() + print(f"wf result: {wf_result}") + + +async def main(address: str): + client = await Client.connect(address, namespace=NAMESPACE) + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[MyWorkflow], + activities=[my_activity], + sticky_queue_schedule_to_start_timeout=timedelta(hours=1), + max_cached_workflows=0 if REPLAY else 100, + ): + await asyncio.sleep(0xFFFFFF) + + +if __name__ == "__main__": + [address] = sys.argv[1:] + asyncio.run(main(address)) diff --git a/reapply/conflict_resolution.sh b/reapply/conflict_resolution.sh new file mode 100644 index 00000000..d1997afb --- /dev/null +++ b/reapply/conflict_resolution.sh @@ -0,0 +1,95 @@ +NS=ns1 +A=localhost:7233 +B=localhost:8233 +WID=my-workflow-id + +start-workflow() { + local addr=$1 + temporal workflow -n $NS --address $addr start --task-queue my-task-queue -w $WID --type my-workflow +} + +terminate-workflow() { + local addr=$1 + temporal workflow -n $NS --address $addr terminate -w $WID +} + +send-signal() { + local addr=$1 + local input=$2 + temporal workflow -n $NS --address $addr signal -w $WID --name my-signal --input "\"$input\"" +} + +run-worker() { + local addr=$1 + ../sdk-python/.venv/bin/python ../samples-python/reapply/conflict_resolution.py $addr +} + +send-update() { + local addr=$1 + local input=$2 + temporal workflow -n $NS --address $addr update -w $WID --name my-update --input "\"$input\"" +} + +failover() { + local cluster=$1 + tctl --ns $NS namespace update --active_cluster $cluster +} + +enable-replication() { + dc-set-replication-max-id 0 +} + +dc-set-replication-max-id() { + local id=$1 + sed -i '/history.ReplicationMaxEventId:/,/value:/ s/value: .*/value: '$id'/' config/dynamicconfig/development-cass.yaml + echo -n "Waiting 10s for dynamic config change..." + sleep 10 + echo +} + +list-events() { + local addr=$1 + temporal workflow -n $NS --address $addr show --output json -w $WID | + jq -r '.events[] | "\(.eventId) \(.eventType) \(.workflowExecutionSignaledEventAttributes.input[0])"' +} + +list-events-both-clusters() { + echo "cluster-a events:" + list-events $A + echo + echo "cluster-b events:" + list-events $B +} + +if false; then + make start-dependencies-cdc + make install-schema-xdc + + # Start two unconnected clusters (see config/development-cluster-*.yaml) + make start-xdc-cluster-a + make start-xdc-cluster-b + + # Add cluster b as a remote of a + # Add cluster a as a remote of b + tctl --address $A admin cluster upsert-remote-cluster --frontend_address $B + tctl --address $B admin cluster upsert-remote-cluster --frontend_address $A + sleep 30 + # Register a multi-region namespace + tctl --ns $NS namespace register --global_namespace true --active_cluster cluster-a --clusters cluster-a cluster-b +fi + +if false; then + + start-workflow $A + dc-set-replication-max-id 2 + send-signal $A A1 + list-events-both-clusters + failover cluster-b + dc-set-replication-max-id 3 + + # failover + + # simulate conflict + # re-enable replication + send-signal $B 2 +fi diff --git a/reapply/reset_reapply_updates.py b/reapply/reset_reapply_updates.py new file mode 100644 index 00000000..ec485fd3 --- /dev/null +++ b/reapply/reset_reapply_updates.py @@ -0,0 +1,202 @@ +import asyncio +import socket +from datetime import datetime, timedelta +from typing import Iterator, List +from uuid import uuid4 + +import temporalio.api.common.v1 +import temporalio.api.enums.v1 +import temporalio.api.history.v1 +import temporalio.api.workflowservice.v1 +import temporalio.common +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.worker import Worker + +try: + from rich import print +except ImportError: + pass + +RunId = str + +WORKFLOW_ID = uuid4().hex +TASK_QUEUE = __file__ + +N_UPDATES = 2 +REPLAY = False + + +@activity.defn +async def my_activity(arg: int) -> str: + return f"activity-result-{arg}" + + +@workflow.defn(sandboxed=False) +class WorkflowWithUpdateHandler: + def __init__(self) -> None: + self.update_results = [] + + @workflow.update + async def my_update(self, arg: int): + r = await workflow.execute_activity( + my_activity, arg, start_to_close_timeout=timedelta(seconds=10) + ) + self.update_results.append(r) + return self.update_results + + @workflow.run + async def run(self): + await workflow.wait_condition(lambda: len(self.update_results) == N_UPDATES) + return {"update_results": self.update_results} + + +async def app(client: Client): + handle = await client.start_workflow( + WorkflowWithUpdateHandler.run, + id=WORKFLOW_ID, + task_queue=TASK_QUEUE, + id_reuse_policy=temporalio.common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, + ) + + log( + f"sent start workflow request http://localhost:3000/namespaces/default/workflows/{WORKFLOW_ID}" + ) + + for i in range(N_UPDATES): + if True or input("execute update?") in ["y", ""]: + log("sending update...") + result = await handle.execute_update( + WorkflowWithUpdateHandler.my_update, arg=i + ) + log(f"received update result: {result}") + + if True or input("reset?") in ["y", ""]: + history = [e async for e in handle.fetch_history_events()] + reset_to = next_event( + history, + temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED, + ) + + log(f"sending reset to event {reset_to.event_id}...") + run_id = get_first_execution_run_id(history) + new_run_id = await reset_workflow(run_id, reset_to, client) + log( + f"did reset: http://localhost:3000/namespaces/default/workflows/{WORKFLOW_ID}/{new_run_id}" + ) + + new_handle = client.get_workflow_handle(WORKFLOW_ID, run_id=new_run_id) + + history = [e async for e in new_handle.fetch_history_events()] + + log("new history") + for e in history: + log(f"{e.event_id} {e.event_type}") + + wf_result = await new_handle.result() + print(f"reset wf result: {wf_result}") + log(f"reset wf result: {wf_result}") + else: + wf_result = await handle.result() + print(f"wf result: {wf_result}") + log(f"wf result: {wf_result}") + + +async def reset_workflow( + run_id: str, + event: temporalio.api.history.v1.HistoryEvent, + client: Client, +) -> RunId: + resp = await client.workflow_service.reset_workflow_execution( + temporalio.api.workflowservice.v1.ResetWorkflowExecutionRequest( + namespace="default", + workflow_execution=temporalio.api.common.v1.WorkflowExecution( + workflow_id=WORKFLOW_ID, + run_id=run_id, + ), + reason="Reset to test update reapply", + request_id="1", + reset_reapply_type=temporalio.api.enums.v1.ResetReapplyType.RESET_REAPPLY_TYPE_UNSPECIFIED, # TODO + workflow_task_finish_event_id=event.event_id, + ) + ) + assert resp.run_id + return resp.run_id + + +def next_event( + history: List[temporalio.api.history.v1.HistoryEvent], + event_type: temporalio.api.enums.v1.EventType.ValueType, +) -> temporalio.api.history.v1.HistoryEvent: + return next(e for e in history if e.event_type == event_type) + + +def get_first_execution_run_id( + history: List[temporalio.api.history.v1.HistoryEvent], +) -> str: + # TODO: correct way to obtain run_id + wf_started_event = next_event( + history, temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + ) + run_id = ( + wf_started_event.workflow_execution_started_event_attributes.first_execution_run_id + ) + assert run_id + return run_id + + +async def main(): + client = await Client.connect("localhost:7233") + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[WorkflowWithUpdateHandler], + activities=[my_activity], + sticky_queue_schedule_to_start_timeout=timedelta(hours=1), + max_cached_workflows=0 if REPLAY else 100, + ): + await app(client) + + +def only(it: Iterator): + t = next(it) + assert (t2 := next(it, it)) == it, f"iterator had multiple items: [{t}, {t2}]" + return t + + +def is_listening(addr: str) -> bool: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + h, p = addr.split(":") + try: + s.connect((h, int(p))) + return True + except socket.error: + return False + finally: + s.close() + + +def server() -> str: + return only( + filter( + is_listening, + ["localhost:8080", "localhost:8081", "localhost:8233"], + ) + ) + + +def log(s: str): + log_to_file(s, "client", "red") + + +def log_to_file(msg: str, prefix: str, color: str): + with open("/tmp/log", "a") as f: + time = datetime.now().strftime("%H:%M:%S.%f")[:-3] + print( + f"\n\n======================\n[{color}]{time} : {prefix} : {msg}[/{color}]\n\n", + file=f, + ) + + +if __name__ == "__main__": + asyncio.run(main())