Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async DB writes #245

Open
wants to merge 2 commits into
base: one-hour-one-life
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions dlgr/griduniverse/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dallinger.compat import unicode
from dallinger.config import get_config
from dallinger.experiment import Experiment
from dallinger.experiment_server.worker_events import WorkerEvent
from faker import Factory
from sqlalchemy import create_engine, func
from sqlalchemy.orm import scoped_session, sessionmaker
Expand Down Expand Up @@ -136,6 +137,47 @@ def format_field(self, value, format_spec):
formatter = PluralFormatter()


class GURecordEvent(WorkerEvent):

def __call__(self):
session = self.session
details = self.details
node = self.node
if self.participant and not self.node:
nodes = self.participant.nodes()
if nodes:
node = nodes[0]
if not node:
node = session.query(dallinger.nodes.Environment).one()

try:
info = Event(origin=node, details=details)
except ValueError:
logger.info(
"Tried to record an event after node#{} failure: {}".format(
node.id, self.details
)
)
return
session.add(info)
session.commit()


class GUUpdateEnvironmentState(WorkerEvent):

def __call__(self):
session = self.session
details = self.details
environment = self.node

state = environment.update(
json.dumps(details),
details=details
)
session.add(state)
session.commit()


def softmax(vector, temperature=1):
"""The softmax activation function."""
vector = [math.pow(x, temperature) for x in vector]
Expand Down Expand Up @@ -1392,26 +1434,23 @@ def parse_message(self, raw_message):

def record_event(self, details, player_id=None):
"""Record an event in the Info table."""
session = self.socket_session
if player_id == "spectator":
if player_id == 'spectator':
return
elif player_id:
node_id = self.node_by_player_id[player_id]
node = session.query(dallinger.models.Node).get(node_id)
else:
node = self.environment
node_id = node.id

try:
info = Event(origin=node, details=details)
except ValueError:
logger.info(
"Tried to record an event after node#{} failure: {}".format(
node.id, details
)
)
return
session.add(info)
session.commit()
from dallinger.experiment_server.worker_events import (
_get_queue,
worker_function,
)
q = _get_queue("high")
q.enqueue(
worker_function, 'GURecordEvent', None, player_id,
node_id=node_id, details=details
)

def publish(self, msg):
"""Publish a message to all griduniverse clients"""
Expand Down Expand Up @@ -1642,6 +1681,10 @@ def send_state_thread(self):

def game_loop(self):
"""Update the world state."""
from dallinger.experiment_server.worker_events import (
_get_queue,
worker_function,
)
gevent.sleep(0.1)
if not self.config.get("replay", False):
self.grid.build_labyrinth()
Expand All @@ -1658,15 +1701,20 @@ def game_loop(self):
previous_second_timestamp = self.grid.start_timestamp
count = 0

q = _get_queue("high")

while not self.grid.game_over:
# Record grid state to database
state_data = self.grid.serialize(
include_walls=self.grid.walls_updated,
include_items=self.grid.items_updated,
)
state = self.environment.update(json.dumps(state_data), details=state_data)
self.socket_session.add(state)
self.socket_session.commit()

q.enqueue(
worker_function, "GUUpdateEnvironmentState", None, None,
node_id=self.environment.id, details=state_data,
receive_timestamp=time.time()
)
count += 1
self.grid.walls_updated = False
self.grid.items_updated = False
Expand Down
12 changes: 12 additions & 0 deletions test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ def pubsub(exp):
exp.redis_conn = orig_conn


@pytest.fixture
def queue_getter():
with mock.patch("dallinger.experiment_server.worker_events._get_queue") as mock_queue_getter:
mock_queue_getter.return_value = mock.Mock()
yield mock_queue_getter


@pytest.fixture
def queue(queue_getter):
yield queue_getter.return_value


@pytest.fixture
def fresh_gridworld():
from dlgr.griduniverse.experiment import Gridworld
Expand Down
164 changes: 126 additions & 38 deletions test/test_griduniverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ def loop_exp_3x(self, exp):
exp.grid.start_timestamp = time.time()
exp.socket_session = mock.Mock()
exp.publish = mock.Mock()
# We serialize the environment node id to send to the job queue, so we
# need a serializable value here
exp.environment.id = 1

def count_down(counter):
for c in counter:
Expand Down Expand Up @@ -191,14 +194,30 @@ def test_loop_spawns_items(self, loop_exp_3x):
[i["item_count"] for i in exp.item_config.values()]
)

def test_loop_serialized_and_saves(self, loop_exp_3x):
def test_loop_records_spawned_items(self, loop_exp_3x, queue):
exp = loop_exp_3x
exp.game_loop()

total_items = sum(
[i["item_count"] for i in exp.item_config.values()]
)

# We record events for every spawned item and each of the three game loops
assert queue.enqueue.call_count == total_items + 3
for call in queue.enqueue.mock_calls[:-3]:
assert call.args[1] == "GURecordEvent"
assert call.kwargs["details"]["type"] == "spawn item"

def test_loop_calls_environment_state_save(self, loop_exp_3x, queue):
# Grid serialized and added to DB session once per loop
exp = loop_exp_3x
exp.game_loop()

assert exp.socket_session.add.call_count == 3
# Session commited once per loop and again at end
assert exp.socket_session.commit.call_count == 4
# We queue up the Environment state save task on each loop run.
# So the last three tasks queued after spawning items should be
# state saves
for call in queue.enqueue.mock_calls[-3:]:
assert call.args[1] == "GUUpdateEnvironmentState"

def test_loop_resets_state(self, loop_exp_3x):
# Wall and item state unset, item count reset during loop
Expand Down Expand Up @@ -286,19 +305,20 @@ def test_colors_distributed_almost_evenly_if_on_edge(self, exp, participants):

@pytest.mark.usefixtures("env")
class TestRecordPlayerActivity(object):
def test_records_player_events(self, exp, a):

def test_records_player_events(self, exp, a, queue):
participant = a.participant()
exp.handle_connect({"player_id": participant.id})
exp.send(
"griduniverse_ctrl:"
'{{"type":"move","player_id":{},"move":"left"}}'.format(participant.id)
)
time.sleep(10)
data = exp.retrieve_data()
# Get the last recorded event
event_detail = json.loads(data.infos.df["details"].values[-1])
assert event_detail["player_id"] == participant.id
assert event_detail["move"] == "left"
queue.enqueue.assert_called_once_with(
mock.ANY, "GURecordEvent", None, 1,
node_id=exp.node_by_player_id[participant.id],
details={"type": "move", "player_id": 1, "move": "left",
"server_time": mock.ANY}
)

def test_scores_and_payoffs_averaged(self, exp, a):
participant = a.participant()
Expand All @@ -313,46 +333,114 @@ def test_scores_and_payoffs_averaged(self, exp, a):
assert results["average_score"] >= 0.0
assert results["average_payoff"] >= 0.0

def test_record_event_with_participant(self, exp, a):
def test_record_event_with_participant(self, exp, a, queue):
# Adds event to player node
participant = a.participant()
exp.handle_connect({"player_id": participant.id})
exp.socket_session.add = mock.Mock()
exp.socket_session.commit = mock.Mock()
exp.record_event({"data": ["some data"]}, player_id=participant.id)
exp.socket_session.add.assert_called_once()
exp.socket_session.commit.assert_called_once()
info = exp.socket_session.add.call_args[0][0]
assert info.details["data"] == ["some data"]
assert info.origin.id == exp.node_by_player_id[participant.id]

def test_record_event_without_participant(self, exp):
exp.handle_connect({'player_id': participant.id})
exp.record_event({'data': ['some data']},
player_id=participant.id)

queue.enqueue.assert_called_once_with(
mock.ANY, 'GURecordEvent', None, 1, node_id=exp.node_by_player_id[participant.id],
details={'data': ['some data']}
)

def test_record_event_without_participant(self, exp, queue):
# Adds event to enviroment node
node = exp.environment
exp.socket_session.add = mock.Mock()
exp.socket_session.commit = mock.Mock()
exp.record_event({"data": ["some data"]})
exp.socket_session.add.assert_called_once()
exp.socket_session.commit.assert_called_once()
info = exp.socket_session.add.call_args[0][0]
assert info.details["data"] == ["some data"]
assert info.origin.id == node.id
exp.record_event({'data': ['some data']})
queue.enqueue.assert_called_once_with(
mock.ANY, 'GURecordEvent', None, None, node_id=node.id,
details={'data': ['some data']}
)


@pytest.mark.usefixtures('env')
class TestWorkerEvents(object):

def test_record_event(self, exp, a):
from dlgr.griduniverse.experiment import GURecordEvent
node = exp.environment
event_func = GURecordEvent(
participant=None, experiment=exp, session=mock.Mock(),
node=node, details={'data': ['some data']}
)
with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event:
event_func()
mock_event.assert_called_once_with(origin=node, details={'data': ['some data']})
event_func.session.add.assert_called_once()
event_func.session.commit.assert_called_once()

def test_record_event_with_participant_and_no_node(self, exp, a):
from dlgr.griduniverse.experiment import GURecordEvent
participant = a.participant()
exp.handle_connect({'player_id': participant.id})
node = participant.nodes()[0]
event_func = GURecordEvent(
participant=participant, experiment=exp, session=mock.Mock(),
node=None, details={'data': ['some data']}
)
with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event:
event_func()
mock_event.assert_called_once_with(origin=node, details={'data': ['some data']})
event_func.session.add.assert_called_once()
event_func.session.commit.assert_called_once()

def test_record_event_with_no_obtainable_node(self, exp, a):
from dallinger.nodes import Environment

from dlgr.griduniverse.experiment import GURecordEvent

# We create a participant, but do not attach a node
participant = a.participant()
# The event should fall back to looking up the environment node.
event_func = GURecordEvent(
participant=participant, experiment=exp, session=mock.Mock(),
node=None, details={'data': ['some data']}
)
mock_results = event_func.session.query.return_value = mock.Mock()
mock_environment = mock_results.one.return_value = mock.Mock()
with mock.patch('dlgr.griduniverse.experiment.Event') as mock_event:
event_func()
event_func.session.query.assert_called_once_with(Environment)
mock_results.one.assert_called_once()
mock_event.assert_called_once_with(origin=mock_environment,
details={'data': ['some data']})
event_func.session.add.assert_called_once()
event_func.session.commit.assert_called_once()

def test_record_event_with_failed_node(self, exp, a):
# Does not save event, but logs failure
from dlgr.griduniverse.experiment import GURecordEvent
node = exp.environment
node.failed = True
exp.socket_session.add = mock.Mock()
exp.socket_session.commit = mock.Mock()
with mock.patch("dlgr.griduniverse.experiment.logger.info") as logger:
exp.record_event({"data": ["some data"]})
assert exp.socket_session.add.call_count == 0
assert exp.socket_session.commit.call_count == 0
event_func = GURecordEvent(
participant=None, experiment=exp, session=mock.Mock(),
node=node, details={'data': ['some data']}
)
with mock.patch('dlgr.griduniverse.experiment.logger.info') as logger:
event_func()
event_func.session.add.assert_not_called()
event_func.session.commit.assert_not_called()
logger.assert_called_once()
assert logger.call_args.startswith(
"Tried to record an event after node#{} failure:".format(node.id)
)

def test_update_environment_state(self):
from dlgr.griduniverse.experiment import GUUpdateEnvironmentState
node = mock.Mock()
event_func = GUUpdateEnvironmentState(
session=mock.Mock(),
node=node, details={'data': ['some data']}
)
event_func()
node.update.assert_called_once_with(
'{"data": ["some data"]}',
details={'data': ['some data']},
)
event_func.session.add.assert_called_once()
event_func.session.commit.assert_called_once()


@pytest.mark.usefixtures("env")
class TestChat(object):
Expand Down