diff --git a/priv/msgs/nl.msg b/priv/msgs/nl.msg index 9dfd896b..f35ff07a 100644 --- a/priv/msgs/nl.msg +++ b/priv/msgs/nl.msg @@ -386,3 +386,17 @@ {"Your Jabber account was successfully created.","Uw Jabber-account is succesvol gecreeerd."}. {"Your Jabber account was successfully deleted.","Uw Jabber-account is succesvol verwijderd."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Uw berichten aan ~s worden geblokkeerd. Om ze te deblokkeren, ga naar ~s"}. +% mod_logdb +{"Users Messages", "Gebruikersberichten"}. +{"Date", "Datum"}. +{"Count", "Aantal"}. +{"Logged messages for ~s", "Gelogde berichten van ~s"}. +{"Logged messages for ~s at ~s", "Gelogde berichten van ~s op ~s"}. +{" at ", " op "}. +{"No logged messages for ~s", "Geen gelogde berichten van ~s"}. +{"No logged messages for ~s at ~s", "Geen gelogde berichten van ~s op ~s"}. +{"Date, Time", "Datum en tijd"}. +{"Direction: Jid", "Richting: Jabber ID"}. +{"Subject", "Onderwerp"}. +{"Body", "Berichtveld"}. +{"Messages", "Berichten"}. diff --git a/priv/msgs/pl.msg b/priv/msgs/pl.msg index 6b303cc9..664f1c95 100644 --- a/priv/msgs/pl.msg +++ b/priv/msgs/pl.msg @@ -390,3 +390,29 @@ {"Your Jabber account was successfully created.","Twoje konto zostało stworzone."}. {"Your Jabber account was successfully deleted.","Twoje konto zostało usunięte."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Twoje wiadomości do ~s są blokowane. Aby je odblokować, odwiedź ~s"}. +% mod_logdb +{"Users Messages", "Wiadomości użytkownika"}. +{"Date", "Data"}. +{"Count", "Liczba"}. +{"Logged messages for ~s", "Zapisane wiadomości dla ~s"}. +{"Logged messages for ~s at", "Zapisane wiadomości dla ~s o ~s"}. +{" at ", " o "}. +{"No logged messages for ~s", "Brak zapisanych wiadomości dla ~s"}. +{"No logged messages for ~s at ~s", "Brak zapisanych wiadomości dla ~s o ~s"}. +{"Date, Time", "Data, Godzina"}. +{"Direction: Jid", "Kierunek: Jid"}. +{"Subject", "Temat"}. +{"Body", "Treść"}. +{"Messages","Wiadomości"}. +{"Filter Selected", "Odfiltruj zaznaczone"}. +{"Do Not Log Messages", "Nie zapisuj wiadomości"}. +{"Log Messages", "Zapisuj wiadomości"}. +{"Messages logging engine", "System zapisywania historii rozmów"}. +{"Default", "Domyślne"}. +{"Set logging preferences", "Ustaw preferencje zapisywania"}. +{"Messages logging engine settings", "Ustawienia systemu logowania"}. +{"Set run-time settings", "Zapisz ustawienia systemu logowania"}. +{"Groupchat messages logging", "Zapisywanie rozmów z konferencji"}. +{"Jids/Domains to ignore", "JID/Domena która ma być ignorowana"}. +{"Purge messages older than (days)", "Usuń wiadomości starsze niż (w dniach)"}. +{"Poll users settings (seconds)", "Czas aktualizacji preferencji użytkowników (sekundy)"}. diff --git a/priv/msgs/ru.msg b/priv/msgs/ru.msg index 05849f51..b87bf9bb 100644 --- a/priv/msgs/ru.msg +++ b/priv/msgs/ru.msg @@ -386,3 +386,33 @@ {"Your Jabber account was successfully created.","Ваш Jabber-аккаунт был успешно создан."}. {"Your Jabber account was successfully deleted.","Ваш Jabber-аккаунт был успешно удален."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Ваши сообщения к ~s блокируются. Для снятия блокировки перейдите по ссылке ~s"}. +% mod_logdb.erl +{"Users Messages", "Сообщения пользователей"}. +{"Date", "Дата"}. +{"Count", "Количество"}. +{"Logged messages for ~s", "Сохранённые cообщения для ~s"}. +{"Logged messages for ~s at ~s", "Сохранённые cообщения для ~s за ~s"}. +{" at ", " за "}. +{"No logged messages for ~s", "Отсутствуют сообщения для ~s"}. +{"No logged messages for ~s at ~s", "Отсутствуют сообщения для ~s за ~s"}. +{"Date, Time", "Дата, Время"}. +{"Direction: Jid", "Направление: Jid"}. +{"Subject", "Тема"}. +{"Body", "Текст"}. +{"Messages", "Сообщения"}. +{"Filter Selected", "Отфильтровать выделенные"}. +{"Do Not Log Messages", "Не сохранять сообщения"}. +{"Log Messages", "Сохранять сообщения"}. +{"Messages logging engine", "Система логирования сообщений"}. +{"Default", "По умолчанию"}. +{"Set logging preferences", "Задайте настройки логирования"}. +{"Messages logging engine users", "Пользователи системы логирования сообщений"}. +{"Messages logging engine settings", "Настройки системы логирования сообщений"}. +{"Set run-time settings", "Задайте текущие настройки"}. +{"Groupchat messages logging", "Логирование сообщений типа groupchat"}. +{"Jids/Domains to ignore", "Игнорировать следующие jids/домены"}. +{"Purge messages older than (days)", "Удалять сообщения старее чем (дни)"}. +{"Poll users settings (seconds)", "Обновлять настройки пользователей через (секунд)"}. +{"Drop", "Удалять"}. +{"Do not drop", "Не удалять"}. +{"Drop messages on user removal", "Удалять сообщения при удалении пользователя"}. diff --git a/priv/msgs/uk.msg b/priv/msgs/uk.msg index a1159b53..4bdab4c5 100644 --- a/priv/msgs/uk.msg +++ b/priv/msgs/uk.msg @@ -390,3 +390,33 @@ {"Your Jabber account was successfully created.","Ваш Jabber-акаунт було успішно створено."}. {"Your Jabber account was successfully deleted.","Ваш Jabber-акаунт було успішно видалено."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Ваші повідомлення до ~s блокуються. Для розблокування відвідайте ~s"}. +% mod_logdb +{"Users Messages", "Повідомлення користувачів"}. +{"Date", "Дата"}. +{"Count", "Кількість"}. +{"Logged messages for ~s", "Збережені повідомлення для ~s"}. +{"Logged messages for ~s at ~s", "Збережені повідомлення для ~s за ~s"}. +{" at ", " за "}. +{"No logged messages for ~s", "Відсутні повідомлення для ~s"}. +{"No logged messages for ~s at ~s", "Відсутні повідомлення для ~s за ~s"}. +{"Date, Time", "Дата, Час"}. +{"Direction: Jid", "Напрямок: Jid"}. +{"Subject", "Тема"}. +{"Body", "Текст"}. +{"Messages", "Повідомлення"}. +{"Filter Selected", "Відфільтрувати виділені"}. +{"Do Not Log Messages", "Не зберігати повідомлення"}. +{"Log Messages", "Зберігати повідомлення"}. +{"Messages logging engine", "Система збереження повідомлень"}. +{"Default", "За замовчуванням"}. +{"Set logging preferences", "Вкажіть налагоджування збереження повідомлень"}. +{"Messages logging engine users", "Користувачі системи збереження повідомлень"}. +{"Messages logging engine settings", "Налагоджування системи збереження повідомлень"}. +{"Set run-time settings", "Вкажіть поточні налагоджування"}. +{"Groupchat messages logging", "Збереження повідомлень типу groupchat"}. +{"Jids/Domains to ignore", "Ігнорувати наступні jids/домени"}. +{"Purge messages older than (days)", "Видаляти повідомлення старіші ніж (дні)"}. +{"Poll users settings (seconds)", "Оновлювати налагоджування користувачів кожні (секунд)"}. +{"Drop", "Видаляти"}. +{"Do not drop", "Не видаляти"}. +{"Drop messages on user removal", "Видаляти повідомлення під час видалення користувача"}. diff --git a/rebar.config b/rebar.config index 477343c5..db58ab69 100644 --- a/rebar.config +++ b/rebar.config @@ -33,8 +33,8 @@ {eimp, ".*", {git, "https://github.com/processone/eimp", {tag, "1.0.9"}}}, {if_var_true, stun, {stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.0.26"}}}}, {if_var_true, sip, {esip, ".*", {git, "https://github.com/processone/esip", {tag, "1.0.27"}}}}, - {if_var_true, mysql, {p1_mysql, ".*", {git, "https://github.com/processone/p1_mysql", - {tag, "1.0.8"}}}}, + {if_var_true, mysql, {p1_mysql, ".*", {git, "https://github.com/paleg/p1_mysql", + {branch, "multi"}}}}, {if_var_true, pgsql, {p1_pgsql, ".*", {git, "https://github.com/processone/p1_pgsql", {tag, "1.1.6"}}}}, {if_var_true, sqlite, {sqlite3, ".*", {git, "https://github.com/processone/erlang-sqlite3", diff --git a/src/gen_logdb.erl b/src/gen_logdb.erl new file mode 100644 index 00000000..8bad1129 --- /dev/null +++ b/src/gen_logdb.erl @@ -0,0 +1,162 @@ +%%%---------------------------------------------------------------------- +%%% File : gen_logdb.erl +%%% Author : Oleg Palij (mailto:o.palij@gmail.com) +%%% Purpose : Describes generic behaviour for mod_logdb backends. +%%% Url : https://paleg.github.io/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(gen_logdb). +-author('o.palij@gmail.com'). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + % called from handle_info(start, _) + % it should logon database and return reference to started instance + % start(VHost, Opts) -> {ok, SPid} | error + % Options - list of options to connect to db + % Types: Options = list() -> [] | + % [{user, "logdb"}, + % {pass, "1234"}, + % {db, "logdb"}] | ... + % VHost = list() -> "jabber.example.org" + {start, 2}, + + % called from cleanup/1 + % it should logoff database and do cleanup + % stop(VHost) + % Types: VHost = list() -> "jabber.example.org" + {stop, 1}, + + % called from handle_call({addlog, _}, _, _) + % it should log messages to database + % log_message(VHost, Msg) -> ok | error + % Types: + % VHost = list() -> "jabber.example.org" + % Msg = record() -> #msg + {log_message, 2}, + + % called from ejabberdctl rebuild_stats + % it should rebuild stats table (if used) for vhost + % rebuild_stats(VHost) + % Types: + % VHost = list() -> "jabber.example.org" + {rebuild_stats, 1}, + + % it should rebuild stats table (if used) for vhost at Date + % rebuild_stats_at(VHost, Date) + % Types: + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + {rebuild_stats_at, 2}, + + % called from user_messages_at_parse_query/5 + % it should delete selected user messages at date + % delete_messages_by_user_at(VHost, Msgs, Date) -> ok | error + % Types: + % VHost = list() -> "jabber.example.org" + % Msgs = list() -> [ #msg1, msg2, ... ] + % Date = list() -> "2007-02-12" + {delete_messages_by_user_at, 3}, + + % called from user_messages_parse_query/4 | vhost_messages_at_parse_query/4 + % it should delete all user messages at date + % delete_all_messages_by_user_at(User, VHost, Date) -> ok | error + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + {delete_all_messages_by_user_at, 3}, + + % called from vhost_messages_parse_query/3 + % it should delete messages for vhost at date and update stats + % delete_messages_at(VHost, Date) -> ok | error + % Types: + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + {delete_messages_at, 2}, + + % called from ejabberd_web_admin:vhost_messages_stats/3 + % it should return sorted list of count of messages by dates for vhost + % get_vhost_stats(VHost) -> {ok, [{Date1, Msgs_count1}, {Date2, Msgs_count2}, ... ]} | + % {error, Reason} + % Types: + % VHost = list() -> "jabber.example.org" + % DateN = list() -> "2007-02-12" + % Msgs_countN = number() -> 241 + {get_vhost_stats, 1}, + + % called from ejabberd_web_admin:vhost_messages_stats_at/4 + % it should return sorted list of count of messages by users at date for vhost + % get_vhost_stats_at(VHost, Date) -> {ok, [{User1, Msgs_count1}, {User2, Msgs_count2}, ....]} | + % {error, Reason} + % Types: + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + % UserN = list() -> "admin" + % Msgs_countN = number() -> 241 + {get_vhost_stats_at, 2}, + + % called from ejabberd_web_admin:user_messages_stats/4 + % it should return sorted list of count of messages by date for user at vhost + % get_user_stats(User, VHost) -> {ok, [{Date1, Msgs_count1}, {Date2, Msgs_count2}, ...]} | + % {error, Reason} + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + % DateN = list() -> "2007-02-12" + % Msgs_countN = number() -> 241 + {get_user_stats, 2}, + + % called from ejabberd_web_admin:user_messages_stats_at/5 + % it should return all user messages at date + % get_user_messages_at(User, VHost, Date) -> {ok, Msgs} | {error, Reason} + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + % Msgs = list() -> [ #msg1, msg2, ... ] + {get_user_messages_at, 3}, + + % called from many places + % it should return list of dates for vhost + % get_dates(VHost) -> [Date1, Date2, ... ] + % Types: + % VHost = list() -> "jabber.example.org" + % DateN = list() -> "2007-02-12" + {get_dates, 1}, + + % called from start + % it should return list with users settings for VHost in db + % get_users_settings(VHost) -> [#user_settings1, #user_settings2, ... ] | error + % Types: + % VHost = list() -> "jabber.example.org" + {get_users_settings, 1}, + + % called from many places + % it should return User settings at VHost from db + % get_user_settings(User, VHost) -> error | {ok, #user_settings} + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + {get_user_settings, 2}, + + % called from web admin + % it should set User settings at VHost + % set_user_settings(User, VHost, #user_settings) -> ok | error + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + {set_user_settings, 3}, + + % called from remove_user (ejabberd hook) + % it should remove user messages and settings at VHost + % drop_user(User, VHost) -> ok | error + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + {drop_user, 2} + ]; +behaviour_info(_) -> + undefined. diff --git a/src/mod_logdb.erl b/src/mod_logdb.erl new file mode 100644 index 00000000..bf0240d1 --- /dev/null +++ b/src/mod_logdb.erl @@ -0,0 +1,1951 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb.erl +%%% Author : Oleg Palij (mailto:o.palij@gmail.com) +%%% Purpose : Frontend for log user messages to db +%%% Url : https://paleg.github.io/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb). +-author('o.palij@gmail.com'). + +-behaviour(gen_server). +-behaviour(gen_mod). + +% supervisor +-export([start_link/2]). +% gen_mod +-export([start/2, stop/1, + mod_opt_type/1, + depends/2, reload/3]). +% gen_server +-export([code_change/3, + handle_call/3, handle_cast/2, handle_info/2, + init/1, terminate/2]). +% hooks +-export([send_packet/1, receive_packet/1, offline_message/1, remove_user/2]). +-export([get_local_identity/5, + get_local_features/5, + get_local_items/5, + adhoc_local_items/4, + adhoc_local_commands/4 + ]). +% ejabberdctl +-export([rebuild_stats/1, + copy_messages/1, copy_messages_ctl/3, copy_messages_int_tc/1]). +% +-export([get_vhost_stats/1, get_vhost_stats_at/2, + get_user_stats/2, get_user_messages_at/3, + get_dates/1, + sort_stats/1, + convert_timestamp/1, convert_timestamp_brief/1, + get_user_settings/2, set_user_settings/3, + user_messages_at_parse_query/4, user_messages_parse_query/3, + vhost_messages_parse_query/2, vhost_messages_at_parse_query/4, + list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + get_module_settings/1, set_module_settings/2, + purge_old_records/2]). +% webadmin hooks +-export([webadmin_menu/3, + webadmin_user/4, + webadmin_page/3, + user_parse_query/5]). +% webadmin queries +-export([vhost_messages_stats/3, + vhost_messages_stats_at/4, + user_messages_stats/4, + user_messages_stats_at/5]). + +-include("mod_logdb.hrl"). +-include("xmpp.hrl"). +-include("mod_roster.hrl"). +-include("ejabberd_commands.hrl"). +-include("adhoc.hrl"). +-include("ejabberd_web_admin.hrl"). +-include("ejabberd_http.hrl"). +-include("logger.hrl"). + +-define(PROCNAME, ejabberd_mod_logdb). +% gen_server call timeout +-define(CALL_TIMEOUT, 10000). + +-record(state, {vhost, dbmod, backendPid, monref, purgeRef, pollRef, dbopts, dbs, dolog_default, ignore_jids, groupchat, purge_older_days, poll_users_settings, drop_messages_on_user_removal}). + +ets_settings_table(VHost) -> list_to_atom("ets_logdb_settings_" ++ binary_to_list(VHost)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod/gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% ejabberd starts module +start(VHost, Opts) -> + ChildSpec = + {gen_mod:get_module_proc(VHost, ?PROCNAME), + {?MODULE, start_link, [VHost, Opts]}, + permanent, + 1000, + worker, + [?MODULE]}, + % add child to ejabberd_sup + supervisor:start_child(ejabberd_gen_mod_sup, ChildSpec). + +depends(_Host, _Opts) -> + []. + +reload(_Host, _NewOpts, _OldOpts) -> + % TODO + ok. + +% supervisor starts gen_server +start_link(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + {ok, Pid} = gen_server:start_link({local, Proc}, ?MODULE, [VHost, Opts], []), + Pid ! start, + {ok, Pid}. + +init([VHost, Opts]) -> + process_flag(trap_exit, true), + DBsRaw = gen_mod:get_opt(dbs, Opts, fun(A) -> A end, [{mnesia, []}]), + DBs = case lists:keysearch(mnesia, 1, DBsRaw) of + false -> lists:append(DBsRaw, [{mnesia,[]}]); + {value, _} -> DBsRaw + end, + VHostDB = gen_mod:get_opt(vhosts, Opts, fun(A) -> A end, [{VHost, mnesia}]), + % 10 is default because of using in clustered environment + PollUsersSettings = gen_mod:get_opt(poll_users_settings, Opts, fun(A) -> A end, 10), + + {DBName, DBOpts} = + case lists:keysearch(VHost, 1, VHostDB) of + false -> + ?WARNING_MSG("There is no logging backend defined for '~s', switching to mnesia", [VHost]), + {mnesia, []}; + {value,{_, DBNameResult}} -> + case lists:keysearch(DBNameResult, 1, DBs) of + false -> + ?WARNING_MSG("There is no such logging backend '~s' defined for '~s', switching to mnesia", [DBNameResult, VHost]), + {mnesia, []}; + {value, {_, DBOptsResult}} -> + {DBNameResult, DBOptsResult} + end + end, + + ?MYDEBUG("Starting mod_logdb for '~s' with '~s' backend", [VHost, DBName]), + + DBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(DBName)), + + {ok, #state{vhost=VHost, + dbmod=DBMod, + dbopts=DBOpts, + % dbs used for convert messages from one backend to other + dbs=DBs, + dolog_default=gen_mod:get_opt(dolog_default, Opts, fun(A) -> A end, true), + drop_messages_on_user_removal=gen_mod:get_opt(drop_messages_on_user_removal, Opts, fun(A) -> A end, true), + ignore_jids=gen_mod:get_opt(ignore_jids, Opts, fun(A) -> A end, []), + groupchat=gen_mod:get_opt(groupchat, Opts, fun(A) -> A end, none), + purge_older_days=gen_mod:get_opt(purge_older_days, Opts, fun(A) -> A end, never), + poll_users_settings=PollUsersSettings}}. + +cleanup(#state{vhost=VHost} = _State) -> + ?MYDEBUG("Stopping ~s for ~p", [?MODULE, VHost]), + + %ets:delete(ets_settings_table(VHost)), + + ejabberd_hooks:delete(remove_user, VHost, ?MODULE, remove_user, 90), + ejabberd_hooks:delete(user_send_packet, VHost, ?MODULE, send_packet, 90), + ejabberd_hooks:delete(user_receive_packet, VHost, ?MODULE, receive_packet, 90), + ejabberd_hooks:delete(offline_message_hook, VHost, ?MODULE, offline_message, 40), + + ejabberd_hooks:delete(adhoc_local_commands, VHost, ?MODULE, adhoc_local_commands, 50), + ejabberd_hooks:delete(adhoc_local_items, VHost, ?MODULE, adhoc_local_items, 50), + ejabberd_hooks:delete(disco_local_identity, VHost, ?MODULE, get_local_identity, 50), + ejabberd_hooks:delete(disco_local_features, VHost, ?MODULE, get_local_features, 50), + ejabberd_hooks:delete(disco_local_items, VHost, ?MODULE, get_local_items, 50), + + ejabberd_hooks:delete(webadmin_menu_host, VHost, ?MODULE, webadmin_menu, 70), + ejabberd_hooks:delete(webadmin_user, VHost, ?MODULE, webadmin_user, 50), + ejabberd_hooks:delete(webadmin_page_host, VHost, ?MODULE, webadmin_page, 50), + ejabberd_hooks:delete(webadmin_user_parse_query, VHost, ?MODULE, user_parse_query, 50), + + ?MYDEBUG("Removed hooks for ~p", [VHost]), + + ejabberd_commands:unregister_commands(get_commands_spec()), + ?MYDEBUG("Unregistered commands for ~p", [VHost]). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + %gen_server:call(Proc, {cleanup}), + %?MYDEBUG("Cleanup in stop finished!!!!", []), + %timer:sleep(10000), + ok = supervisor:terminate_child(ejabberd_gen_mod_sup, Proc), + ok = supervisor:delete_child(ejabberd_gen_mod_sup, Proc). + +get_commands_spec() -> + [#ejabberd_commands{name = rebuild_stats, tags = [logdb], + desc = "Rebuild mod_logdb stats for given host", + module = ?MODULE, function = rebuild_stats, + args = [{host, binary}], + result = {res, rescode}}, + #ejabberd_commands{name = copy_messages, tags = [logdb], + desc = "Copy logdb messages from given backend to current backend for given host", + module = ?MODULE, function = copy_messages_ctl, + args = [{host, binary}, {backend, binary}, {date, binary}], + result = {res, rescode}}]. + +mod_opt_type(dbs) -> + fun (A) when is_list(A) -> A end; +mod_opt_type(vhosts) -> + fun (A) when is_list(A) -> A end; +mod_opt_type(poll_users_settings) -> + fun (I) when is_integer(I) -> I end; +mod_opt_type(groupchat) -> + fun (all) -> all; + (send) -> send; + (none) -> none + end; +mod_opt_type(dolog_default) -> + fun (B) when is_boolean(B) -> B end; +mod_opt_type(ignore_jids) -> + fun (A) when is_list(A) -> A end; +mod_opt_type(purge_older_days) -> + fun (I) when is_integer(I) -> I end; +mod_opt_type(_) -> + [dbs, vhosts, poll_users_settings, groupchat, dolog_default, ignore_jids, purge_older_days]. + +handle_call({cleanup}, _From, State) -> + cleanup(State), + ?MYDEBUG("Cleanup finished!!!!!", []), + {reply, ok, State}; +handle_call({get_dates}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_dates(VHost), + {reply, Reply, State}; +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% ejabberd_web_admin callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +handle_call({delete_messages_by_user_at, PMsgs, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:delete_messages_by_user_at(VHost, PMsgs, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:delete_all_messages_by_user_at(binary_to_list(User), VHost, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:delete_messages_at(VHost, Date), + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_vhost_stats(VHost), + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_vhost_stats_at(VHost, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_user_stats(binary_to_list(User), VHost), + {reply, Reply, State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_user_messages_at(binary_to_list(User), VHost, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbmod=_DBMod, vhost=VHost}=State) -> + Reply = case ets:match_object(ets_settings_table(VHost), + #user_settings{owner_name=User, _='_'}) of + [Set] -> Set; + _ -> #user_settings{owner_name=User, + dolog_default=State#state.dolog_default, + dolog_list=[], + donotlog_list=[]} + end, + {reply, Reply, State}; +% TODO: remove User ?? +handle_call({set_user_settings, User, GSet}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Set = GSet#user_settings{owner_name=User}, + Reply = + case ets:match_object(ets_settings_table(VHost), + #user_settings{owner_name=User, _='_'}) of + [Set] -> + ok; + _ -> + case DBMod:set_user_settings(binary_to_list(User), VHost, Set) of + error -> + error; + ok -> + true = ets:insert(ets_settings_table(VHost), Set), + ok + end + end, + {reply, Reply, State}; +handle_call({get_module_settings}, _From, State) -> + {reply, State, State}; +handle_call({set_module_settings, #state{purge_older_days=PurgeDays, + poll_users_settings=PollSec} = Settings}, + _From, + #state{purgeRef=PurgeRefOld, + pollRef=PollRefOld, + purge_older_days=PurgeDaysOld, + poll_users_settings=PollSecOld} = State) -> + PurgeRef = if + PurgeDays == never, PurgeDaysOld /= never -> + {ok, cancel} = timer:cancel(PurgeRefOld), + disabled; + is_integer(PurgeDays), PurgeDaysOld == never -> + set_purge_timer(PurgeDays); + true -> + PurgeRefOld + end, + + PollRef = if + PollSec == PollSecOld -> + PollRefOld; + PollSec == 0, PollSecOld /= 0 -> + {ok, cancel} = timer:cancel(PollRefOld), + disabled; + is_integer(PollSec), PollSecOld == 0 -> + set_poll_timer(PollSec); + is_integer(PollSec), PollSecOld /= 0 -> + {ok, cancel} = timer:cancel(PollRefOld), + set_poll_timer(PollSec) + end, + + NewState = State#state{dolog_default=Settings#state.dolog_default, + ignore_jids=Settings#state.ignore_jids, + groupchat=Settings#state.groupchat, + drop_messages_on_user_removal=Settings#state.drop_messages_on_user_removal, + purge_older_days=PurgeDays, + poll_users_settings=PollSec, + purgeRef=PurgeRef, + pollRef=PollRef}, + {reply, ok, NewState}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% end ejabberd_web_admin callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +% ejabberd_hooks call +handle_cast({addlog, Direction, Owner, Peer, Packet}, #state{dbmod=DBMod, vhost=VHost}=State) -> + case filter(Owner, Peer, State) of + true -> + case catch packet_parse(Owner, Peer, Packet, Direction, State) of + ignore -> + ok; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to parse: ~p", [Reason]); + Msg -> + DBMod:log_message(VHost, Msg) + end; + false -> + ok + end, + {noreply, State}; +handle_cast({remove_user, User}, #state{dbmod=DBMod, vhost=VHost}=State) -> + case State#state.drop_messages_on_user_removal of + true -> + DBMod:drop_user(binary_to_list(User), VHost), + ?INFO_MSG("Launched ~s@~s removal", [User, VHost]); + false -> + ?INFO_MSG("Message removing is disabled. Keeping messages for ~s@~s", [User, VHost]) + end, + {noreply, State}; +% ejabberdctl rebuild_stats/3 +handle_cast({rebuild_stats}, #state{dbmod=DBMod, vhost=VHost}=State) -> + DBMod:rebuild_stats(VHost), + {noreply, State}; +handle_cast({copy_messages, Backend}, State) -> + spawn(?MODULE, copy_messages, [[State, Backend, []]]), + {noreply, State}; +handle_cast({copy_messages, Backend, Date}, State) -> + spawn(?MODULE, copy_messages, [[State, Backend, [binary_to_list(Date)]]]), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +% return: disabled | timer reference +set_purge_timer(PurgeDays) -> + case PurgeDays of + never -> disabled; + Days when is_integer(Days) -> + {ok, Ref1} = timer:send_interval(timer:hours(24), scheduled_purging), + Ref1 + end. + +% return: disabled | timer reference +set_poll_timer(PollSec) -> + if + PollSec > 0 -> + {ok, Ref2} = timer:send_interval(timer:seconds(PollSec), poll_users_settings), + Ref2; + % db polling disabled + PollSec == 0 -> + disabled; + true -> + {ok, Ref3} = timer:send_interval(timer:seconds(10), poll_users_settings), + Ref3 + end. + +% actual starting of logging +% from timer:send_after (in init) +handle_info(start, #state{dbmod=DBMod, vhost=VHost}=State) -> + case DBMod:start(VHost, State#state.dbopts) of + {error,{already_started,_}} -> + ?MYDEBUG("backend module already started - trying to stop it", []), + DBMod:stop(VHost), + {stop, already_started, State}; + {error, Reason} -> + timer:sleep(30000), + ?ERROR_MSG("Failed to start: ~p", [Reason]), + {stop, db_connection_failed, State}; + {ok, SPid} -> + ?INFO_MSG("~p connection established", [DBMod]), + + MonRef = erlang:monitor(process, SPid), + + ets:new(ets_settings_table(VHost), [named_table,public,set,{keypos, #user_settings.owner_name}]), + DoLog = case DBMod:get_users_settings(VHost) of + {ok, Settings} -> [Sett#user_settings{owner_name = iolist_to_binary(Sett#user_settings.owner_name)} || Sett <- Settings]; + {error, _Reason} -> [] + end, + ets:insert(ets_settings_table(VHost), DoLog), + + TrefPurge = set_purge_timer(State#state.purge_older_days), + TrefPoll = set_poll_timer(State#state.poll_users_settings), + + ejabberd_hooks:add(remove_user, VHost, ?MODULE, remove_user, 90), + ejabberd_hooks:add(user_send_packet, VHost, ?MODULE, send_packet, 90), + ejabberd_hooks:add(user_receive_packet, VHost, ?MODULE, receive_packet, 90), + ejabberd_hooks:add(offline_message_hook, VHost, ?MODULE, offline_message, 40), + + ejabberd_hooks:add(adhoc_local_commands, VHost, ?MODULE, adhoc_local_commands, 50), + ejabberd_hooks:add(disco_local_items, VHost, ?MODULE, get_local_items, 50), + ejabberd_hooks:add(disco_local_identity, VHost, ?MODULE, get_local_identity, 50), + ejabberd_hooks:add(disco_local_features, VHost, ?MODULE, get_local_features, 50), + ejabberd_hooks:add(adhoc_local_items, VHost, ?MODULE, adhoc_local_items, 50), + + ejabberd_hooks:add(webadmin_menu_host, VHost, ?MODULE, webadmin_menu, 70), + ejabberd_hooks:add(webadmin_user, VHost, ?MODULE, webadmin_user, 50), + ejabberd_hooks:add(webadmin_page_host, VHost, ?MODULE, webadmin_page, 50), + ejabberd_hooks:add(webadmin_user_parse_query, VHost, ?MODULE, user_parse_query, 50), + + ?MYDEBUG("Added hooks for ~p", [VHost]), + + ejabberd_commands:register_commands(get_commands_spec()), + ?MYDEBUG("Registered commands for ~p", [VHost]), + + NewState=State#state{monref = MonRef, backendPid=SPid, purgeRef=TrefPurge, pollRef=TrefPoll}, + {noreply, NewState}; + Rez -> + ?ERROR_MSG("Rez=~p", [Rez]), + timer:sleep(30000), + {stop, db_connection_failed, State} + end; +% from timer:send_interval/2 (in start handle_info) +handle_info(scheduled_purging, #state{vhost=VHost, purge_older_days=Days} = State) -> + ?MYDEBUG("Starting scheduled purging of old records for ~p", [VHost]), + spawn(?MODULE, purge_old_records, [VHost, integer_to_list(Days)]), + {noreply, State}; +% from timer:send_interval/2 (in start handle_info) +handle_info(poll_users_settings, #state{dbmod=DBMod, vhost=VHost}=State) -> + {ok, DoLog} = DBMod:get_users_settings(VHost), + ?MYDEBUG("DoLog=~p", [DoLog]), + true = ets:delete_all_objects(ets_settings_table(VHost)), + ets:insert(ets_settings_table(VHost), DoLog), + {noreply, State}; +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, db_connection_dropped, State}; +handle_info({fetch_result, _, _}, State) -> + ?MYDEBUG("Got timed out mysql fetch result", []), + {noreply, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(db_connection_failed, _State) -> + ok; +terminate(db_connection_dropped, State) -> + ?MYDEBUG("Got terminate with db_connection_dropped", []), + cleanup(State), + ok; +terminate(Reason, #state{monref=undefined} = State) -> + ?MYDEBUG("Got terminate with undefined monref.~nReason: ~p", [Reason]), + cleanup(State), + ok; +terminate(Reason, #state{dbmod=DBMod, vhost=VHost, monref=MonRef, backendPid=Pid} = State) -> + ?INFO_MSG("Reason: ~p", [Reason]), + case erlang:is_process_alive(Pid) of + true -> + erlang:demonitor(MonRef, [flush]), + DBMod:stop(VHost); + false -> + ok + end, + cleanup(State), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% ejabberd_hooks callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% TODO: change to/from to list as sql stores it as list +send_packet({Pkt, #{jid := Owner} = C2SState}) -> + VHost = Owner#jid.lserver, + Peer = xmpp:get_to(Pkt), + %?MYDEBUG("send_packet. Peer=~p, Owner=~p", [Peer, Owner]), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {addlog, to, Owner, Peer, Pkt}), + {Pkt, C2SState}. + +receive_packet({Pkt, #{jid := Owner} = C2SState}) -> + VHost = Owner#jid.lserver, + Peer = xmpp:get_from(Pkt), + %?MYDEBUG("receive_packet. Pkt=~p", [Pkt]), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {addlog, from, Owner, Peer, Pkt}), + {Pkt, C2SState}. + +offline_message({_Action, #message{from = Peer, to = Owner} = Pkt} = Acc) -> + VHost = Owner#jid.lserver, + %?MYDEBUG("offline_message. Pkt=~p", [Pkt]), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {addlog, from, Owner, Peer, Pkt}), + Acc. + +remove_user(User, Server) -> + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), + Proc = gen_mod:get_module_proc(LServer, ?PROCNAME), + gen_server:cast(Proc, {remove_user, LUser}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% ejabberdctl +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}), + ok. + +copy_messages_ctl(VHost, Backend, <<"all">>) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {copy_messages, Backend}), + ok; +copy_messages_ctl(VHost, Backend, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {copy_messages, Backend, Date}), + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% misc operations +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +% handle_cast({addlog, E}, _) +% raw packet -> #msg +packet_parse(_Owner, _Peer, #message{type = error}, _Direction, _State) -> + ignore; +packet_parse(_Owner, _Peer, #message{meta = #{sm_copy := true}}, _Direction, _State) -> + ignore; +packet_parse(_Owner, _Peer, #message{meta = #{from_offline := true}}, _Direction, _State) -> + ignore; +packet_parse(Owner, Peer, #message{body = Body, subject = Subject, type = Type}, Direction, State) -> + %?MYDEBUG("Owner=~p, Peer=~p, Direction=~p", [Owner, Peer, Direction]), + %?MYDEBUG("Body=~p, Subject=~p, Type=~p", [Body, Subject, Type]), + SubjectText = xmpp:get_text(Subject), + BodyText = xmpp:get_text(Body), + if (SubjectText == <<"">>) and (BodyText == <<"">>) -> + throw(ignore); + true -> ok + end, + + case Type of + groupchat when State#state.groupchat == send, Direction == to -> + ok; + groupchat when State#state.groupchat == send, Direction == from -> + throw(ignore); + groupchat when State#state.groupchat == none -> + throw(ignore); + _ -> + ok + end, + + #msg{timestamp = get_timestamp(), + owner_name = stringprep:tolower(Owner#jid.user), + peer_name = stringprep:tolower(Peer#jid.user), + peer_server = stringprep:tolower(Peer#jid.server), + peer_resource = Peer#jid.resource, + direction = Direction, + type = misc:atom_to_binary(Type), + subject = SubjectText, + body = BodyText}; +packet_parse(_, _, _, _, _) -> + ignore. + +% called from handle_cast({addlog, _}, _) -> true (log messages) | false (do not log messages) +filter(Owner, Peer, State) -> + OwnerBin = << (Owner#jid.luser)/binary, "@", (Owner#jid.lserver)/binary >>, + OwnerServ = << "@", (Owner#jid.lserver)/binary >>, + PeerBin = << (Peer#jid.luser)/binary, "@", (Peer#jid.lserver)/binary >>, + PeerServ = << "@", (Peer#jid.lserver)/binary >>, + + LogTo = case ets:match_object(ets_settings_table(State#state.vhost), + #user_settings{owner_name=Owner#jid.luser, _='_'}) of + [#user_settings{dolog_default=Default, + dolog_list=DLL, + donotlog_list=DNLL}] -> + + A = lists:member(PeerBin, DLL), + B = lists:member(PeerBin, DNLL), + if + A -> true; + B -> false; + Default == true -> true; + Default == false -> false; + true -> State#state.dolog_default + end; + _ -> State#state.dolog_default + end, + lists:all(fun(O) -> O end, + [not lists:member(OwnerBin, State#state.ignore_jids), + not lists:member(PeerBin, State#state.ignore_jids), + not lists:member(OwnerServ, State#state.ignore_jids), + not lists:member(PeerServ, State#state.ignore_jids), + LogTo]). + +purge_old_records(VHost, Days) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + + Dates = ?MODULE:get_dates(VHost), + DateNow = calendar:datetime_to_gregorian_seconds({date(), {0,0,1}}), + DateDiff = list_to_integer(Days)*24*60*60, + ?MYDEBUG("Purging tables older than ~s days", [Days]), + lists:foreach(fun(Date) -> + [Year, Month, Day] = ejabberd_regexp:split(iolist_to_binary(Date), <<"[^0-9]+">>), + DateInSec = calendar:datetime_to_gregorian_seconds({{binary_to_integer(Year), binary_to_integer(Month), binary_to_integer(Day)}, {0,0,1}}), + if + (DateNow - DateInSec) > DateDiff -> + gen_server:call(Proc, {delete_messages_at, Date}); + true -> + ?MYDEBUG("Skipping messages at ~p", [Date]) + end + end, Dates). + +% called from get_vhost_stats/2, get_user_stats/3 +sort_stats(Stats) -> + % Stats = [{"2003-4-15",1}, {"2006-8-18",1}, ... ] + CFun = fun({TableName, Count}) -> + [Year, Month, Day] = ejabberd_regexp:split(iolist_to_binary(TableName), <<"[^0-9]+">>), + { calendar:datetime_to_gregorian_seconds({{binary_to_integer(Year), binary_to_integer(Month), binary_to_integer(Day)}, {0,0,1}}), Count } + end, + % convert to [{63364377601,1}, {63360662401,1}, ... ] + CStats = lists:map(CFun, Stats), + % sort by date + SortedStats = lists:reverse(lists:keysort(1, CStats)), + % convert to [{"2007-12-9",1}, {"2007-10-27",1}, ... ] sorted list + [{mod_logdb:convert_timestamp_brief(TableSec), Count} || {TableSec, Count} <- SortedStats]. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% Date/Time operations +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% return float seconds elapsed from "zero hour" as list +get_timestamp() -> + {MegaSec, Sec, MicroSec} = now(), + [List] = io_lib:format("~.5f", [MegaSec*1000000 + Sec + MicroSec/1000000]), + List. + +% convert float seconds elapsed from "zero hour" to local time "%Y-%m-%d %H:%M:%S" string +convert_timestamp(Seconds) when is_list(Seconds) -> + case string:to_float(Seconds++".0") of + {F,_} when is_float(F) -> convert_timestamp(F); + _ -> erlang:error(badarg, [Seconds]) + end; +convert_timestamp(Seconds) when is_float(Seconds) -> + GregSec = trunc(Seconds + 719528*86400), + UnivDT = calendar:gregorian_seconds_to_datetime(GregSec), + {{Year, Month, Day},{Hour, Minute, Sec}} = calendar:universal_time_to_local_time(UnivDT), + integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day) ++ " " ++ integer_to_list(Hour) ++ ":" ++ integer_to_list(Minute) ++ ":" ++ integer_to_list(Sec). + +% convert float seconds elapsed from "zero hour" to local time "%Y-%m-%d" string +convert_timestamp_brief(Seconds) when is_list(Seconds) -> + convert_timestamp_brief(list_to_float(Seconds)); +convert_timestamp_brief(Seconds) when is_float(Seconds) -> + GregSec = trunc(Seconds + 719528*86400), + UnivDT = calendar:gregorian_seconds_to_datetime(GregSec), + {{Year, Month, Day},{_Hour, _Minute, _Sec}} = calendar:universal_time_to_local_time(UnivDT), + integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day); +convert_timestamp_brief(Seconds) when is_integer(Seconds) -> + {{Year, Month, Day},{_Hour, _Minute, _Sec}} = calendar:gregorian_seconds_to_datetime(Seconds), + integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% DB operations (get) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). + +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). + +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). + +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). + +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). + +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). + +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}). + +get_module_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_module_settings}). + +set_module_settings(VHost, Settings) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_module_settings, Settings}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% Web admin callbacks (delete) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +user_messages_at_parse_query(VHost, Date, Msgs, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + PMsgs = lists:filter( + fun(Msg) -> + ID = misc:encode_base64(term_to_binary(Msg#msg.timestamp)), + lists:member({<<"selected">>, ID}, Query) + end, Msgs), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, PMsgs, Date}, ?CALL_TIMEOUT); + false -> + nothing + end. + +user_messages_parse_query(User, VHost, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + Dates = get_dates(VHost), + PDates = lists:filter( + fun(Date) -> + ID = misc:encode_base64( << User/binary, (iolist_to_binary(Date))/binary >> ), + lists:member({<<"selected">>, ID}, Query) + end, Dates), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + Rez = lists:foldl( + fun(Date, Acc) -> + lists:append(Acc, + [gen_server:call(Proc, + {delete_all_messages_by_user_at, User, iolist_to_binary(Date)}, + ?CALL_TIMEOUT)]) + end, [], PDates), + case lists:member(error, Rez) of + true -> + error; + false -> + nothing + end; + false -> + nothing + end. + +vhost_messages_parse_query(VHost, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + Dates = get_dates(VHost), + PDates = lists:filter( + fun(Date) -> + ID = misc:encode_base64( << VHost/binary, (iolist_to_binary(Date))/binary >> ), + lists:member({<<"selected">>, ID}, Query) + end, Dates), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + Rez = lists:foldl(fun(Date, Acc) -> + lists:append(Acc, [gen_server:call(Proc, + {delete_messages_at, Date}, + ?CALL_TIMEOUT)]) + end, [], PDates), + case lists:member(error, Rez) of + true -> + error; + false -> + nothing + end; + false -> + nothing + end. + +vhost_messages_at_parse_query(VHost, Date, Stats, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + PStats = lists:filter( + fun({User, _Count}) -> + ID = misc:encode_base64( << (iolist_to_binary(User))/binary, VHost/binary >> ), + lists:member({<<"selected">>, ID}, Query) + end, Stats), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + Rez = lists:foldl(fun({User, _Count}, Acc) -> + lists:append(Acc, [gen_server:call(Proc, + {delete_all_messages_by_user_at, + iolist_to_binary(User), iolist_to_binary(Date)}, + ?CALL_TIMEOUT)]) + end, [], PStats), + case lists:member(error, Rez) of + true -> + error; + false -> + ok + end; + false -> + nothing + end. + +copy_messages([#state{vhost=VHost}=State, From, DatesIn]) -> + {FromDBName, FromDBOpts} = + case lists:keysearch(misc:binary_to_atom(From), 1, State#state.dbs) of + {value, {FN, FO}} -> + {FN, FO}; + false -> + ?ERROR_MSG("Failed to find record for ~p in dbs", [From]), + throw(error) + end, + + FromDBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(FromDBName)), + + {ok, _FromPid} = FromDBMod:start(VHost, FromDBOpts), + + Dates = case DatesIn of + [] -> FromDBMod:get_dates(VHost); + _ -> DatesIn + end, + + DatesLength = length(Dates), + + catch lists:foldl(fun(Date, Acc) -> + case catch copy_messages_int([FromDBMod, State#state.dbmod, VHost, Date]) of + ok -> + ?INFO_MSG("Copied messages at ~p (~p/~p)", [Date, Acc, DatesLength]); + Value -> + ?ERROR_MSG("Failed to copy messages at ~p (~p/~p): ~p", [Date, Acc, DatesLength, Value]), + throw(error) + end, + Acc + 1 + end, 1, Dates), + ?INFO_MSG("copy_messages from ~p finished", [From]), + FromDBMod:stop(VHost). + +copy_messages_int([FromDBMod, ToDBMod, VHost, Date]) -> + ets:new(mod_logdb_temp, [named_table, set, public]), + {Time, Value} = timer:tc(?MODULE, copy_messages_int_tc, [[FromDBMod, ToDBMod, VHost, Date]]), + ets:delete_all_objects(mod_logdb_temp), + ets:delete(mod_logdb_temp), + ?INFO_MSG("copy_messages at ~p elapsed ~p sec", [Date, Time/1000000]), + Value. + +copy_messages_int_tc([FromDBMod, ToDBMod, VHost, Date]) -> + ?INFO_MSG("Going to copy messages from ~p for ~p at ~p", [FromDBMod, VHost, Date]), + + ok = FromDBMod:rebuild_stats_at(VHost, Date), + catch mod_logdb:rebuild_stats_at(VHost, Date), + {ok, FromStats} = FromDBMod:get_vhost_stats_at(VHost, Date), + ToStats = case mod_logdb:get_vhost_stats_at(VHost, iolist_to_binary(Date)) of + {ok, Stats} -> Stats; + {error, _} -> [] + end, + + FromStatsS = lists:keysort(1, FromStats), + ToStatsS = lists:keysort(1, ToStats), + + StatsLength = length(FromStats), + + CopyFun = if + % destination table is empty + ToStats == [] -> + fun({User, _Count}, Acc) -> + {ok, Msgs} = FromDBMod:get_user_messages_at(User, VHost, Date), + MAcc = + lists:foldl(fun(Msg, MFAcc) -> + MsgBinary = Msg#msg{owner_name=iolist_to_binary(User), + peer_name=iolist_to_binary(Msg#msg.peer_name), + peer_server=iolist_to_binary(Msg#msg.peer_server), + peer_resource=iolist_to_binary(Msg#msg.peer_resource), + type=iolist_to_binary(Msg#msg.type), + subject=iolist_to_binary(Msg#msg.subject), + body=iolist_to_binary(Msg#msg.body)}, + ok = ToDBMod:log_message(VHost, MsgBinary), + MFAcc + 1 + end, 0, Msgs), + NewAcc = Acc + 1, + ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]), + %timer:sleep(100), + NewAcc + end; + % destination table is not empty + true -> + fun({User, _Count}, Acc) -> + {ok, ToMsgs} = ToDBMod:get_user_messages_at(User, VHost, Date), + lists:foreach(fun(#msg{timestamp=Tst}) when length(Tst) == 16 -> + ets:insert(mod_logdb_temp, {Tst}); + % mysql, pgsql removes final zeros after decimal point + (#msg{timestamp=Tst}) when length(Tst) < 16 -> + {F, _} = string:to_float(Tst++".0"), + [T] = io_lib:format("~.5f", [F]), + ets:insert(mod_logdb_temp, {T}) + end, ToMsgs), + {ok, Msgs} = FromDBMod:get_user_messages_at(User, VHost, Date), + MAcc = + lists:foldl(fun(#msg{timestamp=ToTimestamp} = Msg, MFAcc) -> + case ets:member(mod_logdb_temp, ToTimestamp) of + false -> + MsgBinary = Msg#msg{owner_name=iolist_to_binary(User), + peer_name=iolist_to_binary(Msg#msg.peer_name), + peer_server=iolist_to_binary(Msg#msg.peer_server), + peer_resource=iolist_to_binary(Msg#msg.peer_resource), + type=iolist_to_binary(Msg#msg.type), + subject=iolist_to_binary(Msg#msg.subject), + body=iolist_to_binary(Msg#msg.body)}, + ok = ToDBMod:log_message(VHost, MsgBinary), + ets:insert(mod_logdb_temp, {ToTimestamp}), + MFAcc + 1; + true -> + MFAcc + end + end, 0, Msgs), + NewAcc = Acc + 1, + ets:delete_all_objects(mod_logdb_temp), + ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]), + %timer:sleep(100), + NewAcc + end + end, + + if + FromStats == [] -> + ?INFO_MSG("No messages were found at ~p", [Date]); + FromStatsS == ToStatsS -> + ?INFO_MSG("Stats are equal at ~p", [Date]); + FromStatsS /= ToStatsS -> + lists:foldl(CopyFun, 0, FromStats), + ok = ToDBMod:rebuild_stats_at(VHost, Date) + %timer:sleep(1000) + end, + + ok. + +list_to_bool(Num) when is_binary(Num) -> + list_to_bool(binary_to_list(Num)); +list_to_bool(Num) when is_list(Num) -> + case lists:member(Num, ["t", "true", "y", "yes", "1"]) of + true -> + true; + false -> + case lists:member(Num, ["f", "false", "n", "no", "0"]) of + true -> + false; + false -> + error + end + end. + +bool_to_list(true) -> + "TRUE"; +bool_to_list(false) -> + "FALSE". + +list_to_string([]) -> + ""; +list_to_string(List) when is_list(List) -> + Str = lists:flatmap(fun(Elm) when is_binary(Elm) -> + binary_to_list(Elm) ++ "\n"; + (Elm) when is_list(Elm) -> + Elm ++ "\n" + end, List), + lists:sublist(Str, length(Str)-1). + +string_to_list(null) -> + []; +string_to_list([]) -> + []; +string_to_list(String) -> + ejabberd_regexp:split(iolist_to_binary(String), <<"\n">>). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% ad-hoc (copy/pasted from mod_configure.erl) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-define(ITEMS_RESULT(Allow, LNode, Fallback), + case Allow of + deny -> Fallback; + allow -> + case get_local_items(LServer, LNode, + jid:encode(To), Lang) of + {result, Res} -> {result, Res}; + {error, Error} -> {error, Error} + end + end). + +get_local_items(Acc, From, #jid{lserver = LServer} = To, + <<"">>, Lang) -> + case gen_mod:is_loaded(LServer, mod_adhoc) of + false -> Acc; + _ -> + Items = case Acc of + {result, Its} -> Its; + empty -> [] + end, + AllowUser = acl:match_rule(LServer, mod_logdb, From), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + if + AllowUser == allow; AllowAdmin == allow -> + case get_local_items(LServer, [], + jid:encode(To), Lang) of + {result, Res} -> + {result, Items ++ Res}; + {error, _Error} -> + {result, Items} + end; + true -> + {result, Items} + end + end; +get_local_items(Acc, From, #jid{lserver = LServer} = To, + Node, Lang) -> + case gen_mod:is_loaded(LServer, mod_adhoc) of + false -> Acc; + _ -> + LNode = tokenize(Node), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + Err = xmpp:err_forbidden(<<"Denied by ACL">>, Lang), + case LNode of + [<<"mod_logdb">>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, Err}); + [<<"mod_logdb_users">>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, Err}); + [<<"mod_logdb_users">>, <<$@, _/binary>>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, Err}); + [<<"mod_logdb_users">>, _User] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, Err}); + [<<"mod_logdb_settings">>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, Err}); + _ -> + Acc + end + end. + +-define(T(Lang, Text), translate:translate(Lang, Text)). + +-define(NODE(Name, Node), + #disco_item{jid = jid:make(Server), + node = Node, + name = ?T(Lang, Name)}). + +-define(NS_ADMINX(Sub), + <<(?NS_ADMIN)/binary, "#", Sub/binary>>). + +tokenize(Node) -> str:tokens(Node, <<"/#">>). + +get_local_items(_Host, [], Server, Lang) -> + {result, + [?NODE(<<"Messages logging engine">>, <<"mod_logdb">>)] + }; +get_local_items(_Host, [<<"mod_logdb">>], Server, Lang) -> + {result, + [?NODE(<<"Messages logging engine users">>, <<"mod_logdb_users">>), + ?NODE(<<"Messages logging engine settings">>, <<"mod_logdb_settings">>)] + }; +get_local_items(Host, [<<"mod_logdb_users">>], Server, _Lang) -> + {result, get_all_vh_users(Host, Server)}; +get_local_items(Host, [<<"mod_logdb_users">>, <<$@, Diap/binary>>], Server, Lang) -> + Users = ejabberd_auth:get_vh_registered_users(Host), + SUsers = lists:sort([{S, U} || {U, S} <- Users]), + try + [S1, S2] = ejabberd_regexp:split(Diap, <<"-">>), + N1 = binary_to_integer(S1), + N2 = binary_to_integer(S2), + Sub = lists:sublist(SUsers, N1, N2 - N1 + 1), + {result, lists:map(fun({S, U}) -> + ?NODE(<< U/binary, "@", S/binary >>, + << (iolist_to_binary("mod_logdb_users/"))/binary, U/binary, "@", S/binary >>) + end, Sub)} + catch _:_ -> + xmpp:err_not_acceptable() + end; +get_local_items(_Host, [<<"mod_logdb_users">>, _User], _Server, _Lang) -> + {result, []}; +get_local_items(_Host, [<<"mod_logdb_settings">>], _Server, _Lang) -> + {result, []}; +get_local_items(_Host, Item, _Server, _Lang) -> + ?MYDEBUG("asked for items in ~p", [Item]), + {error, xmpp:err_item_not_found()}. + +-define(INFO_RESULT(Allow, Feats, Lang), + case Allow of + deny -> {error, xmpp:err_forbidden(<<"Denied by ACL">>, Lang)}; + allow -> {result, Feats} + end). + +get_local_features(Acc, From, + #jid{lserver = LServer} = _To, Node, Lang) -> + case gen_mod:is_loaded(LServer, mod_adhoc) of + false -> + Acc; + _ -> + LNode = tokenize(Node), + AllowUser = acl:match_rule(LServer, mod_logdb, From), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + case LNode of + [<<"mod_logdb">>] when AllowUser == allow; AllowAdmin == allow -> + ?INFO_RESULT(allow, [?NS_COMMANDS], Lang); + [<<"mod_logdb">>] -> + ?INFO_RESULT(deny, [?NS_COMMANDS], Lang); + [<<"mod_logdb_users">>] -> + ?INFO_RESULT(AllowAdmin, [], Lang); + [<<"mod_logdb_users">>, [$@ | _]] -> + ?INFO_RESULT(AllowAdmin, [], Lang); + [<<"mod_logdb_users">>, _User] -> + ?INFO_RESULT(AllowAdmin, [?NS_COMMANDS], Lang); + [<<"mod_logdb_settings">>] -> + ?INFO_RESULT(AllowAdmin, [?NS_COMMANDS], Lang); + [] -> + Acc; + _ -> + Acc + end + end. + +-define(INFO_IDENTITY(Category, Type, Name, Lang), + [#identity{category = Category, type = Type, name = ?T(Lang, Name)}]). + +-define(INFO_COMMAND(Name, Lang), + ?INFO_IDENTITY(<<"automation">>, <<"command-node">>, + Name, Lang)). + +get_local_identity(Acc, _From, _To, Node, Lang) -> + LNode = tokenize(Node), + case LNode of + [<<"mod_logdb">>] -> + ?INFO_COMMAND(<<"Messages logging engine">>, Lang); + [<<"mod_logdb_users">>] -> + ?INFO_COMMAND(<<"Messages logging engine users">>, Lang); + [<<"mod_logdb_users">>, User] -> + ?INFO_COMMAND(User, Lang); + [<<"mod_logdb_settings">>] -> + ?INFO_COMMAND(<<"Messages logging engine settings">>, Lang); + _ -> + Acc + end. + +adhoc_local_items(Acc, From, + #jid{lserver = LServer, server = Server} = To, Lang) -> + % TODO: case acl:match_rule(LServer, ???, From) of + Items = case Acc of + {result, Its} -> Its; + empty -> [] + end, + Nodes = recursively_get_local_items(LServer, + <<"">>, Server, Lang), + Nodes1 = lists:filter( + fun(#disco_item{node = Nd}) -> + F = get_local_features([], From, To, Nd, Lang), + case F of + {result, [?NS_COMMANDS]} -> true; + _ -> false + end + end, Nodes), + {result, Items ++ Nodes1}. + +recursively_get_local_items(_LServer, + <<"mod_logdb_users">>, _Server, _Lang) -> + []; +recursively_get_local_items(LServer, + Node, Server, Lang) -> + LNode = tokenize(Node), + Items = case get_local_items(LServer, LNode, + Server, Lang) of + {result, Res} -> Res; + {error, _Error} -> [] + end, + Nodes = lists:flatten( + lists:map( + fun(#disco_item{jid = #jid{server = S}, node = Nd} = Item) -> + if (S /= Server) or (Nd == <<"">>) -> + []; + true -> + [Item, recursively_get_local_items( + LServer, Nd, Server, Lang)] + end + end, Items)), + Nodes. + +-define(COMMANDS_RESULT(Allow, From, To, Request), + case Allow of + deny -> + {error, xmpp:err_forbidden(<<"Denied by ACL">>, Lang)}; + allow -> + adhoc_local_commands(From, To, Request) + end). + +adhoc_local_commands(Acc, From, #jid{lserver = LServer} = To, + #adhoc_command{node = Node, lang = Lang} = Request) -> + LNode = tokenize(Node), + AllowUser = acl:match_rule(LServer, mod_logdb, From), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + case LNode of + [<<"mod_logdb">>] when AllowUser == allow; AllowAdmin == allow -> + ?COMMANDS_RESULT(allow, From, To, Request); + [<<"mod_logdb_users">>, <<$@, _/binary>>] when AllowAdmin == allow -> + Acc; + [<<"mod_logdb_users">>, _User] when AllowAdmin == allow -> + ?COMMANDS_RESULT(allow, From, To, Request); + [<<"mod_logdb_settings">>] when AllowAdmin == allow -> + ?COMMANDS_RESULT(allow, From, To, Request); + _ -> + Acc + end. + +adhoc_local_commands(From, #jid{lserver = LServer} = _To, + #adhoc_command{lang = Lang, + node = Node, + sid = SessionID, + action = Action, + xdata = XData} = Request) -> + LNode = tokenize(Node), + %% If the "action" attribute is not present, it is + %% understood as "execute". If there was no + %% element in the first response (which there isn't in our + %% case), "execute" and "complete" are equivalent. + ActionIsExecute = Action == execute orelse Action == complete, + if Action == cancel -> + %% User cancels request + #adhoc_command{status = canceled, lang = Lang, + node = Node, sid = SessionID}; + XData == undefined, ActionIsExecute -> + %% User requests form + case get_form(LServer, LNode, Lang) of + {result, Form} -> + xmpp_util:make_adhoc_response( + Request, + #adhoc_command{status = executing, + xdata = Form}); + {error, Error} -> + {error, Error} + end; + XData /= undefined, ActionIsExecute -> + %% User returns form. + case catch set_form(From, LServer, LNode, Lang, XData) of + {result, Res} -> + xmpp_util:make_adhoc_response( + Request, + #adhoc_command{xdata = Res, status = completed}); + {'EXIT', _} -> {error, xmpp:err_bad_request()}; + {error, Error} -> {error, Error} + end; + true -> + {error, xmpp:err_bad_request(<<"Unexpected action">>, Lang)} + end. + +-define(TVFIELD(Type, Var, Val), + #xdata_field{type = Type, var = Var, values = [Val]}). + +-define(HFIELD(), + ?TVFIELD(hidden, <<"FORM_TYPE">>, (?NS_ADMIN))). + +get_user_form(LUser, LServer, Lang) -> + ?MYDEBUG("get_user_form ~p ~p", [LUser, LServer]), + %From = jid:encode(jid:remove_resource(Jid)), + #user_settings{dolog_default=DLD, + dolog_list=DLL, + donotlog_list=DNLL} = get_user_settings(LUser, LServer), + Fs = [ + #xdata_field{ + type = 'list-single', + label = ?T(Lang, <<"Default">>), + var = <<"dolog_default">>, + values = [misc:atom_to_binary(DLD)], + options = [#xdata_option{label = ?T(Lang, <<"Log Messages">>), + value = <<"true">>}, + #xdata_option{label = ?T(Lang, <<"Do Not Log Messages">>), + value = <<"false">>}]}, + #xdata_field{ + type = 'text-multi', + label = ?T(Lang, <<"Log Messages">>), + var = <<"dolog_list">>, + values = DLL}, + #xdata_field{ + type = 'text-multi', + label = ?T(Lang, <<"Do Not Log Messages">>), + var = <<"donotlog_list">>, + values = DNLL} + ], + {result, #xdata{ + title = ?T(Lang, <<"Messages logging engine settings">>), + type = form, + instructions = [<< (?T(Lang, <<"Set logging preferences">>))/binary, + (iolist_to_binary(": "))/binary, + LUser/binary, "@", LServer/binary >>], + fields = [?HFIELD()| + Fs]}}. + +get_settings_form(Host, Lang) -> + ?MYDEBUG("get_settings_form ~p ~p", [Host, Lang]), + #state{dbmod=_DBMod, + dbs=_DBs, + dolog_default=DLD, + ignore_jids=IgnoreJids, + groupchat=GroupChat, + purge_older_days=PurgeDaysT, + drop_messages_on_user_removal=MRemoval, + poll_users_settings=PollTime} = mod_logdb:get_module_settings(Host), + + PurgeDays = + case PurgeDaysT of + never -> <<"never">>; + Num when is_integer(Num) -> integer_to_binary(Num); + _ -> <<"unknown">> + end, + Fs = [ + #xdata_field{ + type = 'list-single', + label = ?T(Lang, <<"Default">>), + var = <<"dolog_default">>, + values = [misc:atom_to_binary(DLD)], + options = [#xdata_option{label = ?T(Lang, <<"Log Messages">>), + value = <<"true">>}, + #xdata_option{label = ?T(Lang, <<"Do Not Log Messages">>), + value = <<"false">>}]}, + #xdata_field{ + type = 'list-single', + label = ?T(Lang, <<"Drop messages on user removal">>), + var = <<"drop_messages_on_user_removal">>, + values = [misc:atom_to_binary(MRemoval)], + options = [#xdata_option{label = ?T(Lang, <<"Drop">>), + value = <<"true">>}, + #xdata_option{label = ?T(Lang, <<"Do not drop">>), + value = <<"false">>}]}, + #xdata_field{ + type = 'list-single', + label = ?T(Lang, <<"Groupchat messages logging">>), + var = <<"groupchat">>, + values = [misc:atom_to_binary(GroupChat)], + options = [#xdata_option{label = ?T(Lang, <<"all">>), + value = <<"all">>}, + #xdata_option{label = ?T(Lang, <<"none">>), + value = <<"none">>}, + #xdata_option{label = ?T(Lang, <<"send">>), + value = <<"send">>}]}, + #xdata_field{ + type = 'text-multi', + label = ?T(Lang, <<"Jids/Domains to ignore">>), + var = <<"ignore_list">>, + values = IgnoreJids}, + #xdata_field{ + type = 'text-single', + label = ?T(Lang, <<"Purge messages older than (days)">>), + var = <<"purge_older_days">>, + values = [iolist_to_binary(PurgeDays)]}, + #xdata_field{ + type = 'text-single', + label = ?T(Lang, <<"Poll users settings (seconds)">>), + var = <<"poll_users_settings">>, + values = [integer_to_binary(PollTime)]} + ], + {result, #xdata{ + title = ?T(Lang, <<"Messages logging engine settings (run-time)">>), + instructions = [?T(Lang, <<"Set run-time settings">>)], + type = form, + fields = [?HFIELD()| + Fs]}}. + +get_form(_Host, [<<"mod_logdb_users">>, User], Lang) -> + #jid{luser=LUser, lserver=LServer} = jid:decode(User), + get_user_form(LUser, LServer, Lang); +get_form(Host, [<<"mod_logdb_settings">>], Lang) -> + get_settings_form(Host, Lang); +get_form(_Host, Command, _Lang) -> + ?MYDEBUG("asked for form ~p", [Command]), + {error, xmpp:err_service_unavailable()}. + +check_log_list([]) -> + ok; +check_log_list([<<>>]) -> + ok; +check_log_list([Head | Tail]) -> + case binary:match(Head, <<$@>>) of + nomatch -> throw(error); + {_, _} -> ok + end, + % this check for Head to be valid jid + case catch jid:decode(Head) of + {'EXIT', _Reason} -> throw(error); + _ -> check_log_list(Tail) + end. + +check_ignore_list([]) -> + ok; +check_ignore_list([<<>>]) -> + ok; +check_ignore_list([<<>> | Tail]) -> + check_ignore_list(Tail); +check_ignore_list([Head | Tail]) -> + case binary:match(Head, <<$@>>) of + {_, _} -> ok; + nomatch -> throw(error) + end, + Jid2Test = case Head of + << $@, _Rest/binary >> -> << "a", Head/binary >>; + Jid -> Jid + end, + % this check for Head to be valid jid + case catch jid:decode(Jid2Test) of + {'EXIT', _Reason} -> throw(error); + _ -> check_ignore_list(Tail) + end. + +get_value(Field, XData) -> hd(get_values(Field, XData)). + +get_values(Field, XData) -> + xmpp_util:get_xdata_values(Field, XData). + +parse_users_settings(XData) -> + DLD = case get_value(<<"dolog_default">>, XData) of + ValueDLD when ValueDLD == <<"true">>; + ValueDLD == <<"false">> -> + list_to_bool(ValueDLD); + _ -> throw(bad_request) + end, + + ListDLL = get_values(<<"dolog_list">>, XData), + DLL = case catch check_log_list(ListDLL) of + ok -> ListDLL; + error -> throw(bad_request) + end, + + ListDNLL = get_values(<<"donotlog_list">>, XData), + DNLL = case catch check_log_list(ListDNLL) of + ok -> ListDNLL; + error -> throw(bad_request) + end, + + #user_settings{dolog_default=DLD, + dolog_list=DLL, + donotlog_list=DNLL}. + +parse_module_settings(XData) -> + DLD = case get_value(<<"dolog_default">>, XData) of + ValueDLD when ValueDLD == <<"true">>; + ValueDLD == <<"false">> -> + list_to_bool(ValueDLD); + _ -> throw(bad_request) + end, + MRemoval = case get_value(<<"drop_messages_on_user_removal">>, XData) of + ValueMRemoval when ValueMRemoval == <<"true">>; + ValueMRemoval == <<"false">> -> + list_to_bool(ValueMRemoval); + _ -> throw(bad_request) + end, + GroupChat = case get_value(<<"groupchat">>, XData) of + ValueGroupChat when ValueGroupChat == <<"none">>; + ValueGroupChat == <<"all">>; + ValueGroupChat == <<"send">> -> + misc:binary_to_atom(ValueGroupChat); + _ -> throw(bad_request) + end, + ListIgnore = get_values(<<"ignore_list">>, XData), + Ignore = case catch check_ignore_list(ListIgnore) of + ok -> ListIgnore; + error -> throw(bad_request) + end, + Purge = case get_value(<<"purge_older_days">>, XData) of + <<"never">> -> never; + ValuePurge -> + case catch binary_to_integer(ValuePurge) of + IntValuePurge when is_integer(IntValuePurge) -> IntValuePurge; + _ -> throw(bad_request) + end + end, + Poll = case catch binary_to_integer(get_value(<<"poll_users_settings">>, XData)) of + IntValuePoll when is_integer(IntValuePoll) -> IntValuePoll; + _ -> throw(bad_request) + end, + #state{dolog_default=DLD, + groupchat=GroupChat, + ignore_jids=Ignore, + purge_older_days=Purge, + drop_messages_on_user_removal=MRemoval, + poll_users_settings=Poll}. + +set_form(_From, _Host, [<<"mod_logdb_users">>, User], Lang, XData) -> + #jid{luser=LUser, lserver=LServer} = jid:decode(User), + Txt = "Parse user settings failed", + case catch parse_users_settings(XData) of + bad_request -> + ?ERROR_MSG("Failed to set user form: bad_request", []), + {error, xmpp:err_bad_request(Txt, Lang)}; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to set user form ~p", [Reason]), + {error, xmpp:err_bad_request(Txt, Lang)}; + UserSettings -> + case mod_logdb:set_user_settings(LUser, LServer, UserSettings) of + ok -> + {result, undefined}; + error -> + {error, xmpp:err_internal_server_error()} + end + end; +set_form(_From, Host, [<<"mod_logdb_settings">>], Lang, XData) -> + Txt = "Parse module settings failed", + case catch parse_module_settings(XData) of + bad_request -> + ?ERROR_MSG("Failed to set settings form: bad_request", []), + {error, xmpp:err_bad_request(Txt, Lang)}; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to set settings form ~p", [Reason]), + {error, xmpp:err_bad_request(Txt, Lang)}; + Settings -> + case mod_logdb:set_module_settings(Host, Settings) of + ok -> + {result, undefined}; + error -> + {error, xmpp:err_internal_server_error()} + end + end; +set_form(From, _Host, Node, _Lang, XData) -> + User = jid:encode(jid:remove_resource(From)), + ?MYDEBUG("set form for ~p at ~p XData=~p", [User, Node, XData]), + {error, xmpp:err_service_unavailable()}. + +get_all_vh_users(Host, Server) -> + case catch ejabberd_auth:get_vh_registered_users(Host) of + {'EXIT', _Reason} -> + []; + Users -> + SUsers = lists:sort([{S, U} || {U, S} <- Users]), + case length(SUsers) of + N when N =< 100 -> + lists:map(fun({S, U}) -> + #disco_item{jid = jid:make(Server), + node = <<"mod_logdb_users/", U/binary, $@, S/binary>>, + name = << U/binary, "@", S/binary >>} + end, SUsers); + N -> + NParts = trunc(math:sqrt(N * 6.17999999999999993783e-1)) + 1, + M = trunc(N / NParts) + 1, + lists:map(fun(K) -> + L = K + M - 1, + Node = <<"@", + (integer_to_binary(K))/binary, + "-", + (integer_to_binary(L))/binary + >>, + {FS, FU} = lists:nth(K, SUsers), + {LS, LU} = + if L < N -> lists:nth(L, SUsers); + true -> lists:last(SUsers) + end, + Name = + <>, + #disco_item{jid = jid:make(Host), + node = <<"mod_logdb_users/", Node/binary>>, + name = Name} + end, lists:seq(1, N, M)) + end + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% webadmin hooks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +webadmin_menu(Acc, _Host, Lang) -> + [{<<"messages">>, ?T(<<"Users Messages">>)} | Acc]. + +webadmin_user(Acc, User, Server, Lang) -> + Sett = get_user_settings(User, Server), + Log = + case Sett#user_settings.dolog_default of + false -> + ?INPUTT(<<"submit">>, <<"dolog">>, <<"Log Messages">>); + true -> + ?INPUTT(<<"submit">>, <<"donotlog">>, <<"Do Not Log Messages">>); + _ -> [] + end, + Acc ++ [?XE(<<"h3">>, [?ACT(<<"messages/">>, <<"Messages">>), ?C(<<" ">>), Log])]. + +webadmin_page(_, Host, + #request{path = [<<"messages">>], + q = Query, + lang = Lang}) -> + Res = vhost_messages_stats(Host, Query, Lang), + {stop, Res}; +webadmin_page(_, Host, + #request{path = [<<"messages">>, Date], + q = Query, + lang = Lang}) -> + Res = vhost_messages_stats_at(Host, Query, Lang, Date), + {stop, Res}; +webadmin_page(_, Host, + #request{path = [<<"user">>, U, <<"messages">>], + q = Query, + lang = Lang}) -> + Res = user_messages_stats(U, Host, Query, Lang), + {stop, Res}; +webadmin_page(_, Host, + #request{path = [<<"user">>, U, <<"messages">>, Date], + q = Query, + lang = Lang}) -> + Res = mod_logdb:user_messages_stats_at(U, Host, Query, Lang, Date), + {stop, Res}; +webadmin_page(Acc, _Host, _R) -> Acc. + +user_parse_query(_, <<"dolog">>, User, Server, _Query) -> + Sett = get_user_settings(User, Server), + % TODO: check returned value + set_user_settings(User, Server, Sett#user_settings{dolog_default=true}), + {stop, ok}; +user_parse_query(_, <<"donotlog">>, User, Server, _Query) -> + Sett = get_user_settings(User, Server), + % TODO: check returned value + set_user_settings(User, Server, Sett#user_settings{dolog_default=false}), + {stop, ok}; +user_parse_query(Acc, _Action, _User, _Server, _Query) -> + Acc. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% webadmin funcs +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +vhost_messages_stats(Server, Query, Lang) -> + Res = case catch vhost_messages_parse_query(Server, Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + {Time, Value} = timer:tc(mod_logdb, get_vhost_stats, [Server]), + ?INFO_MSG("get_vhost_stats(~p) elapsed ~p sec", [Server, Time/1000000]), + %case get_vhost_stats(Server) of + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_vhost_stats: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_vhost_stats: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s">>), [Server])))]; + {ok, Dates} -> + Fun = fun({Date, Count}) -> + DateBin = iolist_to_binary(Date), + ID = misc:encode_base64( << Server/binary, DateBin/binary >> ), + ?XE(<<"tr">>, + [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XE(<<"td">>, [?AC(DateBin, DateBin)]), + ?XC(<<"td">>, integer_to_binary(Count)) + ]) + end, + + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"Logged messages for ~s">>), [Server])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"Date">>), + ?XCT(<<"td">>, <<"Count">>) + ])]), + ?XE(<<"tbody">>, + lists:map(Fun, Dates) + )]), + ?BR, + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>) + ])] + end. + +vhost_messages_stats_at(Server, Query, Lang, Date) -> + {Time, Value} = timer:tc(mod_logdb, get_vhost_stats_at, [Server, Date]), + ?INFO_MSG("get_vhost_stats_at(~p,~p) elapsed ~p sec", [Server, Date, Time/1000000]), + %case get_vhost_stats_at(Server, Date) of + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_vhost_stats_at: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_vhost_stats_at: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s at ~s">>), [Server, Date])))]; + {ok, Stats} -> + Res = case catch vhost_messages_at_parse_query(Server, Date, Stats, Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + Fun = fun({User, Count}) -> + UserBin = iolist_to_binary(User), + ID = misc:encode_base64( << UserBin/binary, Server/binary >> ), + ?XE(<<"tr">>, + [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XE(<<"td">>, [?AC(<< <<"../user/">>/binary, UserBin/binary, <<"/messages/">>/binary, Date/binary >>, UserBin)]), + ?XC(<<"td">>, integer_to_binary(Count)) + ]) + end, + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"Logged messages for ~s at ~s">>), [Server, Date])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"User">>), + ?XCT(<<"td">>, <<"Count">>) + ])]), + ?XE(<<"tbody">>, + lists:map(Fun, Stats) + )]), + ?BR, + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>) + ])] + end. + +user_messages_stats(User, Server, Query, Lang) -> + Jid = jid:encode({User, Server, ""}), + + Res = case catch user_messages_parse_query(User, Server, Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + + {Time, Value} = timer:tc(mod_logdb, get_user_stats, [User, Server]), + ?INFO_MSG("get_user_stats(~p,~p) elapsed ~p sec", [User, Server, Time/1000000]), + + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_user_stats: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching days">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_user_stats: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching days">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s">>), [Jid])))]; + {ok, Dates} -> + Fun = fun({Date, Count}) -> + DateBin = iolist_to_binary(Date), + ID = misc:encode_base64( << User/binary, DateBin/binary >> ), + ?XE(<<"tr">>, + [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XE(<<"td">>, [?AC(DateBin, DateBin)]), + ?XC(<<"td">>, iolist_to_binary(integer_to_list(Count))) + ]) + end, + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T("Logged messages for ~s"), [Jid])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"Date">>), + ?XCT(<<"td">>, <<"Count">>) + ])]), + ?XE(<<"tbody">>, + lists:map(Fun, Dates) + )]), + ?BR, + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>) + ])] + end. + +search_user_nick(User, List) -> + case lists:keysearch(User, 1, List) of + {value,{User, []}} -> + nothing; + {value,{User, Nick}} -> + Nick; + false -> + nothing + end. + +user_messages_stats_at(User, Server, Query, Lang, Date) -> + Jid = jid:encode({User, Server, ""}), + + {Time, Value} = timer:tc(mod_logdb, get_user_messages_at, [User, Server, Date]), + ?INFO_MSG("get_user_messages_at(~p,~p,~p) elapsed ~p sec", [User, Server, Date, Time/1000000]), + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_user_messages_at: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching messages">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_user_messages_at: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching messages">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s at ~s">>), [Jid, Date])))]; + {ok, User_messages} -> + Res = case catch user_messages_at_parse_query(Server, + Date, + User_messages, + Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + + UR = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]), + UserRoster = + lists:map(fun(Item) -> + {jid:encode(Item#roster.jid), Item#roster.name} + end, UR), + + UniqUsers = lists:foldl(fun(#msg{peer_name=PName, peer_server=PServer}, List) -> + ToAdd = PName++"@"++PServer, + case lists:member(ToAdd, List) of + true -> List; + false -> lists:append([ToAdd], List) + end + end, [], User_messages), + + % Users to filter (sublist of UniqUsers) + CheckedUsers = case lists:keysearch(<<"filter">>, 1, Query) of + {value, _} -> + lists:filter(fun(UFUser) -> + ID = misc:encode_base64(term_to_binary(UFUser)), + lists:member({<<"selected">>, ID}, Query) + end, UniqUsers); + false -> [] + end, + + % UniqUsers in html (noone selected -> everyone selected) + Users = lists:map(fun(UHUser) -> + ID = misc:encode_base64(term_to_binary(UHUser)), + Input = case lists:member(UHUser, CheckedUsers) of + true -> [?INPUTC(<<"checkbox">>, <<"selected">>, ID)]; + false when CheckedUsers == [] -> [?INPUTC(<<"checkbox">>, <<"selected">>, ID)]; + false -> [?INPUT(<<"checkbox">>, <<"selected">>, ID)] + end, + Nick = + case search_user_nick(UHUser, UserRoster) of + nothing -> <<"">>; + N -> iolist_to_binary( " ("++ N ++")" ) + end, + ?XE(<<"tr">>, + [?XE(<<"td">>, Input), + ?XC(<<"td">>, iolist_to_binary(UHUser++Nick))]) + end, lists:sort(UniqUsers)), + % Messages to show (based on Users) + User_messages_filtered = case CheckedUsers of + [] -> User_messages; + _ -> lists:filter(fun(#msg{peer_name=PName, peer_server=PServer}) -> + lists:member(PName++"@"++PServer, CheckedUsers) + end, User_messages) + end, + + Msgs_Fun = fun(#msg{timestamp=Timestamp, + subject=Subject, + direction=Direction, + peer_name=PName, peer_server=PServer, peer_resource=PRes, + type=Type, + body=Body}) -> + Text = case Subject of + "" -> iolist_to_binary(Body); + _ -> iolist_to_binary([binary_to_list(?T(<<"Subject">>)) ++ ": " ++ Subject ++ "\n" ++ Body]) + end, + Resource = case PRes of + [] -> []; + undefined -> []; + R -> "/" ++ R + end, + UserNick = + case search_user_nick(PName++"@"++PServer, UserRoster) of + nothing when PServer == Server -> + PName; + nothing when Type == "groupchat", Direction == from -> + PName++"@"++PServer++Resource; + nothing -> + PName++"@"++PServer; + N -> N + end, + ID = misc:encode_base64(term_to_binary(Timestamp)), + ?XE(<<"tr">>, + [?XE(<<"td">>, [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XC(<<"td">>, iolist_to_binary(convert_timestamp(Timestamp))), + ?XC(<<"td">>, iolist_to_binary(atom_to_list(Direction)++": "++UserNick)), + ?XE(<<"td">>, [?XC(<<"pre">>, Text)])]) + end, + % Filtered user messages in html + Msgs = lists:map(Msgs_Fun, lists:sort(User_messages_filtered)), + + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"Logged messages for ~s at ~s">>), [Jid, Date])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"User">>) + ] + ), + ?XE(<<"tbody">>, + Users + )]), + ?INPUTT(<<"submit">>, <<"filter">>, <<"Filter Selected">>) + ] ++ + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"Date, Time">>), + ?XCT(<<"td">>, <<"Direction: Jid">>), + ?XCT(<<"td">>, <<"Body">>) + ])]), + ?XE(<<"tbody">>, + Msgs + )]), + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>), + ?BR + ] + )] + end. diff --git a/src/mod_logdb.hrl b/src/mod_logdb.hrl new file mode 100644 index 00000000..49791f4e --- /dev/null +++ b/src/mod_logdb.hrl @@ -0,0 +1,33 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb.hrl +%%% Author : Oleg Palij (mailto:o.palij@gmail.com) +%%% Purpose : +%%% Url : https://paleg.github.io/mod_logdb/ +%%%---------------------------------------------------------------------- + +-define(logdb_debug, true). + +-ifdef(logdb_debug). +-define(MYDEBUG(Format, Args), io:format("D(~p:~p:~p) : "++Format++"~n", + [calendar:local_time(),?MODULE,?LINE]++Args)). +-else. +-define(MYDEBUG(_F,_A),[]). +-endif. + +-record(msg, {timestamp, + owner_name, + peer_name, peer_server, peer_resource, + direction, + type, subject, + body}). + +-record(user_settings, {owner_name, + dolog_default, + dolog_list=[], + donotlog_list=[]}). + +-define(INPUTC(Type, Name, Value), + ?XA(<<"input">>, [{<<"type">>, Type}, + {<<"name">>, Name}, + {<<"value">>, Value}, + {<<"checked">>, <<"true">>}])). diff --git a/src/mod_logdb_mnesia.erl b/src/mod_logdb_mnesia.erl new file mode 100644 index 00000000..a08d5262 --- /dev/null +++ b/src/mod_logdb_mnesia.erl @@ -0,0 +1,553 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_mnesia.erl +%%% Author : Oleg Palij (mailto:o.palij@gmail.com) +%%% Purpose : mnesia backend for mod_logdb +%%% Url : https://paleg.github.io/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_mnesia). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +-define(PROCNAME, mod_logdb_mnesia). +-define(CALL_TIMEOUT, 10000). + +-record(state, {vhost}). + +-record(stats, {user, at, count}). + +prefix() -> + "logdb_". + +suffix(VHost) -> + "_" ++ binary_to_list(VHost). + +stats_table(VHost) -> + list_to_atom(prefix() ++ "stats" ++ suffix(VHost)). + +table_name(VHost, Date) -> + list_to_atom(prefix() ++ "messages_" ++ Date ++ suffix(VHost)). + +settings_table(VHost) -> + list_to_atom(prefix() ++ "settings" ++ suffix(VHost)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, _Opts]) -> + case mnesia:system_info(is_running) of + yes -> + ok = create_stats_table(VHost), + ok = create_settings_table(VHost), + {ok, #state{vhost=VHost}}; + no -> + ?ERROR_MSG("Mnesia not running", []), + {stop, db_connection_failed}; + Status -> + ?ERROR_MSG("Mnesia status: ~p", [Status]), + {stop, db_connection_failed} + end. + +handle_call({log_message, Msg}, _From, #state{vhost=VHost}=State) -> + {reply, log_message_int(VHost, Msg), State}; +handle_call({rebuild_stats}, _From, #state{vhost=VHost}=State) -> + {atomic, ok} = delete_nonexistent_stats(VHost), + Reply = + lists:foreach(fun(Date) -> + rebuild_stats_at_int(VHost, Date) + end, get_dates_int(VHost)), + {reply, Reply, State}; +handle_call({rebuild_stats_at, Date}, _From, #state{vhost=VHost}=State) -> + Reply = rebuild_stats_at_int(VHost, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{vhost=VHost}=State) -> + Table = table_name(VHost, Date), + Fun = fun() -> + lists:foreach( + fun(Msg) -> + mnesia:write_lock_table(stats_table(VHost)), + mnesia:write_lock_table(Table), + mnesia:delete_object(Table, Msg, write) + end, Msgs) + end, + DRez = case mnesia:transaction(Fun) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to delete_messages_by_user_at at ~p for ~p: ~p", [Date, VHost, Reason]), + error; + _ -> + ok + end, + Reply = + case rebuild_stats_at_int(VHost, Date) of + error -> + error; + ok -> + DRez + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{vhost=VHost}=State) -> + {reply, delete_all_messages_by_user_at_int(User, VHost, Date), State}; +handle_call({delete_messages_at, Date}, _From, #state{vhost=VHost}=State) -> + Reply = + case mnesia:delete_table(table_name(VHost, Date)) of + {atomic, ok} -> + delete_stats_by_vhost_at_int(VHost, Date); + {aborted, Reason} -> + ?ERROR_MSG("Failed to delete_messages_at for ~p at ~p", [VHost, Date, Reason]), + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{vhost=VHost}=State) -> + Fun = fun(#stats{at=Date, count=Count}, Stats) -> + case lists:keysearch(Date, 1, Stats) of + false -> + lists:append(Stats, [{Date, Count}]); + {value, {_, TempCount}} -> + lists:keyreplace(Date, 1, Stats, {Date, TempCount+Count}) + end + end, + Reply = + case mnesia:transaction(fun() -> + mnesia:foldl(Fun, [], stats_table(VHost)) + end) of + {atomic, Result} -> {ok, mod_logdb:sort_stats(Result)}; + {aborted, Reason} -> {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{vhost=VHost}=State) -> + Fun = fun() -> + Pat = #stats{user='$1', at=Date, count='$2'}, + mnesia:select(stats_table(VHost), [{Pat, [], [['$1', '$2']]}]) + end, + Reply = + case mnesia:transaction(Fun) of + {atomic, Result} -> + {ok, lists:reverse(lists:keysort(2, [{User, Count} || [User, Count] <- Result]))}; + {aborted, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{vhost=VHost}=State) -> + {reply, get_user_stats_int(User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{vhost=VHost}=State) -> + Reply = + case mnesia:transaction(fun() -> + Pat = #msg{owner_name=User, _='_'}, + mnesia:select(table_name(VHost, Date), + [{Pat, [], ['$_']}]) + end) of + {atomic, Result} -> {ok, Result}; + {aborted, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{vhost=VHost}=State) -> + {reply, get_dates_int(VHost), State}; +handle_call({get_users_settings}, _From, #state{vhost=VHost}=State) -> + Reply = mnesia:dirty_match_object(settings_table(VHost), #user_settings{_='_'}), + {reply, {ok, Reply}, State}; +handle_call({get_user_settings, User}, _From, #state{vhost=VHost}=State) -> + Reply = + case mnesia:dirty_match_object(settings_table(VHost), #user_settings{owner_name=User, _='_'}) of + [] -> []; + [Setting] -> + Setting + end, + {reply, Reply, State}; +handle_call({set_user_settings, _User, Set}, _From, #state{vhost=VHost}=State) -> + ?MYDEBUG("~p~n~p", [settings_table(VHost), Set]), + Reply = mnesia:dirty_write(settings_table(VHost), Set), + ?MYDEBUG("~p", [Reply]), + {reply, Reply, State}; +handle_call({drop_user, User}, _From, #state{vhost=VHost}=State) -> + {ok, Dates} = get_user_stats_int(User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(User, VHost, Date) + end, Dates), + SDResult = delete_user_settings_int(User, VHost), + Reply = + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append(MDResult, [SDResult])) of + true -> + ok; + false -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, State) -> + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats}, ?CALL_TIMEOUT). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {drop_user, User}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message_int(VHost, #msg{timestamp=Timestamp}=MsgBin) -> + Date = mod_logdb:convert_timestamp_brief(Timestamp), + + Msg = #msg{timestamp = MsgBin#msg.timestamp, + owner_name = binary_to_list(MsgBin#msg.owner_name), + peer_name = binary_to_list(MsgBin#msg.peer_name), + peer_server = binary_to_list(MsgBin#msg.peer_server), + peer_resource = binary_to_list(MsgBin#msg.peer_resource), + direction = MsgBin#msg.direction, + type = binary_to_list(MsgBin#msg.type), + subject = binary_to_list(MsgBin#msg.subject), + body = binary_to_list(MsgBin#msg.body)}, + + ATable = table_name(VHost, Date), + Fun = fun() -> + mnesia:write_lock_table(ATable), + mnesia:write(ATable, Msg, write) + end, + % log message, increment stats for both users + case mnesia:transaction(Fun) of + % if table does not exists - create it and try to log message again + {aborted,{no_exists, _Table}} -> + case create_msg_table(VHost, Date) of + {aborted, CReason} -> + ?ERROR_MSG("Failed to log message: ~p", [CReason]), + error; + {atomic, ok} -> + ?MYDEBUG("Created msg table for ~s at ~s", [VHost, Date]), + log_message_int(VHost, MsgBin) + end; + {aborted, TReason} -> + ?ERROR_MSG("Failed to log message: ~p", [TReason]), + error; + {atomic, _} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + increment_user_stats(Msg#msg.owner_name, VHost, Date) + end. + +increment_user_stats(Owner, VHost, Date) -> + Fun = fun() -> + Pat = #stats{user=Owner, at=Date, count='$1'}, + mnesia:write_lock_table(stats_table(VHost)), + case mnesia:select(stats_table(VHost), [{Pat, [], ['$_']}]) of + [] -> + mnesia:write(stats_table(VHost), + #stats{user=Owner, + at=Date, + count=1}, + write); + [Stats] -> + mnesia:delete_object(stats_table(VHost), + #stats{user=Owner, + at=Date, + count=Stats#stats.count}, + write), + New = Stats#stats{count = Stats#stats.count+1}, + if + New#stats.count > 0 -> mnesia:write(stats_table(VHost), + New, + write); + true -> ok + end + end + end, + case mnesia:transaction(Fun) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to update stats for ~s@~s: ~p", [Owner, VHost, Reason]), + error; + {atomic, _} -> + ?MYDEBUG("Updated stats for ~s@~s", [Owner, VHost]), + ok + end. + +get_dates_int(VHost) -> + Tables = mnesia:system_info(tables), + lists:foldl(fun(ATable, Dates) -> + Table = term_to_binary(ATable), + case ejabberd_regexp:run( Table, << VHost/binary, <<"$">>/binary >> ) of + match -> + case re:run(Table, "[0-9]+-[0-9]+-[0-9]+") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(binary_to_list(Table), S+1, E)]); + nomatch -> + Dates + end; + nomatch -> + Dates + end + end, [], Tables). + +rebuild_stats_at_int(VHost, Date) -> + Table = table_name(VHost, Date), + STable = stats_table(VHost), + CFun = fun(Msg, Stats) -> + Owner = Msg#msg.owner_name, + case lists:keysearch(Owner, 1, Stats) of + {value, {_, Count}} -> + lists:keyreplace(Owner, 1, Stats, {Owner, Count + 1}); + false -> + lists:append(Stats, [{Owner, 1}]) + end + end, + DFun = fun(#stats{at=SDate} = Stat, _Acc) + when SDate == Date -> + mnesia:delete_object(stats_table(VHost), Stat, write); + (_Stat, _Acc) -> ok + end, + % TODO: Maybe unregister hooks ? + case mnesia:transaction(fun() -> + mnesia:write_lock_table(Table), + mnesia:write_lock_table(STable), + % Delete all stats for VHost at Date + mnesia:foldl(DFun, [], STable), + % Calc stats for VHost at Date + case mnesia:foldl(CFun, [], Table) of + [] -> empty; + AStats -> + % Write new calc'ed stats + lists:foreach(fun({Owner, Count}) -> + WStat = #stats{user=Owner, at=Date, count=Count}, + mnesia:write(stats_table(VHost), WStat, write) + end, AStats), + ok + end + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to rebuild_stats_at for ~p at ~p: ~p", [VHost, Date, Reason]), + error; + {atomic, ok} -> + ok; + {atomic, empty} -> + {atomic,ok} = mnesia:delete_table(Table), + ?MYDEBUG("Dropped table at ~p", [Date]), + ok + end. + +delete_nonexistent_stats(VHost) -> + Dates = get_dates_int(VHost), + mnesia:transaction(fun() -> + mnesia:foldl(fun(#stats{at=Date} = Stat, _Acc) -> + case lists:member(Date, Dates) of + false -> mnesia:delete_object(Stat); + true -> ok + end + end, ok, stats_table(VHost)) + end). + +delete_stats_by_vhost_at_int(VHost, Date) -> + StatsDelete = fun(#stats{at=SDate} = Stat, _Acc) + when SDate == Date -> + mnesia:delete_object(stats_table(VHost), Stat, write), + ok; + (_Msg, _Acc) -> ok + end, + case mnesia:transaction(fun() -> + mnesia:write_lock_table(stats_table(VHost)), + mnesia:foldl(StatsDelete, ok, stats_table(VHost)) + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to update stats at ~p for ~p: ~p", [Date, VHost, Reason]), + rebuild_stats_at_int(VHost, Date); + _ -> + ?INFO_MSG("Updated stats at ~p for ~p", [Date, VHost]), + ok + end. + +get_user_stats_int(User, VHost) -> + case mnesia:transaction(fun() -> + Pat = #stats{user=User, at='$1', count='$2'}, + mnesia:select(stats_table(VHost), [{Pat, [], [['$1', '$2']]}]) + end) of + {atomic, Result} -> + {ok, mod_logdb:sort_stats([{Date, Count} || [Date, Count] <- Result])}; + {aborted, Reason} -> + {error, Reason} + end. + +delete_all_messages_by_user_at_int(User, VHost, Date) -> + Table = table_name(VHost, Date), + MsgDelete = fun(#msg{owner_name=Owner} = Msg, _Acc) + when Owner == User -> + mnesia:delete_object(Table, Msg, write), + ok; + (_Msg, _Acc) -> ok + end, + DRez = case mnesia:transaction(fun() -> + mnesia:foldl(MsgDelete, ok, Table) + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to delete_all_messages_by_user_at for ~p@~p at ~p: ~p", [User, VHost, Date, Reason]), + error; + _ -> + ok + end, + case rebuild_stats_at_int(VHost, Date) of + error -> + error; + ok -> + DRez + end. + +delete_user_settings_int(User, VHost) -> + STable = settings_table(VHost), + case mnesia:dirty_match_object(STable, #user_settings{owner_name=User, _='_'}) of + [] -> + ok; + [UserSettings] -> + mnesia:dirty_delete_object(STable, UserSettings) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_stats_table(VHost) -> + SName = stats_table(VHost), + case mnesia:create_table(SName, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, stats)}, + {record_name, stats} + ]) of + {atomic, ok} -> + ?MYDEBUG("Created stats table for ~p", [VHost]), + lists:foreach(fun(Date) -> + rebuild_stats_at_int(VHost, Date) + end, get_dates_int(VHost)), + ok; + {aborted, {already_exists, _}} -> + ?MYDEBUG("Stats table for ~p already exists", [VHost]), + ok; + {aborted, Reason} -> + ?ERROR_MSG("Failed to create stats table: ~p", [Reason]), + error + end. + +create_settings_table(VHost) -> + SName = settings_table(VHost), + case mnesia:create_table(SName, + [{disc_copies, [node()]}, + {type, set}, + {attributes, record_info(fields, user_settings)}, + {record_name, user_settings} + ]) of + {atomic, ok} -> + ?MYDEBUG("Created settings table for ~p", [VHost]), + ok; + {aborted, {already_exists, _}} -> + ?MYDEBUG("Settings table for ~p already exists", [VHost]), + ok; + {aborted, Reason} -> + ?ERROR_MSG("Failed to create settings table: ~p", [Reason]), + error + end. + +create_msg_table(VHost, Date) -> + mnesia:create_table( + table_name(VHost, Date), + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, msg)}, + {record_name, msg}]). diff --git a/src/mod_logdb_mysql.erl b/src/mod_logdb_mysql.erl new file mode 100644 index 00000000..21d65e65 --- /dev/null +++ b/src/mod_logdb_mysql.erl @@ -0,0 +1,1050 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_mysql.erl +%%% Author : Oleg Palij (mailto:o.palij@gmail.com) +%%% Purpose : MySQL backend for mod_logdb +%%% Url : https://paleg.github.io/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_mysql). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +% gen_server call timeout +-define(CALL_TIMEOUT, 30000). +-define(MYSQL_TIMEOUT, 60000). +-define(INDEX_SIZE, integer_to_list(170)). +-define(PROCNAME, mod_logdb_mysql). + +-import(mod_logdb, [list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + convert_timestamp_brief/1]). + +-record(state, {dbref, vhost, server, port, db, user, password}). + +% replace "." with "_" +escape_vhost(VHost) -> lists:map(fun(46) -> 95; + (A) -> A + end, binary_to_list(VHost)). +prefix() -> + "`logdb_". + +suffix(VHost) -> + "_" ++ escape_vhost(VHost) ++ "`". + +messages_table(VHost, Date) -> + prefix() ++ "messages_" ++ Date ++ suffix(VHost). + +stats_table(VHost) -> + prefix() ++ "stats" ++ suffix(VHost). + +temp_table(VHost) -> + prefix() ++ "temp" ++ suffix(VHost). + +settings_table(VHost) -> + prefix() ++ "settings" ++ suffix(VHost). + +users_table(VHost) -> + prefix() ++ "users" ++ suffix(VHost). +servers_table(VHost) -> + prefix() ++ "servers" ++ suffix(VHost). +resources_table(VHost) -> + prefix() ++ "resources" ++ suffix(VHost). + +ets_users_table(VHost) -> list_to_atom("logdb_users_" ++ binary_to_list(VHost)). +ets_servers_table(VHost) -> list_to_atom("logdb_servers_" ++ binary_to_list(VHost)). +ets_resources_table(VHost) -> list_to_atom("logdb_resources_" ++ binary_to_list(VHost)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, Opts]) -> + crypto:start(), + + Server = gen_mod:get_opt(server, Opts, fun(A) -> A end, <<"localhost">>), + Port = gen_mod:get_opt(port, Opts, fun(A) -> A end, 3306), + DB = gen_mod:get_opt(db, Opts, fun(A) -> A end, <<"logdb">>), + User = gen_mod:get_opt(user, Opts, fun(A) -> A end, <<"root">>), + Password = gen_mod:get_opt(password, Opts, fun(A) -> A end, <<"">>), + + St = #state{vhost=VHost, + server=Server, port=Port, db=DB, + user=User, password=Password}, + + case open_mysql_connection(St) of + {ok, DBRef} -> + State = St#state{dbref=DBRef}, + ok = create_stats_table(State), + ok = create_settings_table(State), + ok = create_users_table(State), + % clear ets cache every ... + timer:send_interval(timer:hours(12), clear_ets_tables), + ok = create_servers_table(State), + ok = create_resources_table(State), + erlang:monitor(process, DBRef), + {ok, State}; + {error, Reason} -> + ?ERROR_MSG("MySQL connection failed: ~p~n", [Reason]), + {stop, db_connection_failed} + end. + +open_mysql_connection(#state{server=Server, port=Port, db=DB, + user=DBUser, password=Password} = _State) -> + LogFun = fun(debug, _Format, _Argument) -> + %?MYDEBUG(Format, Argument); + ok; + (error, Format, Argument) -> + ?ERROR_MSG(Format, Argument); + (Level, Format, Argument) -> + ?MYDEBUG("MySQL (~p)~n", [Level]), + ?MYDEBUG(Format, Argument) + end, + ?INFO_MSG("Opening mysql connection ~s@~s:~p/~s", [DBUser, Server, Port, DB]), + p1_mysql_conn:start(binary_to_list(Server), Port, + binary_to_list(DBUser), binary_to_list(Password), + binary_to_list(DB), LogFun). + +close_mysql_connection(DBRef) -> + ?MYDEBUG("Closing ~p mysql connection", [DBRef]), + catch p1_mysql_conn:stop(DBRef). + +handle_call({log_message, Msg}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Date = convert_timestamp_brief(Msg#msg.timestamp), + + Table = messages_table(VHost, Date), + Owner_id = get_user_id(DBRef, VHost, binary_to_list(Msg#msg.owner_name)), + Peer_name_id = get_user_id(DBRef, VHost, binary_to_list(Msg#msg.peer_name)), + Peer_server_id = get_server_id(DBRef, VHost, binary_to_list(Msg#msg.peer_server)), + Peer_resource_id = get_resource_id(DBRef, VHost, binary_to_list(Msg#msg.peer_resource)), + + Query = ["INSERT INTO ",Table," ", + "(owner_id,", + "peer_name_id,", + "peer_server_id,", + "peer_resource_id,", + "direction,", + "type,", + "subject,", + "body,", + "timestamp) ", + "VALUES ", + "('", Owner_id, "',", + "'", Peer_name_id, "',", + "'", Peer_server_id, "',", + "'", Peer_resource_id, "',", + "'", atom_to_list(Msg#msg.direction), "',", + "'", binary_to_list(Msg#msg.type), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.subject) ), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.body) ), "',", + "'", Msg#msg.timestamp, "');"], + + Reply = + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + increment_user_stats(DBRef, Msg#msg.owner_name, Owner_id, VHost, Peer_name_id, Peer_server_id, Date); + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#42S02">>) of + % Table doesn't exist + match -> + case create_msg_table(DBRef, VHost, Date) of + error -> + error; + ok -> + {updated, _} = sql_query_internal(DBRef, Query), + increment_user_stats(DBRef, binary_to_list(Msg#msg.owner_name), Owner_id, VHost, Peer_name_id, Peer_server_id, Date) + end; + _ -> + ?ERROR_MSG("Failed to log message: ~p", [Reason]), + error + end + end, + {reply, Reply, State}; +handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Reply = rebuild_stats_at_int(DBRef, VHost, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, [], _Date}, _From, State) -> + {reply, error, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) -> + ["\"",Timestamp,"\"",","] + end, Msgs), + + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + + Query = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE timestamp IN (", Temp1], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, Aff} -> + ?MYDEBUG("Aff=~p", [Aff]), + rebuild_stats_at_int(DBRef, VHost, Date); + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + ok = delete_all_messages_by_user_at_int(DBRef, User, VHost, Date), + ok = delete_stats_by_user_at_int(DBRef, User, VHost, Date), + {reply, ok, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Reply = + case sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Date),";"]) of + {updated, _} -> + Query = ["DELETE FROM ",stats_table(VHost)," " + "WHERE at=\"",Date,"\";"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at, sum(count) ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]}; + {error, Reason} -> + % TODO: Duplicate error message ? + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT username, sum(count) AS allcount ", + "FROM ",SName," ", + "JOIN ",users_table(VHost)," ON owner_id=user_id " + "WHERE at=\"",Date,"\" " + "GROUP BY username ", + "ORDER BY allcount DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, lists:reverse( + lists:keysort(2, + [ {User, list_to_integer(Count)} || [User, Count] <- Result]))}; + {error, Reason} -> + % TODO: + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + {reply, get_user_stats_int(DBRef, User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + TName = messages_table(VHost, Date), + UName = users_table(VHost), + SName = servers_table(VHost), + RName = resources_table(VHost), + Query = ["SELECT users.username,", + "servers.server,", + "resources.resource,", + "messages.direction," + "messages.type," + "messages.subject," + "messages.body," + "messages.timestamp " + "FROM ",TName," AS messages " + "JOIN ",UName," AS users ON peer_name_id=user_id ", + "JOIN ",SName," AS servers ON peer_server_id=server_id ", + "JOIN ",RName," AS resources ON peer_resource_id=resource_id ", + "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\" ", + "ORDER BY timestamp ASC;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + Fun = fun([Peer_name, Peer_server, Peer_resource, + Direction, + Type, + Subject, Body, + Timestamp]) -> + #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource, + direction=list_to_atom(Direction), + type=Type, + subject=Subject, body=Body, + timestamp=Timestamp} + end, + {ok, lists:map(Fun, Result)}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + [ Date || [Date] <- Result ]; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost)," ", + "JOIN ",users_table(VHost)," ON user_id=owner_id;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, lists:map(fun([Owner, DoLogDef, DoLogL, DoNotLogL]) -> + #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL) + } + end, Result)}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT dolog_default,dolog_list,donotlog_list FROM ",settings_table(VHost)," ", + "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\";"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, []} -> + {ok, []}; + {data, [[Owner, DoLogDef, DoLogL, DoNotLogL]]} -> + {ok, #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL)}}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef, + dolog_list=DoLogL, + donotlog_list=DoNotLogL}}, + _From, #state{dbref=DBRef, vhost=VHost} = State) -> + User_id = get_user_id(DBRef, VHost, User), + + Query = ["UPDATE ",settings_table(VHost)," ", + "SET dolog_default=",bool_to_list(DoLogDef),", ", + "dolog_list='",list_to_string(DoLogL),"', ", + "donotlog_list='",list_to_string(DoNotLogL),"' ", + "WHERE owner_id=\"",User_id,"\";"], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, 0} -> + IQuery = ["INSERT INTO ",settings_table(VHost)," ", + "(owner_id, dolog_default, dolog_list, donotlog_list) ", + "VALUES ", + "('",User_id,"', ",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + ?MYDEBUG("New settings for ~s@~s", [User, VHost]), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>) of + % Already exists + match -> + ok; + _ -> + ?ERROR_MSG("Failed setup user ~p@~p: ~p", [User, VHost, Reason]), + error + end + end; + {updated, 1} -> + ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, #state{vhost=VHost}=State) -> + ets:delete(ets_users_table(VHost)), + ets:delete(ets_servers_table(VHost)), + ?MYDEBUG("Stoping mysql backend for ~p", [VHost]), + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_cast({rebuild_stats}, State) -> + rebuild_all_stats_int(State), + {noreply, State}; +handle_cast({drop_user, User}, #state{vhost=VHost} = State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + {ok, Dates} = get_user_stats_int(DBRef, User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) + end, Dates), + StDResult = delete_all_stats_by_user_int(DBRef, User, VHost), + SDResult = delete_user_settings_int(DBRef, User, VHost), + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append([MDResult, [StDResult], [SDResult]])) of + true -> + ?INFO_MSG("Removed ~s@~s", [User, VHost]); + false -> + ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost]) + end, + close_mysql_connection(DBRef) + end, + spawn(Fun), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info(clear_ets_tables, State) -> + ets:delete_all_objects(ets_users_table(State#state.vhost)), + ets:delete_all_objects(ets_resources_table(State#state.vhost)), + {noreply, State}; +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, connection_dropped, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, #state{dbref=DBRef}=_State) -> + close_mysql_connection(DBRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {drop_user, User}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +increment_user_stats(DBRef, User_name, User_id, VHost, PNameID, PServerID, Date) -> + SName = stats_table(VHost), + UQuery = ["UPDATE ",SName," ", + "SET count=count+1 ", + "WHERE owner_id=\"",User_id,"\" AND peer_name_id=\"",PNameID,"\" AND peer_server_id=\"",PServerID,"\" AND at=\"",Date,"\";"], + + case sql_query_internal(DBRef, UQuery) of + {updated, 0} -> + IQuery = ["INSERT INTO ",SName," ", + "(owner_id, peer_name_id, peer_server_id, at, count) ", + "VALUES ", + "('",User_id,"', '",PNameID,"', '",PServerID,"', '",Date,"', '1');"], + case sql_query_internal(DBRef, IQuery) of + {updated, _} -> + ?MYDEBUG("New stats for ~s@~s at ~s", [User_name, VHost, Date]), + ok; + {error, _} -> + error + end; + {updated, _} -> + ?MYDEBUG("Updated stats for ~s@~s at ~s", [User_name, VHost, Date]), + ok; + {error, _} -> + error + end. + +get_dates_int(DBRef, VHost) -> + case sql_query_internal(DBRef, ["SHOW TABLES"]) of + {data, Tables} -> + Reg = "^" ++ lists:sublist(prefix(),2,length(prefix())) ++ ".*" ++ escape_vhost(VHost), + lists:foldl(fun([Table], Dates) -> + case re:run(Table, Reg) of + {match, _} -> + case re:run(Table, "[0-9]+-[0-9]+-[0-9]+") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(Table, S+1, E)]); + nomatch -> + Dates + end; + _ -> + Dates + end + end, [], Tables); + {error, _} -> + [] + end. + +rebuild_all_stats_int(#state{vhost=VHost}=State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + ok = delete_nonexistent_stats(DBRef, VHost), + case lists:filter(fun(Date) -> + case catch rebuild_stats_at_int(DBRef, VHost, Date) of + ok -> false; + error -> true; + {'EXIT', _} -> true + end + end, get_dates_int(DBRef, VHost)) of + [] -> ok; + FTables -> + ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]), + error + end, + close_mysql_connection(DBRef) + end, + spawn(Fun). + +rebuild_stats_at_int(DBRef, VHost, Date) -> + TempTable = temp_table(VHost), + Fun = fun() -> + Table = messages_table(VHost, Date), + STable = stats_table(VHost), + + DQuery = [ "DELETE FROM ",STable," ", + "WHERE at='",Date,"';"], + + ok = create_temp_table(DBRef, TempTable), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," WRITE, ",TempTable," WRITE;"]), + SQuery = ["INSERT INTO ",TempTable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,\"",Date,"\",count(*) ", + "FROM ",Table," GROUP BY owner_id,peer_name_id,peer_server_id;"], + case sql_query_internal(DBRef, SQuery) of + {updated, 0} -> + Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]), + case Count of + {data, [["0"]]} -> + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table,";"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + ok; + _ -> + ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]), + error + end; + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + SQuery1 = ["INSERT INTO ",STable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,at,count ", + "FROM ",TempTable,";"], + case sql_query_internal(DBRef, SQuery1) of + {updated, _} -> ok; + {error, _} -> error + end; + {error, _} -> error + end + end, + + case catch apply(Fun, []) of + ok -> + ?INFO_MSG("Rebuilded stats for ~p at ~p", [VHost, Date]), + ok; + error -> + error; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]), + error + end, + sql_query_internal(DBRef, ["UNLOCK TABLES;"]), + sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]), + ok. + + +delete_nonexistent_stats(DBRef, VHost) -> + Dates = get_dates_int(DBRef, VHost), + STable = stats_table(VHost), + + Temp = lists:flatmap(fun(Date) -> + ["\"",Date,"\"",","] + end, Dates), + + case Temp of + [] -> + ok; + _ -> + % replace last "," with ");" + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + Query = ["DELETE FROM ",STable," ", + "WHERE at NOT IN (", Temp1], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end + end. + +get_user_stats_int(DBRef, User, VHost) -> + SName = stats_table(VHost), + Query = ["SELECT at, sum(count) as allcount ", + "FROM ",SName," ", + "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\" ", + "GROUP BY at " + "ORDER BY DATE(at) DESC;" + ], + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result]}; + {error, Result} -> + {error, Result} + end. + +delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) -> + DQuery = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, DQuery) of + {updated, _} -> + ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> + error + end. + +delete_all_stats_by_user_int(DBRef, User, VHost) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]), + ok; + {error, _} -> error + end. + +delete_stats_by_user_at_int(DBRef, User, VHost, Date) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\") ", + "AND at=\"",Date,"\";"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> error + end. + +delete_user_settings_int(DBRef, User, VHost) -> + Query = ["DELETE FROM ",settings_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]), + ok; + {error, Reason} -> + ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]), + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_temp_table(DBRef, Name) -> + Query = ["CREATE TABLE ",Name," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at VARCHAR(11), ", + "count INT(11) ", + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> ok; + {error, _Reason} -> error + end. + +create_stats_table(#state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["CREATE TABLE ",SName," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at varchar(20), ", + "count int(11), ", + "INDEX(owner_id, peer_name_id, peer_server_id), ", + "INDEX(at)" + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Created stats table for ~p", [VHost]), + rebuild_all_stats_int(State), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#42S01">>) of + match -> + ?MYDEBUG("Stats table for ~p already exists", [VHost]), + CheckQuery = ["SHOW COLUMNS FROM ",SName," LIKE 'peer_%_id';"], + case sql_query_internal(DBRef, CheckQuery) of + {data, Elems} when length(Elems) == 2 -> + ?MYDEBUG("Stats table structure is ok", []), + ok; + _ -> + ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []), + case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of + {updated, _} -> + ?INFO_MSG("Successfully dropped ~p", [SName]); + _ -> + ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName]) + end, + error + end; + _ -> + ?ERROR_MSG("Failed to create stats table for ~p: ~p", [VHost, Reason]), + error + end + end. + +create_settings_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = settings_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "owner_id MEDIUMINT UNSIGNED PRIMARY KEY, ", + "dolog_default TINYINT(1) NOT NULL DEFAULT 1, ", + "dolog_list TEXT, ", + "donotlog_list TEXT ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created settings table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_users_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = users_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "username TEXT NOT NULL, ", + "user_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(username(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created users table for ~p", [VHost]), + ets:new(ets_users_table(VHost), [named_table, set, public]), + %update_users_from_db(DBRef, VHost), + ok; + {error, _} -> + error + end. + +create_servers_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = servers_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "server TEXT NOT NULL, ", + "server_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(server(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created servers table for ~p", [VHost]), + ets:new(ets_servers_table(VHost), [named_table, set, public]), + update_servers_from_db(DBRef, VHost), + ok; + {error, _} -> + error + end. + +create_resources_table(#state{dbref=DBRef, vhost=VHost}) -> + RName = resources_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",RName," (", + "resource TEXT NOT NULL, ", + "resource_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(resource(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created resources table for ~p", [VHost]), + ets:new(ets_resources_table(VHost), [named_table, set, public]), + ok; + {error, _} -> + error + end. + +create_msg_table(DBRef, VHost, Date) -> + TName = messages_table(VHost, Date), + Query = ["CREATE TABLE ",TName," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "peer_resource_id MEDIUMINT(8) UNSIGNED, ", + "direction ENUM('to', 'from'), ", + "type ENUM('chat','error','groupchat','headline','normal') NOT NULL, ", + "subject TEXT, ", + "body TEXT, ", + "timestamp DOUBLE, ", + "INDEX search_i (owner_id, peer_name_id, peer_server_id, peer_resource_id), ", + "FULLTEXT (body) " + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _MySQLRes} -> + ?MYDEBUG("Created msg table for ~p at ~p", [VHost, Date]), + ok; + {error, _} -> + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internal ets cache (users, servers, resources) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +update_servers_from_db(DBRef, VHost) -> + ?INFO_MSG("Reading servers from db for ~p", [VHost]), + SQuery = ["SELECT server, server_id FROM ",servers_table(VHost),";"], + {data, Result} = sql_query_internal(DBRef, SQuery), + true = ets:delete_all_objects(ets_servers_table(VHost)), + true = ets:insert(ets_servers_table(VHost), [ {Server, Server_id} || [Server, Server_id] <- Result]). + +%update_users_from_db(DBRef, VHost) -> +% ?INFO_MSG("Reading users from db for ~p", [VHost]), +% SQuery = ["SELECT username, user_id FROM ",users_table(VHost),";"], +% {data, Result} = sql_query_internal(DBRef, SQuery), +% true = ets:delete_all_objects(ets_users_table(VHost)), +% true = ets:insert(ets_users_table(VHost), [ {Username, User_id} || [Username, User_id] <- Result]). + +%get_user_name(DBRef, VHost, User_id) -> +% case ets:match(ets_users_table(VHost), {'$1', User_id}) of +% [[User]] -> User; +% % this can be in clustered environment +% [] -> +% %update_users_from_db(DBRef, VHost), +% SQuery = ["SELECT username FROM ",users_table(VHost)," ", +% "WHERE user_id=\"",User_id,"\";"], +% {data, [[Name]]} = sql_query_internal(DBRef, SQuery), +% % cache {user, id} pair +% ets:insert(ets_users_table(VHost), {Name, User_id}), +% Name +% end. + +%get_server_name(DBRef, VHost, Server_id) -> +% case ets:match(ets_servers_table(VHost), {'$1', Server_id}) of +% [[Server]] -> Server; + % this can be in clustered environment +% [] -> +% update_servers_from_db(DBRef, VHost), +% [[Server1]] = ets:match(ets_servers_table(VHost), {'$1', Server_id}), +% Server1 +% end. + +get_user_id_from_db(DBRef, VHost, User) -> + SQuery = ["SELECT user_id FROM ",users_table(VHost)," ", + "WHERE username=\"",User,"\";"], + case sql_query_internal(DBRef, SQuery) of + % no such user in db + {data, []} -> + {ok, []}; + {data, [[DBId]]} -> + % cache {user, id} pair + ets:insert(ets_users_table(VHost), {User, DBId}), + {ok, DBId} + end. +get_user_id(DBRef, VHost, User) -> + % Look at ets + case ets:match(ets_users_table(VHost), {User, '$1'}) of + [] -> + % Look at db + case get_user_id_from_db(DBRef, VHost, User) of + % no such user in db + {ok, []} -> + IQuery = ["INSERT INTO ",users_table(VHost)," ", + "SET username=\"",User,"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {ok, NewId} = get_user_id_from_db(DBRef, VHost, User), + NewId; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~p", [User]), + {ok, ClID} = get_user_id_from_db(DBRef, VHost, User), + ClID + end; + {ok, DBId} -> + DBId + end; + [[EtsId]] -> EtsId + end. + +get_server_id(DBRef, VHost, Server) -> + case ets:match(ets_servers_table(VHost), {Server, '$1'}) of + [] -> + IQuery = ["INSERT INTO ",servers_table(VHost)," ", + "SET server=\"",Server,"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + SQuery = ["SELECT server_id FROM ",servers_table(VHost)," ", + "WHERE server=\"",Server,"\";"], + {data, [[Id]]} = sql_query_internal(DBRef, SQuery), + ets:insert(ets_servers_table(VHost), {Server, Id}), + Id; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~p", [Server]), + update_servers_from_db(DBRef, VHost), + [[Id1]] = ets:match(ets_servers_table(VHost), {Server, '$1'}), + Id1 + end; + [[Id]] -> Id + end. + +get_resource_id_from_db(DBRef, VHost, Resource) -> + SQuery = ["SELECT resource_id FROM ",resources_table(VHost)," ", + "WHERE resource=\"",binary_to_list(ejabberd_sql:escape(iolist_to_binary(Resource))),"\";"], + case sql_query_internal(DBRef, SQuery) of + % no such resource in db + {data, []} -> + {ok, []}; + {data, [[DBId]]} -> + % cache {resource, id} pair + ets:insert(ets_resources_table(VHost), {Resource, DBId}), + {ok, DBId} + end. +get_resource_id(DBRef, VHost, Resource) -> + % Look at ets + case ets:match(ets_resources_table(VHost), {Resource, '$1'}) of + [] -> + % Look at db + case get_resource_id_from_db(DBRef, VHost, Resource) of + % no such resource in db + {ok, []} -> + IQuery = ["INSERT INTO ",resources_table(VHost)," ", + "SET resource=\"",binary_to_list(ejabberd_sql:escape(iolist_to_binary(Resource))),"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {ok, NewId} = get_resource_id_from_db(DBRef, VHost, Resource), + NewId; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~s", [Resource]), + {ok, ClID} = get_resource_id_from_db(DBRef, VHost, Resource), + ClID + end; + {ok, DBId} -> + DBId + end; + [[EtsId]] -> EtsId + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% SQL internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +sql_query_internal(DBRef, Query) -> + case sql_query_internal_silent(DBRef, Query) of + {error, Reason} -> + ?ERROR_MSG("~p while ~p", [Reason, lists:append(Query)]), + {error, Reason}; + Rez -> Rez + end. + +sql_query_internal_silent(DBRef, Query) -> + ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]), + get_result(p1_mysql_conn:fetch(DBRef, Query, self(), ?MYSQL_TIMEOUT)). + +get_result({updated, MySQLRes}) -> + {updated, p1_mysql:get_result_affected_rows(MySQLRes)}; +get_result({data, MySQLRes}) -> + {data, p1_mysql:get_result_rows(MySQLRes)}; +get_result({error, "query timed out"}) -> + {error, "query timed out"}; +get_result({error, MySQLRes}) -> + Reason = p1_mysql:get_result_reason(MySQLRes), + {error, Reason}. diff --git a/src/mod_logdb_mysql5.erl b/src/mod_logdb_mysql5.erl new file mode 100644 index 00000000..c05ab958 --- /dev/null +++ b/src/mod_logdb_mysql5.erl @@ -0,0 +1,979 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_mysql5.erl +%%% Author : Oleg Palij (mailto:o.palij@gmail.com) +%%% Purpose : MySQL 5 backend for mod_logdb +%%% Url : https://paleg.github.io/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_mysql5). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +% gen_server call timeout +-define(CALL_TIMEOUT, 30000). +-define(MYSQL_TIMEOUT, 60000). +-define(INDEX_SIZE, integer_to_list(170)). +-define(PROCNAME, mod_logdb_mysql5). + +-import(mod_logdb, [list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + convert_timestamp_brief/1]). + +-record(state, {dbref, vhost, server, port, db, user, password}). + +% replace "." with "_" +escape_vhost(VHost) -> lists:map(fun(46) -> 95; + (A) -> A + end, binary_to_list(VHost)). +prefix() -> + "`logdb_". + +suffix(VHost) -> + "_" ++ escape_vhost(VHost) ++ "`". + +messages_table(VHost, Date) -> + prefix() ++ "messages_" ++ Date ++ suffix(VHost). + +% TODO: this needs to be redone to unify view name in stored procedure and in delete_messages_at/2 +view_table(VHost, Date) -> + Table = messages_table(VHost, Date), + TablewoQ = lists:sublist(Table, 2, length(Table) - 2), + lists:append(["`v_", TablewoQ, "`"]). + +stats_table(VHost) -> + prefix() ++ "stats" ++ suffix(VHost). + +temp_table(VHost) -> + prefix() ++ "temp" ++ suffix(VHost). + +settings_table(VHost) -> + prefix() ++ "settings" ++ suffix(VHost). + +users_table(VHost) -> + prefix() ++ "users" ++ suffix(VHost). +servers_table(VHost) -> + prefix() ++ "servers" ++ suffix(VHost). +resources_table(VHost) -> + prefix() ++ "resources" ++ suffix(VHost). + +logmessage_name(VHost) -> + prefix() ++ "logmessage" ++ suffix(VHost). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, Opts]) -> + crypto:start(), + + Server = gen_mod:get_opt(server, Opts, fun(A) -> A end, <<"localhost">>), + Port = gen_mod:get_opt(port, Opts, fun(A) -> A end, 3306), + DB = gen_mod:get_opt(db, Opts, fun(A) -> A end, <<"logdb">>), + User = gen_mod:get_opt(user, Opts, fun(A) -> A end, <<"root">>), + Password = gen_mod:get_opt(password, Opts, fun(A) -> A end, <<"">>), + + St = #state{vhost=VHost, + server=Server, port=Port, db=DB, + user=User, password=Password}, + + case open_mysql_connection(St) of + {ok, DBRef} -> + State = St#state{dbref=DBRef}, + ok = create_internals(State), + ok = create_stats_table(State), + ok = create_settings_table(State), + ok = create_users_table(State), + ok = create_servers_table(State), + ok = create_resources_table(State), + erlang:monitor(process, DBRef), + {ok, State}; + {error, Reason} -> + ?ERROR_MSG("MySQL connection failed: ~p~n", [Reason]), + {stop, db_connection_failed} + end. + +open_mysql_connection(#state{server=Server, port=Port, db=DB, + user=DBUser, password=Password} = _State) -> + LogFun = fun(debug, _Format, _Argument) -> + %?MYDEBUG(Format, Argument); + ok; + (error, Format, Argument) -> + ?ERROR_MSG(Format, Argument); + (Level, Format, Argument) -> + ?MYDEBUG("MySQL (~p)~n", [Level]), + ?MYDEBUG(Format, Argument) + end, + ?INFO_MSG("Opening mysql connection ~s@~s:~p/~s", [DBUser, Server, Port, DB]), + p1_mysql_conn:start(binary_to_list(Server), Port, + binary_to_list(DBUser), binary_to_list(Password), + binary_to_list(DB), LogFun). + +close_mysql_connection(DBRef) -> + ?MYDEBUG("Closing ~p mysql connection", [DBRef]), + catch p1_mysql_conn:stop(DBRef). + +handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Reply = rebuild_stats_at_int(DBRef, VHost, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, [], _Date}, _From, State) -> + {reply, error, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) -> + ["\"",Timestamp,"\"",","] + end, Msgs), + + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + + Query = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE timestamp IN (", Temp1], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, Aff} -> + ?MYDEBUG("Aff=~p", [Aff]), + rebuild_stats_at_int(DBRef, VHost, Date); + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + ok = delete_all_messages_by_user_at_int(DBRef, User, VHost, Date), + ok = delete_stats_by_user_at_int(DBRef, User, VHost, Date), + {reply, ok, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Fun = fun() -> + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Date),";"]), + TQuery = ["DELETE FROM ",stats_table(VHost)," " + "WHERE at=\"",Date,"\";"], + {updated, _} = sql_query_internal(DBRef, TQuery), + VQuery = ["DROP VIEW IF EXISTS ",view_table(VHost,Date),";"], + {updated, _} = sql_query_internal(DBRef, VQuery), + ok + end, + Reply = + case catch apply(Fun, []) of + ok -> + ok; + {'EXIT', _} -> + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at, sum(count) ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]}; + {error, Reason} -> + % TODO: Duplicate error message ? + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT username, sum(count) as allcount ", + "FROM ",SName," ", + "JOIN ",users_table(VHost)," ON owner_id=user_id " + "WHERE at=\"",Date,"\" ", + "GROUP BY username ", + "ORDER BY allcount DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {User, list_to_integer(Count)} || [User, Count] <- Result ]}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + {reply, get_user_stats_int(DBRef, User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT peer_name,", + "peer_server,", + "peer_resource,", + "direction," + "type," + "subject," + "body," + "timestamp " + "FROM ",view_table(VHost, Date)," " + "WHERE owner_name=\"",User,"\";"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + Fun = fun([Peer_name, Peer_server, Peer_resource, + Direction, + Type, + Subject, Body, + Timestamp]) -> + #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource, + direction=list_to_atom(Direction), + type=Type, + subject=Subject, body=Body, + timestamp=Timestamp} + end, + {ok, lists:map(Fun, Result)}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + [ Date || [Date] <- Result ]; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost)," ", + "JOIN ",users_table(VHost)," ON user_id=owner_id;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, lists:map(fun([Owner, DoLogDef, DoLogL, DoNotLogL]) -> + #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL) + } + end, Result)}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT dolog_default,dolog_list,donotlog_list FROM ",settings_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, []} -> + {ok, []}; + {data, [[Owner, DoLogDef, DoLogL, DoNotLogL]]} -> + {ok, #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL)}}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef, + dolog_list=DoLogL, + donotlog_list=DoNotLogL}}, + _From, #state{dbref=DBRef, vhost=VHost} = State) -> + User_id = get_user_id(DBRef, VHost, User), + Query = ["UPDATE ",settings_table(VHost)," ", + "SET dolog_default=",bool_to_list(DoLogDef),", ", + "dolog_list='",list_to_string(DoLogL),"', ", + "donotlog_list='",list_to_string(DoNotLogL),"' ", + "WHERE owner_id=",User_id,";"], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, 0} -> + IQuery = ["INSERT INTO ",settings_table(VHost)," ", + "(owner_id, dolog_default, dolog_list, donotlog_list) ", + "VALUES ", + "(",User_id,",",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + ?MYDEBUG("New settings for ~s@~s", [User, VHost]), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>) of + % Already exists + match -> + ok; + _ -> + ?ERROR_MSG("Failed setup user ~p@~p: ~p", [User, VHost, Reason]), + error + end + end; + {updated, 1} -> + ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, #state{vhost=VHost}=State) -> + ?MYDEBUG("Stoping mysql5 backend for ~p", [VHost]), + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_cast({log_message, Msg}, #state{dbref=DBRef, vhost=VHost}=State) -> + Fun = fun() -> + Date = convert_timestamp_brief(Msg#msg.timestamp), + TableName = messages_table(VHost, Date), + + Query = [ "CALL ",logmessage_name(VHost)," " + "('", TableName, "',", + "'", Date, "',", + "'", binary_to_list(Msg#msg.owner_name), "',", + "'", binary_to_list(Msg#msg.peer_name), "',", + "'", binary_to_list(Msg#msg.peer_server), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.peer_resource) ), "',", + "'", atom_to_list(Msg#msg.direction), "',", + "'", binary_to_list(Msg#msg.type), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.subject) ), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.body) ), "',", + "'", Msg#msg.timestamp, "');"], + + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + ok; + {error, _Reason} -> + error + end + end, + spawn(Fun), + {noreply, State}; +handle_cast({rebuild_stats}, State) -> + rebuild_all_stats_int(State), + {noreply, State}; +handle_cast({drop_user, User}, #state{vhost=VHost} = State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + {ok, Dates} = get_user_stats_int(DBRef, User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) + end, Dates), + StDResult = delete_all_stats_by_user_int(DBRef, User, VHost), + SDResult = delete_user_settings_int(DBRef, User, VHost), + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append([MDResult, [StDResult], [SDResult]])) of + true -> + ?INFO_MSG("Removed ~s@~s", [User, VHost]); + false -> + ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost]) + end, + close_mysql_connection(DBRef) + end, + spawn(Fun), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, connection_dropped, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, #state{dbref=DBRef}=_State) -> + close_mysql_connection(DBRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {log_message, Msg}). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {drop_user, User}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_dates_int(DBRef, VHost) -> + case sql_query_internal(DBRef, ["SHOW TABLES"]) of + {data, Tables} -> + Reg = "^" ++ lists:sublist(prefix(),2,length(prefix())) ++ ".*" ++ escape_vhost(VHost), + lists:foldl(fun([Table], Dates) -> + case re:run(Table, Reg) of + {match, _} -> + case re:run(Table, "[0-9]+-[0-9]+-[0-9]+") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(Table, S+1, E)]); + nomatch -> + Dates + end; + _ -> + Dates + end + end, [], Tables); + {error, _} -> + [] + end. + +rebuild_all_stats_int(#state{vhost=VHost}=State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + ok = delete_nonexistent_stats(DBRef, VHost), + case lists:filter(fun(Date) -> + case catch rebuild_stats_at_int(DBRef, VHost, Date) of + ok -> false; + error -> true; + {'EXIT', _} -> true + end + end, get_dates_int(DBRef, VHost)) of + [] -> ok; + FTables -> + ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]), + error + end, + close_mysql_connection(DBRef) + end, + spawn(Fun). + +rebuild_stats_at_int(DBRef, VHost, Date) -> + TempTable = temp_table(VHost), + Fun = fun() -> + Table = messages_table(VHost, Date), + STable = stats_table(VHost), + + DQuery = [ "DELETE FROM ",STable," ", + "WHERE at='",Date,"';"], + + ok = create_temp_table(DBRef, TempTable), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," WRITE, ",TempTable," WRITE;"]), + SQuery = ["INSERT INTO ",TempTable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,\"",Date,"\",count(*) ", + "FROM ",Table," WHERE ext is NULL GROUP BY owner_id,peer_name_id,peer_server_id;"], + case sql_query_internal(DBRef, SQuery) of + {updated, 0} -> + Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]), + case Count of + {data, [["0"]]} -> + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table,";"]), + sql_query_internal(DBRef, ["UNLOCK TABLES;"]), + {updated, _} = sql_query_internal(DBRef, ["DROP VIEW IF EXISTS ",view_table(VHost,Date),";"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + ok; + _ -> + ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]), + error + end; + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + SQuery1 = ["INSERT INTO ",STable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,at,count ", + "FROM ",TempTable,";"], + case sql_query_internal(DBRef, SQuery1) of + {updated, _} -> ok; + {error, _} -> error + end; + {error, _} -> error + end + end, + + case catch apply(Fun, []) of + ok -> + ?INFO_MSG("Rebuilded stats for ~p at ~p", [VHost, Date]), + ok; + error -> + error; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]), + error + end, + sql_query_internal(DBRef, ["UNLOCK TABLES;"]), + sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]), + ok. + +delete_nonexistent_stats(DBRef, VHost) -> + Dates = get_dates_int(DBRef, VHost), + STable = stats_table(VHost), + + Temp = lists:flatmap(fun(Date) -> + ["\"",Date,"\"",","] + end, Dates), + case Temp of + [] -> + ok; + _ -> + % replace last "," with ");" + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + Query = ["DELETE FROM ",STable," ", + "WHERE at NOT IN (", Temp1], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end + end. + +get_user_stats_int(DBRef, User, VHost) -> + SName = stats_table(VHost), + UName = users_table(VHost), + Query = ["SELECT stats.at, sum(stats.count) ", + "FROM ",UName," AS users ", + "JOIN ",SName," AS stats ON owner_id=user_id " + "WHERE users.username=\"",User,"\" ", + "GROUP BY stats.at " + "ORDER BY DATE(stats.at) DESC;" + ], + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]}; + {error, Result} -> + {error, Result} + end. + +delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) -> + DQuery = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, DQuery) of + {updated, _} -> + ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> + error + end. + +delete_all_stats_by_user_int(DBRef, User, VHost) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]), + ok; + {error, _} -> error + end. + +delete_stats_by_user_at_int(DBRef, User, VHost, Date) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\") ", + "AND at=\"",Date,"\";"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> error + end. + +delete_user_settings_int(DBRef, User, VHost) -> + Query = ["DELETE FROM ",settings_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]), + ok; + {error, Reason} -> + ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]), + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_temp_table(DBRef, Name) -> + Query = ["CREATE TABLE ",Name," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at VARCHAR(11), ", + "count INT(11) ", + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> ok; + {error, _Reason} -> error + end. + +create_stats_table(#state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["CREATE TABLE ",SName," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at VARCHAR(11), ", + "count INT(11), ", + "ext INTEGER DEFAULT NULL, " + "INDEX ext_i (ext), " + "INDEX(owner_id,peer_name_id,peer_server_id), ", + "INDEX(at) ", + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created stats table for ~p", [VHost]), + rebuild_all_stats_int(State), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#42S01">>) of + match -> + ?MYDEBUG("Stats table for ~p already exists", [VHost]), + CheckQuery = ["SHOW COLUMNS FROM ",SName," LIKE 'peer_%_id';"], + case sql_query_internal(DBRef, CheckQuery) of + {data, Elems} when length(Elems) == 2 -> + ?MYDEBUG("Stats table structure is ok", []), + ok; + _ -> + ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []), + case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of + {updated, _} -> + ?INFO_MSG("Successfully dropped ~p", [SName]); + _ -> + ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName]) + end, + error + end; + _ -> + ?ERROR_MSG("Failed to create stats table for ~p: ~p", [VHost, Reason]), + error + end + end. + +create_settings_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = settings_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "owner_id MEDIUMINT UNSIGNED PRIMARY KEY, ", + "dolog_default TINYINT(1) NOT NULL DEFAULT 1, ", + "dolog_list TEXT, ", + "donotlog_list TEXT ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created settings table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_users_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = users_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "username TEXT NOT NULL, ", + "user_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(username(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created users table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_servers_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = servers_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "server TEXT NOT NULL, ", + "server_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(server(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created servers table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_resources_table(#state{dbref=DBRef, vhost=VHost}) -> + RName = resources_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",RName," (", + "resource TEXT NOT NULL, ", + "resource_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(resource(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created resources table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_internals(#state{dbref=DBRef, vhost=VHost}) -> + sql_query_internal(DBRef, ["DROP PROCEDURE IF EXISTS ",logmessage_name(VHost),";"]), + case sql_query_internal(DBRef, [get_logmessage(VHost)]) of + {updated, _} -> + ?MYDEBUG("Created logmessage for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% SQL internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +sql_query_internal(DBRef, Query) -> + case sql_query_internal_silent(DBRef, Query) of + {error, Reason} -> + ?ERROR_MSG("~p while ~p", [Reason, lists:append(Query)]), + {error, Reason}; + Rez -> Rez + end. + +sql_query_internal_silent(DBRef, Query) -> + ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]), + get_result(p1_mysql_conn:fetch(DBRef, Query, self(), ?MYSQL_TIMEOUT)). + +get_result({updated, MySQLRes}) -> + {updated, p1_mysql:get_result_affected_rows(MySQLRes)}; +get_result({data, MySQLRes}) -> + {data, p1_mysql:get_result_rows(MySQLRes)}; +get_result({error, "query timed out"}) -> + {error, "query timed out"}; +get_result({error, MySQLRes}) -> + Reason = p1_mysql:get_result_reason(MySQLRes), + {error, Reason}. + +get_user_id(DBRef, VHost, User) -> + SQuery = ["SELECT user_id FROM ",users_table(VHost)," ", + "WHERE username=\"",User,"\";"], + case sql_query_internal(DBRef, SQuery) of + {data, []} -> + IQuery = ["INSERT INTO ",users_table(VHost)," ", + "SET username=\"",User,"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {data, [[DBIdNew]]} = sql_query_internal(DBRef, SQuery), + DBIdNew; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~p", [User]), + {data, [[ClID]]} = sql_query_internal(DBRef, SQuery), + ClID + end; + {data, [[DBId]]} -> + DBId + end. + +get_logmessage(VHost) -> + UName = users_table(VHost), + SName = servers_table(VHost), + RName = resources_table(VHost), + StName = stats_table(VHost), + io_lib:format(" +CREATE PROCEDURE ~s(tablename TEXT, atdate TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(10), msubject TEXT, mbody TEXT, mtimestamp DOUBLE) +BEGIN + DECLARE ownerID MEDIUMINT UNSIGNED; + DECLARE peer_nameID MEDIUMINT UNSIGNED; + DECLARE peer_serverID MEDIUMINT UNSIGNED; + DECLARE peer_resourceID MEDIUMINT UNSIGNED; + DECLARE Vmtype VARCHAR(10); + DECLARE Vmtimestamp DOUBLE; + DECLARE Vmdirection VARCHAR(4); + DECLARE Vmbody TEXT; + DECLARE Vmsubject TEXT; + DECLARE iq TEXT; + DECLARE cq TEXT; + DECLARE viewname TEXT; + DECLARE notable INT; + DECLARE CONTINUE HANDLER FOR SQLSTATE '42S02' SET @notable = 1; + + SET @notable = 0; + SET @ownerID = NULL; + SET @peer_nameID = NULL; + SET @peer_serverID = NULL; + SET @peer_resourceID = NULL; + + SET @Vmtype = mtype; + SET @Vmtimestamp = mtimestamp; + SET @Vmdirection = mdirection; + SET @Vmbody = mbody; + SET @Vmsubject = msubject; + + SELECT user_id INTO @ownerID FROM ~s WHERE username=owner; + IF @ownerID IS NULL THEN + INSERT INTO ~s SET username=owner; + SET @ownerID = LAST_INSERT_ID(); + END IF; + + SELECT user_id INTO @peer_nameID FROM ~s WHERE username=peer_name; + IF @peer_nameID IS NULL THEN + INSERT INTO ~s SET username=peer_name; + SET @peer_nameID = LAST_INSERT_ID(); + END IF; + + SELECT server_id INTO @peer_serverID FROM ~s WHERE server=peer_server; + IF @peer_serverID IS NULL THEN + INSERT INTO ~s SET server=peer_server; + SET @peer_serverID = LAST_INSERT_ID(); + END IF; + + SELECT resource_id INTO @peer_resourceID FROM ~s WHERE resource=peer_resource; + IF @peer_resourceID IS NULL THEN + INSERT INTO ~s SET resource=peer_resource; + SET @peer_resourceID = LAST_INSERT_ID(); + END IF; + + SET @iq = CONCAT(\"INSERT INTO \",tablename,\" (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (@ownerID,@peer_nameID,@peer_serverID,@peer_resourceID,@Vmdirection,@Vmtype,@Vmsubject,@Vmbody,@Vmtimestamp);\"); + PREPARE insertmsg FROM @iq; + + IF @notable = 1 THEN + SET @cq = CONCAT(\"CREATE TABLE \",tablename,\" ( + owner_id MEDIUMINT UNSIGNED NOT NULL, + peer_name_id MEDIUMINT UNSIGNED NOT NULL, + peer_server_id MEDIUMINT UNSIGNED NOT NULL, + peer_resource_id MEDIUMINT(8) UNSIGNED NOT NULL, + direction ENUM('to', 'from') NOT NULL, + type ENUM('chat','error','groupchat','headline','normal') NOT NULL, + subject TEXT, + body TEXT, + timestamp DOUBLE NOT NULL, + ext INTEGER DEFAULT NULL, + INDEX search_i (owner_id, peer_name_id, peer_server_id, peer_resource_id), + INDEX ext_i (ext), + FULLTEXT (body) + ) ENGINE=MyISAM + PACK_KEYS=1 + CHARACTER SET utf8;\"); + PREPARE createtable FROM @cq; + EXECUTE createtable; + DEALLOCATE PREPARE createtable; + + SET @viewname = CONCAT(\"`v_\", TRIM(BOTH '`' FROM tablename), \"`\"); + SET @cq = CONCAT(\"CREATE OR REPLACE VIEW \",@viewname,\" AS + SELECT owner.username AS owner_name, + peer.username AS peer_name, + servers.server AS peer_server, + resources.resource AS peer_resource, + messages.direction, + messages.type, + messages.subject, + messages.body, + messages.timestamp + FROM + ~s owner, + ~s peer, + ~s servers, + ~s resources, + \", tablename,\" messages + WHERE + owner.user_id=messages.owner_id and + peer.user_id=messages.peer_name_id and + servers.server_id=messages.peer_server_id and + resources.resource_id=messages.peer_resource_id + ORDER BY messages.timestamp;\"); + PREPARE createview FROM @cq; + EXECUTE createview; + DEALLOCATE PREPARE createview; + + SET @notable = 0; + PREPARE insertmsg FROM @iq; + EXECUTE insertmsg; + ELSEIF @notable = 0 THEN + EXECUTE insertmsg; + END IF; + + DEALLOCATE PREPARE insertmsg; + + IF @notable = 0 THEN + UPDATE ~s SET count=count+1 WHERE owner_id=@ownerID AND peer_name_id=@peer_nameID AND peer_server_id=@peer_serverID AND at=atdate; + IF ROW_COUNT() = 0 THEN + INSERT INTO ~s (owner_id, peer_name_id, peer_server_id, at, count) VALUES (@ownerID, @peer_nameID, @peer_serverID, atdate, 1); + END IF; + END IF; +END;", [logmessage_name(VHost),UName,UName,UName,UName,SName,SName,RName,RName,UName,UName,SName,RName,StName,StName]). diff --git a/src/mod_logdb_pgsql.erl b/src/mod_logdb_pgsql.erl new file mode 100644 index 00000000..202c6ed4 --- /dev/null +++ b/src/mod_logdb_pgsql.erl @@ -0,0 +1,1104 @@ +% {ok, DBRef} = pgsql:connect([{host, "127.0.0.1"}, {database, "logdb"}, {user, "logdb"}, {password, "logdb"}, {port, 5432}, {as_binary, true}]). +% Schema = "test". +% pgsql:squery(DBRef, "CREATE TABLE test.\"logdb_stats_test\" (owner_id INTEGER, peer_name_id INTEGER, peer_server_id INTEGER, at VARCHAR(20), count integer);" ). +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_pgsql.erl +%%% Author : Oleg Palij (mailto:o.palij@gmail.com) +%%% Purpose : Posgresql backend for mod_logdb +%%% Url : https://paleg.github.io/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_pgsql). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +-export([view_table/3]). + +% gen_server call timeout +-define(CALL_TIMEOUT, 30000). +-define(PGSQL_TIMEOUT, 60000). +-define(PROCNAME, mod_logdb_pgsql). + +-import(mod_logdb, [list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + convert_timestamp_brief/1]). + +-record(state, {dbref, vhost, server, port, db, user, password, schema}). + +% replace "." with "_" +escape_vhost(VHost) -> lists:map(fun(46) -> 95; + (A) -> A + end, binary_to_list(VHost)). + +prefix(Schema) -> + Schema ++ ".\"" ++ "logdb_". + +suffix(VHost) -> + "_" ++ escape_vhost(VHost) ++ "\"". + +messages_table(VHost, Schema, Date) -> + prefix(Schema) ++ "messages_" ++ Date ++ suffix(VHost). + +view_table(VHost, Schema, Date) -> + Table = messages_table(VHost, Schema, Date), + TablewoS = lists:sublist(Table, length(Schema) + 3, length(Table) - length(Schema) - 3), + lists:append([Schema, ".\"v_", TablewoS, "\""]). + +stats_table(VHost, Schema) -> + prefix(Schema) ++ "stats" ++ suffix(VHost). + +temp_table(VHost, Schema) -> + prefix(Schema) ++ "temp" ++ suffix(VHost). + +settings_table(VHost, Schema) -> + prefix(Schema) ++ "settings" ++ suffix(VHost). + +users_table(VHost, Schema) -> + prefix(Schema) ++ "users" ++ suffix(VHost). +servers_table(VHost, Schema) -> + prefix(Schema) ++ "servers" ++ suffix(VHost). +resources_table(VHost, Schema) -> + prefix(Schema) ++ "resources" ++ suffix(VHost). + +logmessage_name(VHost, Schema) -> + prefix(Schema) ++ "logmessage" ++ suffix(VHost). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, Opts]) -> + Server = gen_mod:get_opt(server, Opts, fun(A) -> A end, <<"localhost">>), + DB = gen_mod:get_opt(db, Opts, fun(A) -> A end, <<"ejabberd_logdb">>), + User = gen_mod:get_opt(user, Opts, fun(A) -> A end, <<"root">>), + Port = gen_mod:get_opt(port, Opts, fun(A) -> A end, 5432), + Password = gen_mod:get_opt(password, Opts, fun(A) -> A end, <<"">>), + Schema = binary_to_list(gen_mod:get_opt(schema, Opts, fun(A) -> A end, <<"public">>)), + + ?MYDEBUG("Starting pgsql backend for ~s", [VHost]), + + St = #state{vhost=VHost, + server=Server, port=Port, db=DB, + user=User, password=Password, + schema=Schema}, + + case open_pgsql_connection(St) of + {ok, DBRef} -> + State = St#state{dbref=DBRef}, + ok = create_internals(State), + ok = create_stats_table(State), + ok = create_settings_table(State), + ok = create_users_table(State), + ok = create_servers_table(State), + ok = create_resources_table(State), + erlang:monitor(process, DBRef), + {ok, State}; + % this does not work + {error, Reason} -> + ?ERROR_MSG("PgSQL connection failed: ~p~n", [Reason]), + {stop, db_connection_failed}; + % and this too, becouse pgsql_conn do exit() which can not be catched + {'EXIT', Rez} -> + ?ERROR_MSG("Rez: ~p~n", [Rez]), + {stop, db_connection_failed} + end. + +open_pgsql_connection(#state{server=Server, port=Port, db=DB, schema=Schema, + user=User, password=Password} = _State) -> + ?INFO_MSG("Opening pgsql connection ~s@~s:~p/~s", [User, Server, Port, DB]), + {ok, DBRef} = pgsql:connect(Server, DB, User, Password, Port), + {updated, _} = sql_query_internal(DBRef, ["SET SEARCH_PATH TO ",Schema,";"]), + {ok, DBRef}. + +close_pgsql_connection(DBRef) -> + ?MYDEBUG("Closing ~p pgsql connection", [DBRef]), + pgsql:terminate(DBRef). + +handle_call({log_message, Msg}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Date = convert_timestamp_brief(Msg#msg.timestamp), + TableName = messages_table(VHost, Schema, Date), + ViewName = view_table(VHost, Schema, Date), + + Query = [ "SELECT ", logmessage_name(VHost, Schema)," " + "('", TableName, "',", + "'", ViewName, "',", + "'", Date, "',", + "'", binary_to_list(Msg#msg.owner_name), "',", + "'", binary_to_list(Msg#msg.peer_name), "',", + "'", binary_to_list(Msg#msg.peer_server), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.peer_resource) ), "',", + "'", atom_to_list(Msg#msg.direction), "',", + "'", binary_to_list(Msg#msg.type), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.subject) ), "',", + "'", binary_to_list( ejabberd_sql:escape(Msg#msg.body) ), "',", + "'", Msg#msg.timestamp, "');"], + + case sql_query_internal_silent(DBRef, Query) of + % TODO: change this + {data, [{"0"}]} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + ok; + {error, _Reason} -> + error + end, + {reply, ok, State}; +handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Reply = rebuild_stats_at_int(DBRef, VHost, Schema, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, [], _Date}, _From, State) -> + {reply, error, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) -> + ["'",Timestamp,"'",","] + end, Msgs), + + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + + Query = ["DELETE FROM ",messages_table(VHost, Schema, Date)," ", + "WHERE timestamp IN (", Temp1], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, _} -> + rebuild_stats_at_int(DBRef, VHost, Schema, Date); + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + ok = delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date), + ok = delete_stats_by_user_at_int(DBRef, Schema, User, VHost, Date), + {reply, ok, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + {updated, _} = sql_query_internal(DBRef, ["DROP VIEW ",view_table(VHost, Schema, Date),";"]), + Reply = + case sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Schema, Date)," CASCADE;"]) of + {updated, _} -> + Query = ["DELETE FROM ",stats_table(VHost, Schema)," " + "WHERE at='",Date,"';"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + Query = ["SELECT at, sum(count) ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + {ok, [ {Date, list_to_integer(Count)} || {Date, Count} <- Recs]}; + {error, Reason} -> + % TODO: Duplicate error message ? + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + Query = ["SELECT username, sum(count) AS allcount ", + "FROM ",SName," ", + "JOIN ",users_table(VHost, Schema)," ON owner_id=user_id ", + "WHERE at='",Date,"' ", + "GROUP BY username ", + "ORDER BY allcount DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + RFun = fun({User, Count}) -> + {User, list_to_integer(Count)} + end, + {ok, lists:reverse(lists:keysort(2, lists:map(RFun, Recs)))}; + {error, Reason} -> + % TODO: + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + {reply, get_user_stats_int(DBRef, Schema, User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Query = ["SELECT peer_name,", + "peer_server,", + "peer_resource,", + "direction," + "type," + "subject," + "body," + "timestamp " + "FROM ",view_table(VHost, Schema, Date)," " + "WHERE owner_name='",User,"';"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + Fun = fun({Peer_name, Peer_server, Peer_resource, + Direction, + Type, + Subject, Body, + Timestamp}) -> + #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource, + direction=list_to_atom(Direction), + type=Type, + subject=Subject, body=Body, + timestamp=Timestamp} + end, + {ok, lists:map(Fun, Recs)}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + Query = ["SELECT at ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY at DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + [ Date || {Date} <- Result ]; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost, Schema)," ", + "JOIN ",users_table(VHost, Schema)," ON user_id=owner_id;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + {ok, [#user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL) + } || {Owner, DoLogDef, DoLogL, DoNotLogL} <- Recs]}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Query = ["SELECT dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + Reply = + case sql_query_internal_silent(DBRef, Query) of + {data, []} -> + {ok, []}; + {data, [{DoLogDef, DoLogL, DoNotLogL}]} -> + {ok, #user_settings{owner_name=User, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL)}}; + {error, Reason} -> + ?ERROR_MSG("Failed to get_user_settings for ~s@~s: ~p", [User, VHost, Reason]), + error + end, + {reply, Reply, State}; +handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef, + dolog_list=DoLogL, + donotlog_list=DoNotLogL}}, + _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + User_id = get_user_id(DBRef, VHost, Schema, User), + Query = ["UPDATE ",settings_table(VHost, Schema)," ", + "SET dolog_default=",bool_to_list(DoLogDef),", ", + "dolog_list='",list_to_string(DoLogL),"', ", + "donotlog_list='",list_to_string(DoNotLogL),"' ", + "WHERE owner_id=",User_id,";"], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, 0} -> + IQuery = ["INSERT INTO ",settings_table(VHost, Schema)," ", + "(owner_id, dolog_default, dolog_list, donotlog_list) ", + "VALUES ", + "(",User_id,", ",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"], + case sql_query_internal(DBRef, IQuery) of + {updated, 1} -> + ?MYDEBUG("New settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end; + {updated, 1} -> + ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, State) -> + ?MYDEBUG("Stoping pgsql backend for ~p", [State#state.vhost]), + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + + +handle_cast({rebuild_stats}, State) -> + rebuild_all_stats_int(State), + {noreply, State}; +handle_cast({drop_user, User}, #state{vhost=VHost, schema=Schema}=State) -> + Fun = fun() -> + {ok, DBRef} = open_pgsql_connection(State), + {ok, Dates} = get_user_stats_int(DBRef, Schema, User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date) + end, Dates), + StDResult = delete_all_stats_by_user_int(DBRef, Schema, User, VHost), + SDResult = delete_user_settings_int(DBRef, Schema, User, VHost), + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append([MDResult, [StDResult], [SDResult]])) of + true -> + ?INFO_MSG("Removed ~s@~s", [User, VHost]); + false -> + ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost]) + end, + close_pgsql_connection(DBRef) + end, + spawn(Fun), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, connection_dropped, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, #state{dbref=DBRef}=_State) -> + close_pgsql_connection(DBRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {drop_user, User}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_dates_int(DBRef, VHost) -> + Query = ["SELECT n.nspname as \"Schema\", + c.relname as \"Name\", + CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' END as \"Type\", + r.rolname as \"Owner\" + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_roles r ON r.oid = c.relowner + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind IN ('r','') + AND n.nspname NOT IN ('pg_catalog', 'pg_toast') + AND c.relname ~ '^(.*",escape_vhost(VHost),".*)$' + AND pg_catalog.pg_table_is_visible(c.oid) + ORDER BY 1,2;"], + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + lists:foldl(fun({_Schema, Table, _Type, _Owner}, Dates) -> + case re:run(Table,"[0-9]+-[0-9]+-[0-9]+") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(Table, S+1, E)]); + nomatch -> + Dates + end + end, [], Recs); + {error, _} -> + [] + end. + +rebuild_all_stats_int(#state{vhost=VHost, schema=Schema}=State) -> + Fun = fun() -> + {ok, DBRef} = open_pgsql_connection(State), + ok = delete_nonexistent_stats(DBRef, Schema, VHost), + case lists:filter(fun(Date) -> + case catch rebuild_stats_at_int(DBRef, VHost, Schema, Date) of + ok -> false; + error -> true; + {'EXIT', _} -> true + end + end, get_dates_int(DBRef, VHost)) of + [] -> ok; + FTables -> + ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]), + error + end, + close_pgsql_connection(DBRef) + end, + spawn(Fun). + +rebuild_stats_at_int(DBRef, VHost, Schema, Date) -> + TempTable = temp_table(VHost, Schema), + Fun = + fun() -> + Table = messages_table(VHost, Schema, Date), + STable = stats_table(VHost, Schema), + + DQuery = [ "DELETE FROM ",STable," ", + "WHERE at='",Date,"';"], + + ok = create_temp_table(DBRef, VHost, Schema), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",TempTable," IN ACCESS EXCLUSIVE MODE;"]), + SQuery = ["INSERT INTO ",TempTable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,'",Date,"'",",count(*) ", + "FROM ",Table," GROUP BY owner_id,peer_name_id,peer_server_id;"], + case sql_query_internal(DBRef, SQuery) of + {updated, 0} -> + Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]), + case Count of + {data, [{"0"}]} -> + {updated, _} = sql_query_internal(DBRef, ["DROP VIEW ",view_table(VHost, Schema, Date),";"]), + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table," CASCADE;"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + ok; + _ -> + ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]), + error + end; + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",TempTable," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + SQuery1 = ["INSERT INTO ",STable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,at,count ", + "FROM ",TempTable,";"], + case sql_query_internal(DBRef, SQuery1) of + {updated, _} -> ok; + {error, _} -> error + end; + {error, _} -> error + end + end, % fun + + case sql_transaction_internal(DBRef, Fun) of + {atomic, _} -> + ?INFO_MSG("Rebuilded stats for ~s at ~s", [VHost, Date]), + ok; + {aborted, Reason} -> + ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]), + error + end, + sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]), + ok. + +delete_nonexistent_stats(DBRef, Schema, VHost) -> + Dates = get_dates_int(DBRef, VHost), + STable = stats_table(VHost, Schema), + + Temp = lists:flatmap(fun(Date) -> + ["'",Date,"'",","] + end, Dates), + + case Temp of + [] -> + ok; + _ -> + % replace last "," with ");" + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + Query = ["DELETE FROM ",STable," ", + "WHERE at NOT IN (", Temp1], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end + end. + +get_user_stats_int(DBRef, Schema, User, VHost) -> + SName = stats_table(VHost, Schema), + UName = users_table(VHost, Schema), + Query = ["SELECT stats.at, sum(stats.count) ", + "FROM ",UName," AS users ", + "JOIN ",SName," AS stats ON owner_id=user_id " + "WHERE users.username='",User,"' ", + "GROUP BY stats.at " + "ORDER BY DATE(at) DESC;" + ], + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + {ok, [ {Date, list_to_integer(Count)} || {Date, Count} <- Recs ]}; + {error, Result} -> + {error, Result} + end. + +delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date) -> + DQuery = ["DELETE FROM ",messages_table(VHost, Schema, Date)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + case sql_query_internal(DBRef, DQuery) of + {updated, _} -> + ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> + error + end. + +delete_all_stats_by_user_int(DBRef, Schema, User, VHost) -> + SQuery = ["DELETE FROM ",stats_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]), + ok; + {error, _} -> error + end. + +delete_stats_by_user_at_int(DBRef, Schema, User, VHost, Date) -> + SQuery = ["DELETE FROM ",stats_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"') ", + "AND at='",Date,"';"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> error + end. + +delete_user_settings_int(DBRef, Schema, User, VHost) -> + Query = ["DELETE FROM ",settings_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]), + ok; + {error, Reason} -> + ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]), + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_temp_table(DBRef, VHost, Schema) -> + TName = temp_table(VHost, Schema), + Query = ["CREATE TABLE ",TName," (", + "owner_id INTEGER, ", + "peer_name_id INTEGER, ", + "peer_server_id INTEGER, ", + "at VARCHAR(20), ", + "count INTEGER ", + ");" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> ok; + {error, _Reason} -> error + end. + +create_stats_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + + Fun = + fun() -> + Query = ["CREATE TABLE ",SName," (", + "owner_id INTEGER, ", + "peer_name_id INTEGER, ", + "peer_server_id INTEGER, ", + "at VARCHAR(20), ", + "count integer", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"s_search_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (owner_id, peer_name_id, peer_server_id);"]), + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"s_at_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (at);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create stats table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created stats table for ~s", [VHost]), + rebuild_all_stats_int(State), + ok; + {atomic, exists} -> + ?MYDEBUG("Stats table for ~s already exists", [VHost]), + {match, [{F, L}]} = re:run(SName, "\".*\""), + QTable = lists:sublist(SName, F+2, L-2), + OIDQuery = ["SELECT c.oid FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname='",QTable,"' AND pg_catalog.pg_table_is_visible(c.oid);"], + {data,[{OID}]} = sql_query_internal(DBRef, OIDQuery), + CheckQuery = ["SELECT a.attname FROM pg_catalog.pg_attribute a WHERE a.attrelid = '",OID,"' AND a.attnum > 0 AND NOT a.attisdropped AND a.attname ~ '^peer_.*_id$';"], + case sql_query_internal(DBRef, CheckQuery) of + {data, Elems} when length(Elems) == 2 -> + ?MYDEBUG("Stats table structure is ok", []), + ok; + _ -> + ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []), + case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of + {updated, _} -> + ?INFO_MSG("Successfully dropped ~p", [SName]); + _ -> + ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName]) + end, + error + end; + {error, _} -> error + end. + +create_settings_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + SName = settings_table(VHost, Schema), + Query = ["CREATE TABLE ",SName," (", + "owner_id INTEGER PRIMARY KEY, ", + "dolog_default BOOLEAN, ", + "dolog_list TEXT DEFAULT '', ", + "donotlog_list TEXT DEFAULT ''", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created settings table for ~s", [VHost]), + ok; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + ?MYDEBUG("Settings table for ~s already exists", [VHost]), + ok; + _ -> + ?ERROR_MSG("Failed to create settings table for ~s: ~p", [VHost, Reason]), + error + end + end. + +create_users_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + SName = users_table(VHost, Schema), + + Fun = + fun() -> + Query = ["CREATE TABLE ",SName," (", + "username TEXT UNIQUE, ", + "user_id SERIAL PRIMARY KEY", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"username_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (username);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create users table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created users table for ~s", [VHost]), + ok; + {atomic, exists} -> + ?MYDEBUG("Users table for ~s already exists", [VHost]), + ok; + {aborted, _} -> error + end. + +create_servers_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + SName = servers_table(VHost, Schema), + Fun = + fun() -> + Query = ["CREATE TABLE ",SName," (", + "server TEXT UNIQUE, ", + "server_id SERIAL PRIMARY KEY", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"server_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (server);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create servers table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created servers table for ~s", [VHost]), + ok; + {atomic, exists} -> + ?MYDEBUG("Servers table for ~s already exists", [VHost]), + ok; + {aborted, _} -> error + end. + +create_resources_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + RName = resources_table(VHost, Schema), + Fun = fun() -> + Query = ["CREATE TABLE ",RName," (", + "resource TEXT UNIQUE, ", + "resource_id SERIAL PRIMARY KEY", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"resource_i_",Schema,"_",escape_vhost(VHost),"\" ON ",RName," (resource);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create users table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created resources table for ~s", [VHost]), + ok; + {atomic, exists} -> + ?MYDEBUG("Resources table for ~s already exists", [VHost]), + ok; + {aborted, _} -> error + end. + +create_internals(#state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + sql_query_internal(DBRef, ["DROP FUNCTION IF EXISTS ",logmessage_name(VHost,Schema)," (tbname TEXT, atdt TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(9), msubj TEXT, mbody TEXT, mtimestamp DOUBLE PRECISION);"]), + case sql_query_internal(DBRef, [get_logmessage(VHost, Schema)]) of + {updated, _} -> + ?MYDEBUG("Created logmessage for ~p", [VHost]), + ok; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42704"}} -> + ?ERROR_MSG("plpgsql language must be installed into database '~s'. Use CREATE LANGUAGE...", [State#state.db]), + error; + _ -> + error + end + end. + +get_user_id(DBRef, VHost, Schema, User) -> + SQuery = ["SELECT user_id FROM ",users_table(VHost, Schema)," ", + "WHERE username='",User,"';"], + case sql_query_internal(DBRef, SQuery) of + {data, []} -> + IQuery = ["INSERT INTO ",users_table(VHost, Schema)," ", + "VALUES ('",User,"');"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {data, [{DBIdNew}]} = sql_query_internal(DBRef, SQuery), + DBIdNew; + {error, Reason} -> + % this can be in clustered environment + {value, {code, "23505"}} = lists:keysearch(code, 1, Reason), + ?ERROR_MSG("Duplicate key name for ~p", [User]), + {data, [{ClID}]} = sql_query_internal(DBRef, SQuery), + ClID + end; + {data, [{DBId}]} -> + DBId + end. + +get_logmessage(VHost,Schema) -> + UName = users_table(VHost,Schema), + SName = servers_table(VHost,Schema), + RName = resources_table(VHost,Schema), + StName = stats_table(VHost,Schema), + io_lib:format("CREATE OR REPLACE FUNCTION ~s (tbname TEXT, vname TEXT, atdt TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(9), msubj TEXT, mbody TEXT, mtimestamp DOUBLE PRECISION) RETURNS INTEGER AS $$ +DECLARE + ownerID INTEGER; + peer_nameID INTEGER; + peer_serverID INTEGER; + peer_resourceID INTEGER; + tablename ALIAS for $1; + viewname ALIAS for $2; + atdate ALIAS for $3; +BEGIN + SELECT INTO ownerID user_id FROM ~s WHERE username = owner; + IF NOT FOUND THEN + INSERT INTO ~s (username) VALUES (owner); + ownerID := lastval(); + END IF; + + SELECT INTO peer_nameID user_id FROM ~s WHERE username = peer_name; + IF NOT FOUND THEN + INSERT INTO ~s (username) VALUES (peer_name); + peer_nameID := lastval(); + END IF; + + SELECT INTO peer_serverID server_id FROM ~s WHERE server = peer_server; + IF NOT FOUND THEN + INSERT INTO ~s (server) VALUES (peer_server); + peer_serverID := lastval(); + END IF; + + SELECT INTO peer_resourceID resource_id FROM ~s WHERE resource = peer_resource; + IF NOT FOUND THEN + INSERT INTO ~s (resource) VALUES (peer_resource); + peer_resourceID := lastval(); + END IF; + + BEGIN + EXECUTE 'INSERT INTO ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (' || ownerID || ',' || peer_nameID || ',' || peer_serverID || ',' || peer_resourceID || ',''' || mdirection || ''',''' || mtype || ''',' || quote_literal(msubj) || ',' || quote_literal(mbody) || ',' || mtimestamp || ')'; + EXCEPTION WHEN undefined_table THEN + EXECUTE 'CREATE TABLE ' || tablename || ' (' || + 'owner_id INTEGER, ' || + 'peer_name_id INTEGER, ' || + 'peer_server_id INTEGER, ' || + 'peer_resource_id INTEGER, ' || + 'direction VARCHAR(4) CHECK (direction IN (''to'',''from'')), ' || + 'type VARCHAR(9) CHECK (type IN (''chat'',''error'',''groupchat'',''headline'',''normal'')), ' || + 'subject TEXT, ' || + 'body TEXT, ' || + 'timestamp DOUBLE PRECISION)'; + EXECUTE 'CREATE INDEX \"search_i_' || '~s' || '_' || atdate || '_' || '~s' || '\"' || ' ON ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id)'; + + EXECUTE 'CREATE OR REPLACE VIEW ' || viewname || ' AS ' || + 'SELECT owner.username AS owner_name, ' || + 'peer.username AS peer_name, ' || + 'servers.server AS peer_server, ' || + 'resources.resource AS peer_resource, ' || + 'messages.direction, ' || + 'messages.type, ' || + 'messages.subject, ' || + 'messages.body, ' || + 'messages.timestamp ' || + 'FROM ' || + '~s owner, ' || + '~s peer, ' || + '~s servers, ' || + '~s resources, ' || + tablename || ' messages ' || + 'WHERE ' || + 'owner.user_id=messages.owner_id and ' || + 'peer.user_id=messages.peer_name_id and ' || + 'servers.server_id=messages.peer_server_id and ' || + 'resources.resource_id=messages.peer_resource_id ' || + 'ORDER BY messages.timestamp'; + + EXECUTE 'INSERT INTO ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (' || ownerID || ',' || peer_nameID || ',' || peer_serverID || ',' || peer_resourceID || ',''' || mdirection || ''',''' || mtype || ''',' || quote_literal(msubj) || ',' || quote_literal(mbody) || ',' || mtimestamp || ')'; + END; + + UPDATE ~s SET count=count+1 where at=atdate and owner_id=ownerID and peer_name_id=peer_nameID and peer_server_id=peer_serverID; + IF NOT FOUND THEN + INSERT INTO ~s (owner_id, peer_name_id, peer_server_id, at, count) VALUES (ownerID, peer_nameID, peer_serverID, atdate, 1); + END IF; + RETURN 0; +END; +$$ LANGUAGE plpgsql; +", [logmessage_name(VHost,Schema),UName,UName,UName,UName,SName,SName,RName,RName,Schema,escape_vhost(VHost),UName,UName,SName,RName,StName,StName]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% SQL internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% like do_transaction/2 in mysql_conn.erl (changeset by Yariv Sadan ) +sql_transaction_internal(DBRef, Fun) -> + case sql_query_internal(DBRef, ["BEGIN;"]) of + {updated, _} -> + case catch Fun() of + error = Err -> + rollback_internal(DBRef, Err); + {error, _} = Err -> + rollback_internal(DBRef, Err); + {'EXIT', _} = Err -> + rollback_internal(DBRef, Err); + Res -> + case sql_query_internal(DBRef, ["COMMIT;"]) of + {error, _} -> rollback_internal(DBRef, {commit_error}); + {updated, _} -> + case Res of + {atomic, _} -> Res; + _ -> {atomic, Res} + end + end + end; + {error, _} -> + {aborted, {begin_error}} + end. + +% like rollback/2 in mysql_conn.erl (changeset by Yariv Sadan ) +rollback_internal(DBRef, Reason) -> + Res = sql_query_internal(DBRef, ["ROLLBACK;"]), + {aborted, {Reason, {rollback_result, Res}}}. + +sql_query_internal(DBRef, Query) -> + case sql_query_internal_silent(DBRef, Query) of + {error, undefined, Rez} -> + ?ERROR_MSG("Got undefined result: ~p while ~p", [Rez, lists:append(Query)]), + {error, undefined}; + {error, Error} -> + ?ERROR_MSG("Failed: ~p while ~p", [Error, lists:append(Query)]), + {error, Error}; + Rez -> Rez + end. + +sql_query_internal_silent(DBRef, Query) -> + ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]), + % TODO: use pquery? + get_result(pgsql:squery(DBRef, Query)). + +get_result({ok, ["CREATE TABLE"]}) -> + {updated, 1}; +get_result({ok, ["DROP TABLE"]}) -> + {updated, 1}; +get_result({ok, ["ALTER TABLE"]}) -> + {updated, 1}; +get_result({ok,["DROP VIEW"]}) -> + {updated, 1}; +get_result({ok,["DROP FUNCTION"]}) -> + {updated, 1}; +get_result({ok, ["CREATE INDEX"]}) -> + {updated, 1}; +get_result({ok, ["CREATE FUNCTION"]}) -> + {updated, 1}; +get_result({ok, [{[$S, $E, $L, $E, $C, $T, $ | _Rest], _Rows, Recs}]}) -> + Fun = fun(Rec) -> + list_to_tuple( + lists:map(fun(Elem) when is_binary(Elem) -> + binary_to_list(Elem); + (Elem) when is_list(Elem) -> + Elem; + (Elem) when is_integer(Elem) -> + integer_to_list(Elem); + (Elem) when is_float(Elem) -> + float_to_list(Elem); + (Elem) when is_boolean(Elem) -> + atom_to_list(Elem); + (Elem) -> + ?ERROR_MSG("Unknown element type ~p", [Elem]), + Elem + end, Rec)) + end, + Res = lists:map(Fun, Recs), + %{data, [list_to_tuple(Rec) || Rec <- Recs]}; + {data, Res}; +get_result({ok, ["INSERT " ++ OIDN]}) -> + [_OID, N] = string:tokens(OIDN, " "), + {updated, list_to_integer(N)}; +get_result({ok, ["DELETE " ++ N]}) -> + {updated, list_to_integer(N)}; +get_result({ok, ["UPDATE " ++ N]}) -> + {updated, list_to_integer(N)}; +get_result({ok, ["BEGIN"]}) -> + {updated, 1}; +get_result({ok, ["LOCK TABLE"]}) -> + {updated, 1}; +get_result({ok, ["ROLLBACK"]}) -> + {updated, 1}; +get_result({ok, ["COMMIT"]}) -> + {updated, 1}; +get_result({ok, ["SET"]}) -> + {updated, 1}; +get_result({ok, [{error, Error}]}) -> + {error, Error}; +get_result(Rez) -> + {error, undefined, Rez}. + diff --git a/src/mod_roster.erl b/src/mod_roster.erl index 38c3a78b..f02f2cd3 100644 --- a/src/mod_roster.erl +++ b/src/mod_roster.erl @@ -66,6 +66,8 @@ -define(ROSTER_ITEM_CACHE, roster_item_cache). -define(ROSTER_VERSION_CACHE, roster_version_cache). +-include("mod_logdb.hrl"). + -export_type([subscription/0]). -callback init(binary(), gen_mod:opts()) -> any(). @@ -911,6 +913,14 @@ user_roster(User, Server, Query, Lang) -> Query), Items = get_roster(LUser, LServer), SItems = lists:sort(Items), + + Settings = case gen_mod:is_loaded(Server, mod_logdb) of + true -> + mod_logdb:get_user_settings(User, Server); + false -> + [] + end, + FItems = case SItems of [] -> [?CT(<<"None">>)]; _ -> @@ -968,7 +978,33 @@ user_roster(User, Server, Query, Lang) -> [?INPUTT(<<"submit">>, <<"remove", (ejabberd_web_admin:term_to_id(R#roster.jid))/binary>>, - <<"Remove">>)])]) + <<"Remove">>)]), + case gen_mod:is_loaded(Server, mod_logdb) of + true -> + Peer = jid:encode(R#roster.jid), + A = lists:member(Peer, Settings#user_settings.dolog_list), + B = lists:member(Peer, Settings#user_settings.donotlog_list), + {Name, Value} = + if + A -> + {<<"donotlog">>, <<"Do Not Log Messages">>}; + B -> + {<<"dolog">>, <<"Log Messages">>}; + Settings#user_settings.dolog_default == true -> + {<<"donotlog">>, <<"Do Not Log Messages">>}; + Settings#user_settings.dolog_default == false -> + {<<"dolog">>, <<"Log Messages">>} + end, + + ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUTT(<<"submit">>, + <>, + Value)]); + false -> + ?X([]) + end + ]) end, SItems)))])] end, @@ -1075,9 +1111,42 @@ user_roster_item_parse_query(User, Server, Items, sub_els = [#roster_query{ items = [RosterItem]}]}), throw(submitted); - false -> ok - end - end + false -> + case lists:keysearch( + <<"donotlog", (ejabberd_web_admin:term_to_id(JID))/binary>>, 1, Query) of + {value, _} -> + Peer = jid:encode(JID), + Settings = mod_logdb:get_user_settings(User, Server), + DNLL = case lists:member(Peer, Settings#user_settings.donotlog_list) of + false -> lists:append(Settings#user_settings.donotlog_list, [Peer]); + true -> Settings#user_settings.donotlog_list + end, + DLL = lists:delete(jid:encode(JID), Settings#user_settings.dolog_list), + Sett = Settings#user_settings{donotlog_list=DNLL, dolog_list=DLL}, + % TODO: check returned value + ok = mod_logdb:set_user_settings(User, Server, Sett), + throw(nothing); + false -> + case lists:keysearch( + <<"dolog", (ejabberd_web_admin:term_to_id(JID))/binary>>, 1, Query) of + {value, _} -> + Peer = jid:encode(JID), + Settings = mod_logdb:get_user_settings(User, Server), + DLL = case lists:member(Peer, Settings#user_settings.dolog_list) of + false -> lists:append(Settings#user_settings.dolog_list, [Peer]); + true -> Settings#user_settings.dolog_list + end, + DNLL = lists:delete(jid:encode(JID), Settings#user_settings.donotlog_list), + Sett = Settings#user_settings{donotlog_list=DNLL, dolog_list=DLL}, + % TODO: check returned value + ok = mod_logdb:set_user_settings(User, Server, Sett), + throw(nothing); + false -> + ok + end % dolog + end % donotlog + end % remove + end % validate end, Items), nothing.