diff --git a/dlgr/griduniverse/experiment.py b/dlgr/griduniverse/experiment.py index 34c03f20..bddd102f 100644 --- a/dlgr/griduniverse/experiment.py +++ b/dlgr/griduniverse/experiment.py @@ -153,7 +153,7 @@ def __call__(self): nodes = self.participant.nodes() if nodes: node = nodes[0] - elif not node: + if not node: node = session.query(dallinger.nodes.Environment).one() try: @@ -1684,6 +1684,7 @@ def send_state_thread(self): def game_loop(self): """Update the world state.""" + from dallinger.experiment_server.worker_events import worker_function, _get_queue gevent.sleep(0.1) if not self.config.get("replay", False): self.grid.build_labyrinth() @@ -1700,6 +1701,8 @@ def game_loop(self): previous_second_timestamp = self.grid.start_timestamp count = 0 + q = _get_queue("high") + while not self.grid.game_over: # Record grid state to database state_data = self.grid.serialize( @@ -1707,8 +1710,6 @@ def game_loop(self): include_items=self.grid.items_updated, ) - from dallinger.experiment_server.worker_events import worker_function, _get_queue - q = _get_queue("high") q.enqueue( worker_function, "GUUpdateEnvironmentState", None, None, node_id=self.environment.id, details=state_data, diff --git a/test/conftest.py b/test/conftest.py index b3328676..4b634492 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -174,6 +174,18 @@ def pubsub(exp): exp.redis_conn = orig_conn +@pytest.fixture +def queue_getter(): + with mock.patch("dallinger.experiment_server.worker_events._get_queue") as mock_queue_getter: + mock_queue_getter.return_value = mock.Mock() + yield mock_queue_getter + + +@pytest.fixture +def queue(queue_getter): + yield queue_getter.return_value + + @pytest.fixture def fresh_gridworld(): from dlgr.griduniverse.experiment import Gridworld diff --git a/test/test_griduniverse.py b/test/test_griduniverse.py index a096a76d..d3420f95 100755 --- a/test/test_griduniverse.py +++ b/test/test_griduniverse.py @@ -150,6 +150,9 @@ def loop_exp_3x(self, exp): exp.grid.start_timestamp = time.time() exp.socket_session = mock.Mock() exp.publish = mock.Mock() + # We serialize the environment node id to send to the job queue, so we + # need a serializable value here + exp.environment.id = 1 def count_down(counter): for c in counter: @@ -190,14 +193,30 @@ def test_loop_spawns_items(self, loop_exp_3x): [i["item_count"] for i in exp.item_config.values()] ) - def test_loop_serialized_and_saves(self, loop_exp_3x): + def test_loop_records_spawned_items(self, loop_exp_3x, queue): + exp = loop_exp_3x + exp.game_loop() + + total_items = sum( + [i["item_count"] for i in exp.item_config.values()] + ) + + # We record events for every spawned item and each of the three game loops + assert queue.enqueue.call_count == total_items + 3 + for call in queue.enqueue.mock_calls[:-3]: + assert call.args[1] == "GURecordEvent" + assert call.kwargs["details"]["type"] == "spawn item" + + def test_loop_calls_environment_state_save(self, loop_exp_3x, queue): # Grid serialized and added to DB session once per loop exp = loop_exp_3x exp.game_loop() - assert exp.socket_session.add.call_count == 3 - # Session commited once per loop and again at end - assert exp.socket_session.commit.call_count == 4 + # We queue up the Environment state save task on each loop run. + # So the last three tasks queued after spawning items should be + # state saves + for call in queue.enqueue.mock_calls[-3:]: + assert call.args[1] == "GUUpdateEnvironmentState" def test_loop_resets_state(self, loop_exp_3x): # Wall and item state unset, item count reset during loop @@ -285,19 +304,20 @@ def test_colors_distributed_almost_evenly_if_on_edge(self, exp, participants): @pytest.mark.usefixtures("env") class TestRecordPlayerActivity(object): - def test_records_player_events(self, exp, a): + + def test_records_player_events(self, exp, a, queue): participant = a.participant() exp.handle_connect({"player_id": participant.id}) exp.send( "griduniverse_ctrl:" '{{"type":"move","player_id":{},"move":"left"}}'.format(participant.id) ) - time.sleep(10) - data = exp.retrieve_data() - # Get the last recorded event - event_detail = json.loads(data.infos.df["details"].values[-1]) - assert event_detail["player_id"] == participant.id - assert event_detail["move"] == "left" + queue.enqueue.assert_called_once_with( + mock.ANY, "GURecordEvent", None, 1, + node_id=exp.node_by_player_id[participant.id], + details={"type": "move", "player_id": 1, "move": "left", + "server_time": mock.ANY} + ) def test_scores_and_payoffs_averaged(self, exp, a): participant = a.participant() @@ -312,46 +332,112 @@ def test_scores_and_payoffs_averaged(self, exp, a): assert results["average_score"] >= 0.0 assert results["average_payoff"] >= 0.0 - def test_record_event_with_participant(self, exp, a): + def test_record_event_with_participant(self, exp, a, queue): # Adds event to player node participant = a.participant() - exp.handle_connect({"player_id": participant.id}) - exp.socket_session.add = mock.Mock() - exp.socket_session.commit = mock.Mock() - exp.record_event({"data": ["some data"]}, player_id=participant.id) - exp.socket_session.add.assert_called_once() - exp.socket_session.commit.assert_called_once() - info = exp.socket_session.add.call_args[0][0] - assert info.details["data"] == ["some data"] - assert info.origin.id == exp.node_by_player_id[participant.id] - - def test_record_event_without_participant(self, exp): + exp.handle_connect({'player_id': participant.id}) + exp.record_event({'data': ['some data']}, + player_id=participant.id) + + queue.enqueue.assert_called_once_with( + mock.ANY, 'GURecordEvent', None, 1, node_id=exp.node_by_player_id[participant.id], + details={'data': ['some data']} + ) + + def test_record_event_without_participant(self, exp, queue): # Adds event to enviroment node node = exp.environment - exp.socket_session.add = mock.Mock() - exp.socket_session.commit = mock.Mock() - exp.record_event({"data": ["some data"]}) - exp.socket_session.add.assert_called_once() - exp.socket_session.commit.assert_called_once() - info = exp.socket_session.add.call_args[0][0] - assert info.details["data"] == ["some data"] - assert info.origin.id == node.id + exp.record_event({'data': ['some data']}) + queue.enqueue.assert_called_once_with( + mock.ANY, 'GURecordEvent', None, None, node_id=node.id, + details={'data': ['some data']} + ) + + +@pytest.mark.usefixtures('env') +class TestWorkerEvents(object): + + def test_record_event(self, exp, a): + from dlgr.griduniverse.experiment import GURecordEvent + node = exp.environment + event_func = GURecordEvent( + participant=None, experiment=exp, session=mock.Mock(), + node=node, details={'data': ['some data']} + ) + with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event: + event_func() + mock_event.assert_called_once_with(origin=node, details={'data': ['some data']}) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() + + def test_record_event_with_participant_and_no_node(self, exp, a): + from dlgr.griduniverse.experiment import GURecordEvent + participant = a.participant() + exp.handle_connect({'player_id': participant.id}) + node = participant.nodes()[0] + event_func = GURecordEvent( + participant=participant, experiment=exp, session=mock.Mock(), + node=None, details={'data': ['some data']} + ) + with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event: + event_func() + mock_event.assert_called_once_with(origin=node, details={'data': ['some data']}) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() + + def test_record_event_with_no_obtainable_node(self, exp, a): + from dallinger.nodes import Environment + from dlgr.griduniverse.experiment import GURecordEvent + # We create a participant, but do not attach a node + participant = a.participant() + # The event should fall back to looking up the environment node. + event_func = GURecordEvent( + participant=participant, experiment=exp, session=mock.Mock(), + node=None, details={'data': ['some data']} + ) + mock_results = event_func.session.query.return_value = mock.Mock() + mock_environment = mock_results.one.return_value = mock.Mock() + with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event: + event_func() + event_func.session.query.assert_called_once_with(Environment) + mock_results.one.assert_called_once() + mock_event.assert_called_once_with(origin=mock_environment, + details={'data': ['some data']}) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() def test_record_event_with_failed_node(self, exp, a): - # Does not save event, but logs failure + from dlgr.griduniverse.experiment import GURecordEvent node = exp.environment node.failed = True - exp.socket_session.add = mock.Mock() - exp.socket_session.commit = mock.Mock() - with mock.patch("dlgr.griduniverse.experiment.logger.info") as logger: - exp.record_event({"data": ["some data"]}) - assert exp.socket_session.add.call_count == 0 - assert exp.socket_session.commit.call_count == 0 + event_func = GURecordEvent( + participant=None, experiment=exp, session=mock.Mock(), + node=node, details={'data': ['some data']} + ) + with mock.patch('dlgr.griduniverse.experiment.logger.info') as logger: + event_func() + event_func.session.add.assert_not_called() + event_func.session.commit.assert_not_called() logger.assert_called_once() assert logger.call_args.startswith( "Tried to record an event after node#{} failure:".format(node.id) ) + def test_update_environment_state(self): + from dlgr.griduniverse.experiment import GUUpdateEnvironmentState + node = mock.Mock() + event_func = GUUpdateEnvironmentState( + session=mock.Mock(), + node=node, details={'data': ['some data']} + ) + event_func() + node.update.assert_called_once_with( + '{"data": ["some data"]}', + details={'data': ['some data']}, + ) + event_func.session.add.assert_called_once() + event_func.session.commit.assert_called_once() + @pytest.mark.usefixtures("env") class TestChat(object):