Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update reapply #102

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions reapply/conflict_resolution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import sys
from datetime import timedelta

import temporalio.api.common.v1
import temporalio.api.enums.v1
import temporalio.api.history.v1
import temporalio.api.workflowservice.v1
import temporalio.common
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

try:
from rich import print
except ImportError:
pass

RunId = str

NAMESPACE = "ns1"
WORKFLOW_ID = "my-workflow-id"
TASK_QUEUE = "my-task-queue"
N_SIGNALS = 1
N_UPDATES = 0
REPLAY = False


@activity.defn
async def my_activity(arg: str) -> str:
return f"activity-result-{arg}"


@workflow.defn(name="my-workflow", sandboxed=False)
class MyWorkflow:
def __init__(self) -> None:
self.done = False
self.signal_results = []
self.update_results = []

@workflow.signal(name="my-signal")
async def my_signal(self, arg: str):
if arg == "done":
self.done = True
else:
self.signal_results.append(arg)

@workflow.update(name="my-update")
async def my_update(self, arg: str):
r = await workflow.execute_activity(
my_activity, arg, start_to_close_timeout=timedelta(seconds=10)
)
self.update_results.append(r)
return self.update_results

@workflow.run
async def run(self):
await workflow.wait_condition(lambda: self.done)
return {
"signal_results": self.signal_results,
"update_results": self.update_results,
}


async def execute_workflow(client: Client):
handle = await client.start_workflow(
MyWorkflow.run,
id=WORKFLOW_ID,
task_queue=TASK_QUEUE,
id_reuse_policy=temporalio.common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
)

print(
f"started workflow: http://localhost:XXXX/namespaces/default/workflows/{WORKFLOW_ID}"
)
wf_result = await handle.result()
print(f"wf result: {wf_result}")


async def main(address: str):
client = await Client.connect(address, namespace=NAMESPACE)
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[MyWorkflow],
activities=[my_activity],
sticky_queue_schedule_to_start_timeout=timedelta(hours=1),
max_cached_workflows=0 if REPLAY else 100,
):
await asyncio.sleep(0xFFFFFF)


if __name__ == "__main__":
[address] = sys.argv[1:]
asyncio.run(main(address))
95 changes: 95 additions & 0 deletions reapply/conflict_resolution.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
NS=ns1
A=localhost:7233
B=localhost:8233
WID=my-workflow-id

start-workflow() {
local addr=$1
temporal workflow -n $NS --address $addr start --task-queue my-task-queue -w $WID --type my-workflow
}

terminate-workflow() {
local addr=$1
temporal workflow -n $NS --address $addr terminate -w $WID
}

send-signal() {
local addr=$1
local input=$2
temporal workflow -n $NS --address $addr signal -w $WID --name my-signal --input "\"$input\""
}

run-worker() {
local addr=$1
../sdk-python/.venv/bin/python ../samples-python/reapply/conflict_resolution.py $addr
}

send-update() {
local addr=$1
local input=$2
temporal workflow -n $NS --address $addr update -w $WID --name my-update --input "\"$input\""
}

failover() {
local cluster=$1
tctl --ns $NS namespace update --active_cluster $cluster
}

enable-replication() {
dc-set-replication-max-id 0
}

dc-set-replication-max-id() {
local id=$1
sed -i '/history.ReplicationMaxEventId:/,/value:/ s/value: .*/value: '$id'/' config/dynamicconfig/development-cass.yaml
echo -n "Waiting 10s for dynamic config change..."
sleep 10
echo
}

list-events() {
local addr=$1
temporal workflow -n $NS --address $addr show --output json -w $WID |
jq -r '.events[] | "\(.eventId) \(.eventType) \(.workflowExecutionSignaledEventAttributes.input[0])"'
}

list-events-both-clusters() {
echo "cluster-a events:"
list-events $A
echo
echo "cluster-b events:"
list-events $B
}

if false; then
make start-dependencies-cdc
make install-schema-xdc

# Start two unconnected clusters (see config/development-cluster-*.yaml)
make start-xdc-cluster-a
make start-xdc-cluster-b

# Add cluster b as a remote of a
# Add cluster a as a remote of b
tctl --address $A admin cluster upsert-remote-cluster --frontend_address $B
tctl --address $B admin cluster upsert-remote-cluster --frontend_address $A
sleep 30
# Register a multi-region namespace
tctl --ns $NS namespace register --global_namespace true --active_cluster cluster-a --clusters cluster-a cluster-b
fi

if false; then

start-workflow $A
dc-set-replication-max-id 2
send-signal $A A1
list-events-both-clusters
failover cluster-b
dc-set-replication-max-id 3

# failover

# simulate conflict
# re-enable replication
send-signal $B 2
fi
202 changes: 202 additions & 0 deletions reapply/reset_reapply_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
import asyncio
import socket
from datetime import datetime, timedelta
from typing import Iterator, List
from uuid import uuid4

import temporalio.api.common.v1
import temporalio.api.enums.v1
import temporalio.api.history.v1
import temporalio.api.workflowservice.v1
import temporalio.common
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker

try:
from rich import print
except ImportError:
pass

RunId = str

WORKFLOW_ID = uuid4().hex
TASK_QUEUE = __file__

N_UPDATES = 2
REPLAY = False


@activity.defn
async def my_activity(arg: int) -> str:
return f"activity-result-{arg}"


@workflow.defn(sandboxed=False)
class WorkflowWithUpdateHandler:
def __init__(self) -> None:
self.update_results = []

@workflow.update
async def my_update(self, arg: int):
r = await workflow.execute_activity(
my_activity, arg, start_to_close_timeout=timedelta(seconds=10)
)
self.update_results.append(r)
return self.update_results

@workflow.run
async def run(self):
await workflow.wait_condition(lambda: len(self.update_results) == N_UPDATES)
return {"update_results": self.update_results}


async def app(client: Client):
handle = await client.start_workflow(
WorkflowWithUpdateHandler.run,
id=WORKFLOW_ID,
task_queue=TASK_QUEUE,
id_reuse_policy=temporalio.common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
)

log(
f"sent start workflow request http://localhost:3000/namespaces/default/workflows/{WORKFLOW_ID}"
)

for i in range(N_UPDATES):
if True or input("execute update?") in ["y", ""]:
log("sending update...")
result = await handle.execute_update(
WorkflowWithUpdateHandler.my_update, arg=i
)
log(f"received update result: {result}")

if True or input("reset?") in ["y", ""]:
history = [e async for e in handle.fetch_history_events()]
reset_to = next_event(
history,
temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED,
)

log(f"sending reset to event {reset_to.event_id}...")
run_id = get_first_execution_run_id(history)
new_run_id = await reset_workflow(run_id, reset_to, client)
log(
f"did reset: http://localhost:3000/namespaces/default/workflows/{WORKFLOW_ID}/{new_run_id}"
)

new_handle = client.get_workflow_handle(WORKFLOW_ID, run_id=new_run_id)

history = [e async for e in new_handle.fetch_history_events()]

log("new history")
for e in history:
log(f"{e.event_id} {e.event_type}")

wf_result = await new_handle.result()
print(f"reset wf result: {wf_result}")
log(f"reset wf result: {wf_result}")
else:
wf_result = await handle.result()
print(f"wf result: {wf_result}")
log(f"wf result: {wf_result}")


async def reset_workflow(
run_id: str,
event: temporalio.api.history.v1.HistoryEvent,
client: Client,
) -> RunId:
resp = await client.workflow_service.reset_workflow_execution(
temporalio.api.workflowservice.v1.ResetWorkflowExecutionRequest(
namespace="default",
workflow_execution=temporalio.api.common.v1.WorkflowExecution(
workflow_id=WORKFLOW_ID,
run_id=run_id,
),
reason="Reset to test update reapply",
request_id="1",
reset_reapply_type=temporalio.api.enums.v1.ResetReapplyType.RESET_REAPPLY_TYPE_UNSPECIFIED, # TODO
workflow_task_finish_event_id=event.event_id,
)
)
assert resp.run_id
return resp.run_id


def next_event(
history: List[temporalio.api.history.v1.HistoryEvent],
event_type: temporalio.api.enums.v1.EventType.ValueType,
) -> temporalio.api.history.v1.HistoryEvent:
return next(e for e in history if e.event_type == event_type)


def get_first_execution_run_id(
history: List[temporalio.api.history.v1.HistoryEvent],
) -> str:
# TODO: correct way to obtain run_id
wf_started_event = next_event(
history, temporalio.api.enums.v1.EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
)
run_id = (
wf_started_event.workflow_execution_started_event_attributes.first_execution_run_id
)
assert run_id
return run_id


async def main():
client = await Client.connect("localhost:7233")
async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[WorkflowWithUpdateHandler],
activities=[my_activity],
sticky_queue_schedule_to_start_timeout=timedelta(hours=1),
max_cached_workflows=0 if REPLAY else 100,
):
await app(client)


def only(it: Iterator):
t = next(it)
assert (t2 := next(it, it)) == it, f"iterator had multiple items: [{t}, {t2}]"
return t


def is_listening(addr: str) -> bool:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
h, p = addr.split(":")
try:
s.connect((h, int(p)))
return True
except socket.error:
return False
finally:
s.close()


def server() -> str:
return only(
filter(
is_listening,
["localhost:8080", "localhost:8081", "localhost:8233"],
)
)


def log(s: str):
log_to_file(s, "client", "red")


def log_to_file(msg: str, prefix: str, color: str):
with open("/tmp/log", "a") as f:
time = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(
f"\n\n======================\n[{color}]{time} : {prefix} : {msg}[/{color}]\n\n",
file=f,
)


if __name__ == "__main__":
asyncio.run(main())
Loading