diff --git a/ansible_runner/streaming.py b/ansible_runner/streaming.py index ff9622c05..a3a6a8219 100644 --- a/ansible_runner/streaming.py +++ b/ansible_runner/streaming.py @@ -333,8 +333,13 @@ def run(self): try: line = self._input.readline() data = json.loads(line) - except (json.decoder.JSONDecodeError, IOError): - self.status_callback({'status': 'error', 'job_explanation': 'Failed to JSON parse a line from worker stream.'}) + except (json.decoder.JSONDecodeError, IOError) as exc: + self.status_callback({ + 'status': 'error', + 'job_explanation': ( + f'Failed to JSON parse a line from worker stream. Error: {exc} Line with invalid JSON data: {line[:1000]}' + ) + }) break if 'status' in data: diff --git a/test/integration/test_transmit_worker_process.py b/test/integration/test_transmit_worker_process.py index f86e56513..85ad71e86 100644 --- a/test/integration/test_transmit_worker_process.py +++ b/test/integration/test_transmit_worker_process.py @@ -459,11 +459,15 @@ def test_garbage_private_dir_worker(tmp_path): _output=outgoing_buffer, private_data_dir=worker_dir, ) - sent = outgoing_buffer.getvalue() - assert b'"status": "error"' in sent + outgoing_buffer.seek(0) + sent = outgoing_buffer.readline() + data = json.loads(sent) + assert data['status'] == 'error' + assert data['job_explanation'] == 'Failed to extract private data directory on worker.' + assert data['result_traceback'] -def test_unparsable_private_dir_worker(tmp_path): +def test_unparsable_line_worker(tmp_path): worker_dir = tmp_path / 'for_worker' worker_dir.mkdir() incoming_buffer = io.BytesIO(b'') @@ -476,18 +480,27 @@ def test_unparsable_private_dir_worker(tmp_path): _output=outgoing_buffer, private_data_dir=worker_dir, ) - sent = outgoing_buffer.getvalue() - assert b'"status": "error"' in sent + outgoing_buffer.seek(0) + sent = outgoing_buffer.readline() + data = json.loads(sent) + assert data['status'] == 'error' + assert data['job_explanation'] == 'Failed to JSON parse a line from transmit stream.' -def test_unparsable_private_dir_processor(tmp_path): +def test_unparsable_really_big_line_processor(tmp_path): process_dir = tmp_path / 'for_process' process_dir.mkdir() - incoming_buffer = io.BytesIO(b'') + incoming_buffer = io.BytesIO(bytes(f'not-json-data with extra garbage:{"f"*10000}', encoding='utf-8')) + + def status_receiver(status_data, runner_config): + assert status_data['status'] == 'error' + assert 'Failed to JSON parse a line from worker stream.' in status_data['job_explanation'] + assert 'not-json-data with extra garbage:ffffffffff' in status_data['job_explanation'] + assert len(status_data['job_explanation']) < 2000 - processor = run( + run( streamer='process', _input=incoming_buffer, private_data_dir=process_dir, + status_handler=status_receiver ) - assert processor.status == 'error'