Skip to content

Commit

Permalink
Issue #604/#644 test coverage for personal UDP mode
Browse files Browse the repository at this point in the history
and some related refactoring
  • Loading branch information
soxofaan committed Oct 11, 2024
1 parent f8db877 commit ff9c3f2
Showing 1 changed file with 130 additions and 72 deletions.
202 changes: 130 additions & 72 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import json
import re
import threading
Expand Down Expand Up @@ -38,8 +39,8 @@


@pytest.fixture
def con120(requests_mock) -> openeo.Connection:
requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0"))
def con(requests_mock) -> openeo.Connection:
requests_mock.get(OPENEO_BACKEND, json=build_capabilities(api_version="1.2.0", udp=True))
con = openeo.Connection(OPENEO_BACKEND)
return con

Expand Down Expand Up @@ -1010,72 +1011,69 @@ def test_create_job_db(tmp_path, filename, expected):

class TestUDPJobFactory:
@pytest.fixture
def dummy_backend(self, requests_mock, con120) -> DummyBackend:
dummy = DummyBackend(requests_mock=requests_mock, connection=con120)
def dummy_backend(self, requests_mock, con) -> DummyBackend:
dummy = DummyBackend(requests_mock=requests_mock, connection=con)
dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished")
return dummy

@pytest.fixture(autouse=True)
def remote_process_definitions(self, requests_mock):
requests_mock.get(
"https://remote.test/3plus5.json",
json={
"id": "3plus5",
"process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True},
PG_3PLUS5 = {
"id": "3plus5",
"process_graph": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True},
}
PG_INCREMENT = {
"id": "increment",
"parameters": [
{"name": "data", "description": "data", "schema": {"type": "number"}},
{
"name": "increment",
"description": "increment",
"schema": {"type": "number"},
"optional": True,
"default": 1,
},
)
requests_mock.get(
"https://remote.test/increment.json",
json={
"id": "increment",
"parameters": [
{"name": "data", "description": "data", "schema": {"type": "number"}},
{
"name": "increment",
"description": "increment",
"schema": {"type": "number"},
"optional": True,
"default": 1,
},
],
"process_graph": {
"process_id": "add",
"arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}},
"result": True,
],
"process_graph": {
"process_id": "add",
"arguments": {"x": {"from_parameter": "data"}, "y": {"from_parameter": "increment"}},
"result": True,
},
}
PG_OFFSET_POLYGON = {
"id": "offset_polygon",
"parameters": [
{"name": "data", "description": "data", "schema": {"type": "number"}},
{
"name": "polygons",
"description": "polygons",
"schema": {
"title": "GeoJSON",
"type": "object",
"subtype": "geojson",
},
},
)
requests_mock.get(
"https://remote.test/offset_poplygon.json",
json={
"id": "offset_poplygon",
"parameters": [
{"name": "data", "description": "data", "schema": {"type": "number"}},
{
"name": "polygons",
"description": "polygons",
"schema": {
"title": "GeoJSON",
"type": "object",
"subtype": "geojson",
},
},
{
"name": "offset",
"description": "Offset",
"schema": {"type": "number"},
"optional": True,
"default": 0,
},
],
{
"name": "offset",
"description": "Offset",
"schema": {"type": "number"},
"optional": True,
"default": 0,
},
)
],
}

def test_minimal(self, con120, dummy_backend):
@pytest.fixture(autouse=True)
def remote_process_definitions(self, requests_mock) -> dict:
mocks = {}
for pg in [self.PG_3PLUS5, self.PG_INCREMENT, self.PG_OFFSET_POLYGON]:
process_id = pg["id"]
mocks[process_id] = requests_mock.get(f"https://remote.test/{process_id}.json", json=pg)
return mocks

def test_minimal(self, con, dummy_backend, remote_process_definitions):
"""Bare minimum: just start a job, no parameters/arguments"""
job_factory = UDPJobFactory(process_id="3plus5", namespace="https://remote.test/3plus5.json")

job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con120)
job = job_factory.start_job(row=pd.Series({"foo": 123}), connection=con)
assert isinstance(job, BatchJob)
assert dummy_backend.batch_jobs == {
"job-000": {
Expand All @@ -1092,11 +1090,13 @@ def test_minimal(self, con120, dummy_backend):
}
}

def test_basic(self, con120, dummy_backend):
assert remote_process_definitions["3plus5"].call_count == 1

def test_basic(self, con, dummy_backend, remote_process_definitions):
"""Basic parameterized UDP job generation"""
job_factory = UDPJobFactory(process_id="increment", namespace="https://remote.test/increment.json")

job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con120)
job = job_factory.start_job(row=pd.Series({"data": 123}), connection=con)
assert isinstance(job, BatchJob)
assert dummy_backend.batch_jobs == {
"job-000": {
Expand All @@ -1112,6 +1112,7 @@ def test_basic(self, con120, dummy_backend):
"status": "created",
}
}
assert remote_process_definitions["increment"].call_count == 1

@pytest.mark.parametrize(
["parameter_defaults", "row", "expected_arguments"],
Expand All @@ -1122,15 +1123,15 @@ def test_basic(self, con120, dummy_backend):
({"increment": 5}, {"data": 123, "increment": 1000}, {"data": 123, "increment": 1000}),
],
)
def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults, row, expected_arguments):
def test_basic_parameterization(self, con, dummy_backend, parameter_defaults, row, expected_arguments):
"""Basic parameterized UDP job generation"""
job_factory = UDPJobFactory(
process_id="increment",
namespace="https://remote.test/increment.json",
parameter_defaults=parameter_defaults,
)

job = job_factory.start_job(row=pd.Series(row), connection=con120)
job = job_factory.start_job(row=pd.Series(row), connection=con)
assert isinstance(job, BatchJob)
assert dummy_backend.batch_jobs == {
"job-000": {
Expand All @@ -1154,7 +1155,9 @@ def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager:
job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1)
return job_manager

def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
def test_with_job_manager_remote_basic(
self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock, remote_process_definitions
):
job_starter = UDPJobFactory(
process_id="increment",
namespace="https://remote.test/increment.json",
Expand All @@ -1175,6 +1178,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
}
)
assert set(job_db.read().status) == {"finished"}
assert remote_process_definitions["increment"].call_count == 1

assert dummy_backend.batch_jobs == {
"job-000": {
Expand Down Expand Up @@ -1247,7 +1251,7 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
),
],
)
def test_udp_job_manager_parameter_handling(
def test_with_job_manager_remote_parameter_handling(
self,
tmp_path,
requests_mock,
Expand Down Expand Up @@ -1317,10 +1321,10 @@ def test_udp_job_manager_parameter_handling(
},
}

def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
def test_with_job_manager_remote_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
job_starter = UDPJobFactory(
process_id="offset_poplygon",
namespace="https://remote.test/offset_poplygon.json",
process_id="offset_polygon",
namespace="https://remote.test/offset_polygon.json",
parameter_defaults={"data": 123},
)

Expand Down Expand Up @@ -1361,9 +1365,9 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend,
"job-000": {
"job_id": "job-000",
"pg": {
"offsetpoplygon1": {
"process_id": "offset_poplygon",
"namespace": "https://remote.test/offset_poplygon.json",
"offsetpolygon1": {
"process_id": "offset_polygon",
"namespace": "https://remote.test/offset_polygon.json",
"arguments": {
"data": 123,
"polygons": {"type": "Point", "coordinates": [1.0, 2.0]},
Expand All @@ -1377,9 +1381,9 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend,
"job-001": {
"job_id": "job-001",
"pg": {
"offsetpoplygon1": {
"process_id": "offset_poplygon",
"namespace": "https://remote.test/offset_poplygon.json",
"offsetpolygon1": {
"process_id": "offset_polygon",
"namespace": "https://remote.test/offset_polygon.json",
"arguments": {
"data": 123,
"polygons": {"type": "Point", "coordinates": [3.0, 4.0]},
Expand All @@ -1391,3 +1395,57 @@ def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend,
"status": "finished",
},
}

def test_with_job_manager_udp_basic(
self, tmp_path, requests_mock, con, dummy_backend, job_manager, sleep_mock, remote_process_definitions
):
# make deep copy
udp = copy.deepcopy(self.PG_INCREMENT)
# Register personal UDP
increment_udp_mock = requests_mock.get(con.build_url("/process_graphs/increment"), json=udp)

job_starter = UDPJobFactory(
process_id="increment",
# No namespace to trigger personal UDP mode
namespace=None,
parameter_defaults={"increment": 5},
)
assert increment_udp_mock.call_count == 0

df = pd.DataFrame({"data": [3, 5]})
job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df)

stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter)
assert stats == dirty_equals.IsPartialDict(
{
"start_job call": 2,
"job finished": 2,
}
)
assert increment_udp_mock.call_count == 2
assert set(job_db.read().status) == {"finished"}

assert dummy_backend.batch_jobs == {
"job-000": {
"job_id": "job-000",
"pg": {
"increment1": {
"process_id": "increment",
"arguments": {"data": 3, "increment": 5},
"result": True,
}
},
"status": "finished",
},
"job-001": {
"job_id": "job-001",
"pg": {
"increment1": {
"process_id": "increment",
"arguments": {"data": 5, "increment": 5},
"result": True,
}
},
"status": "finished",
},
}

0 comments on commit ff9c3f2

Please sign in to comment.