Skip to content

Commit

Permalink
Merge pull request #4218 from esl/update-mam
Browse files Browse the repository at this point in the history
Update version of XEP-0313: Message Archive Management pt. 1
  • Loading branch information
arcusfelis authored Feb 6, 2024
2 parents 1ada930 + df7d89a commit d2f609a
Show file tree
Hide file tree
Showing 22 changed files with 835 additions and 92 deletions.
458 changes: 452 additions & 6 deletions big_tests/tests/mam_SUITE.erl

Large diffs are not rendered by default.

141 changes: 103 additions & 38 deletions big_tests/tests/mam_helper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
maybe_wait_for_archive/1,
stanza_archive_request/2,
stanza_text_search_archive_request/3,
stanza_include_groupchat_request/3,
stanza_date_range_archive_request_not_empty/3,
wait_archive_respond/1,
wait_for_complete_archive_response/3,
Expand All @@ -80,6 +81,8 @@
stanza_limit_archive_request/1,
rsm_send/3,
stanza_page_archive_request/3,
stanza_flip_page_archive_request/3,
stanza_metadata_request/0,
wait_empty_rset/2,
wait_message_range/2,
wait_message_range/3,
Expand All @@ -96,6 +99,8 @@
retract_ns/0,
retract_esl_ns/0,
retract_tombstone_ns/0,
groupchat_field_ns/0,
groupchat_available_ns/0,
make_alice_and_bob_friends/2,
run_prefs_case/6,
prefs_cases2/0,
Expand Down Expand Up @@ -258,6 +263,8 @@ mam_ns_binary_v06() -> <<"urn:xmpp:mam:2">>.
retract_ns() -> <<"urn:xmpp:message-retract:0">>.
retract_esl_ns() -> <<"urn:esl:message-retract-by-stanza-id:0">>.
retract_tombstone_ns() -> <<"urn:xmpp:message-retract:0#tombstone">>.
groupchat_field_ns() -> <<"urn:xmpp:mam:2#groupchat-field">>.
groupchat_available_ns() -> <<"urn:xmpp:mam:2#groupchat-available">>.

skip_undefined(Xs) ->
[X || X <- Xs, X =/= undefined].
Expand All @@ -277,56 +284,99 @@ maybe_with_elem(BWithJID) ->
name = <<"with">>,
children = [#xmlcdata{content = BWithJID}]}.

stanza_metadata_request() ->
escalus_stanza:iq(<<"get">>,
[#xmlel{name = <<"metadata">>,
attrs = [{<<"xmlns">>, mam_ns_binary_v06()}]}]).

%% An optional 'queryid' attribute allows the client to match results to
%% a certain query.
stanza_archive_request(P, QueryId) ->
stanza_lookup_messages_iq(P, QueryId,
undefined, undefined,
undefined, undefined, undefined).
Params = #{query_id => QueryId},
stanza_lookup_messages_iq(P, Params).

stanza_date_range_archive_request(P) ->
stanza_lookup_messages_iq(P, undefined,
"2010-06-07T00:00:00Z", "2010-07-07T13:23:54Z",
undefined, undefined, undefined).
Params = #{
start => "2010-06-07T00:00:00Z",
stop => "2010-07-07T13:23:54Z"
},
stanza_lookup_messages_iq(P, Params).

stanza_date_range_archive_request_not_empty(P, Start, Stop) ->
stanza_lookup_messages_iq(P, undefined,
Start, Stop,
undefined, undefined, undefined).
Params = #{
start => Start,
stop => Stop
},
stanza_lookup_messages_iq(P, Params).

stanza_limit_archive_request(P) ->
stanza_lookup_messages_iq(P, undefined, "2010-08-07T00:00:00Z",
undefined, undefined, #rsm_in{max=10}, undefined).
Params = #{
start => "2010-08-07T00:00:00Z",
rsm => #rsm_in{max=10}
},
stanza_lookup_messages_iq(P, Params).

stanza_page_archive_request(P, QueryId, RSM) ->
stanza_lookup_messages_iq(P, QueryId, undefined, undefined, undefined, RSM, undefined).
Params = #{
query_id => QueryId,
rsm => RSM
},
stanza_lookup_messages_iq(P, Params).

stanza_flip_page_archive_request(P, QueryId, RSM) ->
Params = #{
query_id => QueryId,
rsm => RSM,
flip_page => true
},
stanza_lookup_messages_iq(P, Params).

stanza_filtered_by_jid_request(P, BWithJID) ->
stanza_lookup_messages_iq(P, undefined, undefined,
undefined, BWithJID, undefined, undefined).
Params = #{with_jid => BWithJID},
stanza_lookup_messages_iq(P, Params).

stanza_text_search_archive_request(P, QueryId, TextSearch) ->
stanza_lookup_messages_iq(P, QueryId,
undefined, undefined,
undefined, undefined, TextSearch).
Params = #{
query_id => QueryId,
text_search => TextSearch
},
stanza_lookup_messages_iq(P, Params).

stanza_include_groupchat_request(P, QueryId, IncludeGroupChat) ->
Params = #{
query_id => QueryId,
include_group_chat => IncludeGroupChat
},
stanza_lookup_messages_iq(P, Params).

stanza_lookup_messages_iq(P, Params) ->
QueryId = maps:get(query_id, Params, undefined),
BStart = maps:get(start, Params, undefined),
BEnd = maps:get(stop, Params, undefined),
BWithJID = maps:get(with_jid, Params, undefined),
RSM = maps:get(rsm, Params, undefined),
TextSearch = maps:get(text_search, Params, undefined),
FlipPage = maps:get(flip_page, Params, undefined),
IncludeGroupChat = maps:get(include_group_chat, Params, undefined),

stanza_lookup_messages_iq(P, QueryId, BStart, BEnd, BWithJID, RSM, TextSearch) ->
escalus_stanza:iq(<<"set">>, [#xmlel{
name = <<"query">>,
attrs = mam_ns_attr(P)
++ maybe_attr(<<"queryid">>, QueryId),
children = skip_undefined([
form_x(BStart, BEnd, BWithJID, RSM, TextSearch),
maybe_rsm_elem(RSM)])
form_x(BStart, BEnd, BWithJID, RSM, TextSearch, IncludeGroupChat),
maybe_rsm_elem(RSM),
maybe_flip_page(FlipPage)])
}]).

form_x(undefined, undefined, undefined, undefined, undefined) ->
form_x(undefined, undefined, undefined, undefined, undefined, undefined) ->
undefined;
form_x(BStart, BEnd, BWithJID, RSM, TextSearch) ->
form_x(BStart, BEnd, BWithJID, RSM, TextSearch, IncludeGroupChat) ->
Fields = skip_undefined([form_field(<<"start">>, BStart),
form_field(<<"end">>, BEnd),
form_field(<<"with">>, BWithJID),
form_field(<<"full-text-search">>, TextSearch)]
form_field(<<"full-text-search">>, TextSearch),
form_field(<<"include-groupchat">>, IncludeGroupChat)]
++ form_extra_fields(RSM)
++ form_border_fields(RSM)),
form_helper:form(#{fields => Fields}).
Expand All @@ -340,10 +390,10 @@ form_border_fields(undefined) ->
[];
form_border_fields(#rsm_in{
before_id=BeforeId, after_id=AfterId, from_id=FromId, to_id=ToId}) ->
[form_field(<<"before_id">>, BeforeId),
form_field(<<"after_id">>, AfterId),
form_field(<<"from_id">>, FromId),
form_field(<<"to_id">>, ToId)].
[form_field(<<"before-id">>, BeforeId),
form_field(<<"after-id">>, AfterId),
form_field(<<"from-id">>, FromId),
form_field(<<"to-id">>, ToId)].

form_field(_VarName, undefined) ->
undefined;
Expand Down Expand Up @@ -372,6 +422,11 @@ maybe_rsm_elem(#rsm_in{max=Max, direction=Direction, id=Id, index=Index}) ->
maybe_rsm_index(Index),
maybe_rsm_direction(Direction, Id)])}.

maybe_flip_page(undefined) ->
undefined;
maybe_flip_page(_) ->
#xmlel{name = <<"flip-page">>}.

rsm_id_children(undefined) -> [];
rsm_id_children(Id) -> [#xmlcdata{content = Id}].

Expand Down Expand Up @@ -817,15 +872,22 @@ delete_offline_messages(Server, Username) ->
HostType = domain_helper:host_type(),
rpc_apply(mod_offline_backend, remove_user, [HostType, Username, Server]).

wait_message_range(Client, FromN, ToN) ->
wait_message_range(Client, 15, FromN-1, FromN, ToN).
wait_message_range(Client, FromN, ToN) when FromN =< ToN ->
wait_message_range(Client, 15, FromN-1, FromN, ToN, 1);
wait_message_range(Client, FromN, ToN) when FromN > ToN ->
wait_message_range(Client, 15, FromN-1, FromN, ToN, -1).

wait_message_range(Client, TotalCount, Offset, FromN, ToN) ->
wait_message_range(Client, TotalCount, Offset, FromN, ToN) when FromN =< ToN ->
wait_message_range(Client, TotalCount, Offset, FromN, ToN, 1);
wait_message_range(Client, TotalCount, Offset, FromN, ToN) when FromN > ToN ->
wait_message_range(Client, TotalCount, Offset, FromN, ToN, -1).

wait_message_range(Client, TotalCount, Offset, FromN, ToN, Step) ->
wait_message_range(Client, #{total_count => TotalCount, offset => Offset,
from => FromN, to => ToN}).
from => FromN, to => ToN, step => Step}).

wait_message_range(Client, Params = #{total_count := TotalCount, offset := Offset,
from := FromN, to := ToN}) ->
from := FromN, to := ToN, step := Step}) ->
IsComplete = maps:get(is_complete, Params, undefined),
Result = wait_archive_respond(Client),
Messages = respond_messages(Result),
Expand All @@ -835,9 +897,12 @@ wait_message_range(Client, Params = #{total_count := TotalCount, offset := Offse
ParsedIQ = parse_result_iq(Result),
try
?assert_equal(TotalCount, ParsedIQ#result_iq.count),
?assert_equal(Offset, ParsedIQ#result_iq.first_index),
case Step of
-1 -> ok;
_ -> ?assert_equal(Offset, ParsedIQ#result_iq.first_index)
end,
%% Compare body of the messages.
?assert_equal([generate_message_text(N) || N <- maybe_seq(FromN, ToN)],
?assert_equal([generate_message_text(N) || N <- maybe_seq(FromN, ToN, Step)],
[B || #forwarded_message{message_body=B} <- ParsedMessages]),
case IsComplete of
true -> ?assert_equal(<<"true">>, ParsedIQ#result_iq.complete);
Expand All @@ -854,8 +919,8 @@ wait_message_range(Client, Params = #{total_count := TotalCount, offset := Offse
erlang:raise(Class, Reason, StackTrace)
end.

maybe_seq(undefined, undefined) -> [];
maybe_seq(A, B) -> lists:seq(A, B).
maybe_seq(undefined, undefined, _) -> [];
maybe_seq(A, B, Step) -> lists:seq(A, B, Step).

wait_empty_rset(Alice, TotalCount) ->
Result = wait_archive_respond(Alice),
Expand Down Expand Up @@ -970,10 +1035,10 @@ put_msg({{MsgIdOwner, MsgIdRemote},
{_, Source, _}, Packet}) ->
Map1 = #{message_id => MsgIdOwner, archive_id => FromArcID,
local_jid => FromJID, remote_jid => ToJID, source_jid => Source,
origin_id => none, direction => outgoing, packet => Packet},
origin_id => none, direction => outgoing, packet => Packet, is_groupchat => false},
Map2 = #{message_id => MsgIdRemote, archive_id => ToArcID,
local_jid => ToJID, remote_jid => FromJID, source_jid => Source,
origin_id => none, direction => incoming, packet => Packet},
origin_id => none, direction => incoming, packet => Packet, is_groupchat => false},
archive_message(Map1),
archive_message(Map2).

Expand Down
6 changes: 3 additions & 3 deletions big_tests/tests/mam_proper_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ packet(To, Body) ->
%% Generates mod_mam:archive_message_params()
params() ->
?LET({MessId, ArcId, LocalJid, RemoteJid,
OriginId, Dir, Body, SenderId},
OriginId, Dir, Body, SenderId, IsGroupChat},
{non_neg_integer(), non_neg_integer(), jid(), jid(),
origin_id(), direction(), body(), non_neg_integer()},
origin_id(), direction(), body(), non_neg_integer(), boolean()},
#{message_id => MessId, archive_id => ArcId,
local_jid => LocalJid, remote_jid => RemoteJid,
source_jid => choose_source_jid(Dir, LocalJid, RemoteJid),
origin_id => OriginId, direction => Dir,
packet => packet(choose_dest_jid(Dir, LocalJid, RemoteJid), Body),
sender_id => SenderId}).
sender_id => SenderId, is_groupchat => IsGroupChat}).

choose_source_jid(incoming, _LocalJid, RemoteJid) -> RemoteJid;
choose_source_jid(outgoing, LocalJid, _RemoteJid) -> LocalJid.
Expand Down
3 changes: 3 additions & 0 deletions doc/migrations/6.2.0_x.x.x.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## Database migration

There is a new column in the `mam_message` table in the database, which is used to support including or excluding groupchat results in a user archive. See the migrations for Postgres, MySQL and MSSQL in the [`priv/migrations`](https://github.com/esl/MongooseIM/tree/master/priv/migrations) directory. Please be aware that the filtering process will only be effective for new messages and will not apply to those messages that have already been stored in the database.
5 changes: 4 additions & 1 deletion doc/modules/mod_mam.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ To disable archive for one-to-one messages please remove PM section or any PM re
* **Default:** `false`
* **Example:** `modules.mod_mam.pm.archive_groupchats = true`

When enabled, MAM will store groupchat messages in recipients' individual archives. **USE WITH CAUTION!** May increase archive size significantly. Disabling this option for existing installation will neither remove such messages from MAM storage, nor will filter out them from search results.
When enabled, MAM will store groupchat messages in recipients' individual archives. **USE WITH CAUTION!** May increase archive size significantly. Disabling this option for existing installation will neither remove such messages from MAM storage, nor will filter out them from search results. Clients can use `include-groupchat` filter to filter out groupchat messages while querying the archive.

!!! Warning
The `include-groupchat` filter doesn't work for Cassandra backend.

#### `modules.mod_mam.pm.same_mam_id_for_peers`
* **Syntax:** boolean
Expand Down
16 changes: 8 additions & 8 deletions doc/open-extensions/mam.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
The new fields allow to improve the performance of the counting queries for very big archives
by changing how count and index functions work.

- `from_id` - returns and counts messages with ids `id >= from_id` only (`from_id` is included into the set).
- `to_id` - returns and counts messages with ids `id <= to_id` only (`to_id` is included into the set).
- `after_id` - returns and counts messages with ids `id > after_id` only (`after_id` is not included into the set).
- `before_id` - returns and counts messages with ids `id < before_id` only (`before_id` is not included into the set).
- `from-id` - returns and counts messages with ids `id >= from-id` only (`from-id` is included into the set).
- `to-id` - returns and counts messages with ids `id <= to-id` only (`to-id` is included into the set).
- `after-id` - returns and counts messages with ids `id > after-id` only (`after-id` is not included into the set).
- `before-id` - returns and counts messages with ids `id < before-id` only (`before-id` is not included into the set).
- `simple` - do not return count and offset fields in the result.

The fields could be combined together. If two filters are provided, both would
Expand All @@ -21,7 +21,7 @@ Example from `pagination_first_page_after_id4` testcase:
The client has downloaded his archive and got disconnected.
He knows, that the last message he has on his device has id=BO7CH1JOF801.
He wants to receive new messages that were sent while he has been disconnected
using a page size 5.
using a page size 5.

In this mode, the client would get the oldest messages first.

Expand All @@ -32,7 +32,7 @@ Testcase: the client has messages 1-15 in his archive.
<iq type='set' id='req1'>
<query xmlns='urn:xmpp:mam:1' queryid='first_page_after_id4'>
<x xmlns='jabber:x:data'>
<field var='after_id'>
<field var='after-id'>
<value>BO7CH1JOF801</value> <!-- id of the Message #4 -->
</field>
</x>
Expand Down Expand Up @@ -85,7 +85,7 @@ more messages.
<iq type='set' id='req2'>
<query xmlns='urn:xmpp:mam:1' queryid='first_page_after_id9'>
<x xmlns='jabber:x:data'>
<field var='after_id'>
<field var='after-id'>
<value>BO7CH1K3TU01</value> <!-- id of the Message #9 -->
</field>
</x>
Expand All @@ -112,7 +112,7 @@ Example `pagination_last_page_after_id4`.
<iq type='set' id='req3'>
<query xmlns='urn:xmpp:mam:1' queryid='last_page_after_id4'>
<x xmlns='jabber:x:data'>
<field var='after_id'>
<field var='after-id'>
<value>BO7CUCVVS6O1</value>
</field>
</x>
Expand Down
2 changes: 2 additions & 0 deletions include/mongoose_ns.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
-define(NS_SERVERINFO, <<"http://jabber.org/network/serverinfo">>).
-define(NS_MAM_04, <<"urn:xmpp:mam:1">>). % MAM 0.4.1 or 0.5
-define(NS_MAM_06, <<"urn:xmpp:mam:2">>). % MAM 0.6
-define(NS_MAM_GC_FIELD, <<"urn:xmpp:mam:2#groupchat-field">>).
-define(NS_MAM_GC_AVAILABLE, <<"urn:xmpp:mam:2#groupchat-available">>).
-define(NS_HTTP_UPLOAD_030, <<"urn:xmpp:http:upload:0">>).
-define(NS_PUSH, <<"urn:xmpp:push:0">>). % Push Notifications v0.2.1
-define(NS_STANZAID, <<"urn:xmpp:sid:0">>).
Expand Down
3 changes: 3 additions & 0 deletions priv/elasticsearch/pm.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
"body": {
"type": "text",
"analyzer": "english"
},
"is_groupchat": {
"type": "boolean"
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions priv/migrations/mssql_6.2.0_x.x.x.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Store information whether the message is of type "groupchat" in the user's archive
ALTER TABLE mam_message
ADD is_groupchat smallint NOT NULL DEFAULT 0;
3 changes: 3 additions & 0 deletions priv/migrations/mysql_6.2.0_x.x.x.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Store information whether the message is of type "groupchat" in the user's archive
ALTER TABLE mam_message
ADD COLUMN is_groupchat boolean NOT NULL DEFAULT false;
3 changes: 3 additions & 0 deletions priv/migrations/pgsql_6.2.0_x.x.x.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Store information whether the message is of type "groupchat" in the user's archive
ALTER TABLE mam_message
ADD COLUMN is_groupchat boolean NOT NULL DEFAULT false;
5 changes: 3 additions & 2 deletions priv/mssql2012.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ CREATE TABLE [dbo].[mam_message](
[message] [varbinary](max) NOT NULL,
[search_body] [nvarchar](max) NOT NULL,
[origin_id] [nvarchar](250) NULL,
[is_groupchat] [smallint] NOT NULL,
CONSTRAINT [PK_mam_message_user_id] PRIMARY KEY CLUSTERED
(
[user_id] ASC,
Expand Down Expand Up @@ -214,8 +215,8 @@ CREATE TABLE [dbo].[privacy_list](
[created_at] [datetime] NOT NULL,
CONSTRAINT [PK_privacy_list] PRIMARY KEY CLUSTERED
(
[server] ASC,
[username] ASC,
[server] ASC,
[username] ASC,
[name] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
Expand Down
1 change: 1 addition & 0 deletions priv/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ CREATE TABLE mam_message(
message mediumblob NOT NULL,
search_body mediumtext,
origin_id varchar(250) CHARACTER SET binary,
is_groupchat boolean NOT NULL,
PRIMARY KEY (user_id, id),
INDEX i_mam_message_rem USING BTREE (user_id, remote_bare_jid, id)
) CHARACTER SET utf8mb4
Expand Down
Loading

0 comments on commit d2f609a

Please sign in to comment.