diff --git a/big_tests/rebar.config b/big_tests/rebar.config index 98226512c0..2a14913dfa 100644 --- a/big_tests/rebar.config +++ b/big_tests/rebar.config @@ -16,7 +16,8 @@ {proper, "1.4.0"}, {gun, "2.1.0"}, {fusco, "0.1.1"}, - {escalus, "4.2.12"}, + {escalus, {git, "git@github.com:esl/escalus.git", {branch, "ws-stream-management"}}}, +% {escalus, "4.2.12"}, {cowboy, "2.12.0"}, {csv, "3.0.3", {pkg, csve}}, {amqp_client, "3.12.14"}, diff --git a/big_tests/rebar.lock b/big_tests/rebar.lock index e76f53c632..025395421a 100644 --- a/big_tests/rebar.lock +++ b/big_tests/rebar.lock @@ -8,7 +8,10 @@ {pkg,<<"credentials_obfuscation">>,<<"3.4.0">>}, 1}, {<<"csv">>,{pkg,<<"csve">>,<<"3.0.3">>},0}, - {<<"escalus">>,{pkg,<<"escalus">>,<<"4.2.12">>},0}, + {<<"escalus">>, + {git,"git@github.com:esl/escalus.git", + {ref,"f99d6efe1028f645d8a17c9ab3fb8e4adab69a76"}}, + 0}, {<<"esip">>,{pkg,<<"esip">>,<<"1.0.52">>},0}, {<<"exml">>,{pkg,<<"hexml">>,<<"3.4.1">>},0}, {<<"fast_pbkdf2">>,{pkg,<<"fast_pbkdf2">>,<<"1.0.5">>},2}, @@ -41,7 +44,6 @@ {<<"cowlib">>, <<"DB8F7505D8332D98EF50A3EF34B34C1AFDDEC7506E4EE4DD4A3A266285D282CA">>}, {<<"credentials_obfuscation">>, <<"34E18B126B3AEFD6E8143776FBE1CECEEA6792307C99AC5EE8687911F048CFD7">>}, {<<"csv">>, <<"69E7D9B3FDC72016644368762C6A3E6CBFEB85BCCADBF1BD99AB6C827E360E04">>}, - {<<"escalus">>, <<"D0390EB227D4B19F5EA85CAE722F3BD9891DE054EBF1C154078D5203C07F7972">>}, {<<"esip">>, <<"A2840287C493A4280E6FBA57A257706843B025C315875E38B03FD07190E22DBA">>}, {<<"exml">>, <<"9581FE6512D9772C61BBE611CD4A8E5BB90B4D4481275325EC520F7A931A9393">>}, {<<"fast_pbkdf2">>, <<"6045138C4C209FC8222A0B18B2CB1D7BD7407EF4ADAD0F14C5E0F7F4726E3E41">>}, @@ -73,7 +75,6 @@ {<<"cowlib">>, <<"E1E1284DC3FC030A64B1AD0D8382AE7E99DA46C3246B815318A4B848873800A4">>}, {<<"credentials_obfuscation">>, <<"738ACE0ED5545D2710D3F7383906FC6F6B582D019036E5269C4DBD85DBCED566">>}, {<<"csv">>, <<"741D1A55AABADAA3E0FE13051050101A73E90C4570B9F9403A939D9546813521">>}, - {<<"escalus">>, <<"BE52DAAEDE0D920F100BA1AAA43220ECA070F41E1BB52472C192F2973C9B424D">>}, {<<"esip">>, <<"6F00165395900500AA262CE0297162D93931C78C1464D89FD0EDC6E3D6BC011F">>}, {<<"exml">>, <<"D8E7894E2544402B4986EEB2443C15B51B14F686266F091DBF2777D1D99A2FA2">>}, {<<"fast_pbkdf2">>, <<"BC3B5A3CAB47AD114FF8BB815FEDE62A6187ACD14D8B37412F2AF8236A089CEF">>}, diff --git a/big_tests/tests/sasl_external_SUITE.erl b/big_tests/tests/sasl_external_SUITE.erl index 8d0c1f1e32..b1d6c1d0de 100644 --- a/big_tests/tests/sasl_external_SUITE.erl +++ b/big_tests/tests/sasl_external_SUITE.erl @@ -381,11 +381,10 @@ generate_user(C, User, Transport) -> {versions, ['tlsv1.2']}, {certfile, maps:get(cert, UserCert)}, {keyfile, maps:get(key, UserCert)}]}], - Common ++ transport_specific_options(Transport) - ++ [{port, ct:get_config({hosts, mim, c2s_port})}]. + Common ++ transport_specific_options(Transport). transport_specific_options(escalus_tcp) -> - [{starttls, required}]; + [{starttls, required}, {port, ct:get_config({hosts, mim, c2s_port})}]; transport_specific_options(_) -> [{port, ct:get_config({hosts, mim, cowboy_secure_port})}, {ssl, true}]. diff --git a/big_tests/tests/sm_SUITE.erl b/big_tests/tests/sm_SUITE.erl index 31ce2768ca..e04376d113 100644 --- a/big_tests/tests/sm_SUITE.erl +++ b/big_tests/tests/sm_SUITE.erl @@ -51,17 +51,37 @@ suite() -> require_rpc_nodes([mim]) ++ escalus:suite(). all() -> - ct_helper:groups_to_all(groups()) ++ [ping_timeout]. + [{group, ws_tests}, + {group, tcp_tests}]. groups() -> [ - {parallel, [parallel], parallel_cases()}, + {ws_tests, [], ws_tests()}, + {tcp_tests, [], tcp_tests()}, + {parallel, [parallel], parallel_cases() ++ [aggressively_pipelined_resume]}, + {parallel_ws, [parallel], parallel_cases() ++ [aggressively_pipelined_resume_ws]}, {parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()}, {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, {stale_h, [], stale_h_cases()}, {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()} ]. +ws_tests() -> + [{group, parallel_ws}, + {group, parallel_manual_ack_freq_1}, + {group, manual_ack_freq_2}, + {group, stale_h}, + {group, parallel_unacknowledged_message_hook}, + ping_timeout]. + +tcp_tests() -> + [{group, parallel}, + {group, parallel_manual_ack_freq_1}, + {group, manual_ack_freq_2}, + {group, stale_h}, + {group, parallel_unacknowledged_message_hook}, + ping_timeout]. + parallel_cases() -> [server_announces_sm, server_enables_sm_before_session, @@ -89,7 +109,6 @@ parallel_cases() -> resume_session_kills_old_C2S_gracefully, carboncopy_works, carboncopy_works_after_resume, - aggressively_pipelined_resume, replies_are_processed_by_resumed_session, subscription_requests_are_buffered_properly, messages_are_properly_flushed_during_resumption]. @@ -131,16 +150,23 @@ parallel_unacknowledged_message_hook_cases() -> %%-------------------------------------------------------------------- init_per_suite(Config) -> - NewConfig = dynamic_modules:save_modules(host_type(), Config), - NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), - mongoose_helper:inject_module(?MODULE), - escalus:init_per_suite(NewConfigWithSM). + escalus:init_per_suite(Config). end_per_suite(Config) -> - escalus_fresh:clean(), - dynamic_modules:restore_modules(Config), escalus:end_per_suite(Config). +init_per_group(ws_tests, Config) -> + NewConfig = dynamic_modules:save_modules(host_type(), Config), + mongoose_helper:inject_module(?MODULE), + NewConfigWithSM = escalus_users:update_userspec(NewConfig, geralt, stream_management, true), + [{user, geralt} | NewConfigWithSM]; + +init_per_group(tcp_tests, Config) -> + NewConfig = dynamic_modules:save_modules(host_type(), Config), + mongoose_helper:inject_module(?MODULE), + NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), + [{user, alice} | NewConfigWithSM]; + init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook; Group =:= manual_ack_freq_long_session_timeout; Group =:= parallel_manual_ack_freq_1; @@ -157,6 +183,9 @@ init_per_group(Group, Config) -> dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), Config. +end_per_group(Group, Config) when Group =:= ws_tests; Group =:= tcp_tests -> + escalus_fresh:clean(), + dynamic_modules:restore_modules(Config); end_per_group(_Group, _Config) -> ok. @@ -216,6 +245,8 @@ required_modules(Scope, Name) -> required_sm_opts(group, parallel) -> #{ack_freq => never}; +required_sm_opts(group, parallel_ws) -> + #{ack_freq => never}; required_sm_opts(group, parallel_manual_ack_freq_1) -> #{ack_freq => 1, resume_timeout => ?LONG_TIMEOUT}; @@ -271,153 +302,153 @@ mod_ping_opts() -> %%-------------------------------------------------------------------- server_announces_sm(Config) -> - AliceSpec = escalus_fresh:freshen_spec(Config, alice), - {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec, + UserSpec = escalus_fresh:freshen_spec(Config, ?config(user, Config)), + {ok, #client{props = Props}, Features} = escalus_connection:start(UserSpec, [start_stream]), true = escalus_session:can_use_stream_management(Props, Features). server_enables_sm_before_session(Config) -> - connect_fresh(Config, alice, sm_after_bind). + connect_fresh(Config, ?config(user, Config), sm_after_bind). server_enables_sm_after_session(Config) -> - connect_fresh(Config, alice, sm_after_session). + connect_fresh(Config, ?config(user, Config), sm_after_session). server_returns_failed_after_start(Config) -> - Alice = connect_fresh(Config, alice, before_auth), - server_returns_failed(Alice). + User = connect_fresh(Config, ?config(user, Config), before_auth), + server_returns_failed(User). server_returns_failed_after_auth(Config) -> - Alice = connect_fresh(Config, alice, auth), - server_returns_failed(Alice). + User = connect_fresh(Config, ?config(user, Config), auth), + server_returns_failed(User). server_enables_resumption(Config) -> - Alice = connect_fresh(Config, alice, sr_presence), - escalus_connection:stop(Alice). + User = connect_fresh(Config, ?config(user, Config), sr_presence), + escalus_connection:stop(User). -server_returns_failed(Alice) -> - escalus_connection:send(Alice, escalus_stanza:enable_sm()), +server_returns_failed(User) -> + escalus_connection:send(User, escalus_stanza:enable_sm()), escalus:assert(is_sm_failed, [<<"unexpected-request">>], - escalus_connection:get_stanza(Alice, enable_sm_failed)). + escalus_connection:get_stanza(User, enable_sm_failed)). client_enables_sm_twice_fails_with_correct_error_stanza(Config) -> - Alice = connect_fresh(Config, alice, sm_before_session), - escalus_connection:send(Alice, escalus_stanza:enable_sm()), + User = connect_fresh(Config, ?config(user, Config), sm_before_session), + escalus_connection:send(User, escalus_stanza:enable_sm()), escalus:assert(is_sm_failed, [<<"unexpected-request">>], - escalus_connection:get_stanza(Alice, enable_sm_failed)), + escalus_connection:get_stanza(User, enable_sm_failed)), escalus:assert(is_stream_end, - escalus_connection:get_stanza(Alice, enable_sm_failed)), - true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). + escalus_connection:get_stanza(User, enable_sm_failed)), + true = escalus_connection:wait_for_close(User, timer:seconds(5)). session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) -> %% GIVEN USER WITH STREAM RESUMPTION ENABLED - Alice = connect_fresh(Config, alice, sr_presence), - SMH = escalus_connection:get_sm_h(Alice), + User = connect_fresh(Config, ?config(user, Config), sr_presence), + SMH = escalus_connection:get_sm_h(User), %% WHEN USER RESUMES SESSION FROM NEW CLIENT - Alice2 = connect_resume(Alice, SMH), - process_initial_stanza(Alice2), + User2 = connect_resume(User, SMH), + process_initial_stanza(User2), %% THEN: Old session is gracefully closed with the correct error stanza escalus:assert(is_stream_error, [<<"conflict">>, <<>>], - escalus_connection:get_stanza(Alice, close_old_stream)), + escalus_connection:get_stanza(User, close_old_stream)), escalus:assert(is_stream_end, - escalus_connection:get_stanza(Alice, close_old_stream)), - true = escalus_connection:wait_for_close(Alice, timer:seconds(5)), - true = escalus_connection:is_connected(Alice2), - escalus_connection:stop(Alice2). + escalus_connection:get_stanza(User, close_old_stream)), + true = escalus_connection:wait_for_close(User, timer:seconds(5)), + true = escalus_connection:is_connected(User2), + escalus_connection:stop(User2). session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) -> %% GIVEN USER WITH STREAM RESUMPTION ENABLED - Alice = connect_fresh(Config, alice, sr_presence), + User = connect_fresh(Config, ?config(user, Config), sr_presence), %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT - Alice2 = sm_helper:kill_and_connect_resume(Alice), - process_initial_stanza(Alice2), + User2 = sm_helper:kill_and_connect_resume(User), + process_initial_stanza(User2), %% THEN new session does not have any message rerouted - false = escalus_client:has_stanzas(Alice2), - true = escalus_connection:is_connected(Alice2), - escalus_connection:stop(Alice2). + false = escalus_client:has_stanzas(User2), + true = escalus_connection:is_connected(User2), + escalus_connection:stop(User2). basic_ack(Config) -> - Alice = connect_fresh(Config, alice, sm_after_session), - escalus_connection:send(Alice, escalus_stanza:roster_get()), + User = connect_fresh(Config, ?config(user, Config), sm_after_session), + escalus_connection:send(User, escalus_stanza:roster_get()), escalus:assert(is_roster_result, - escalus_connection:get_stanza(Alice, roster_result)), - escalus_connection:send(Alice, escalus_stanza:sm_request()), + escalus_connection:get_stanza(User, roster_result)), + escalus_connection:send(User, escalus_stanza:sm_request()), escalus:assert(is_sm_ack, - escalus_connection:get_stanza(Alice, stream_mgmt_ack)). + escalus_connection:get_stanza(User, stream_mgmt_ack)). %% Test that "h" value is valid when: %% - SM is enabled *before* the session is established %% - is sent *before* the session is established h_ok_before_session(Config) -> - Alice = connect_fresh(Config, alice, sm_after_bind), - escalus_connection:send(Alice, escalus_stanza:sm_request()), + User = connect_fresh(Config, ?config(user, Config), sm_after_bind), + escalus_connection:send(User, escalus_stanza:sm_request()), escalus:assert(is_sm_ack, [0], - escalus_connection:get_stanza(Alice, stream_mgmt_ack)). + escalus_connection:get_stanza(User, stream_mgmt_ack)). %% Test that "h" value is valid when: %% - SM is enabled *before* the session is established %% - is sent *after* the session is established h_ok_after_session_enabled_before_session(Config) -> - Alice = connect_fresh(Config, alice, sm_before_session), - escalus_connection:send(Alice, escalus_stanza:sm_request()), + User = connect_fresh(Config, ?config(user, Config), sm_before_session), + escalus_connection:send(User, escalus_stanza:sm_request()), escalus:assert(is_sm_ack, [1], - escalus_connection:get_stanza(Alice, stream_mgmt_ack)). + escalus_connection:get_stanza(User, stream_mgmt_ack)). %% Test that "h" value is valid when: %% - SM is enabled *after* the session is established %% - is sent *after* the session is established h_ok_after_session_enabled_after_session(Config) -> - Alice = connect_fresh(Config, alice, sm_after_session), - escalus_connection:send(Alice, escalus_stanza:roster_get()), + User = connect_fresh(Config, ?config(user, Config), sm_after_session), + escalus_connection:send(User, escalus_stanza:roster_get()), escalus:assert(is_roster_result, - escalus_connection:get_stanza(Alice, roster_result)), - escalus_connection:send(Alice, escalus_stanza:sm_request()), + escalus_connection:get_stanza(User, roster_result)), + escalus_connection:send(User, escalus_stanza:sm_request()), escalus:assert(is_sm_ack, [1], - escalus_connection:get_stanza(Alice, stream_mgmt_ack)). + escalus_connection:get_stanza(User, stream_mgmt_ack)). %% Test that "h" value is valid after exchanging a few messages. h_ok_after_a_chat(ConfigIn) -> - Config = escalus_users:update_userspec(ConfigIn, alice, + Config = escalus_users:update_userspec(ConfigIn, ?config(user, ConfigIn), stream_management, true), - escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) -> - escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), + escalus:fresh_story(Config, [{?config(user, Config), 1}, {bob,1}], fun(User, Bob) -> + escalus:send(User, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), escalus:assert(is_chat_message, [<<"Hi, Bob!">>], escalus:wait_for_stanza(Bob)), - escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), - escalus:assert(is_chat_message, [<<"Hi, Alice!">>], - escalus:wait_for_stanza(Alice)), - escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)), + escalus:send(Bob, escalus_stanza:chat_to(User, <<"Hi, User!">>)), + escalus:assert(is_chat_message, [<<"Hi, User!">>], + escalus:wait_for_stanza(User)), + escalus:send(Bob, escalus_stanza:chat_to(User, <<"How's life?">>)), escalus:assert(is_chat_message, [<<"How's life?">>], - escalus:wait_for_stanza(Alice)), - escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), + escalus:wait_for_stanza(User)), + escalus:send(User, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>], escalus:wait_for_stanza(Bob)), - escalus:send(Alice, escalus_stanza:sm_request()), - escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)), + escalus:send(User, escalus_stanza:sm_request()), + escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(User)), %% Ack, so that unacked messages don't go into offline store. - escalus:send(Alice, escalus_stanza:sm_ack(3)) + escalus:send(User, escalus_stanza:sm_ack(3)) end). h_non_given_closes_stream_gracefully(ConfigIn) -> AStanza = #xmlel{name = <<"a">>, attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]}, - Config = escalus_users:update_userspec(ConfigIn, alice, + Config = escalus_users:update_userspec(ConfigIn, ?config(user, ConfigIn), stream_management, true), - escalus:fresh_story(Config, [{alice,1}], fun(Alice) -> - C2SPid = mongoose_helper:get_session_pid(Alice), - escalus:send(Alice, AStanza), + escalus:fresh_story(Config, [{?config(user, Config), 1}], fun(User) -> + C2SPid = mongoose_helper:get_session_pid(User), + escalus:send(User, AStanza), escalus:assert(is_stream_error, [<<"policy-violation">>, <<>>], - escalus:wait_for_stanza(Alice)), + escalus:wait_for_stanza(User)), mongoose_helper:wait_for_pid_to_die(C2SPid), - escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), - true = escalus_connection:wait_for_close(Alice, timer:seconds(5)) + escalus:assert(is_stream_end, escalus_connection:get_stanza(User, stream_end)), + true = escalus_connection:wait_for_close(User, timer:seconds(5)) end). client_acks_more_than_sent(Config) -> - Alice = connect_fresh(Config, alice, sm_after_session), - escalus:send(Alice, escalus_stanza:sm_ack(5)), - StreamErrorStanza = escalus:wait_for_stanza(Alice), + User = connect_fresh(Config, ?config(user, Config), sm_after_session), + escalus:send(User, escalus_stanza:sm_ack(5)), + StreamErrorStanza = escalus:wait_for_stanza(User), %% Assert "undefined-condition" children escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza), %% Assert "handled-count-too-high" children with correct attributes @@ -428,21 +459,21 @@ client_acks_more_than_sent(Config) -> <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>), <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>), %% Assert graceful stream end - escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), - true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). + escalus:assert(is_stream_end, escalus_connection:get_stanza(User, stream_end)), + true = escalus_connection:wait_for_close(User, timer:seconds(5)). too_many_unacked_stanzas(Config) -> Bob = connect_fresh(Config, bob, presence), - Alice = connect_fresh(Config, alice, sm_presence, manual), - get_ack(Alice), - [escalus:send(Bob, escalus_stanza:chat_to(Alice, - <<(integer_to_binary(N))/binary, ": Hi, Alice!">>)) + User = connect_fresh(Config, ?config(user, Config), sm_presence, manual), + get_ack(User), + [escalus:send(Bob, escalus_stanza:chat_to(User, + <<(integer_to_binary(N))/binary, ": Hi, User!">>)) || N <- lists:seq(1,?SMALL_SM_BUFFER)], - escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests + escalus:wait_for_stanzas(User, ?SMALL_SM_BUFFER * 2), % messages and ack requests escalus:assert(is_stream_error, [<<"resource-constraint">>, <<"too many unacked stanzas">>], %% wait for deferred buffer check - escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). + escalus:wait_for_stanza(User, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). server_requests_ack(Config) -> server_requests_ack(Config, 1). @@ -452,126 +483,126 @@ server_requests_ack_freq_2(Config) -> server_requests_ack(Config, N) -> Bob = connect_fresh(Config, bob, presence), - Alice = connect_fresh(Config, alice, sm_presence, manual), + User = connect_fresh(Config, ?config(user, Config), sm_presence, manual), %% ack request after initial presence - maybe_assert_ack_request(1, N, Alice), - escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), - escalus:assert(is_chat_message, [<<"Hi, Alice!">>], - escalus:wait_for_stanza(Alice)), - maybe_assert_ack_request(2, N, Alice). + maybe_assert_ack_request(1, N, User), + escalus:send(Bob, escalus_stanza:chat_to(User, <<"Hi, User!">>)), + escalus:assert(is_chat_message, [<<"Hi, User!">>], + escalus:wait_for_stanza(User)), + maybe_assert_ack_request(2, N, User). -maybe_assert_ack_request(StanzasRec, AckRequests, Alice) -> +maybe_assert_ack_request(StanzasRec, AckRequests, User) -> ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]), case StanzasRec rem AckRequests of 0 -> - escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice)); + escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(User)); _ -> ok end, StanzasRec. server_requests_ack_after_session(Config) -> - Alice = connect_fresh(Config, alice, sm_before_session, manual), - escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)). + User = connect_fresh(Config, ?config(user, Config), sm_before_session, manual), + escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(User, stream_mgmt_req)). resend_more_offline_messages_than_buffer_size(Config) -> - %% connect bob and alice + %% connect bob and user Bob = connect_fresh(Config, bob, presence), - AliceSpec = escalus_fresh:create_fresh_user(Config, alice), + UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)), % sent some messages - more than unacked buffer size MessagesToSend = ?SMALL_SM_BUFFER + 1, - AliceJid = common_helper:get_bjid(AliceSpec), - [escalus_connection:send(Bob, escalus_stanza:chat_to(AliceJid, integer_to_binary(I))) + UserJid = common_helper:get_bjid(UserSpec), + [escalus_connection:send(Bob, escalus_stanza:chat_to(UserJid, integer_to_binary(I))) || I <- lists:seq(1, MessagesToSend)], - mongoose_helper:wait_for_n_offline_messages(AliceJid, MessagesToSend), + mongoose_helper:wait_for_n_offline_messages(UserJid, MessagesToSend), - % connect alice who wants to receive all messages from offline storage - Alice = connect_spec(AliceSpec, sm_after_session, manual), - send_initial_presence(Alice), - escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests + % connect user who wants to receive all messages from offline storage + User = connect_spec(UserSpec, sm_after_session, manual), + send_initial_presence(User), + escalus:wait_for_stanzas(User, MessagesToSend * 2), %messages and ack requests - escalus_connection:get_stanza(Alice, presence), - get_ack(Alice), % ack request + escalus_connection:get_stanza(User, presence), + get_ack(User), % ack request % confirm messages + presence - escalus_connection:send(Alice, escalus_stanza:sm_ack(4)), + escalus_connection:send(User, escalus_stanza:sm_ack(4)), % wait for check constraint message on server side ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000), - false = escalus_client:has_stanzas(Alice), + false = escalus_client:has_stanzas(User), % should not receive anything especially any stream errors - escalus_connection:stop(Alice), + escalus_connection:stop(User), escalus_connection:stop(Bob). resend_unacked_on_reconnection(Config) -> Texts = three_texts(), Bob = connect_fresh(Config, bob, presence), - Alice = connect_fresh(Config, alice, sm_presence), - AliceSpec = sm_helper:client_to_spec0(Alice), - %% Bob sends some messages to Alice. - sm_helper:send_messages(Bob, Alice, Texts), - %% Alice receives the messages. - sm_helper:wait_for_messages(Alice, Texts), - %% Alice disconnects without acking the messages. - sm_helper:stop_client_and_wait_for_termination(Alice), + User = connect_fresh(Config, ?config(user, Config), sm_presence), + UserSpec = sm_helper:client_to_spec0(User), + %% Bob sends some messages to User. + sm_helper:send_messages(Bob, User, Texts), + %% User receives the messages. + sm_helper:wait_for_messages(User, Texts), + %% User disconnects without acking the messages. + sm_helper:stop_client_and_wait_for_termination(User), %% Messages go to the offline store. - %% Alice receives the messages from the offline store. - NewAlice = connect_spec(AliceSpec, session, manual), - send_initial_presence(NewAlice), - sm_helper:wait_for_messages(NewAlice, Texts), - %% Alice acks the delayed messages so they won't go again + %% User receives the messages from the offline store. + NewUser = connect_spec(UserSpec, session, manual), + send_initial_presence(NewUser), + sm_helper:wait_for_messages(NewUser, Texts), + %% User acks the delayed messages so they won't go again %% to the offline store. - escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)). + escalus_connection:send(NewUser, escalus_stanza:sm_ack(3)). %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order %% TODO Test without wait_for_n_offline_messages. It would require changes in SM %% and more strict tests, reproducing delays in SM and in mod_offline. preserve_order(Config) -> - %% connect bob and alice + %% connect bob and user Bob = connect_fresh(Config, bob, presence), - Alice = connect_fresh(Config, alice, sr_presence, manual), - AliceSpec = sm_helper:client_to_spec(Alice), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)), + User = connect_fresh(Config, ?config(user, Config), sr_presence, manual), + UserSpec = sm_helper:client_to_spec(User), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"1">>)), - %% kill alice connection - escalus_connection:kill(Alice), - C2SPid = mongoose_helper:get_session_pid(Alice), + %% kill user connection + escalus_connection:kill(User), + C2SPid = mongoose_helper:get_session_pid(User), sm_helper:wait_until_resume_session(C2SPid), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"2">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"3">>)), - NewAlice = connect_spec(AliceSpec, session, manual), - escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])), + NewUser = connect_spec(UserSpec, session, manual), + escalus_connection:send(NewUser, escalus_stanza:enable_sm([resume])), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"4">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"5">>)), %% Without this check we will get stuff out of order - mongoose_helper:wait_for_n_offline_messages(NewAlice, 5), + mongoose_helper:wait_for_n_offline_messages(NewUser, 5), - send_initial_presence(NewAlice), + send_initial_presence(NewUser), %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered - mongoose_helper:wait_for_n_offline_messages(NewAlice, 0), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)), + mongoose_helper:wait_for_n_offline_messages(NewUser, 0), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"6">>)), %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm) - receive_all_ordered(NewAlice, 6), + receive_all_ordered(NewUser, 6), % replace connection - NewAlice2 = connect_spec(AliceSpec, session, manual), + NewUser2 = connect_spec(UserSpec, session, manual), % allow messages to go to the offline storage - mongoose_helper:wait_for_n_offline_messages(NewAlice, 6), + mongoose_helper:wait_for_n_offline_messages(NewUser, 6), - send_initial_presence(NewAlice2), + send_initial_presence(NewUser2), % receves messages in correct order - receive_all_ordered(NewAlice2, 6), + receive_all_ordered(NewUser2, 6), escalus_connection:stop(Bob), - escalus_connection:stop(NewAlice2). + escalus_connection:stop(NewUser2). receive_all_ordered(Conn, Last) -> receive_all_ordered(Conn, 1, Last, []). @@ -594,94 +625,83 @@ receive_all_ordered(Conn, N, Last, Acc) -> end. resend_unacked_after_resume_timeout(Config) -> - %% connect bob and alice + %% connect bob and user Bob = connect_fresh(Config, bob, presence), - Alice = connect_fresh(Config, alice, sr_presence), - AliceSpec = sm_helper:client_to_spec(Alice), + User = connect_fresh(Config, ?config(user, Config), sr_presence), + UserSpec = sm_helper:client_to_spec(User), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), - %% kill alice connection - escalus_connection:kill(Alice), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + %% kill user connection + escalus_connection:kill(User), %% ensure there is no session - C2SPid = mongoose_helper:get_session_pid(Alice), + C2SPid = mongoose_helper:get_session_pid(User), sm_helper:wait_until_resume_session(C2SPid), - %% alice come back and receives unacked message - NewAlice = connect_spec(AliceSpec, session), - send_initial_presence(NewAlice), + %% user come back and receives unacked message + NewUser = connect_spec(UserSpec, session), + send_initial_presence(NewUser), escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], - escalus:wait_for_stanzas(NewAlice, 2)), + escalus:wait_for_stanzas(NewUser, 2)), escalus_connection:stop(Bob), - escalus_connection:stop(NewAlice). + escalus_connection:stop(NewUser). ping_timeout(Config) -> %% make sure there are no leftover stanzas in the history ?assertEqual([], get_stanzas_filtered_by_mod_ping()), - %% connect Alice - Alice = connect_fresh(Config, alice, session), - - %% manually configure stream resumption to avoid ping stanzas - escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), - Pred = fun(Stanza) -> escalus_pred:is_sm_enabled([resume], Stanza) end, - SM = escalus_connection:receive_stanza(Alice, #{pred => Pred}), - SMH = escalus_connection:get_sm_h(Alice), - SMID = exml_query:attr(SM, <<"id">>), + %% connect User and wait for the session to close + User = connect_fresh(Config, ?config(user, Config), sr_presence), - escalus_client:wait_for_stanza(Alice), - ct:sleep(?PING_REQUEST_TIMEOUT + ?PING_INTERVAL + timer:seconds(1)), + escalus_client:wait_for_stanza(User), + % The additional time was 1 second. This sometimes resulted in connection termination, + % therefore the value was lowered to 0.5 seconds + ct:sleep(?PING_REQUEST_TIMEOUT + ?PING_INTERVAL + timer:seconds(0.5)), %% attempt to resume the session after the connection drop - sm_helper:kill_client_and_wait_for_termination(Alice), - NewAlice = connect_same(Alice, auth), - escalus_connection:send(NewAlice, escalus_stanza:resume(SMID, SMH)), + NewUser = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(User), %% after resume_timeout, we expect the session to be closed - escalus_connection:get_stanza(NewAlice, failed_resumption), + escalus_connection:get_stanza(NewUser, failed_resumption), %% bind a new session and expect unacknowledged messages to be resent - escalus_session:session(escalus_session:bind(NewAlice)), - send_initial_presence(NewAlice), - - %% check if the error stanza was handled by mod_ping - FilteredStanzas = get_stanzas_filtered_by_mod_ping(), - ?assertNotEqual(length(FilteredStanzas), 0), - Fun = fun(Stanza) -> - escalus:assert(is_iq_error, Stanza), - ?assertNotEqual(undefined, - exml_query:subelement_with_name_and_ns(Stanza, <<"ping">>, <<"urn:xmpp:ping">>)) - end, - lists:foreach(Fun, FilteredStanzas), - - %% kill the connection to avoid errors - escalus_connection:kill(NewAlice). + escalus_session:session(escalus_session:bind(NewUser)), + send_initial_presence(NewUser), + + %% Check if the error stanza was handled by mod_ping. + %% There is a slight chance of more than one error stanza, + %% so the processing happens in lists:foreach. + + check_stanzas_filtered_by_mod_ping(), + + %% stop the connection + escalus_connection:stop(NewUser). resume_expired_session_returns_correct_h(Config) -> - %% connect bob and alice + %% connect bob and user Bob = connect_fresh(Config, bob, sr_presence), - Alice = connect_fresh(Config, alice, sr_presence, manual), - get_ack(Alice), + User = connect_fresh(Config, ?config(user, Config), sr_presence, manual), + get_ack(User), - %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), - escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)), - %% alice comes back, but too late, so resumption doesn't work, + %% Bob sends a message to User, and User receives it but doesn't acknowledge + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(User)), + %% user comes back, but too late, so resumption doesn't work, %% but she receives the previous h = 1 anyway - %% NewAlice is also manual ack - NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice), - FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption), + %% NewUser is also manual ack + NewUser = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(User), + FailedResumption = escalus_connection:get_stanza(NewUser, failed_resumption), <<"1">> = exml_query:attr(FailedResumption, <<"h">>), %% And we can continue with bind and session - escalus_session:session(escalus_session:bind(NewAlice)), - send_initial_presence(NewAlice), - Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}), - escalus_connection:get_stanza(NewAlice, {msg, 2})], + escalus_session:session(escalus_session:bind(NewUser)), + send_initial_presence(NewUser), + Stanzas = [escalus_connection:get_stanza(NewUser, {msg, 1}), + escalus_connection:get_stanza(NewUser, {msg, 2})], escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), escalus_connection:stop(Bob), - escalus_connection:stop(NewAlice). + escalus_connection:stop(NewUser). gc_repeat_after_never_means_no_cleaning(Config) -> [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), @@ -704,120 +724,120 @@ resume_session_state_send_message_with_ack(Config) -> resume_session_state_send_message_generic(Config, ack). resume_session_state_send_message_generic(Config, AckInitialPresence) -> - %% connect bob and alice + %% connect bob and user Bob = connect_fresh(Config, bob, presence), - Alice = connect_fresh(Config, alice, sr_presence, manual), - maybe_ack_initial_presence(Alice, AckInitialPresence), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), - %% kill alice connection - C2SPid = mongoose_helper:get_session_pid(Alice), - escalus_connection:kill(Alice), + User = connect_fresh(Config, ?config(user, Config), sr_presence, manual), + maybe_ack_initial_presence(User, AckInitialPresence), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + %% kill user connection + C2SPid = mongoose_helper:get_session_pid(User), + escalus_connection:kill(User), sm_helper:wait_until_resume_session(C2SPid), - sm_helper:assert_alive_resources(Alice, 1), + sm_helper:assert_alive_resources(User, 1), %% send some messages and check if c2s can handle it - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), - %% suspend the process to ensure that Alice has enough time to reconnect, + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-2">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-3">>)), + %% suspend the process to ensure that User has enough time to reconnect, %% before resumption timeout occurs. ok = rpc(mim(), sys, suspend, [C2SPid]), - %% alice comes back and receives unacked message - NewAlice = connect_same(Alice, presence), + %% user comes back and receives unacked message + NewUser = connect_same(User, presence), %% now we can resume c2s process of the old connection %% and let it process session resumption timeout ok = rpc(mim(), sys, resume, [C2SPid]), - Stanzas = escalus:wait_for_stanzas(NewAlice, 3), + Stanzas = escalus:wait_for_stanzas(NewUser, 3), % what about order ? - % alice receive presence from herself and 3 unacked messages from bob + % user receive presence from herself and 3 unacked messages from bob escalus_new_assert:mix_match([is_chat(<<"msg-1">>), is_chat(<<"msg-2">>), is_chat(<<"msg-3">>)], Stanzas), escalus_connection:stop(Bob), - escalus_connection:stop(NewAlice). + escalus_connection:stop(NewUser). %%for instance it can be done by mod ping resume_session_state_stop_c2s(Config) -> Bob = connect_fresh(Config, bob, presence), - Alice = connect_fresh(Config, alice, sr_presence, manual), + User = connect_fresh(Config, ?config(user, Config), sr_presence, manual), - get_ack(Alice), - ack_initial_presence(Alice), + get_ack(User), + ack_initial_presence(User), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), - escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(User, msg)), %% get pid of c2s - C2SPid = mongoose_helper:get_session_pid(Alice), + C2SPid = mongoose_helper:get_session_pid(User), %% Wait c2s process to process our presence ack. %% Otherwise, we can receive two initial presences sometimes. sm_helper:wait_for_c2s_unacked_count(C2SPid, 1), - % kill alice connection - escalus_connection:kill(Alice), + % kill user connection + escalus_connection:kill(User), % session should be alive - sm_helper:assert_alive_resources(Alice, 1), + sm_helper:assert_alive_resources(User, 1), rpc(mim(), mongoose_c2s, stop, [C2SPid, normal]), sm_helper:wait_until_resume_session(C2SPid), - %% suspend the process to ensure that Alice has enough time to reconnect, + %% suspend the process to ensure that User has enough time to reconnect, %% before resumption timeout occurs. ok = rpc(mim(), sys, suspend, [C2SPid]), - %% alice comes back and receives unacked message - NewAlice = connect_same(Alice, presence, manual), + %% user comes back and receives unacked message + NewUser = connect_same(User, presence, manual), %% now we can resume c2s process of the old connection %% and let it process session resumption timeout ok = rpc(mim(), sys, resume, [C2SPid]), - escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)), + escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewUser, msg)), escalus_connection:stop(Bob), - escalus_connection:stop(NewAlice). + escalus_connection:stop(NewUser). %% This test only verifies the validity of helpers (get_session_pid, %% get_c2s_state_name) written for wait_for_resumption %% testcase. session_established(Config) -> - Alice = connect_fresh(Config, alice, presence), - C2SPid = mongoose_helper:get_session_pid(Alice), + User = connect_fresh(Config, ?config(user, Config), presence), + C2SPid = mongoose_helper:get_session_pid(User), session_established = mongoose_helper:get_c2s_state_name(C2SPid), - escalus_connection:stop(Alice). + escalus_connection:stop(User). %% Ensure that after a violent disconnection, %% the c2s waits for resumption (but don't resume yet). wait_for_resumption(Config) -> - AliceSpec = escalus_fresh:create_fresh_user(Config, alice), + UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)), Bob = connect_fresh(Config, bob, session), Texts = three_texts(), - {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), + {C2SPid, _} = buffer_unacked_messages_and_die(Config, UserSpec, Bob, Texts), sm_helper:wait_until_resume_session(C2SPid). unacknowledged_message_hook_filter(Config) -> FilterText = <<"filter">>, Bob = connect_fresh(Config, bob, presence), - AliceSpec = escalus_fresh:create_fresh_user(Config, alice), - Resource = proplists:get_value(username, AliceSpec), + UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)), + Resource = proplists:get_value(username, UserSpec), HookHandlerExtra = start_filter_hook_listener(FilterText, Resource), - Alice = connect_spec([{resource, Resource} | AliceSpec], sr_presence, manual), + User = connect_spec([{resource, Resource} | UserSpec], sr_presence, manual), %% Ack the presence stanza - get_ack(Alice), - ack_initial_presence(Alice), + get_ack(User), + ack_initial_presence(User), Messages = [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>, <<"msg-4">>], All = [<<"msg-1">>, FilterText, <<"msg-2">>, FilterText, <<"msg-3">>, <<"msg-4">>], - [ escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, Body)) || Body <- All ], - %% kill alice connection - C2SPid = mongoose_helper:get_session_pid(Alice), - escalus_connection:kill(Alice), + [ escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, Body)) || Body <- All ], + %% kill user connection + C2SPid = mongoose_helper:get_session_pid(User), + escalus_connection:kill(User), sm_helper:wait_until_resume_session(C2SPid), - sm_helper:assert_alive_resources(Alice, 1), + sm_helper:assert_alive_resources(User, 1), %% ensure second C2S is registered so all the messages are bounced properly - NewAlice = connect_spec([{resource, <<"2">>}| AliceSpec], sr_presence, manual), - send_initial_presence(NewAlice), - sm_helper:wait_for_resource_count(NewAlice, 2), + NewUser = connect_spec([{resource, <<"2">>}| UserSpec], sr_presence, manual), + send_initial_presence(NewUser), + sm_helper:wait_for_resource_count(NewUser, 2), ok = rpc(mim(), sys, terminate, [C2SPid, normal]), %% verify that the filtered message is never received - verify_no_receive_filtertext(NewAlice, FilterText, Messages), + verify_no_receive_filtertext(NewUser, FilterText, Messages), stop_hook_listener(HookHandlerExtra), escalus_connection:stop(Bob). @@ -842,87 +862,87 @@ verify_no_receive_filtertext(Client, FilterText, Messages) -> unacknowledged_message_hook_resume(Config) -> unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config). -unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) -> - NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), - send_initial_presence(NewAlice), - {Resource, NewAlice}. +unacknowledged_message_hook_resume(UserSpec, Resource, SMID, _C2SPid) -> + NewUser = connect_spec(UserSpec, {resume, SMID, 1}, manual), + send_initial_presence(NewUser), + {Resource, NewUser}. unacknowledged_message_hook_bounce(Config) -> unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config). -unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) -> +unacknowledged_message_hook_bounce(UserSpec, Resource, _SMID, C2SPid) -> NewResource = <<"new_", Resource/binary>>, - NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}), - NewAlice = connect_spec(NewSpec, sr_session, manual), - send_initial_presence(NewAlice), + NewSpec = lists:keystore(resource, 1, UserSpec, {resource, NewResource}), + NewUser = connect_spec(NewSpec, sr_session, manual), + send_initial_presence(NewUser), %% ensure second C2S is registered so all the messages are bounced properly - sm_helper:wait_for_resource_count(NewAlice, 2), + sm_helper:wait_for_resource_count(NewUser, 2), ok = rpc(mim(), sys, terminate, [C2SPid, normal]), - {NewResource, NewAlice}. + {NewResource, NewUser}. unacknowledged_message_hook_offline(Config) -> unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config). -unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) -> +unacknowledged_message_hook_offline(UserSpec, Resource, _SMID, C2SPid) -> C2SRef = erlang:monitor(process, C2SPid), sm_helper:wait_for_process_termination(C2SRef), %% reset the session, so old C2S process is stopped - NewAlice = connect_spec(AliceSpec, sr_session, manual), + NewUser = connect_spec(UserSpec, sr_session, manual), %% wait for old C2S termination before send presence. other way %% some of the latest unacknowledged messages can be bounced to %% the new C2S process instead of going to the mod_offline storage. %% looks like all the unacknowledged messages arrive to the new %% C2S, but the message sequence is broken (the bounced messages %% delivered before the messages from the mod_offline storage) - send_initial_presence(NewAlice), - {Resource, NewAlice}. + send_initial_presence(NewUser), + {Resource, NewUser}. unacknowledged_message_hook_common(RestartConnectionFN, Config) -> - %% connect bob and alice + %% connect bob and user Bob = connect_fresh(Config, bob, presence), - AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice), - Resource = proplists:get_value(username, AliceSpec0), - AliceSpec = [{resource, Resource} | AliceSpec0], + UserSpec0 = escalus_fresh:create_fresh_user(Config, ?config(user, Config)), + Resource = proplists:get_value(username, UserSpec0), + UserSpec = [{resource, Resource} | UserSpec0], HookHandlerExtra = start_hook_listener(Resource), - Alice = connect_spec(AliceSpec, sr_presence, manual), + User = connect_spec(UserSpec, sr_presence, manual), %% Ack the presence stanza - get_ack(Alice), - ack_initial_presence(Alice), + get_ack(User), + ack_initial_presence(User), - SMID = sm_helper:client_to_smid(Alice), + SMID = sm_helper:client_to_smid(User), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), - %% kill alice connection - C2SPid = mongoose_helper:get_session_pid(Alice), - escalus_connection:kill(Alice), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-1">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-2">>)), + %% kill user connection + C2SPid = mongoose_helper:get_session_pid(User), + escalus_connection:kill(User), sm_helper:wait_until_resume_session(C2SPid), - sm_helper:assert_alive_resources(Alice, 1), + sm_helper:assert_alive_resources(User, 1), escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)), escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)), ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), %% send some messages and check if c2s can handle it - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), - escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-3">>)), + escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(User, <<"msg-4">>)), escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)), escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)), ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), - %% alice comes back and receives unacked message - {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid), + %% user comes back and receives unacked message + {NewResource, NewUser} = RestartConnectionFN(UserSpec, Resource, SMID, C2SPid), mongoose_helper:wait_until( fun() -> - Stanza = escalus_connection:get_stanza(NewAlice, msg), + Stanza = escalus_connection:get_stanza(NewUser, msg), escalus:assert(is_chat_message, [<<"msg-4">>], Stanza), ok end, ok), - NewC2SPid = mongoose_helper:get_session_pid(NewAlice), - escalus_connection:kill(NewAlice), + NewC2SPid = mongoose_helper:get_session_pid(NewUser), + escalus_connection:kill(NewUser), sm_helper:wait_until_resume_session(NewC2SPid), escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)), @@ -934,33 +954,33 @@ unacknowledged_message_hook_common(RestartConnectionFN, Config) -> escalus_connection:stop(Bob). resume_session(Config) -> - AliceSpec = escalus_fresh:create_fresh_user(Config, alice), + UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)), Texts = three_texts(), escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> - {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), + {_, SMID} = buffer_unacked_messages_and_die(Config, UserSpec, Bob, Texts), %% Resume the session. - Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), - %% Alice receives the unacked messages from the previous + User = connect_spec(UserSpec, {resume, SMID, 1}, manual), + %% User receives the unacked messages from the previous %% interrupted session. - sm_helper:wait_for_messages(Alice, Texts), - %% Alice acks the received messages. - escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), - escalus_connection:stop(Alice) + sm_helper:wait_for_messages(User, Texts), + %% User acks the received messages. + escalus_connection:send(User, escalus_stanza:sm_ack(5)), + escalus_connection:stop(User) end). resume_session_with_wrong_h_does_not_leak_sessions(Config) -> - AliceSpec = escalus_fresh:create_fresh_user(Config, alice), + UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)), Messages = three_texts(), HostType = host_type(), escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> - {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages), + {_, SMID} = buffer_unacked_messages_and_die(Config, UserSpec, Bob, Messages), %% Resume the session. - Alice = connect_spec(AliceSpec, auth, manual), - Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30), + User = connect_spec(UserSpec, auth, manual), + Resumed = sm_helper:try_to_resume_stream(User, SMID, 30), escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), - escalus_connection:wait_for_close(Alice, timer:seconds(5)), + escalus_connection:wait_for_close(User, timer:seconds(5)), Fun = fun() -> - [] = sm_helper:get_user_present_resources(Alice), + [] = sm_helper:get_user_present_resources(User), sm_helper:get_sid_by_stream_id(HostType, SMID) end, mongoose_helper:wait_until(Fun, {error, smid_not_found}, #{name => smid_is_cleaned}) @@ -970,14 +990,14 @@ resume_session_with_wrong_sid_returns_item_not_found(Config) -> session_resumption_expects_item_not_found(Config, <<"wrong-sid">>). resume_session_with_wrong_namespace_is_a_noop(Config) -> - Alice = connect_fresh(Config, alice, auth), + User = connect_fresh(Config, ?config(user, Config), auth), #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4), Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}), - escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}), - escalus_assert:has_no_stanzas(Alice), - [] = sm_helper:get_user_present_resources(Alice), - true = escalus_connection:is_connected(Alice), - escalus_connection:stop(Alice). + escalus_connection:send(User, Resume#xmlel{attrs = Attrs2}), + escalus_assert:has_no_stanzas(User), + [] = sm_helper:get_user_present_resources(User), + true = escalus_connection:is_connected(User), + escalus_connection:stop(User). resume_dead_session_results_in_item_not_found(Config) -> SMID = base64:encode(crypto:strong_rand_bytes(21)), @@ -987,112 +1007,139 @@ resume_dead_session_results_in_item_not_found(Config) -> session_resumption_expects_item_not_found(Config, SMID). session_resumption_expects_item_not_found(Config, SMID) -> - Alice = connect_fresh(Config, alice, auth), - Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2), + User = connect_fresh(Config, ?config(user, Config), auth), + Resumed = sm_helper:try_to_resume_stream(User, SMID, 2), escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed), - [] = sm_helper:get_user_present_resources(Alice), - true = escalus_connection:is_connected(Alice), - escalus_connection:stop(Alice). + [] = sm_helper:get_user_present_resources(User), + true = escalus_connection:is_connected(User), + escalus_connection:stop(User). resume_session_kills_old_C2S_gracefully(Config) -> - Alice = connect_fresh(Config, alice, sr_presence, manual), - C2SPid = mongoose_helper:get_session_pid(Alice), + User = connect_fresh(Config, ?config(user, Config), sr_presence, manual), + C2SPid = mongoose_helper:get_session_pid(User), - %% Monitor the C2S process and disconnect Alice. - MonitorRef = sm_helper:monitor_session(Alice), - escalus_client:kill_connection(Config, Alice), + %% Monitor the C2S process and disconnect User. + MonitorRef = sm_helper:monitor_session(User), + escalus_client:kill_connection(Config, User), %% Ensure the c2s process is waiting for resumption. sm_helper:wait_until_resume_session(C2SPid), %% Resume the session. - NewAlice = connect_resume(Alice, 1), + NewUser = connect_resume(User, 1), %% C2S process should die gracefully with Reason=normal. sm_helper:wait_for_process_termination(MonitorRef), - escalus_connection:stop(NewAlice). + escalus_connection:stop(NewUser). carboncopy_works(Config) -> - escalus:fresh_story(Config, [{alice, 2}, {bob, 1}], fun(Alice1, Alice, Bob) -> - mongoose_helper:enable_carbons([Alice1, Alice]), - escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)), - sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]), - carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1}) + escalus:fresh_story(Config, [{?config(user, Config), 2}, {bob, 1}], fun(User1, User, Bob) -> + mongoose_helper:enable_carbons([User1, User]), + escalus_connection:send(Bob, escalus_stanza:chat_to(User1, <<"msg-4">>)), + sm_helper:wait_for_messages(User1, [<<"msg-4">>]), + carboncopy_helper:wait_for_carbon_chat_with_body(User, <<"msg-4">>, #{from => Bob, to => User1}) end). carboncopy_works_after_resume(Config) -> Texts = three_texts(), - escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice1, Bob) -> - AliceSpec = [{resource, <<"res2">>} | sm_helper:client_to_spec(Alice1)], - F = fun(Alice2) -> - [escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(A)) || A <- [Alice1, Alice2]], - mongoose_helper:enable_carbons([Alice1, Alice2]) + escalus:fresh_story(Config, [{?config(user, Config), 1}, {bob, 1}], fun(User1, Bob) -> + UserSpec = [{resource, <<"res2">>} | sm_helper:client_to_spec(User1)], + F = fun(User2) -> + [escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(A)) || A <- [User1, User2]], + mongoose_helper:enable_carbons([User1, User2]) end, - {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F), + {_, SMID} = buffer_unacked_messages_and_die(Config, UserSpec, Bob, Texts, F), %% Resume the session. - Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), - wait_for_carbon_with_bodies(Alice1, Texts, #{from => Bob, to => Alice}), - %% Get a presence from Alice1 again - escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(Alice)), - %% Alice receives an IQ result from the carbon copy enable request - escalus:assert(is_iq_result, [], escalus:wait_for_stanza(Alice)), - %% Alice receives the unacked messages from the previous + User = connect_spec(UserSpec, {resume, SMID, 1}, manual), + wait_for_carbon_with_bodies(User1, Texts, #{from => Bob, to => User}), + %% Get a presence from User1 again + escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(User)), + %% User receives an IQ result from the carbon copy enable request + escalus:assert(is_iq_result, [], escalus:wait_for_stanza(User)), + %% User receives the unacked messages from the previous %% interrupted session. - sm_helper:wait_for_messages(Alice, Texts), - %% Alice acks the received messages. - escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), + sm_helper:wait_for_messages(User, Texts), + %% User acks the received messages. + escalus_connection:send(User, escalus_stanza:sm_ack(5)), %% Direct send - escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)), - sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]), - carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1}), - escalus_connection:stop(Alice) + escalus_connection:send(Bob, escalus_stanza:chat_to(User1, <<"msg-4">>)), + sm_helper:wait_for_messages(User1, [<<"msg-4">>]), + carboncopy_helper:wait_for_carbon_chat_with_body(User, <<"msg-4">>, #{from => Bob, to => User1}), + escalus_connection:stop(User) end). wait_for_carbon_with_bodies(Client, Texts, Params) -> [carboncopy_helper:wait_for_carbon_chat_with_body(Client, Text, Params) || Text <- Texts]. -buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) -> +buffer_unacked_messages_and_die(Config, UserSpec, Bob, Texts) -> F = fun(_Client) -> ok end, - buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F). - -buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F) -> - Alice = connect_spec(AliceSpec, sr_presence, manual), - F(Alice), - C2SPid = mongoose_helper:get_session_pid(Alice), - %% Bobs sends some messages to Alice. - sm_helper:send_messages(Bob, Alice, Texts), - %% Alice receives them, but doesn't ack. - sm_helper:wait_for_messages(Alice, Texts), - %% Alice's connection is violently terminated. - escalus_client:kill_connection(Config, Alice), + buffer_unacked_messages_and_die(Config, UserSpec, Bob, Texts, F). + +buffer_unacked_messages_and_die(Config, UserSpec, Bob, Texts, F) -> + User = connect_spec(UserSpec, sr_presence, manual), + F(User), + C2SPid = mongoose_helper:get_session_pid(User), + %% Bobs sends some messages to User. + sm_helper:send_messages(Bob, User, Texts), + %% User receives them, but doesn't ack. + sm_helper:wait_for_messages(User, Texts), + %% User's connection is violently terminated. + escalus_client:kill_connection(Config, User), sm_helper:wait_until_resume_session(C2SPid), - SMID = sm_helper:client_to_smid(Alice), + SMID = sm_helper:client_to_smid(User), {C2SPid, SMID}. aggressively_pipelined_resume(Config) -> - AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} - | escalus_fresh:create_fresh_user(Config, alice)], + UserSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} + | escalus_fresh:create_fresh_user(Config, ?config(user, Config))], UnackedMessages = three_texts(), escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> - {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages), + {_, SMID} = buffer_unacked_messages_and_die(Config, UserSpec, Bob, UnackedMessages), %% Resume the session. - Alice = escalus_connection:connect(AliceSpec), + User = escalus_connection:connect(UserSpec), - Username = proplists:get_value(username, AliceSpec), - Password = proplists:get_value(password, AliceSpec), + Username = proplists:get_value(username, UserSpec), + Password = proplists:get_value(password, UserSpec), Payload = <<0:8,Username/binary,0:8,Password/binary>>, - Server = proplists:get_value(server, AliceSpec), + Server = proplists:get_value(server, UserSpec), Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>), Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>), Resume = escalus_stanza:resume(SMID, 2), - escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]), - Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)], + escalus_client:send(User, [Stream, Auth, AuthStream, Resume]), + Messages = [escalus_connection:get_stanza(User, {get_resumed, I}) || I <- lists:seq(1, 6)], + escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), + + escalus_connection:stop(User) + end). + +aggressively_pipelined_resume_ws(Config) -> + UserSpec = [{manual_ack, true} | escalus_fresh:create_fresh_user(Config, ?config(user, Config))], + UnackedMessages = three_texts(), + escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> + {_, SMID} = buffer_unacked_messages_and_die(Config, UserSpec, Bob, UnackedMessages), + %% Resume the session. + User = escalus_connection:connect(UserSpec), + + Username = proplists:get_value(username, UserSpec), + Password = proplists:get_value(password, UserSpec), + Payload = <<0:8,Username/binary,0:8,Password/binary>>, + Server = proplists:get_value(server, UserSpec), + + Stream = escalus_stanza:ws_open(Server), + Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), + Resume = escalus_stanza:resume(SMID, 2), + + escalus_client:send(User, [Stream]), + escalus_client:send(User, [Auth]), + escalus_client:send(User, [Stream]), + escalus_client:send(User, [Resume]), + Messages = [escalus_connection:get_stanza(User, {get_resumed, I}) || I <- lists:seq(1, 6)], escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), - escalus_connection:stop(Alice) + escalus_connection:stop(User) end). %% This is a regression test for a case when a session processes a request, which will @@ -1103,101 +1150,106 @@ replies_are_processed_by_resumed_session(Config) -> %% GIVEN a session and registered special IQ handler (added in init_per_testcase), %% that waits for old session process to terminate (at this point new process %% has fully taken over) and then actually sends the reply. - Alice = connect_fresh(Config, alice, sr_presence), + User = connect_fresh(Config, ?config(user, Config), sr_presence), %% WHEN a client sends IQ request to the special handler... IQReq = escalus_stanza:iq_get(regression_ns(), []), - escalus:send(Alice, IQReq), + escalus:send(User, IQReq), + + % This test was flaky for ws_test, because there was not enaugh time + % between send and kill_and_connect_resume for the stanza to be properly + % processed. The sleep solves the problem for now. + timer:sleep(50), %% ... goes down and session is resumed. - Alice2 = sm_helper:kill_and_connect_resume(Alice), + User2 = sm_helper:kill_and_connect_resume(User), %% THEN the client receives the reply properly. - IQReply = escalus:wait_for_stanza(Alice2), + IQReply = escalus:wait_for_stanza(User2), escalus:assert(is_iq_result, [IQReq], IQReply), - escalus_connection:stop(Alice2). + escalus_connection:stop(User2). %% This is a regression test for a bug, which manifested in following scenario %% (due to improper presence sub requests buffering): -%% 1. Bob is online, Alice is offline -%% 2. Bob subscribes to Alice's presence; -%% 3. Alice becomes online -%% 4. Bob sends a message to Alice -%% 5. Alice doesn't SM-ack the request or message, terminates the connection -%% 6. Alice reconnects but with session *replace*, not resume +%% 1. Bob is online, User is offline +%% 2. Bob subscribes to User's presence; +%% 3. User becomes online +%% 4. Bob sends a message to User +%% 5. User doesn't SM-ack the request or message, terminates the connection +%% 6. User reconnects but with session *replace*, not resume %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer -%% 8. Alice doesn't receive the buffered message +%% 8. User doesn't receive the buffered message subscription_requests_are_buffered_properly(Config) -> - AliceSpec = escalus_fresh:create_fresh_user(Config, alice), + UserSpec = escalus_fresh:create_fresh_user(Config, ?config(user, Config)), MsgBody = <<"buffered">>, escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> - % GIVEN Bob's pending subscription to Alice's presence - AliceJid = common_helper:get_bjid(AliceSpec), - escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), + % GIVEN Bob's pending subscription to User's presence + UserJid = common_helper:get_bjid(UserSpec), + escalus:send(Bob, escalus_stanza:presence_direct(UserJid, <<"subscribe">>)), _RosterPushReq = escalus:wait_for_stanza(Bob), - % WHEN Alice becomes online... - Alice = connect_spec(AliceSpec, sr_session, manual), - send_initial_presence(Alice), + % WHEN User becomes online... + User = connect_spec(UserSpec, sr_session, manual), + send_initial_presence(User), %% subscribe could come before the initial presence escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], - escalus:wait_for_stanzas(Alice, 2)), + escalus:wait_for_stanzas(User, 2)), - % ...and Bob sends a message to Alice... - escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), - MsgStanza = escalus:wait_for_stanza(Alice), + % ...and Bob sends a message to User... + escalus:send(Bob, escalus_stanza:chat_to(User, MsgBody)), + MsgStanza = escalus:wait_for_stanza(User), escalus:assert(is_chat_message, [MsgBody], MsgStanza), - % ...and Alice terminates connection without acking anything... - escalus_client:kill_connection(Config, Alice), + % ...and User terminates connection without acking anything... + escalus_client:kill_connection(Config, User), % ...and reconnects with session replacement. - Alice2 = connect_spec(AliceSpec, session, manual), + User2 = connect_spec(UserSpec, session, manual), - % THEN Alice receives (without sending initial presence): + % THEN User receives (without sending initial presence): % * buffered Bob's message (like above) - % Alice DOESN'T receive: + % User DOESN'T receive: % * buffered subscription request because it is dropped by ejabberd_sm % because it's treated like repeated sub request to bare JID, so it's not % processed by any sub req handler (like mod_roster) - % * buffered available presence from Alice - because it is addressed to another SID - % and Alice2 is a brand new session - escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)), - sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>), - escalus_assert:has_no_stanzas(Alice2), + % * buffered available presence from User - because it is addressed to another SID + % and User2 is a brand new session + escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(User2)), + sm_helper:send_and_receive(Bob, User2, <<"flush1">>), + escalus_assert:has_no_stanzas(User2), %% Only once an initial presence is sent, a subscription request is sent - send_initial_presence(Alice2), + send_initial_presence(User2), escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], - escalus:wait_for_stanzas(Alice2, 2)), + escalus:wait_for_stanzas(User2, 2)), - sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>), - escalus_assert:has_no_stanzas(Alice2), + sm_helper:send_and_receive(Bob, User2, <<"flush2">>), + escalus_assert:has_no_stanzas(User2), - escalus_connection:stop(Alice2) + escalus_connection:stop(User2) end). %% This is a regression test for a bug, due to which messages sent to old session %% in a middle of state handover were not appended properly to SM buffer. %% Scenario to reproduce: -%% 1. Online Bob and Alice -%% 2. Alice kills the connection -%% 3. Alice's session is suspended -%% 4. Alice resumes session with new connection. At this moment new session is still not +%% 1. Online Bob and User +%% 2. User kills the connection +%% 3. User's session is suspended +%% 4. User resumes session with new connection. At this moment new session is still not %% present in session table. `resume` request is stuck in old proc mailbox. -%% 5. Bob sends a message to Alice. Only old proc is present in session table so now +%% 5. Bob sends a message to User. Only old proc is present in session table so now %% old session has two messages in mailbox: `resume` and XML from Bob %% 6. We resume old process and it begins session handover %% 7. Bob's message is appended to SM buffer in "flush" step %% 8. With bug fixed, the message is retransmitted properly messages_are_properly_flushed_during_resumption(Config) -> escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> - % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended - Alice = connect_fresh(Config, alice, sr_presence), - SMH = escalus_connection:get_sm_h(Alice), - escalus_client:kill_connection(Config, Alice), + % GIVEN (online Bob) and (User in resume state); User's session is suspended + User = connect_fresh(Config, ?config(user, Config), sr_presence), + SMH = escalus_connection:get_sm_h(User), + escalus_client:kill_connection(Config, User), %% The receiver process would stop now - C2SPid = mongoose_helper:get_session_pid(Alice), + C2SPid = mongoose_helper:get_session_pid(User), sm_helper:wait_until_resume_session(C2SPid), sm_helper:wait_for_queue_length(C2SPid, 0), @@ -1214,36 +1266,36 @@ messages_are_properly_flushed_during_resumption(Config) -> sm_helper:wait_for_queue_length(C2SPid, 1), % Bob sends a message... - escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), + escalus:send(Bob, escalus_stanza:chat_to(User, MsgBody)), - % ...we ensure that a message is enqueued in Alice's session... + % ...we ensure that a message is enqueued in User's session... % (2 messages = resume request + Bob's message) sm_helper:wait_for_queue_length(C2SPid, 2), % ...and old process is resumed. ok = rpc(mim(), sys, resume, [C2SPid]) end), - Alice2 = connect_resume(Alice, SMH), - % THEN Alice's new session receives Bob's message - RecvMsg = escalus:wait_for_stanza(Alice2), + User2 = connect_resume(User, SMH), + % THEN User's new session receives Bob's message + RecvMsg = escalus:wait_for_stanza(User2), escalus:assert(is_chat_message, [MsgBody], RecvMsg) end). no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) -> - Alice = connect_fresh(Config, alice, session, manual), + User = connect_fresh(Config, ?config(user, Config), session, manual), %% Should not crash anything! - escalus_connection:send(Alice, escalus_stanza:enable_sm()), - Response = escalus_connection:get_stanza(Alice, service_unavailable), + escalus_connection:send(User, escalus_stanza:enable_sm()), + Response = escalus_connection:get_stanza(User, service_unavailable), escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), - escalus_connection:stop(Alice). + escalus_connection:stop(User). no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) -> - Alice = connect_fresh(Config, alice, session, manual), + User = connect_fresh(Config, ?config(user, Config), session, manual), %% Should not crash anything! - escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), - Response = escalus_connection:get_stanza(Alice, service_unavailable), + escalus_connection:send(User, escalus_stanza:enable_sm([resume])), + Response = escalus_connection:get_stanza(User, service_unavailable), escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), - escalus_connection:stop(Alice). + escalus_connection:stop(User). %%-------------------------------------------------------------------- %% Helpers @@ -1351,6 +1403,15 @@ get_stanzas_filtered_by_mod_ping() -> {stop, drop} = _Result } <- History ]. + +check_stanzas_filtered_by_mod_ping() -> + Stanzas = get_stanzas_filtered_by_mod_ping(), + lists:foreach(fun(Stanza) -> + escalus:assert(is_iq_error, Stanza), + ?assertNotEqual(undefined, + exml_query:subelement_with_name_and_ns(Stanza, <<"ping">>, <<"urn:xmpp:ping">>)) + end, Stanzas), + ?assert(length(Stanzas) > 0). %%-------------------------------------------------------------------- %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" %%-------------------------------------------------------------------- @@ -1395,7 +1456,7 @@ wait_for_session(JID, Retries, SleepTime) -> ok end. -maybe_ack_initial_presence(Alice, ack) -> - ack_initial_presence(Alice); -maybe_ack_initial_presence(_Alice, no_ack) -> +maybe_ack_initial_presence(User, ack) -> + ack_initial_presence(User); +maybe_ack_initial_presence(_User, no_ack) -> ok. diff --git a/big_tests/tests/sm_helper.erl b/big_tests/tests/sm_helper.erl index 69c069c06b..29edd9bd35 100644 --- a/big_tests/tests/sm_helper.erl +++ b/big_tests/tests/sm_helper.erl @@ -277,7 +277,13 @@ send_initial_presence(User) -> process_initial_stanza(User) -> send_initial_presence(User), - escalus:assert(is_presence, escalus:wait_for_stanza(User)). + Stanza = escalus:wait_for_stanza(User), + case escalus_pred:is_iq(Stanza) of + true -> + Stanza2 = escalus:wait_for_stanza(User), + escalus:assert(is_presence, Stanza2); + false -> escalus:assert(is_presence, Stanza) + end. send_messages(Bob, Alice, Texts) -> [escalus:send(Bob, escalus_stanza:chat_to(Alice, Text)) diff --git a/src/c2s/mongoose_c2s.erl b/src/c2s/mongoose_c2s.erl index ee588418b3..1ba13d25da 100644 --- a/src/c2s/mongoose_c2s.erl +++ b/src/c2s/mongoose_c2s.erl @@ -637,10 +637,10 @@ handle_info(StateData, _C2SState, {TcpOrSSl, _Socket, _Packet} = SocketData) when TcpOrSSl =:= tcp; TcpOrSSl =:= ssl -> handle_socket_data(StateData, SocketData); handle_info(StateData, C2SState, {Closed, _Socket} = SocketData) - when Closed =:= tcp_closed; Closed =:= ssl_closed -> + when Closed =:= tcp_closed; Closed =:= ssl_closed; Closed =:= websockets_closed -> handle_socket_closed(StateData, C2SState, SocketData); handle_info(StateData, C2SState, {Error, _Socket} = SocketData) - when Error =:= tcp_error; Error =:= ssl_error -> + when Error =:= tcp_error; Error =:= ssl_error; Error =:= websockets_error -> handle_socket_error(StateData, C2SState, SocketData); handle_info(StateData, C2SState, Info) -> handle_foreign_event(StateData, C2SState, info, Info). diff --git a/src/mod_websockets.erl b/src/mod_websockets.erl index 4f3ed564b2..419659b9e3 100644 --- a/src/mod_websockets.erl +++ b/src/mod_websockets.erl @@ -92,7 +92,19 @@ init(Req, Opts = #{timeout := Timeout}) -> %% upgrade protocol {cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}. -terminate(_Reason, _Req, _State) -> +terminate(_Reason, _Req, #ws_state{fsm_pid = undefined} = State) -> + ok; +terminate(Reason, _Req, #ws_state{fsm_pid = FSM} = State) when Reason =:= normal; + Reason =:= stop; + Reason =:= timeout; + Reason =:= remote -> + FSM ! {websockets_closed, undefined}, + ok; +terminate({remote, _, _}, _Req, #ws_state{fsm_pid = FSM} = State) -> + FSM ! {websockets_closed, undefined}, + ok; +terminate(Reason, _Req, #ws_state{fsm_pid = FSM} = State) -> + FSM ! {websockets_error, undefined}, ok. %%--------------------------------------------------------------------