diff --git a/cdci_data_analysis/analysis/matrix_helper.py b/cdci_data_analysis/analysis/matrix_helper.py index 92ce22005..70992025a 100644 --- a/cdci_data_analysis/analysis/matrix_helper.py +++ b/cdci_data_analysis/analysis/matrix_helper.py @@ -104,8 +104,9 @@ def send_incident_report_message( env.filters['humanize_future'] = humanize_future matrix_server_url = config.matrix_server_url - matrix_sender_access_token = config.matrix_sender_access_token - receiver_room_id = tokenHelper.get_token_user_matrix_room_id(decoded_token) + + incident_report_receivers_room_ids = config.matrix_incident_report_receivers_room_ids + incident_report_sender_personal_access_token = config.matrix_incident_report_sender_personal_access_token matrix_message_data = { 'request': { @@ -126,16 +127,23 @@ def send_incident_report_message( # open("debug_email_lines_too_long.html", "w").write(email_body_html) # open("debug_email_lines_too_long.text", "w").write(email_text) # raise MatrixMessageNotSent(f"message not sent on matrix, lines too long!") + res_content = { + 'res_content_incident_reports': [] + } - res_data = send_message(url_server=matrix_server_url, - sender_access_token=matrix_sender_access_token, - room_id=receiver_room_id, - message_text=message_text, - message_body_html=message_body_html - ) + message_data = { + 'message_data_incident_reports': [] + } - message_data = res_data['message_data'] - res_content = res_data['res_content'] + for incident_report_receiver_room_id in incident_report_receivers_room_ids: + res_data_message_receiver = send_message(url_server=matrix_server_url, + sender_access_token=incident_report_sender_personal_access_token, + room_id=incident_report_receiver_room_id, + message_text=message_text, + message_body_html=message_body_html + ) + message_data['message_data_incident_reports'].append(res_data_message_receiver['message_data']) + res_content['res_content_incident_reports'].append(res_data_message_receiver['res_content']) store_incident_report_matrix_message(message_data, scratch_dir, sending_time=sending_time) @@ -188,6 +196,8 @@ def send_job_message( matrix_sender_access_token = config.matrix_sender_access_token receiver_room_id = tokenHelper.get_token_user_matrix_room_id(decoded_token) + bcc_receivers_room_ids = config.matrix_bcc_receivers_room_ids + matrix_message_data = { 'oda_site': { 'site_name': config.site_name, @@ -218,22 +228,47 @@ def send_job_message( template = env.get_template('matrix_message.html') message_body_html = template.render(**matrix_message_data) message_text = textify_matrix_message(message_body_html) - res_data = send_message(url_server=matrix_server_url, - sender_access_token=matrix_sender_access_token, - room_id=receiver_room_id, - message_text=message_text, - message_body_html=message_body_html - ) + res_content = { + 'res_content_bcc_users': [] + } + + message_data = { + 'message_data_bcc_users': [] + } + if receiver_room_id is not None and receiver_room_id != "": + res_data_message_token_user = send_message(url_server=matrix_server_url, + sender_access_token=matrix_sender_access_token, + room_id=receiver_room_id, + message_text=message_text, + message_body_html=message_body_html + ) + message_data_token_user = res_data_message_token_user['message_data'] + res_content_token_user = res_data_message_token_user['res_content'] + message_data['message_data_token_user'] = message_data_token_user + res_content['res_content_token_user'] = res_content_token_user + else: + matrix_helper_logger.warning('a matrix message could not be sent to the token user as no personal room id was ' + 'provided within the token') + + for bcc_receiver_room_id in bcc_receivers_room_ids: + if bcc_receiver_room_id is not None and bcc_receiver_room_id != "": + res_data_message_cc_user = send_message(url_server=matrix_server_url, + sender_access_token=matrix_sender_access_token, + room_id=bcc_receiver_room_id, + message_text=message_text, + message_body_html=message_body_html + ) + message_data_cc_user = res_data_message_cc_user['message_data'] + message_data['message_data_bcc_users'].append(message_data_cc_user) + res_content_cc_user = res_data_message_cc_user['res_content'] + res_content['res_content_bcc_users'].append(res_content_cc_user) - message_data = res_data['message_data'] - res_content = res_data['res_content'] store_status_matrix_message_info(message_data, status, scratch_dir, sending_time=sending_time, first_submitted_time=time_request) return res_content - def send_message( url_server=None, sender_access_token=None, @@ -241,20 +276,6 @@ def send_message( message_text=None, message_body_html=None, ): - - if url_server is None: - matrix_helper_logger.info('matrix url server not available') - raise MissingRequestParameter('matrix url server not available') - if sender_access_token is None: - matrix_helper_logger.info('matrix sender_access_token not available') - raise MissingRequestParameter('matrix sender_access_token not available') - if room_id is None: - matrix_helper_logger.info('matrix room_id not available') - raise MissingRequestParameter('matrix room_id not available') - if message_text is None or message_body_html is None: - matrix_helper_logger.info('matrix message not available') - raise MissingRequestParameter('matrix message not available') - matrix_helper_logger.info(f"Sending message to the room id: {room_id}") url = os.path.join(url_server, f'_matrix/client/r0/rooms/{room_id}/send/m.room.message') @@ -279,14 +300,14 @@ def send_message( except json.decoder.JSONDecodeError: error_msg = res.text matrix_helper_logger.warning(f"there seems to be some problem in sending a message via matrix:\n" - f"the requested url {url} lead to the error {error_msg}, " - "this might be due to an error in the url or the page requested no longer exists, " - "please check it and try to issue again the request") + f"the requested url {url} lead to the error {error_msg}, " + "this might be due to an error in the url or the page requested no longer exists, " + "please check it and try to issue again the request") matrix_error_message = error_msg sentry.capture_message(f'issue in sending a message via matrix, the requested url {url} lead to the error ' f'{matrix_error_message}') - raise MatrixMessageNotSent('issue when performing a request to the product gallery', + raise MatrixMessageNotSent('issue in sending a message via matrix', status_code=res.status_code, payload={'matrix_error_message': matrix_error_message}) @@ -300,12 +321,31 @@ def send_message( return res_data +def is_matrix_config_ok(config): + if config.matrix_server_url is None: + matrix_helper_logger.info('matrix url server not available') + return False + if config.matrix_sender_access_token is None: + matrix_helper_logger.info('matrix sender_access_token not available') + return False + return True + + def is_message_to_send_run_query(status, time_original_request, scratch_dir, job_id, config, decoded_token=None): log_additional_info_obj = {} sending_ok = False + config_ok = is_matrix_config_ok(config) time_check = time_.time() sentry_for_matrix_message_sending_check = config.sentry_for_matrix_message_sending_check + + if config.matrix_server_url is None: + matrix_helper_logger.info('matrix url server not available') + config_ok = False + if config.matrix_sender_access_token is None: + matrix_helper_logger.info('matrix sender_access_token not available') + config_ok = False + # get total request duration if decoded_token: # in case the job is just submitted and was not submitted before, at least since some time @@ -387,12 +427,24 @@ def is_message_to_send_run_query(status, time_original_request, scratch_dir, job else: matrix_helper_logger.info(f'a message on matrix will not be sent because a token was not provided') - return sending_ok + return sending_ok and config_ok + + +def is_matrix_config_present(config): + url_server = config.matrix_server_url + sender_access_token = config.matrix_sender_access_token + + if url_server is None or sender_access_token is None: + matrix_helper_logger.info('matrix url server not available') + return False + + return True def is_message_to_send_callback(status, time_original_request, scratch_dir, config, job_id, decoded_token=None): log_additional_info_obj = {} sending_ok = False + config_ok = is_matrix_config_ok(config) time_check = time_.time() sentry_for_matrix_message_sending_check = config.sentry_for_matrix_message_sending_check @@ -467,7 +519,7 @@ def is_message_to_send_callback(status, time_original_request, scratch_dir, conf additional_info_obj=log_additional_info_obj ) - return sending_ok + return sending_ok and config_ok def log_matrix_message_sending_info(status, time_request, scratch_dir, job_id, additional_info_obj=None): diff --git a/cdci_data_analysis/config_dir/conf_env.yml.example b/cdci_data_analysis/config_dir/conf_env.yml.example index 467c7c120..8664ab7ee 100644 --- a/cdci_data_analysis/config_dir/conf_env.yml.example +++ b/cdci_data_analysis/config_dir/conf_env.yml.example @@ -80,6 +80,16 @@ dispatcher: matrix_server_url: MATRIX_SERVER_URL # access token of the sender account, from which the messages will be sent from matrix_sender_access_token: MATRIX_SENDER_ACCESS_TOKEN + # list of additional room ids receivers towards which the message will be sent, besides the room id extracted from the token + matrix_bcc_receivers_room_ids: ['room_id'] + # incident report related options, for the messages sent via matrix + incident_report_matrix_options: + # list of room ids receivers towards which the incident report message will be sent + matrix_incident_report_receivers_room_ids: ['room_id'] + # personal access token of the sender of the incident report + matrix_incident_report_sender_personal_access_token: MATRIX_INCIDENT_REPORT_SENDER_ACCESS_TOKEN + # list of room ids to which the matrix message should be sent + matrix_cc_receivers_room_id: [] # enable/disable sending of messages via matrix in case of a submitted job status matrix_message_sending_job_submitted: True # amount of time (in seconds) elapsed from the sending of the last message on matrix diff --git a/cdci_data_analysis/configurer.py b/cdci_data_analysis/configurer.py index 992badc5e..4ec6e6f1c 100644 --- a/cdci_data_analysis/configurer.py +++ b/cdci_data_analysis/configurer.py @@ -247,6 +247,9 @@ def __init__(self, cfg_dict, origin=None): disp_dict['email_options'].get('incident_report_email_options', {}).get('incident_report_receivers_email_addresses', None), disp_dict.get('matrix_options', {}).get('matrix_server_url', None), disp_dict.get('matrix_options', {}).get('matrix_sender_access_token', None), + disp_dict.get('matrix_options', {}).get('matrix_bcc_receivers_room_ids', []), + disp_dict.get('matrix_options', {}).get('incident_report_matrix_options', {}).get('matrix_incident_report_receivers_room_ids', []), + disp_dict.get('matrix_options', {}).get('incident_report_matrix_options', {}).get('matrix_incident_report_sender_personal_access_token', None), disp_dict.get('matrix_options', {}).get('matrix_message_sending_job_submitted', True), disp_dict.get('matrix_options', {}).get('matrix_message_sending_job_submitted_default_interval', 1800), disp_dict.get('matrix_options', {}).get('sentry_for_matrix_message_sending_check', False), @@ -322,6 +325,9 @@ def set_conf_dispatcher(self, incident_report_receivers_email_addresses, matrix_server_url, matrix_sender_access_token, + matrix_bcc_receivers_room_ids, + matrix_incident_report_receivers_room_ids, + matrix_incident_report_sender_personal_access_token, matrix_message_sending_job_submitted, matrix_message_sending_job_submitted_default_interval, sentry_for_matrix_message_sending_check, @@ -369,6 +375,9 @@ def set_conf_dispatcher(self, self.incident_report_receivers_email_addresses = incident_report_receivers_email_addresses self.matrix_server_url = matrix_server_url self.matrix_sender_access_token = matrix_sender_access_token + self.matrix_bcc_receivers_room_ids = matrix_bcc_receivers_room_ids + self.matrix_incident_report_receivers_room_ids = matrix_incident_report_receivers_room_ids + self.matrix_incident_report_sender_personal_access_token = matrix_incident_report_sender_personal_access_token self.matrix_message_sending_job_submitted = matrix_message_sending_job_submitted self.matrix_message_sending_job_submitted_default_interval = matrix_message_sending_job_submitted_default_interval self.sentry_for_matrix_message_sending_check = sentry_for_matrix_message_sending_check diff --git a/cdci_data_analysis/flask_app/dispatcher_query.py b/cdci_data_analysis/flask_app/dispatcher_query.py index d498041e7..0a879f6f4 100644 --- a/cdci_data_analysis/flask_app/dispatcher_query.py +++ b/cdci_data_analysis/flask_app/dispatcher_query.py @@ -1045,22 +1045,27 @@ def run_call_back(self, status_kw_name='action') -> typing.Tuple[str, typing.Uni is_email_to_send = False is_message_to_send = False try: - step = 'checking if an email can be sent' + step = 'checking if a message can be sent via matrix' is_message_to_send = matrix_helper.is_message_to_send_callback(status, - time_original_request, - self.scratch_dir, - self.app.config['conf'], - self.job_id, - decoded_token=self.decoded_token) + time_original_request, + self.scratch_dir, + self.app.config['conf'], + self.job_id, + decoded_token=self.decoded_token) except matrix_helper.MultipleDoneMatrixMessage as e: job.write_dataserver_status(status_dictionary_value=status, full_dict=self.par_dic, matrix_message_status='attempted repeated sending of matrix message detected') logging.warning(f'attempted repeated sending of completion matrix message detected: {e}') sentry.capture_message(f'attempted repeated sending of completion matrix message detected: {e}') + except MissingRequestParameter as e: + job.write_dataserver_status(status_dictionary_value=status, + full_dict=self.par_dic, + call_back_status=f'parameter missing when checking if a message could be sent via matrix: {e.message}') + logging.warning(f'parameter missing when checking if a message could be sent via matrix: {e.message}') try: - step = 'checking if a message can be sent via matrix' + step = 'checking if an email can be sent' is_email_to_send = email_helper.is_email_to_send_callback(self.logger, status, time_original_request, @@ -1074,6 +1079,12 @@ def run_call_back(self, status_kw_name='action') -> typing.Tuple[str, typing.Uni email_status='attempted repeated sending of completion email detected') logging.warning(f'attempted repeated sending of completion email detected: {e}') sentry.capture_message(f'attempted repeated sending of completion email detected: {e}') + except MissingRequestParameter as e: + job.write_dataserver_status(status_dictionary_value=status, + full_dict=self.par_dic, + call_back_status=f'parameter missing when checking if an email could be sent: {e.message}') + logging.warning(f'parameter missing when checking if an email could be sent: {e.message}') + try: if is_email_to_send or is_message_to_send: step = 'extracting the original request dictionary' @@ -1151,12 +1162,6 @@ def run_call_back(self, status_kw_name='action') -> typing.Tuple[str, typing.Uni logging.warning(f'matrix message sending failed: {e}') sentry.capture_message(f'sending matrix message failed {e.message}') - except MissingRequestParameter as e: - job.write_dataserver_status(status_dictionary_value=status, - full_dict=self.par_dic, - call_back_status=f'parameter missing when sending a message via matrix: {e.message}') - logging.warning(f'parameter missing when sending a message via matrix: {e.message}') - try: # TODO for a future implementation # self.validate_job_id() @@ -1199,11 +1204,6 @@ def run_call_back(self, status_kw_name='action') -> typing.Tuple[str, typing.Uni logging.warning(f'email sending failed: {e}') sentry.capture_message(f'sending email failed {e}') - except MissingRequestParameter as e: - job.write_dataserver_status(status_dictionary_value=status, - full_dict=self.par_dic, - call_back_status=f'parameter missing when sending an email: {e.message}') - logging.warning(f'parameter missing when sending an email: {e.message}') # TODO for a future implementation # except RequestNotAuthorized as e: # job.write_dataserver_status(status_dictionary_value=status, @@ -1968,11 +1968,6 @@ def run_query(self, off_line=False, disp_conf=None): query_out.set_status_field('matrix_message_status_details', e.payload) logging.warning(f'matrix message sending failed: {e}') sentry.capture_message(f'sending matrix message failed {e.message}') - except MissingRequestParameter as e: - query_out.set_status_field('matrix_message_status', 'sending matrix message failed') - query_out.set_status_field('matrix_message_status_details', e.payload) - logging.warning(f'matrix message sending failed: {e}') - sentry.capture_message(f'sending matrix message failed {e.message}') if email_helper.is_email_to_send_run_query(self.logger, query_new_status, diff --git a/cdci_data_analysis/pytest_fixtures.py b/cdci_data_analysis/pytest_fixtures.py index 3f1a4c5fc..3f8119714 100644 --- a/cdci_data_analysis/pytest_fixtures.py +++ b/cdci_data_analysis/pytest_fixtures.py @@ -564,6 +564,10 @@ def dispatcher_test_conf_with_matrix_options_fn(dispatcher_test_conf_fn): f.write('\n matrix_options:' '\n matrix_server_url: "https://matrix-client.matrix.org/"' f'\n matrix_sender_access_token: "{os.getenv("MATRIX_SENDER_ACCESS_TOKEN", "matrix_sender_access_token")}"' + f'\n matrix_bcc_receivers_room_ids: ["{os.getenv("MATRIX_CC_RECEIVER_ROOM_ID", "")}"]' + '\n incident_report_matrix_options:' + f'\n matrix_incident_report_receivers_room_ids: ["{os.getenv("MATRIX_INCIDENT_REPORT_RECEIVER_ROOM_ID", "matrix_incident_report_receivers_room_ids")}"]' + f'\n matrix_incident_report_sender_personal_access_token: "{os.getenv("MATRIX_INCIDENT_REPORT_SENDER_PERSONAL_ACCESS_TOKEN", "matrix_incident_report_sender_personal_access_token")}"' '\n matrix_message_sending_job_submitted: True' '\n matrix_message_sending_job_submitted_default_interval: 5' '\n sentry_for_matrix_message_sending_check: False' @@ -573,6 +577,11 @@ def dispatcher_test_conf_with_matrix_options_fn(dispatcher_test_conf_fn): yield fn +@pytest.fixture +def dispatcher_no_bcc_matrix_room_ids(monkeypatch): + monkeypatch.delenv('MATRIX_CC_RECEIVER_ROOM_ID', raising=False) + + @pytest.fixture def dispatcher_test_conf_with_gallery_no_resolver_fn(dispatcher_test_conf_fn): fn = "test-dispatcher-conf-with-gallery.yaml" diff --git a/tests/conftest.py b/tests/conftest.py index be7962151..e8a35efa5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ dispatcher_long_living_fixture, gunicorn_dispatcher_long_living_fixture, dispatcher_long_living_fixture_with_matrix_options, + dispatcher_no_bcc_matrix_room_ids, gunicorn_dispatcher_long_living_fixture_with_matrix_options, dispatcher_test_conf, dispatcher_test_conf_with_gallery, diff --git a/tests/test_matrix_messages.py b/tests/test_matrix_messages.py index 4226e2cd4..7fcf81b61 100644 --- a/tests/test_matrix_messages.py +++ b/tests/test_matrix_messages.py @@ -102,7 +102,6 @@ def validate_incident_matrix_message_content( message_record, room_id:str, event_id:str, - user_id:str, dispatcher_job_state: DispatcherJobState, incident_time_str: str = None, incident_report_str: str = None, @@ -121,7 +120,6 @@ def validate_incident_matrix_message_content( ) assert message_record['room_id'] == room_id - assert message_record['user_id'] == user_id assert message_record['type'] == 'm.room.message' assert message_record['event_id'] == event_id @@ -192,8 +190,9 @@ def adapt_html(html_content, patterns=None, **matrix_message_args,): @pytest.mark.test_matrix @pytest.mark.parametrize("default_values", [True, False]) @pytest.mark.parametrize("time_original_request_none", [False]) -@pytest.mark.parametrize("request_cred", ['public', 'private', 'private-no-matrix-message']) +@pytest.mark.parametrize("request_cred", ['public', 'private', 'private-no-matrix-message', 'private-no-room-id']) def test_matrix_message_run_analysis_callback(gunicorn_dispatcher_long_living_fixture_with_matrix_options, + dispatcher_test_conf_with_matrix_options, dispatcher_local_matrix_message_server, default_values, request_cred, time_original_request_none): DataServerQuery.set_status('submitted') @@ -233,6 +232,8 @@ def test_matrix_message_run_analysis_callback(gunicorn_dispatcher_long_living_fi token_payload['mxdone'] = False token_payload['mxfail'] = False expect_matrix_message = False + elif request_cred == 'private-no-room-id': + token_payload.pop('mxroomid', None) encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256') @@ -284,20 +285,49 @@ def test_matrix_message_run_analysis_callback(gunicorn_dispatcher_long_living_fi # matrix message not supposed to be sent for public request assert 'matrix_message_status' not in jdata else: + assert 'matrix_message_status' in jdata['exit_status'] assert jdata['exit_status']['matrix_message_status'] == 'matrix message sent' assert 'matrix_message_status_details' in jdata['exit_status'] - matrix_message_event_id_obj = json.loads(jdata['exit_status']['matrix_message_status_details']) - assert 'event_id' in matrix_message_event_id_obj['res_content'] + matrix_message_status_details_obj = json.loads(jdata['exit_status']['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + matrix_user_message_event_id = \ + matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record( + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id), + 'submitted', + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + variation_suffixes=["dummy"], + time_request_str=time_request_str, + request_params=dict_param, + products_url=products_url, + dispatcher_live_fixture=None, + require_reference_matrix_message=True + ) - matrix_message_event_id_obj = matrix_message_event_id_obj['res_content']['event_id'] + assert 'res_content_bcc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_bcc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_bcc_users']) == 1 + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_bcc_users'][0] + + matrix_bcc_message_event_id_obj = matrix_message_status_details_obj['res_content']['res_content_bcc_users'][0]['event_id'] validate_matrix_message_content( - dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj), + dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=dispatcher_test_conf_with_matrix_options['matrix_options']['matrix_bcc_receivers_room_ids'][0], + event_id=matrix_bcc_message_event_id_obj), 'submitted', - room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj, + room_id=dispatcher_test_conf_with_matrix_options['matrix_options']['matrix_bcc_receivers_room_ids'][0], + event_id=matrix_bcc_message_event_id_obj, user_id=token_payload['user_id'], dispatcher_job_state=dispatcher_job_state, variation_suffixes=["dummy"], @@ -428,19 +458,47 @@ def test_matrix_message_run_analysis_callback(gunicorn_dispatcher_long_living_fi assert 'matrix_message_status' in jdata assert jdata['matrix_message_status'] == 'matrix message sent' assert 'matrix_message_status_details' in jdata - matrix_message_event_id_obj = json.loads(jdata['matrix_message_status_details']) - assert 'event_id' in matrix_message_event_id_obj['res_content'] + matrix_message_status_details_obj = json.loads(jdata['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + assert 'res_content_bcc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_bcc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_bcc_users']) == 1 + + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_token_user'] + matrix_user_message_event_id = \ + matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record( + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id), + 'done', + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + time_request_str=time_request_str, + dispatcher_live_fixture=server, + require_reference_matrix_message=True + ) + + matrix_bcc_message_event_id_obj = matrix_message_status_details_obj['res_content']['res_content_bcc_users'][0][ + 'event_id'] - matrix_message_event_id_obj = matrix_message_event_id_obj['res_content']['event_id'] # check the matrix message in the matrix message folders, and that the first one was produced dispatcher_job_state.assert_matrix_message(state="done") validate_matrix_message_content( - dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj), + dispatcher_local_matrix_message_server.get_matrix_message_record( + room_id=dispatcher_test_conf_with_matrix_options['matrix_options']['matrix_bcc_receivers_room_ids'][0], + event_id=matrix_bcc_message_event_id_obj), 'done', - room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj, + room_id=dispatcher_test_conf_with_matrix_options['matrix_options']['matrix_bcc_receivers_room_ids'][0], + event_id=matrix_bcc_message_event_id_obj, user_id=token_payload['user_id'], dispatcher_job_state=dispatcher_job_state, time_request_str=time_request_str, @@ -473,29 +531,37 @@ def test_matrix_message_run_analysis_callback(gunicorn_dispatcher_long_living_fi assert 'matrix_message_status' in jdata assert jdata['matrix_message_status'] == 'matrix message sent' assert 'matrix_message_status_details' in jdata - matrix_message_event_id_obj = json.loads(jdata['matrix_message_status_details']) - assert 'event_id' in matrix_message_event_id_obj['res_content'] - - matrix_message_event_id_obj = matrix_message_event_id_obj['res_content']['event_id'] - - # check the matrix message in the matrix message folders, and that the first one was produced - if default_values or time_original_request_none: - dispatcher_job_state.assert_matrix_message('failed', comment="expected one matrix message in total, failed") + matrix_message_status_details_obj = json.loads(jdata['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + assert 'res_content_bcc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_bcc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_bcc_users']) == 1 + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] else: - dispatcher_job_state.assert_matrix_message('failed', comment="expected two matrix message in total, second failed") - - validate_matrix_message_content( - dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj), - 'failed', - room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj, - user_id=token_payload['user_id'], - dispatcher_job_state=dispatcher_job_state, - time_request_str=time_request_str, - dispatcher_live_fixture=server, - require_reference_matrix_message=True - ) + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_token_user'] + + matrix_message_event_id_obj = matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + # check the matrix message in the matrix message folders, and that the first one was produced + if default_values or time_original_request_none: + dispatcher_job_state.assert_matrix_message('failed', comment="expected one matrix message in total, failed") + else: + dispatcher_job_state.assert_matrix_message('failed', comment="expected two matrix message in total, second failed") + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=token_payload['mxroomid'], + event_id=matrix_message_event_id_obj), + 'failed', + room_id=token_payload['mxroomid'], + event_id=matrix_message_event_id_obj, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + time_request_str=time_request_str, + dispatcher_live_fixture=server, + require_reference_matrix_message=True + ) @pytest.mark.test_matrix @@ -542,8 +608,14 @@ def test_matrix_message_submitted_same_job(dispatcher_live_fixture_with_matrix_o assert 'matrix_message_status' in jdata['exit_status'] assert jdata['exit_status']['matrix_message_status'] == 'matrix message sent' assert 'matrix_message_status_details' in jdata['exit_status'] - matrix_message_event_id_obj = json.loads(jdata['exit_status']['matrix_message_status_details']) - assert 'event_id' in matrix_message_event_id_obj['res_content'] + matrix_message_status_details_obj = json.loads(jdata['exit_status']['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_token_user'] + assert 'res_content_bcc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_bcc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_bcc_users']) == 1 + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_bcc_users'][0] time_request = jdata['time_request'] time_request_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(time_request))) @@ -953,15 +1025,15 @@ def test_status_details_matrix_message_done(gunicorn_dispatcher_long_living_fixt job_id=dispatcher_job_state.job_id, token=encoded_token) - matrix_message_event_id_obj = matrix_message_event_id_obj['res_content']['event_id'] + matrix_message_event_id = matrix_message_event_id_obj['res_content']['res_content_token_user']['event_id'] # check the email in the log files validate_matrix_message_content( dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj), + event_id=matrix_message_event_id), 'done', room_id=token_payload['mxroomid'], - event_id=matrix_message_event_id_obj, + event_id=matrix_message_event_id, user_id=token_payload['user_id'], dispatcher_job_state=dispatcher_job_state, variation_suffixes=["failing"], @@ -1044,6 +1116,7 @@ def test_matrix_message_and_email(gunicorn_dispatcher_long_living_fixture_with_m @pytest.mark.test_matrix @pytest.mark.parametrize("request_cred", ['public', 'valid_token', 'invalid_token']) def test_incident_report(dispatcher_live_fixture_with_matrix_options, + dispatcher_test_conf_with_matrix_options, dispatcher_local_matrix_message_server, dispatcher_test_conf, request_cred): @@ -1112,17 +1185,360 @@ def test_incident_report(dispatcher_live_fixture_with_matrix_options, assert jdata_incident_report['martix_message_report_status'] == 'incident report message successfully sent via matrix' assert 'martix_message_report_status_details' in jdata_incident_report assert 'res_content' in jdata_incident_report['martix_message_report_status_details'] - assert 'event_id' in jdata_incident_report['martix_message_report_status_details']['res_content'] - matrix_message_event_id = jdata_incident_report['martix_message_report_status_details']['res_content']['event_id'] + assert 'res_content_incident_reports' in jdata_incident_report['martix_message_report_status_details']['res_content'] + assert len(jdata_incident_report['martix_message_report_status_details']['res_content']['res_content_incident_reports']) == 1 + assert 'event_id' in jdata_incident_report['martix_message_report_status_details']['res_content']['res_content_incident_reports'][0] + matrix_message_event_id = jdata_incident_report['martix_message_report_status_details']['res_content']['res_content_incident_reports'][0]['event_id'] validate_incident_matrix_message_content( - dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=decoded_token['mxroomid'], + dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=dispatcher_test_conf_with_matrix_options['matrix_options']['incident_report_matrix_options']['matrix_incident_report_receivers_room_ids'][0], event_id=matrix_message_event_id), event_id=matrix_message_event_id, - room_id=decoded_token['mxroomid'], - user_id=decoded_token['user_id'], + room_id=dispatcher_test_conf_with_matrix_options['matrix_options']['incident_report_matrix_options']['matrix_incident_report_receivers_room_ids'][0], dispatcher_job_state=dispatcher_job_state, incident_time_str=time_request_str, incident_report_str=incident_content, decoded_token=decoded_token - ) \ No newline at end of file + ) + + +@pytest.mark.test_matrix +@pytest.mark.parametrize("default_values", [True, False]) +@pytest.mark.parametrize("time_original_request_none", [False]) +@pytest.mark.parametrize("request_cred", ['public', 'private', 'private-no-matrix-message', 'private-no-room-id']) +def test_matrix_message_run_analysis_callback_no_room_ids(dispatcher_no_bcc_matrix_room_ids, + gunicorn_dispatcher_long_living_fixture_with_matrix_options, + dispatcher_test_conf_with_matrix_options, + dispatcher_local_matrix_message_server, + default_values, request_cred, time_original_request_none): + DataServerQuery.set_status('submitted') + + server = gunicorn_dispatcher_long_living_fixture_with_matrix_options + + DispatcherJobState.remove_scratch_folders() + + token_none = (request_cred == 'public') + + expect_matrix_message = True + token_payload = {**default_token_payload, + "tmx": 0, + "tem": 0, + "mxstout": True, + "mxintsub": 5, + "mxsub": True, + "mssub": False, + "msdone": False, + "msfail": False, + "mxroomid": dispatcher_local_matrix_message_server.room_id + } + + if token_none: + encoded_token = None + else: + # let's generate a valid token with high threshold + + if default_values: + token_payload.pop('tmx') + token_payload.pop('mxstout') + token_payload.pop('mxsub') + token_payload.pop('mxintsub') + + if request_cred == 'private-no-matrix-message': + token_payload['mxsub'] = False + token_payload['mxdone'] = False + token_payload['mxfail'] = False + expect_matrix_message = False + elif request_cred == 'private-no-room-id': + token_payload.pop('mxroomid', None) + + encoded_token = jwt.encode(token_payload, secret_key, algorithm='HS256') + + dict_param = dict( + query_status="new", + query_type="Real", + instrument="empty-async", + product_type="dummy", + token=encoded_token + ) + + # this should return status submitted, so matrix message sent + c = requests.get(os.path.join(server, "run_analysis"), + dict_param + ) + assert c.status_code == 200 + jdata = c.json() + + session_id = jdata['session_id'] + job_id = jdata['job_monitor']['job_id'] + + logger.info("response from run_analysis: %s", json.dumps(jdata, indent=4)) + dispatcher_job_state = DispatcherJobState.from_run_analysis_response(c.json()) + + assert jdata['query_status'] == "submitted" + + completed_dict_param = {**dict_param, + 'use_scws': 'no', + 'src_name': '1E 1740.7-2942', + 'RA': 265.97845833, + 'DEC': -29.74516667, + 'T1': '2017-03-06T13:26:48.000', + 'T2': '2017-03-06T15:32:27.000', + 'T_format': 'isot' + } + + products_url = DispatcherJobState.get_expected_products_url(completed_dict_param, + token=encoded_token, + session_id=session_id, + job_id=job_id) + assert jdata['exit_status']['job_status'] == 'submitted' + # get the original time the request was made + assert 'time_request' in jdata + # set the time the request was initiated + time_request = jdata['time_request'] + time_request_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(float(time_request))) + + if token_none or not expect_matrix_message: + # matrix message not supposed to be sent for public request + assert 'matrix_message_status' not in jdata + else: + + assert 'matrix_message_status' in jdata['exit_status'] + assert jdata['exit_status']['matrix_message_status'] == 'matrix message sent' + assert 'matrix_message_status_details' in jdata['exit_status'] + matrix_message_status_details_obj = json.loads(jdata['exit_status']['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + matrix_user_message_event_id = \ + matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record( + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id), + 'submitted', + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + variation_suffixes=["dummy"], + time_request_str=time_request_str, + request_params=dict_param, + products_url=products_url, + dispatcher_live_fixture=None, + require_reference_matrix_message=True + ) + + assert 'res_content_bcc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_bcc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_bcc_users']) == 0 + + # for the call_back(s) in case the time of the original request is not provided + if time_original_request_none: + time_request = None + time_request_str = 'None' + + for i in range(5): + # imitating what a backend would do + current_action = 'progress' if i > 2 else 'main_done' + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action=current_action, + node_id=f'node_{i}', + message='progressing', + token=encoded_token, + time_original_request=time_request + )) + assert dispatcher_job_state.load_job_state_record(f'node_{i}', "progressing")['full_report_dict'][ + 'action'] == current_action + + c = requests.get(os.path.join(server, "run_analysis"), + params=dict( + query_status="submitted", # whether query is new or not, this should work + query_type="Real", + instrument="empty-async", + product_type="dummy", + async_dispatcher=False, + session_id=dispatcher_job_state.session_id, + job_id=dispatcher_job_state.job_id, + token=encoded_token + )) + assert c.json()['query_status'] == 'progress' # always progress! + + # we should now find progress records + c = requests.get(os.path.join(server, "run_analysis"), + {**dict_param, + "query_status": "submitted", + "job_id": job_id, + "session_id": session_id, + } + ) + + assert c.status_code == 200 + jdata = c.json() + + assert len(jdata['job_monitor']['full_report_dict_list']) == 6 + assert [c['action'] for c in jdata['job_monitor']['full_report_dict_list']] == [ + 'main_done', 'main_done', 'main_done', 'progress', 'progress', 'progress'] + + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action='main_incorrect_status', + node_id=f'node_{i + 1}', + message='progressing', + token=encoded_token, + time_original_request=time_request + )) + assert c.status_code == 200 + + c = requests.get(os.path.join(server, "run_analysis"), + { + **dict_param, + "query_status": "submitted", + "job_id": job_id, + "session_id": session_id, + } + ) + assert c.status_code == 200 + assert c.json()['query_status'] == 'progress' + + # this does nothing special + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action='ready', + node_id='node_ready', + message='ready', + token=encoded_token, + time_original_request=time_request + )) + + DataServerQuery.set_status('done') + + # this triggers a message via matrix + c = requests.get(os.path.join(server, "call_back"), + params=dict( + job_id=dispatcher_job_state.job_id, + session_id=dispatcher_job_state.session_id, + instrument_name="empty-async", + action='done', + node_id='node_final', + message='done', + token=encoded_token, + time_original_request=time_request + )) + + assert c.status_code == 200 + + jdata = dispatcher_job_state.load_job_state_record('node_final', 'done') + + if token_none or not expect_matrix_message: + assert 'matrix_message_status' not in jdata + + elif time_original_request_none: + assert 'matrix_message_status' in jdata + + elif default_values: + assert 'matrix_message_status' not in jdata + + else: + assert 'matrix_message_status' in jdata + assert jdata['matrix_message_status'] == 'matrix message sent' + assert 'matrix_message_status_details' in jdata + matrix_message_status_details_obj = json.loads(jdata['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + assert 'res_content_bcc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_bcc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_bcc_users']) == 0 + + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_token_user'] + matrix_user_message_event_id = \ + matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record( + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id), + 'done', + room_id=token_payload['mxroomid'], + event_id=matrix_user_message_event_id, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + time_request_str=time_request_str, + dispatcher_live_fixture=server, + require_reference_matrix_message=True + ) + + # check the matrix message in the matrix message folders, and that the first one was produced + dispatcher_job_state.assert_matrix_message(state="done") + + # this also triggers matrix message (simulate a failed request) + c = requests.get(os.path.join(server, "call_back"), + params={ + 'job_id': dispatcher_job_state.job_id, + 'session_id': dispatcher_job_state.session_id, + 'instrument_name': "empty-async", + 'action': 'failed', + 'node_id': 'node_failed', + 'message': 'failed', + 'token': encoded_token, + 'time_original_request': time_request + }) + + assert c.status_code == 200 + + jdata = dispatcher_job_state.load_job_state_record('node_failed', 'failed') + + if token_none or not expect_matrix_message: + # matrix message not supposed to be sent for public request + assert 'matrix_message_status' not in jdata + assert 'matrix_message_status_details' not in jdata + else: + assert 'matrix_message_status' in jdata + assert jdata['matrix_message_status'] == 'matrix message sent' + assert 'matrix_message_status_details' in jdata + matrix_message_status_details_obj = json.loads(jdata['matrix_message_status_details']) + assert 'res_content' in matrix_message_status_details_obj + assert 'res_content_bcc_users' in matrix_message_status_details_obj['res_content'] + assert isinstance(matrix_message_status_details_obj['res_content']['res_content_bcc_users'], list) + assert len(matrix_message_status_details_obj['res_content']['res_content_bcc_users']) == 0 + if request_cred == 'private-no-room-id': + assert 'res_content_token_user' not in matrix_message_status_details_obj['res_content'] + else: + assert 'res_content_token_user' in matrix_message_status_details_obj['res_content'] + assert 'event_id' in matrix_message_status_details_obj['res_content']['res_content_token_user'] + + matrix_message_event_id_obj = matrix_message_status_details_obj['res_content']['res_content_token_user']['event_id'] + + # check the matrix message in the matrix message folders, and that the first one was produced + if default_values or time_original_request_none: + dispatcher_job_state.assert_matrix_message('failed', comment="expected one matrix message in total, failed") + else: + dispatcher_job_state.assert_matrix_message('failed', comment="expected two matrix message in total, second failed") + + validate_matrix_message_content( + dispatcher_local_matrix_message_server.get_matrix_message_record(room_id=token_payload['mxroomid'], + event_id=matrix_message_event_id_obj), + 'failed', + room_id=token_payload['mxroomid'], + event_id=matrix_message_event_id_obj, + user_id=token_payload['user_id'], + dispatcher_job_state=dispatcher_job_state, + time_request_str=time_request_str, + dispatcher_live_fixture=server, + require_reference_matrix_message=True + )