From b800c12623bc4f3125809ac73a2d7b7f1e51a49a Mon Sep 17 00:00:00 2001 From: Alec Mitchell Date: Thu, 20 Jul 2023 00:27:16 -0700 Subject: [PATCH 1/2] Update record events and game environment state updates to use async worker events. --- dlgr/griduniverse/experiment.py | 75 +++++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/dlgr/griduniverse/experiment.py b/dlgr/griduniverse/experiment.py index 6f1425e9..c558786d 100644 --- a/dlgr/griduniverse/experiment.py +++ b/dlgr/griduniverse/experiment.py @@ -22,6 +22,7 @@ from dallinger.compat import unicode from dallinger.config import get_config from dallinger.experiment import Experiment +from dallinger.experiment_server.worker_events import WorkerEvent from faker import Factory from sqlalchemy import create_engine, func from sqlalchemy.orm import scoped_session, sessionmaker @@ -136,6 +137,47 @@ def format_field(self, value, format_spec): formatter = PluralFormatter() +class GURecordEvent(WorkerEvent): + + def __call__(self): + session = self.session + details = self.details + node = self.node + if self.participant and not self.node: + nodes = self.participant.nodes() + if nodes: + node = nodes[0] + elif not node: + node = session.query(dallinger.nodes.Environment).one() + + try: + info = Event(origin=node, details=details) + except ValueError: + logger.info( + "Tried to record an event after node#{} failure: {}".format( + node.id, self.details + ) + ) + return + session.add(info) + session.commit() + + +class GUUpdateEnvironmentState(WorkerEvent): + + def __call__(self): + session = self.session + details = self.details + environment = self.node + + state = environment.update( + json.dumps(details), + details=details + ) + session.add(state) + session.commit() + + def softmax(vector, temperature=1): """The softmax activation function.""" vector = [math.pow(x, temperature) for x in vector] @@ -1392,26 +1434,20 @@ def parse_message(self, raw_message): def record_event(self, details, player_id=None): """Record an event in the Info table.""" - session = self.socket_session - if player_id == "spectator": + if player_id == 'spectator': return elif player_id: node_id = self.node_by_player_id[player_id] - node = session.query(dallinger.models.Node).get(node_id) else: node = self.environment + node_id = node.id - try: - info = Event(origin=node, details=details) - except ValueError: - logger.info( - "Tried to record an event after node#{} failure: {}".format( - node.id, details - ) - ) - return - session.add(info) - session.commit() + from dallinger.experiment_server.worker_events import worker_function, _get_queue + q = _get_queue("high") + q.enqueue( + worker_function, 'GURecordEvent', None, player_id, + node_id=node_id, details=details + ) def publish(self, msg): """Publish a message to all griduniverse clients""" @@ -1664,9 +1700,14 @@ def game_loop(self): include_walls=self.grid.walls_updated, include_items=self.grid.items_updated, ) - state = self.environment.update(json.dumps(state_data), details=state_data) - self.socket_session.add(state) - self.socket_session.commit() + + from dallinger.experiment_server.worker_events import worker_function, _get_queue + q = _get_queue("high") + q.enqueue( + worker_function, "GUUpdateEnvironmentState", None, None, + node_id=self.environment.id, details=state_data, + receive_timestamp=time.time() + ) count += 1 self.grid.walls_updated = False self.grid.items_updated = False From 1bd31bb7b64616dea972de6851e9a4a140752fe4 Mon Sep 17 00:00:00 2001 From: Alec Mitchell Date: Thu, 20 Jul 2023 13:05:56 -0700 Subject: [PATCH 2/2] Test fixes for async changes. Blacken. --- dlgr/griduniverse/experiment.py | 15 ++- test/conftest.py | 12 +++ test/test_griduniverse.py | 164 ++++++++++++++++++++++++-------- 3 files changed, 149 insertions(+), 42 deletions(-) diff --git a/dlgr/griduniverse/experiment.py b/dlgr/griduniverse/experiment.py index c558786d..c5c25968 100644 --- a/dlgr/griduniverse/experiment.py +++ b/dlgr/griduniverse/experiment.py @@ -147,7 +147,7 @@ def __call__(self): nodes = self.participant.nodes() if nodes: node = nodes[0] - elif not node: + if not node: node = session.query(dallinger.nodes.Environment).one() try: @@ -1442,7 +1442,10 @@ def record_event(self, details, player_id=None): node = self.environment node_id = node.id - from dallinger.experiment_server.worker_events import worker_function, _get_queue + from dallinger.experiment_server.worker_events import ( + _get_queue, + worker_function, + ) q = _get_queue("high") q.enqueue( worker_function, 'GURecordEvent', None, player_id, @@ -1678,6 +1681,10 @@ def send_state_thread(self): def game_loop(self): """Update the world state.""" + from dallinger.experiment_server.worker_events import ( + _get_queue, + worker_function, + ) gevent.sleep(0.1) if not self.config.get("replay", False): self.grid.build_labyrinth() @@ -1694,6 +1701,8 @@ def game_loop(self): previous_second_timestamp = self.grid.start_timestamp count = 0 + q = _get_queue("high") + while not self.grid.game_over: # Record grid state to database state_data = self.grid.serialize( @@ -1701,8 +1710,6 @@ def game_loop(self): include_items=self.grid.items_updated, ) - from dallinger.experiment_server.worker_events import worker_function, _get_queue - q = _get_queue("high") q.enqueue( worker_function, "GUUpdateEnvironmentState", None, None, node_id=self.environment.id, details=state_data, diff --git a/test/conftest.py b/test/conftest.py index 3ad69d76..588d4631 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -173,6 +173,18 @@ def pubsub(exp): exp.redis_conn = orig_conn +@pytest.fixture +def queue_getter(): + with mock.patch("dallinger.experiment_server.worker_events._get_queue") as mock_queue_getter: + mock_queue_getter.return_value = mock.Mock() + yield mock_queue_getter + + +@pytest.fixture +def queue(queue_getter): + yield queue_getter.return_value + + @pytest.fixture def fresh_gridworld(): from dlgr.griduniverse.experiment import Gridworld diff --git a/test/test_griduniverse.py b/test/test_griduniverse.py index a284db24..ca78138a 100755 --- a/test/test_griduniverse.py +++ b/test/test_griduniverse.py @@ -151,6 +151,9 @@ def loop_exp_3x(self, exp): exp.grid.start_timestamp = time.time() exp.socket_session = mock.Mock() exp.publish = mock.Mock() + # We serialize the environment node id to send to the job queue, so we + # need a serializable value here + exp.environment.id = 1 def count_down(counter): for c in counter: @@ -191,14 +194,30 @@ def test_loop_spawns_items(self, loop_exp_3x): [i["item_count"] for i in exp.item_config.values()] ) - def test_loop_serialized_and_saves(self, loop_exp_3x): + def test_loop_records_spawned_items(self, loop_exp_3x, queue): + exp = loop_exp_3x + exp.game_loop() + + total_items = sum( + [i["item_count"] for i in exp.item_config.values()] + ) + + # We record events for every spawned item and each of the three game loops + assert queue.enqueue.call_count == total_items + 3 + for call in queue.enqueue.mock_calls[:-3]: + assert call.args[1] == "GURecordEvent" + assert call.kwargs["details"]["type"] == "spawn item" + + def test_loop_calls_environment_state_save(self, loop_exp_3x, queue): # Grid serialized and added to DB session once per loop exp = loop_exp_3x exp.game_loop() - assert exp.socket_session.add.call_count == 3 - # Session commited once per loop and again at end - assert exp.socket_session.commit.call_count == 4 + # We queue up the Environment state save task on each loop run. + # So the last three tasks queued after spawning items should be + # state saves + for call in queue.enqueue.mock_calls[-3:]: + assert call.args[1] == "GUUpdateEnvironmentState" def test_loop_resets_state(self, loop_exp_3x): # Wall and item state unset, item count reset during loop @@ -286,19 +305,20 @@ def test_colors_distributed_almost_evenly_if_on_edge(self, exp, participants): @pytest.mark.usefixtures("env") class TestRecordPlayerActivity(object): - def test_records_player_events(self, exp, a): + + def test_records_player_events(self, exp, a, queue): participant = a.participant() exp.handle_connect({"player_id": participant.id}) exp.send( "griduniverse_ctrl:" '{{"type":"move","player_id":{},"move":"left"}}'.format(participant.id) ) - time.sleep(10) - data = exp.retrieve_data() - # Get the last recorded event - event_detail = json.loads(data.infos.df["details"].values[-1]) - assert event_detail["player_id"] == participant.id - assert event_detail["move"] == "left" + queue.enqueue.assert_called_once_with( + mock.ANY, "GURecordEvent", None, 1, + node_id=exp.node_by_player_id[participant.id], + details={"type": "move", "player_id": 1, "move": "left", + "server_time": mock.ANY} + ) def test_scores_and_payoffs_averaged(self, exp, a): participant = a.participant() @@ -313,46 +333,114 @@ def test_scores_and_payoffs_averaged(self, exp, a): assert results["average_score"] >= 0.0 assert results["average_payoff"] >= 0.0 - def test_record_event_with_participant(self, exp, a): + def test_record_event_with_participant(self, exp, a, queue): # Adds event to player node participant = a.participant() - exp.handle_connect({"player_id": participant.id}) - exp.socket_session.add = mock.Mock() - exp.socket_session.commit = mock.Mock() - exp.record_event({"data": ["some data"]}, player_id=participant.id) - exp.socket_session.add.assert_called_once() - exp.socket_session.commit.assert_called_once() - info = exp.socket_session.add.call_args[0][0] - assert info.details["data"] == ["some data"] - assert info.origin.id == exp.node_by_player_id[participant.id] - - def test_record_event_without_participant(self, exp): + exp.handle_connect({'player_id': participant.id}) + exp.record_event({'data': ['some data']}, + player_id=participant.id) + + queue.enqueue.assert_called_once_with( + mock.ANY, 'GURecordEvent', None, 1, node_id=exp.node_by_player_id[participant.id], + details={'data': ['some data']} + ) + + def test_record_event_without_participant(self, exp, queue): # Adds event to enviroment node node = exp.environment - exp.socket_session.add = mock.Mock() - exp.socket_session.commit = mock.Mock() - exp.record_event({"data": ["some data"]}) - exp.socket_session.add.assert_called_once() - exp.socket_session.commit.assert_called_once() - info = exp.socket_session.add.call_args[0][0] - assert info.details["data"] == ["some data"] - assert info.origin.id == node.id + exp.record_event({'data': ['some data']}) + queue.enqueue.assert_called_once_with( + mock.ANY, 'GURecordEvent', None, None, node_id=node.id, + details={'data': ['some data']} + ) + + +@pytest.mark.usefixtures('env') +class TestWorkerEvents(object): + + def test_record_event(self, exp, a): + from dlgr.griduniverse.experiment import GURecordEvent + node = exp.environment + event_func = GURecordEvent( + participant=None, experiment=exp, session=mock.Mock(), + node=node, details={'data': ['some data']} + ) + with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event: + event_func() + mock_event.assert_called_once_with(origin=node, details={'data': ['some data']}) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() + + def test_record_event_with_participant_and_no_node(self, exp, a): + from dlgr.griduniverse.experiment import GURecordEvent + participant = a.participant() + exp.handle_connect({'player_id': participant.id}) + node = participant.nodes()[0] + event_func = GURecordEvent( + participant=participant, experiment=exp, session=mock.Mock(), + node=None, details={'data': ['some data']} + ) + with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event: + event_func() + mock_event.assert_called_once_with(origin=node, details={'data': ['some data']}) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() + + def test_record_event_with_no_obtainable_node(self, exp, a): + from dallinger.nodes import Environment + + from dlgr.griduniverse.experiment import GURecordEvent + + # We create a participant, but do not attach a node + participant = a.participant() + # The event should fall back to looking up the environment node. + event_func = GURecordEvent( + participant=participant, experiment=exp, session=mock.Mock(), + node=None, details={'data': ['some data']} + ) + mock_results = event_func.session.query.return_value = mock.Mock() + mock_environment = mock_results.one.return_value = mock.Mock() + with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event: + event_func() + event_func.session.query.assert_called_once_with(Environment) + mock_results.one.assert_called_once() + mock_event.assert_called_once_with(origin=mock_environment, + details={'data': ['some data']}) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() def test_record_event_with_failed_node(self, exp, a): - # Does not save event, but logs failure + from dlgr.griduniverse.experiment import GURecordEvent node = exp.environment node.failed = True - exp.socket_session.add = mock.Mock() - exp.socket_session.commit = mock.Mock() - with mock.patch("dlgr.griduniverse.experiment.logger.info") as logger: - exp.record_event({"data": ["some data"]}) - assert exp.socket_session.add.call_count == 0 - assert exp.socket_session.commit.call_count == 0 + event_func = GURecordEvent( + participant=None, experiment=exp, session=mock.Mock(), + node=node, details={'data': ['some data']} + ) + with mock.patch('dlgr.griduniverse.experiment.logger.info') as logger: + event_func() + event_func.session.add.assert_not_called() + event_func.session.commit.assert_not_called() logger.assert_called_once() assert logger.call_args.startswith( "Tried to record an event after node#{} failure:".format(node.id) ) + def test_update_environment_state(self): + from dlgr.griduniverse.experiment import GUUpdateEnvironmentState + node = mock.Mock() + event_func = GUUpdateEnvironmentState( + session=mock.Mock(), + node=node, details={'data': ['some data']} + ) + event_func() + node.update.assert_called_once_with( + '{"data": ["some data"]}', + details={'data': ['some data']}, + ) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() + @pytest.mark.usefixtures("env") class TestChat(object):