diff --git a/priv/msgs/nl.msg b/priv/msgs/nl.msg index c8bb951..f6f6dc5 100644 --- a/priv/msgs/nl.msg +++ b/priv/msgs/nl.msg @@ -375,3 +375,17 @@ {"Your Jabber account was successfully created.","Uw Jabber-account is succesvol gecreeerd."}. {"Your Jabber account was successfully deleted.","Uw Jabber-account is succesvol verwijderd."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Uw berichten aan ~s worden geblokkeerd. Om ze te deblokkeren, ga naar ~s"}. +% mod_logdb +{"Users Messages", "Gebruikersberichten"}. +{"Date", "Datum"}. +{"Count", "Aantal"}. +{"Logged messages for ~s", "Gelogde berichten van ~s"}. +{"Logged messages for ~s at ~s", "Gelogde berichten van ~s op ~s"}. +{" at ", " op "}. +{"No logged messages for ~s", "Geen gelogde berichten van ~s"}. +{"No logged messages for ~s at ~s", "Geen gelogde berichten van ~s op ~s"}. +{"Date, Time", "Datum en tijd"}. +{"Direction: Jid", "Richting: Jabber ID"}. +{"Subject", "Onderwerp"}. +{"Body", "Berichtveld"}. +{"Messages", "Berichten"}. diff --git a/priv/msgs/pl.msg b/priv/msgs/pl.msg index 46e2f77..139dc5d 100644 --- a/priv/msgs/pl.msg +++ b/priv/msgs/pl.msg @@ -375,3 +375,29 @@ {"Your Jabber account was successfully created.","Twoje konto zostało stworzone."}. {"Your Jabber account was successfully deleted.","Twoje konto zostało usunięte."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Twoje wiadomości do ~s są blokowane. Aby je odblokować, odwiedź ~s"}. +% mod_logdb +{"Users Messages", "Wiadomości użytkownika"}. +{"Date", "Data"}. +{"Count", "Liczba"}. +{"Logged messages for ~s", "Zapisane wiadomości dla ~s"}. +{"Logged messages for ~s at", "Zapisane wiadomości dla ~s o ~s"}. +{" at ", " o "}. +{"No logged messages for ~s", "Brak zapisanych wiadomości dla ~s"}. +{"No logged messages for ~s at ~s", "Brak zapisanych wiadomości dla ~s o ~s"}. +{"Date, Time", "Data, Godzina"}. +{"Direction: Jid", "Kierunek: Jid"}. +{"Subject", "Temat"}. +{"Body", "Treść"}. +{"Messages","Wiadomości"}. +{"Filter Selected", "Odfiltruj zaznaczone"}. +{"Do Not Log Messages", "Nie zapisuj wiadomości"}. +{"Log Messages", "Zapisuj wiadomości"}. +{"Messages logging engine", "System zapisywania historii rozmów"}. +{"Default", "Domyślne"}. +{"Set logging preferences", "Ustaw preferencje zapisywania"}. +{"Messages logging engine settings", "Ustawienia systemu logowania"}. +{"Set run-time settings", "Zapisz ustawienia systemu logowania"}. +{"Groupchat messages logging", "Zapisywanie rozmów z konferencji"}. +{"Jids/Domains to ignore", "JID/Domena która ma być ignorowana"}. +{"Purge messages older than (days)", "Usuń wiadomości starsze niż (w dniach)"}. +{"Poll users settings (seconds)", "Czas aktualizacji preferencji użytkowników (sekundy)"}. diff --git a/priv/msgs/ru.msg b/priv/msgs/ru.msg index e5ea42e..10575d0 100644 --- a/priv/msgs/ru.msg +++ b/priv/msgs/ru.msg @@ -375,3 +375,33 @@ {"Your Jabber account was successfully created.","Ваш Jabber-аккаунт был успешно создан."}. {"Your Jabber account was successfully deleted.","Ваш Jabber-аккаунт был успешно удален."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Ваши сообщения к ~s блокируются. Для снятия блокировки перейдите по ссылке ~s"}. +% mod_logdb.erl +{"Users Messages", "Сообщения пользователей"}. +{"Date", "Дата"}. +{"Count", "Количество"}. +{"Logged messages for ~s", "Сохранённые cообщения для ~s"}. +{"Logged messages for ~s at ~s", "Сохранённые cообщения для ~s за ~s"}. +{" at ", " за "}. +{"No logged messages for ~s", "Отсутствуют сообщения для ~s"}. +{"No logged messages for ~s at ~s", "Отсутствуют сообщения для ~s за ~s"}. +{"Date, Time", "Дата, Время"}. +{"Direction: Jid", "Направление: Jid"}. +{"Subject", "Тема"}. +{"Body", "Текст"}. +{"Messages", "Сообщения"}. +{"Filter Selected", "Отфильтровать выделенные"}. +{"Do Not Log Messages", "Не сохранять сообщения"}. +{"Log Messages", "Сохранять сообщения"}. +{"Messages logging engine", "Система логирования сообщений"}. +{"Default", "По умолчанию"}. +{"Set logging preferences", "Задайте настройки логирования"}. +{"Messages logging engine users", "Пользователи системы логирования сообщений"}. +{"Messages logging engine settings", "Настройки системы логирования сообщений"}. +{"Set run-time settings", "Задайте текущие настройки"}. +{"Groupchat messages logging", "Логирование сообщений типа groupchat"}. +{"Jids/Domains to ignore", "Игнорировать следующие jids/домены"}. +{"Purge messages older than (days)", "Удалять сообщения старее чем (дни)"}. +{"Poll users settings (seconds)", "Обновлять настройки пользователей через (секунд)"}. +{"Drop", "Удалять"}. +{"Do not drop", "Не удалять"}. +{"Drop messages on user removal", "Удалять сообщения при удалении пользователя"}. diff --git a/priv/msgs/uk.msg b/priv/msgs/uk.msg index f6f9553..56b5dd5 100644 --- a/priv/msgs/uk.msg +++ b/priv/msgs/uk.msg @@ -367,3 +367,33 @@ {"Your Jabber account was successfully created.","Ваш Jabber-акаунт було успішно створено."}. {"Your Jabber account was successfully deleted.","Ваш Jabber-акаунт було успішно видалено."}. {"Your messages to ~s are being blocked. To unblock them, visit ~s","Ваші повідомлення до ~s блокуються. Для розблокування відвідайте ~s"}. +% mod_logdb +{"Users Messages", "Повідомлення користувачів"}. +{"Date", "Дата"}. +{"Count", "Кількість"}. +{"Logged messages for ~s", "Збережені повідомлення для ~s"}. +{"Logged messages for ~s at ~s", "Збережені повідомлення для ~s за ~s"}. +{" at ", " за "}. +{"No logged messages for ~s", "Відсутні повідомлення для ~s"}. +{"No logged messages for ~s at ~s", "Відсутні повідомлення для ~s за ~s"}. +{"Date, Time", "Дата, Час"}. +{"Direction: Jid", "Напрямок: Jid"}. +{"Subject", "Тема"}. +{"Body", "Текст"}. +{"Messages", "Повідомлення"}. +{"Filter Selected", "Відфільтрувати виділені"}. +{"Do Not Log Messages", "Не зберігати повідомлення"}. +{"Log Messages", "Зберігати повідомлення"}. +{"Messages logging engine", "Система збереження повідомлень"}. +{"Default", "За замовчуванням"}. +{"Set logging preferences", "Вкажіть налагоджування збереження повідомлень"}. +{"Messages logging engine users", "Користувачі системи збереження повідомлень"}. +{"Messages logging engine settings", "Налагоджування системи збереження повідомлень"}. +{"Set run-time settings", "Вкажіть поточні налагоджування"}. +{"Groupchat messages logging", "Збереження повідомлень типу groupchat"}. +{"Jids/Domains to ignore", "Ігнорувати наступні jids/домени"}. +{"Purge messages older than (days)", "Видаляти повідомлення старіші ніж (дні)"}. +{"Poll users settings (seconds)", "Оновлювати налагоджування користувачів кожні (секунд)"}. +{"Drop", "Видаляти"}. +{"Do not drop", "Не видаляти"}. +{"Drop messages on user removal", "Видаляти повідомлення під час видалення користувача"}. diff --git a/src/gen_logdb.erl b/src/gen_logdb.erl new file mode 100644 index 0000000..06a894b --- /dev/null +++ b/src/gen_logdb.erl @@ -0,0 +1,164 @@ +%%%---------------------------------------------------------------------- +%%% File : gen_logdb.erl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : Describes generic behaviour for mod_logdb backends. +%%% Version : trunk +%%% Id : $Id: gen_logdb.erl 1273 2009-02-05 18:12:57Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(gen_logdb). +-author('o.palij@gmail.com'). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + % called from handle_info(start, _) + % it should logon database and return reference to started instance + % start(VHost, Opts) -> {ok, SPid} | error + % Options - list of options to connect to db + % Types: Options = list() -> [] | + % [{user, "logdb"}, + % {pass, "1234"}, + % {db, "logdb"}] | ... + % VHost = list() -> "jabber.example.org" + {start, 2}, + + % called from cleanup/1 + % it should logoff database and do cleanup + % stop(VHost) + % Types: VHost = list() -> "jabber.example.org" + {stop, 1}, + + % called from handle_call({addlog, _}, _, _) + % it should log messages to database + % log_message(VHost, Msg) -> ok | error + % Types: + % VHost = list() -> "jabber.example.org" + % Msg = record() -> #msg + {log_message, 2}, + + % called from ejabberdctl rebuild_stats + % it should rebuild stats table (if used) for vhost + % rebuild_stats(VHost) + % Types: + % VHost = list() -> "jabber.example.org" + {rebuild_stats, 1}, + + % it should rebuild stats table (if used) for vhost at Date + % rebuild_stats_at(VHost, Date) + % Types: + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + {rebuild_stats_at, 2}, + + % called from user_messages_at_parse_query/5 + % it should delete selected user messages at date + % delete_messages_by_user_at(VHost, Msgs, Date) -> ok | error + % Types: + % VHost = list() -> "jabber.example.org" + % Msgs = list() -> [ #msg1, msg2, ... ] + % Date = list() -> "2007-02-12" + {delete_messages_by_user_at, 3}, + + % called from user_messages_parse_query/4 | vhost_messages_at_parse_query/4 + % it should delete all user messages at date + % delete_all_messages_by_user_at(User, VHost, Date) -> ok | error + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + {delete_all_messages_by_user_at, 3}, + + % called from vhost_messages_parse_query/3 + % it should delete messages for vhost at date and update stats + % delete_messages_at(VHost, Date) -> ok | error + % Types: + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + {delete_messages_at, 2}, + + % called from ejabberd_web_admin:vhost_messages_stats/3 + % it should return sorted list of count of messages by dates for vhost + % get_vhost_stats(VHost) -> {ok, [{Date1, Msgs_count1}, {Date2, Msgs_count2}, ... ]} | + % {error, Reason} + % Types: + % VHost = list() -> "jabber.example.org" + % DateN = list() -> "2007-02-12" + % Msgs_countN = number() -> 241 + {get_vhost_stats, 1}, + + % called from ejabberd_web_admin:vhost_messages_stats_at/4 + % it should return sorted list of count of messages by users at date for vhost + % get_vhost_stats_at(VHost, Date) -> {ok, [{User1, Msgs_count1}, {User2, Msgs_count2}, ....]} | + % {error, Reason} + % Types: + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + % UserN = list() -> "admin" + % Msgs_countN = number() -> 241 + {get_vhost_stats_at, 2}, + + % called from ejabberd_web_admin:user_messages_stats/4 + % it should return sorted list of count of messages by date for user at vhost + % get_user_stats(User, VHost) -> {ok, [{Date1, Msgs_count1}, {Date2, Msgs_count2}, ...]} | + % {error, Reason} + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + % DateN = list() -> "2007-02-12" + % Msgs_countN = number() -> 241 + {get_user_stats, 2}, + + % called from ejabberd_web_admin:user_messages_stats_at/5 + % it should return all user messages at date + % get_user_messages_at(User, VHost, Date) -> {ok, Msgs} | {error, Reason} + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + % Date = list() -> "2007-02-12" + % Msgs = list() -> [ #msg1, msg2, ... ] + {get_user_messages_at, 3}, + + % called from many places + % it should return list of dates for vhost + % get_dates(VHost) -> [Date1, Date2, ... ] + % Types: + % VHost = list() -> "jabber.example.org" + % DateN = list() -> "2007-02-12" + {get_dates, 1}, + + % called from start + % it should return list with users settings for VHost in db + % get_users_settings(VHost) -> [#user_settings1, #user_settings2, ... ] | error + % Types: + % VHost = list() -> "jabber.example.org" + {get_users_settings, 1}, + + % called from many places + % it should return User settings at VHost from db + % get_user_settings(User, VHost) -> error | {ok, #user_settings} + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + {get_user_settings, 2}, + + % called from web admin + % it should set User settings at VHost + % set_user_settings(User, VHost, #user_settings) -> ok | error + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + {set_user_settings, 3}, + + % called from remove_user (ejabberd hook) + % it should remove user messages and settings at VHost + % drop_user(User, VHost) -> ok | error + % Types: + % User = list() -> "admin" + % VHost = list() -> "jabber.example.org" + {drop_user, 2} + ]; +behaviour_info(_) -> + undefined. diff --git a/src/mod_logdb.erl b/src/mod_logdb.erl new file mode 100644 index 0000000..72f1982 --- /dev/null +++ b/src/mod_logdb.erl @@ -0,0 +1,2152 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb.erl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : Frontend for log user messages to db +%%% Version : trunk +%%% Id : $Id: mod_logdb.erl 1360 2009-07-30 06:00:14Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb). +-author('o.palij@gmail.com'). + +-behaviour(gen_server). +-behaviour(gen_mod). + +% supervisor +-export([start_link/2]). +% gen_mod +-export([start/2,stop/1]). +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% hooks +-export([send_packet/3, receive_packet/4, remove_user/2]). +-export([get_local_identity/5, + get_local_features/5, + get_local_items/5, + adhoc_local_items/4, + adhoc_local_commands/4 +% get_sm_identity/5, +% get_sm_features/5, +% get_sm_items/5, +% adhoc_sm_items/4, +% adhoc_sm_commands/4]). + ]). +% ejabberdctl +-export([rebuild_stats/3, + copy_messages/1, copy_messages_ctl/3, copy_messages_int_tc/1]). +% +-export([get_vhost_stats/1, get_vhost_stats_at/2, + get_user_stats/2, get_user_messages_at/3, + get_dates/1, + sort_stats/1, + convert_timestamp/1, convert_timestamp_brief/1, + get_user_settings/2, set_user_settings/3, + user_messages_at_parse_query/4, user_messages_parse_query/3, + vhost_messages_parse_query/2, vhost_messages_at_parse_query/4, + list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + get_module_settings/1, set_module_settings/2, + purge_old_records/2]). +% webadmin hooks +-export([webadmin_menu/3, + webadmin_user/4, + webadmin_page/3, + user_parse_query/5]). +% webadmin queries +-export([vhost_messages_stats/3, + vhost_messages_stats_at/4, + user_messages_stats/4, + user_messages_stats_at/5]). + +-include("mod_logdb.hrl"). +-include("ejabberd.hrl"). +-include("mod_roster.hrl"). +-include("jlib.hrl"). +-include("ejabberd_ctl.hrl"). +-include("adhoc.hrl"). +-include("ejabberd_web_admin.hrl"). +-include("ejabberd_http.hrl"). +-include("logger.hrl"). + +-define(PROCNAME, ejabberd_mod_logdb). +% gen_server call timeout +-define(CALL_TIMEOUT, 10000). + +-record(state, {vhost, dbmod, backendPid, monref, purgeRef, pollRef, dbopts, dbs, dolog_default, ignore_jids, groupchat, purge_older_days, poll_users_settings, drop_messages_on_user_removal}). + +ets_settings_table(VHost) -> list_to_atom("ets_logdb_settings_" ++ binary_to_list(VHost)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod/gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% ejabberd starts module +start(VHost, Opts) -> + ChildSpec = + {gen_mod:get_module_proc(VHost, ?PROCNAME), + {?MODULE, start_link, [VHost, Opts]}, + permanent, + 1000, + worker, + [?MODULE]}, + % add child to ejabberd_sup + supervisor:start_child(ejabberd_sup, ChildSpec). + +% supervisor starts gen_server +start_link(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + {ok, Pid} = gen_server:start_link({local, Proc}, ?MODULE, [VHost, Opts], []), + Pid ! start, + {ok, Pid}. + +init([VHost, Opts]) -> + process_flag(trap_exit, true), + DBsRaw = gen_mod:get_opt(dbs, Opts, fun(A) -> A end, [{mnesia, []}]), + DBs = case lists:keysearch(mnesia, 1, DBsRaw) of + false -> lists:append(DBsRaw, [{mnesia,[]}]); + {value, _} -> DBsRaw + end, + VHostDB = gen_mod:get_opt(vhosts, Opts, fun(A) -> A end, [{VHost, mnesia}]), + % 10 is default becouse of using in clustered environment + PollUsersSettings = gen_mod:get_opt(poll_users_settings, Opts, fun(A) -> A end, 10), + + {DBName, DBOpts} = + case lists:keysearch(VHost, 1, VHostDB) of + false -> + ?WARNING_MSG("There is no logging backend defined for '~s', switching to mnesia", [VHost]), + {mnesia, []}; + {value,{_, DBNameResult}} -> + case lists:keysearch(DBNameResult, 1, DBs) of + false -> + ?WARNING_MSG("There is no such logging backend '~s' defined for '~s', switching to mnesia", [DBNameResult, VHost]), + {mnesia, []}; + {value, {_, DBOptsResult}} -> + {DBNameResult, DBOptsResult} + end + end, + + ?MYDEBUG("Starting mod_logdb for '~s' with '~s' backend", [VHost, DBName]), + + DBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(DBName)), + + {ok, #state{vhost=VHost, + dbmod=DBMod, + dbopts=DBOpts, + % dbs used for convert messages from one backend to other + dbs=DBs, + dolog_default=gen_mod:get_opt(dolog_default, Opts, fun(A) -> A end, true), + drop_messages_on_user_removal=gen_mod:get_opt(drop_messages_on_user_removal, Opts, fun(A) -> A end, true), + ignore_jids=gen_mod:get_opt(ignore_jids, Opts, fun(A) -> A end, []), + groupchat=gen_mod:get_opt(groupchat, Opts, fun(A) -> A end, none), + purge_older_days=gen_mod:get_opt(purge_older_days, Opts, fun(A) -> A end, never), + poll_users_settings=PollUsersSettings}}. + +cleanup(#state{vhost=VHost} = _State) -> + ?MYDEBUG("Stopping ~s for ~p", [?MODULE, VHost]), + + %ets:delete(ets_settings_table(VHost)), + + ejabberd_hooks:delete(remove_user, VHost, ?MODULE, remove_user, 90), + ejabberd_hooks:delete(user_send_packet, VHost, ?MODULE, send_packet, 90), + ejabberd_hooks:delete(user_receive_packet, VHost, ?MODULE, receive_packet, 90), + %ejabberd_hooks:delete(adhoc_sm_commands, VHost, ?MODULE, adhoc_sm_commands, 110), + %ejabberd_hooks:delete(adhoc_sm_items, VHost, ?MODULE, adhoc_sm_items, 110), + ejabberd_hooks:delete(adhoc_local_commands, VHost, ?MODULE, adhoc_local_commands, 50), + ejabberd_hooks:delete(adhoc_local_items, VHost, ?MODULE, adhoc_local_items, 50), + %ejabberd_hooks:delete(disco_sm_identity, VHost, ?MODULE, get_sm_identity, 50), + %ejabberd_hooks:delete(disco_sm_features, VHost, ?MODULE, get_sm_features, 50), + %ejabberd_hooks:delete(disco_sm_items, VHost, ?MODULE, get_sm_items, 50), + ejabberd_hooks:delete(disco_local_identity, VHost, ?MODULE, get_local_identity, 50), + ejabberd_hooks:delete(disco_local_features, VHost, ?MODULE, get_local_features, 50), + ejabberd_hooks:delete(disco_local_items, VHost, ?MODULE, get_local_items, 50), + + ejabberd_hooks:delete(webadmin_menu_host, VHost, ?MODULE, webadmin_menu, 70), + ejabberd_hooks:delete(webadmin_user, VHost, ?MODULE, webadmin_user, 50), + ejabberd_hooks:delete(webadmin_page_host, VHost, ?MODULE, webadmin_page, 50), + ejabberd_hooks:delete(webadmin_user_parse_query, VHost, ?MODULE, user_parse_query, 50), + + ?MYDEBUG("Removed hooks for ~p", [VHost]), + + %ejabberd_ctl:unregister_commands(VHost, [{"rebuild_stats", "rebuild mod_logdb module stats for vhost"}], ?MODULE, rebuild_stats), + %Supported_backends = lists:flatmap(fun({Backend, _Opts}) -> + % [atom_to_list(Backend), " "] + % end, State#state.dbs), + %ejabberd_ctl:unregister_commands( + % VHost, + % [{"copy_messages backend", "copy messages from backend to current backend. backends could be: " ++ Supported_backends }], + % ?MODULE, copy_messages_ctl), + ?MYDEBUG("Unregistered commands for ~p", [VHost]). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + %gen_server:call(Proc, {cleanup}), + %?MYDEBUG("Cleanup in stop finished!!!!", []), + %timer:sleep(10000), + ok = supervisor:terminate_child(ejabberd_sup, Proc), + ok = supervisor:delete_child(ejabberd_sup, Proc). + +handle_call({cleanup}, _From, State) -> + cleanup(State), + ?MYDEBUG("Cleanup finished!!!!!", []), + {reply, ok, State}; +handle_call({get_dates}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_dates(VHost), + {reply, Reply, State}; +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% ejabberd_web_admin callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +handle_call({delete_messages_by_user_at, PMsgs, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:delete_messages_by_user_at(VHost, PMsgs, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:delete_all_messages_by_user_at(binary_to_list(User), VHost, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:delete_messages_at(VHost, Date), + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_vhost_stats(VHost), + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_vhost_stats_at(VHost, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_user_stats(binary_to_list(User), VHost), + {reply, Reply, State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Reply = DBMod:get_user_messages_at(binary_to_list(User), VHost, binary_to_list(Date)), + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbmod=_DBMod, vhost=VHost}=State) -> + Reply = case ets:match_object(ets_settings_table(VHost), + #user_settings{owner_name=User, _='_'}) of + [Set] -> Set; + _ -> #user_settings{owner_name=User, + dolog_default=State#state.dolog_default, + dolog_list=[], + donotlog_list=[]} + end, + {reply, Reply, State}; +% TODO: remove User ?? +handle_call({set_user_settings, User, GSet}, _From, #state{dbmod=DBMod, vhost=VHost}=State) -> + Set = GSet#user_settings{owner_name=User}, + Reply = + case ets:match_object(ets_settings_table(VHost), + #user_settings{owner_name=User, _='_'}) of + [Set] -> + ok; + _ -> + case DBMod:set_user_settings(binary_to_list(User), VHost, Set) of + error -> + error; + ok -> + true = ets:insert(ets_settings_table(VHost), Set), + ok + end + end, + {reply, Reply, State}; +handle_call({get_module_settings}, _From, State) -> + {reply, State, State}; +handle_call({set_module_settings, #state{purge_older_days=PurgeDays, + poll_users_settings=PollSec} = Settings}, + _From, + #state{purgeRef=PurgeRefOld, + pollRef=PollRefOld, + purge_older_days=PurgeDaysOld, + poll_users_settings=PollSecOld} = State) -> + PurgeRef = if + PurgeDays == never, PurgeDaysOld /= never -> + {ok, cancel} = timer:cancel(PurgeRefOld), + disabled; + is_integer(PurgeDays), PurgeDaysOld == never -> + set_purge_timer(PurgeDays); + true -> + PurgeRefOld + end, + + PollRef = if + PollSec == PollSecOld -> + PollRefOld; + PollSec == 0, PollSecOld /= 0 -> + {ok, cancel} = timer:cancel(PollRefOld), + disabled; + is_integer(PollSec), PollSecOld == 0 -> + set_poll_timer(PollSec); + is_integer(PollSec), PollSecOld /= 0 -> + {ok, cancel} = timer:cancel(PollRefOld), + set_poll_timer(PollSec) + end, + + NewState = State#state{dolog_default=Settings#state.dolog_default, + ignore_jids=Settings#state.ignore_jids, + groupchat=Settings#state.groupchat, + drop_messages_on_user_removal=Settings#state.drop_messages_on_user_removal, + purge_older_days=PurgeDays, + poll_users_settings=PollSec, + purgeRef=PurgeRef, + pollRef=PollRef}, + {reply, ok, NewState}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% end ejabberd_web_admin callbacks +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +% ejabberd_hooks call +handle_cast({addlog, Direction, Owner, Peer, Packet}, #state{dbmod=DBMod, vhost=VHost}=State) -> + case filter(Owner, Peer, State) of + true -> + case catch packet_parse(Owner, Peer, Packet, Direction, State) of + ignore -> + ok; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to parse: ~p", [Reason]); + Msg -> + DBMod:log_message(VHost, Msg) + end; + false -> + ok + end, + {noreply, State}; +handle_cast({remove_user, User}, #state{dbmod=DBMod, vhost=VHost}=State) -> + case State#state.drop_messages_on_user_removal of + true -> + DBMod:drop_user(binary_to_list(User), VHost), + ?INFO_MSG("Launched ~s@~s removal", [User, VHost]); + false -> + ?INFO_MSG("Message removing is disabled. Keeping messages for ~s@~s", [User, VHost]) + end, + {noreply, State}; +% ejabberdctl rebuild_stats/3 +handle_cast({rebuild_stats}, #state{dbmod=DBMod, vhost=VHost}=State) -> + DBMod:rebuild_stats(VHost), + {noreply, State}; +handle_cast({copy_messages, Backend}, State) -> + spawn(?MODULE, copy_messages, [[State, Backend]]), + {noreply, State}; +handle_cast({copy_messages, Backend, Date}, State) -> + spawn(?MODULE, copy_messages, [[State, Backend, Date]]), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +% return: disabled | timer reference +set_purge_timer(PurgeDays) -> + case PurgeDays of + never -> disabled; + Days when is_integer(Days) -> + {ok, Ref1} = timer:send_interval(timer:hours(24), scheduled_purging), + Ref1 + end. + +% return: disabled | timer reference +set_poll_timer(PollSec) -> + if + PollSec > 0 -> + {ok, Ref2} = timer:send_interval(timer:seconds(PollSec), poll_users_settings), + Ref2; + % db polling disabled + PollSec == 0 -> + disabled; + true -> + {ok, Ref3} = timer:send_interval(timer:seconds(10), poll_users_settings), + Ref3 + end. + +% actual starting of logging +% from timer:send_after (in init) +handle_info(start, #state{dbmod=DBMod, vhost=VHost}=State) -> + case DBMod:start(VHost, State#state.dbopts) of + {error,{already_started,_}} -> + ?MYDEBUG("backend module already started - trying to stop it", []), + DBMod:stop(VHost), + {stop, already_started, State}; + {error, Reason} -> + timer:sleep(30000), + ?ERROR_MSG("Failed to start: ~p", [Reason]), + {stop, db_connection_failed, State}; + {ok, SPid} -> + ?INFO_MSG("~p connection established", [DBMod]), + + MonRef = erlang:monitor(process, SPid), + + ets:new(ets_settings_table(VHost), [named_table,public,set,{keypos, #user_settings.owner_name}]), + DoLog = case DBMod:get_users_settings(VHost) of + {ok, Settings} -> [Sett#user_settings{owner_name = iolist_to_binary(Sett#user_settings.owner_name)} || Sett <- Settings]; + {error, _Reason} -> [] + end, + ets:insert(ets_settings_table(VHost), DoLog), + + TrefPurge = set_purge_timer(State#state.purge_older_days), + TrefPoll = set_poll_timer(State#state.poll_users_settings), + + ejabberd_hooks:add(remove_user, VHost, ?MODULE, remove_user, 90), + ejabberd_hooks:add(user_send_packet, VHost, ?MODULE, send_packet, 90), + ejabberd_hooks:add(user_receive_packet, VHost, ?MODULE, receive_packet, 90), + + ejabberd_hooks:add(disco_local_items, VHost, ?MODULE, get_local_items, 50), + ejabberd_hooks:add(disco_local_features, VHost, ?MODULE, get_local_features, 50), + ejabberd_hooks:add(disco_local_identity, VHost, ?MODULE, get_local_identity, 50), + %ejabberd_hooks:add(disco_sm_items, VHost, ?MODULE, get_sm_items, 50), + %ejabberd_hooks:add(disco_sm_features, VHost, ?MODULE, get_sm_features, 50), + %ejabberd_hooks:add(disco_sm_identity, VHost, ?MODULE, get_sm_identity, 50), + ejabberd_hooks:add(adhoc_local_items, VHost, ?MODULE, adhoc_local_items, 50), + ejabberd_hooks:add(adhoc_local_commands, VHost, ?MODULE, adhoc_local_commands, 50), + %ejabberd_hooks:add(adhoc_sm_items, VHost, ?MODULE, adhoc_sm_items, 50), + %ejabberd_hooks:add(adhoc_sm_commands, VHost, ?MODULE, adhoc_sm_commands, 50), + + ejabberd_hooks:add(webadmin_menu_host, VHost, ?MODULE, webadmin_menu, 70), + ejabberd_hooks:add(webadmin_user, VHost, ?MODULE, webadmin_user, 50), + ejabberd_hooks:add(webadmin_page_host, VHost, ?MODULE, webadmin_page, 50), + ejabberd_hooks:add(webadmin_user_parse_query, VHost, ?MODULE, user_parse_query, 50), + + ?MYDEBUG("Added hooks for ~p", [VHost]), + + %ejabberd_ctl:register_commands( + % VHost, + % [{"rebuild_stats", "rebuild mod_logdb module stats for vhost"}], + % ?MODULE, rebuild_stats), + %Supported_backends = lists:flatmap(fun({Backend, _Opts}) -> + % [atom_to_list(Backend), " "] + % end, State#state.dbs), + %ejabberd_ctl:register_commands( + % VHost, + % [{"copy_messages backend", "copy messages from backend to current backend. backends could be: " ++ Supported_backends }], + % ?MODULE, copy_messages_ctl), + ?MYDEBUG("Registered commands for ~p", [VHost]), + + NewState=State#state{monref = MonRef, backendPid=SPid, purgeRef=TrefPurge, pollRef=TrefPoll}, + {noreply, NewState}; + Rez -> + ?ERROR_MSG("Rez=~p", [Rez]), + timer:sleep(30000), + {stop, db_connection_failed, State} + end; +% from timer:send_interval/2 (in start handle_info) +handle_info(scheduled_purging, #state{vhost=VHost, purge_older_days=Days} = State) -> + ?MYDEBUG("Starting scheduled purging of old records for ~p", [VHost]), + spawn(?MODULE, purge_old_records, [VHost, integer_to_list(Days)]), + {noreply, State}; +% from timer:send_interval/2 (in start handle_info) +handle_info(poll_users_settings, #state{dbmod=DBMod, vhost=VHost}=State) -> + {ok, DoLog} = DBMod:get_users_settings(VHost), + ?MYDEBUG("DoLog=~p", [DoLog]), + true = ets:delete_all_objects(ets_settings_table(VHost)), + ets:insert(ets_settings_table(VHost), DoLog), + {noreply, State}; +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, db_connection_dropped, State}; +handle_info({fetch_result, _, _}, State) -> + ?MYDEBUG("Got timed out mysql fetch result", []), + {noreply, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(db_connection_failed, _State) -> + ok; +terminate(db_connection_dropped, State) -> + ?MYDEBUG("Got terminate with db_connection_dropped", []), + cleanup(State), + ok; +terminate(Reason, #state{monref=undefined} = State) -> + ?MYDEBUG("Got terminate with undefined monref.~nReason: ~p", [Reason]), + cleanup(State), + ok; +terminate(Reason, #state{dbmod=DBMod, vhost=VHost, monref=MonRef, backendPid=Pid} = State) -> + ?INFO_MSG("Reason: ~p", [Reason]), + case erlang:is_process_alive(Pid) of + true -> + erlang:demonitor(MonRef, [flush]), + DBMod:stop(VHost); + false -> + ok + end, + cleanup(State), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% ejabberd_hooks callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% TODO: change to/from to list as sql stores it as list +send_packet(Owner, Peer, P) -> + VHost = Owner#jid.lserver, + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {addlog, to, Owner, Peer, P}). + +receive_packet(_JID, Peer, Owner, P) -> + VHost = Owner#jid.lserver, + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {addlog, from, Owner, Peer, P}). + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Proc = gen_mod:get_module_proc(LServer, ?PROCNAME), + gen_server:cast(Proc, {remove_user, LUser}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% ejabberdctl +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +rebuild_stats(_Val, VHost, ["rebuild_stats"]) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}), + {stop, ?STATUS_SUCCESS}; +rebuild_stats(Val, _VHost, _Args) -> + Val. + +copy_messages_ctl(_Val, VHost, ["copy_messages", Backend]) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {copy_messages, Backend}), + {stop, ?STATUS_SUCCESS}; +copy_messages_ctl(_Val, VHost, ["copy_messages", Backend, Date]) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {copy_messages, Backend, Date}), + {stop, ?STATUS_SUCCESS}; +copy_messages_ctl(Val, _VHost, _Args) -> + Val. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% misc operations +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +% handle_cast({addlog, E}, _) +% raw packet -> #msg +packet_parse(Owner, Peer, Packet, Direction, State) -> + case xml:get_subtag(Packet, <<"body">>) of + false -> + ignore; + Body_xml -> + Message_type = + case xml:get_tag_attr_s(<<"type">>, Packet) of + <<"error">> -> throw(ignore); + [] -> <<"normal">>; + MType -> MType + end, + + case Message_type of + <<"groupchat">> when State#state.groupchat == send, Direction == to -> + ok; + <<"groupchat">> when State#state.groupchat == send, Direction == from -> + throw(ignore); + <<"groupchat">> when State#state.groupchat == half -> + Rooms = ets:match(muc_online_room, '$1'), + Ni=lists:foldl(fun([{muc_online_room, {GName, GHost}, Pid}], Names) -> + case gen_fsm:sync_send_all_state_event(Pid, {get_jid_nick,Owner}) of + [] -> Names; + Nick -> + lists:append(Names, [jlib:jid_to_string({GName, GHost, Nick})]) + end + end, [], Rooms), + case lists:member(jlib:jid_to_string(Peer), Ni) of + true when Direction == from -> + throw(ignore); + _ -> + ok + end; + <<"groupchat">> when State#state.groupchat == none -> + throw(ignore); + _ -> + ok + end, + + Message_body = xml:get_tag_cdata(Body_xml), + Message_subject = + case xml:get_subtag(Packet, <<"subject">>) of + false -> + <<"">>; + Subject_xml -> + xml:get_tag_cdata(Subject_xml) + end, + + OwnerName = stringprep:tolower(Owner#jid.user), + PName = stringprep:tolower(Peer#jid.user), + PServer = stringprep:tolower(Peer#jid.server), + PResource = Peer#jid.resource, + + #msg{timestamp = get_timestamp(), + owner_name = OwnerName, + peer_name = PName, + peer_server = PServer, + peer_resource = PResource, + direction = Direction, + type = Message_type, + subject = Message_subject, + body = Message_body} + end. + +% called from handle_cast({addlog, _}, _) -> true (log messages) | false (do not log messages) +filter(Owner, Peer, State) -> + OwnerBin = << (Owner#jid.luser)/binary, "@", (Owner#jid.lserver)/binary >>, + OwnerServ = << "@", (Owner#jid.lserver)/binary >>, + PeerBin = << (Peer#jid.luser)/binary, "@", (Peer#jid.lserver)/binary >>, + PeerServ = << "@", (Peer#jid.lserver)/binary >>, + + LogTo = case ets:match_object(ets_settings_table(State#state.vhost), + #user_settings{owner_name=Owner#jid.luser, _='_'}) of + [#user_settings{dolog_default=Default, + dolog_list=DLL, + donotlog_list=DNLL}] -> + + A = lists:member(PeerBin, DLL), + B = lists:member(PeerBin, DNLL), + if + A -> true; + B -> false; + Default == true -> true; + Default == false -> false; + true -> State#state.dolog_default + end; + _ -> State#state.dolog_default + end, + lists:all(fun(O) -> O end, + [not lists:member(OwnerBin, State#state.ignore_jids), + not lists:member(PeerBin, State#state.ignore_jids), + not lists:member(OwnerServ, State#state.ignore_jids), + not lists:member(PeerServ, State#state.ignore_jids), + LogTo]). + +purge_old_records(VHost, Days) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + + Dates = ?MODULE:get_dates(VHost), + DateNow = calendar:datetime_to_gregorian_seconds({date(), {0,0,1}}), + DateDiff = list_to_integer(Days)*24*60*60, + ?MYDEBUG("Purging tables older than ~s days", [Days]), + lists:foreach(fun(Date) -> + [Year, Month, Day] = ejabberd_regexp:split(iolist_to_binary(Date), <<"[^0-9]+">>), + DateInSec = calendar:datetime_to_gregorian_seconds({{binary_to_integer(Year), binary_to_integer(Month), binary_to_integer(Day)}, {0,0,1}}), + if + (DateNow - DateInSec) > DateDiff -> + gen_server:call(Proc, {delete_messages_at, Date}); + true -> + ?MYDEBUG("Skipping messages at ~p", [Date]) + end + end, Dates). + +% called from get_vhost_stats/2, get_user_stats/3 +sort_stats(Stats) -> + % Stats = [{"2003-4-15",1}, {"2006-8-18",1}, ... ] + CFun = fun({TableName, Count}) -> + [Year, Month, Day] = ejabberd_regexp:split(iolist_to_binary(TableName), <<"[^0-9]+">>), + { calendar:datetime_to_gregorian_seconds({{binary_to_integer(Year), binary_to_integer(Month), binary_to_integer(Day)}, {0,0,1}}), Count } + end, + % convert to [{63364377601,1}, {63360662401,1}, ... ] + CStats = lists:map(CFun, Stats), + % sort by date + SortedStats = lists:reverse(lists:keysort(1, CStats)), + % convert to [{"2007-12-9",1}, {"2007-10-27",1}, ... ] sorted list + [{mod_logdb:convert_timestamp_brief(TableSec), Count} || {TableSec, Count} <- SortedStats]. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% Date/Time operations +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% return float seconds elapsed from "zero hour" as list +get_timestamp() -> + {MegaSec, Sec, MicroSec} = now(), + [List] = io_lib:format("~.5f", [MegaSec*1000000 + Sec + MicroSec/1000000]), + List. + +% convert float seconds elapsed from "zero hour" to local time "%Y-%m-%d %H:%M:%S" string +convert_timestamp(Seconds) when is_list(Seconds) -> + case string:to_float(Seconds++".0") of + {F,_} when is_float(F) -> convert_timestamp(F); + _ -> erlang:error(badarg, [Seconds]) + end; +convert_timestamp(Seconds) when is_float(Seconds) -> + GregSec = trunc(Seconds + 719528*86400), + UnivDT = calendar:gregorian_seconds_to_datetime(GregSec), + {{Year, Month, Day},{Hour, Minute, Sec}} = calendar:universal_time_to_local_time(UnivDT), + integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day) ++ " " ++ integer_to_list(Hour) ++ ":" ++ integer_to_list(Minute) ++ ":" ++ integer_to_list(Sec). + +% convert float seconds elapsed from "zero hour" to local time "%Y-%m-%d" string +convert_timestamp_brief(Seconds) when is_list(Seconds) -> + convert_timestamp_brief(list_to_float(Seconds)); +convert_timestamp_brief(Seconds) when is_float(Seconds) -> + GregSec = trunc(Seconds + 719528*86400), + UnivDT = calendar:gregorian_seconds_to_datetime(GregSec), + {{Year, Month, Day},{_Hour, _Minute, _Sec}} = calendar:universal_time_to_local_time(UnivDT), + integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day); +convert_timestamp_brief(Seconds) when is_integer(Seconds) -> + {{Year, Month, Day},{_Hour, _Minute, _Sec}} = calendar:gregorian_seconds_to_datetime(Seconds), + integer_to_list(Year) ++ "-" ++ integer_to_list(Month) ++ "-" ++ integer_to_list(Day). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% DB operations (get) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). + +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). + +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). + +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). + +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). + +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). + +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}). + +get_module_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_module_settings}). + +set_module_settings(VHost, Settings) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_module_settings, Settings}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% Web admin callbacks (delete) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +user_messages_at_parse_query(VHost, Date, Msgs, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + PMsgs = lists:filter( + fun(Msg) -> + ID = jlib:encode_base64(term_to_binary(Msg#msg.timestamp)), + lists:member({<<"selected">>, ID}, Query) + end, Msgs), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, PMsgs, Date}, ?CALL_TIMEOUT); + false -> + nothing + end. + +user_messages_parse_query(User, VHost, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + Dates = get_dates(VHost), + PDates = lists:filter( + fun(Date) -> + ID = jlib:encode_base64( << User/binary, (iolist_to_binary(Date))/binary >> ), + lists:member({<<"selected">>, ID}, Query) + end, Dates), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + Rez = lists:foldl( + fun(Date, Acc) -> + lists:append(Acc, + [gen_server:call(Proc, + {delete_all_messages_by_user_at, User, iolist_to_binary(Date)}, + ?CALL_TIMEOUT)]) + end, [], PDates), + case lists:member(error, Rez) of + true -> + error; + false -> + nothing + end; + false -> + nothing + end. + +vhost_messages_parse_query(VHost, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + Dates = get_dates(VHost), + PDates = lists:filter( + fun(Date) -> + ID = jlib:encode_base64( << VHost/binary, (iolist_to_binary(Date))/binary >> ), + lists:member({<<"selected">>, ID}, Query) + end, Dates), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + Rez = lists:foldl(fun(Date, Acc) -> + lists:append(Acc, [gen_server:call(Proc, + {delete_messages_at, Date}, + ?CALL_TIMEOUT)]) + end, [], PDates), + case lists:member(error, Rez) of + true -> + error; + false -> + nothing + end; + false -> + nothing + end. + +vhost_messages_at_parse_query(VHost, Date, Stats, Query) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + PStats = lists:filter( + fun({User, _Count}) -> + ID = jlib:encode_base64( << (iolist_to_binary(User))/binary, VHost/binary >> ), + lists:member({<<"selected">>, ID}, Query) + end, Stats), + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + Rez = lists:foldl(fun({User, _Count}, Acc) -> + lists:append(Acc, [gen_server:call(Proc, + {delete_all_messages_by_user_at, + iolist_to_binary(User), iolist_to_binary(Date)}, + ?CALL_TIMEOUT)]) + end, [], PStats), + case lists:member(error, Rez) of + true -> + error; + false -> + ok + end; + false -> + nothing + end. + +copy_messages([#state{vhost=VHost}=State, From]) -> + ?INFO_MSG("Going to copy messages from ~p for ~p", [From, VHost]), + + {FromDBName, FromDBOpts} = + case lists:keysearch(list_to_atom(From), 1, State#state.dbs) of + {value, {FN, FO}} -> + {FN, FO}; + false -> + ?ERROR_MSG("Failed to find record for ~p in dbs", [From]), + throw(error) + end, + + FromDBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(FromDBName)), + + {ok, _FromPid} = FromDBMod:start(VHost, FromDBOpts), + + Dates = FromDBMod:get_dates(VHost), + DatesLength = length(Dates), + + lists:foldl(fun(Date, Acc) -> + case copy_messages_int([FromDBMod, State#state.dbmod, VHost, Date]) of + ok -> + ?INFO_MSG("Copied messages at ~p (~p/~p)", [Date, Acc, DatesLength]); + Value -> + ?ERROR_MSG("Failed to copy messages at ~p (~p/~p): ~p", [Date, Acc, DatesLength, Value]), + FromDBMod:stop(VHost), + throw(error) + end, + Acc + 1 + end, 1, Dates), + ?INFO_MSG("Copied messages from ~p", [From]), + FromDBMod:stop(VHost); +copy_messages([#state{vhost=VHost}=State, From, Date]) -> + {value, {FromDBName, FromDBOpts}} = lists:keysearch(list_to_atom(From), 1, State#state.dbs), + FromDBMod = list_to_atom(atom_to_list(?MODULE) ++ "_" ++ atom_to_list(FromDBName)), + {ok, _FromPid} = FromDBMod:start(VHost, FromDBOpts), + case catch copy_messages_int([FromDBMod, State#state.dbmod, VHost, Date]) of + {'exit', Reason} -> + ?ERROR_MSG("Failed to copy messages at ~p: ~p", [Date, Reason]); + ok -> + ?INFO_MSG("Copied messages at ~p", [Date]); + Value -> + ?ERROR_MSG("Failed to copy messages at ~p: ~p", [Date, Value]) + end, + FromDBMod:stop(VHost). + +copy_messages_int([FromDBMod, ToDBMod, VHost, Date]) -> + ets:new(mod_logdb_temp, [named_table, set, public]), + {Time, Value} = timer:tc(?MODULE, copy_messages_int_tc, [[FromDBMod, ToDBMod, VHost, Date]]), + ets:delete_all_objects(mod_logdb_temp), + ets:delete(mod_logdb_temp), + ?INFO_MSG("copy_messages at ~p elapsed ~p sec", [Date, Time/1000000]), + Value. + +copy_messages_int_tc([FromDBMod, ToDBMod, VHost, Date]) -> + ?INFO_MSG("Going to copy messages from ~p for ~p at ~p", [FromDBMod, VHost, Date]), + + ok = FromDBMod:rebuild_stats_at(VHost, binary_to_list(Date)), + catch mod_logdb:rebuild_stats_at(VHost, Date), + {ok, FromStats} = FromDBMod:get_vhost_stats_at(VHost, binary_to_list(Date)), + ToStats = case mod_logdb:get_vhost_stats_at(VHost, Date) of + {ok, Stats} -> Stats; + {error, _} -> [] + end, + + FromStatsS = lists:keysort(1, FromStats), + ToStatsS = lists:keysort(1, ToStats), + + StatsLength = length(FromStats), + + CopyFun = if + % destination table is empty + FromDBMod /= mod_logdb_mnesia_old, ToStats == [] -> + fun({User, _Count}, Acc) -> + {ok, Msgs} = FromDBMod:get_user_messages_at(binary_to_list(User), VHost, binary_to_list(Date)), + MAcc = + lists:foldl(fun(Msg, MFAcc) -> + ok = ToDBMod:log_message(VHost, Msg), + MFAcc + 1 + end, 0, Msgs), + NewAcc = Acc + 1, + ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]), + %timer:sleep(100), + NewAcc + end; + % destination table is not empty + FromDBMod /= mod_logdb_mnesia_old, ToStats /= [] -> + fun({User, _Count}, Acc) -> + {ok, ToMsgs} = ToDBMod:get_user_messages_at(binary_to_list(User), VHost, binary_to_list(Date)), + lists:foreach(fun(#msg{timestamp=Tst}) when length(Tst) == 16 -> + ets:insert(mod_logdb_temp, {Tst}); + % mysql, pgsql removes final zeros after decimal point + (#msg{timestamp=Tst}) when length(Tst) < 16 -> + {F, _} = string:to_float(Tst++".0"), + [T] = io_lib:format("~.5f", [F]), + ets:insert(mod_logdb_temp, {T}) + end, ToMsgs), + {ok, Msgs} = FromDBMod:get_user_messages_at(binary_to_list(User), VHost, binary_to_list(Date)), + MAcc = + lists:foldl(fun(#msg{timestamp=ToTimestamp} = Msg, MFAcc) -> + case ets:member(mod_logdb_temp, ToTimestamp) of + false -> + ok = ToDBMod:log_message(VHost, Msg), + ets:insert(mod_logdb_temp, {ToTimestamp}), + MFAcc + 1; + true -> + MFAcc + end + end, 0, Msgs), + NewAcc = Acc + 1, + ets:delete_all_objects(mod_logdb_temp), + ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]), + %timer:sleep(100), + NewAcc + end; + % copying from mod_logmnesia + true -> + fun({User, _Count}, Acc) -> + ToStats = + case ToDBMod:get_user_messages_at(binary_to_list(User), VHost, binary_to_list(Date)) of + {ok, []} -> + ok; + {ok, ToMsgs} -> + lists:foreach(fun(#msg{timestamp=Tst}) when length(Tst) == 16 -> + ets:insert(mod_logdb_temp, {Tst}); + % mysql, pgsql removes final zeros after decimal point + (#msg{timestamp=Tst}) when length(Tst) < 15 -> + {F, _} = string:to_float(Tst++".0"), + [T] = io_lib:format("~.5f", [F]), + ets:insert(mod_logdb_temp, {T}) + end, ToMsgs); + {error, _} -> + ok + end, + {ok, Msgs} = FromDBMod:get_user_messages_at(binary_to_list(User), VHost, binary_to_list(Date)), + + MAcc = + lists:foldl( + fun({msg, TU, TS, TR, FU, FS, FR, Type, Subj, Body, Timest}, + MFAcc) -> + [Timestamp] = if is_float(Timest) == true -> + io_lib:format("~.5f", [Timest]); + % early versions of mod_logmnesia + is_integer(Timest) == true -> + io_lib:format("~.5f", [Timest-719528*86400.0]); + true -> + ?ERROR_MSG("Incorrect timestamp ~p", [Timest]), + throw(error) + end, + case ets:member(mod_logdb_temp, Timestamp) of + false -> + if + % from + TS == VHost -> + TMsg = #msg{timestamp=Timestamp, + owner_name=TU, + peer_name=FU, peer_server=FS, peer_resource=FR, + direction=from, + type=Type, + subject=Subj, body=Body}, + ok = ToDBMod:log_message(VHost, TMsg); + true -> ok + end, + if + % to + FS == VHost -> + FMsg = #msg{timestamp=Timestamp, + owner_name=FU, + peer_name=TU, peer_server=TS, peer_resource=TR, + direction=to, + type=Type, + subject=Subj, body=Body}, + ok = ToDBMod:log_message(VHost, FMsg); + true -> ok + end, + ets:insert(mod_logdb_temp, {Timestamp}), + MFAcc + 1; + true -> % not ets:member + MFAcc + end % case + end, 0, Msgs), % foldl + NewAcc = Acc + 1, + ?INFO_MSG("Copied ~p messages for ~p (~p/~p) at ~p", [MAcc, User, NewAcc, StatsLength, Date]), + %timer:sleep(100), + NewAcc + end % fun + end, % if FromDBMod /= mod_logdb_mnesia_old + + if + FromStats == [] -> + ?INFO_MSG("No messages were found at ~p", [Date]); + FromStatsS == ToStatsS -> + ?INFO_MSG("Stats are equal at ~p", [Date]); + FromStatsS /= ToStatsS -> + lists:foldl(CopyFun, 0, FromStats), + ok = ToDBMod:rebuild_stats_at(VHost, binary_to_list(Date)) + %timer:sleep(1000) + end, + + ok. + +list_to_bool(Num) when is_binary(Num) -> + list_to_bool(binary_to_list(Num)); +list_to_bool(Num) when is_list(Num) -> + case lists:member(Num, ["t", "true", "y", "yes", "1"]) of + true -> + true; + false -> + case lists:member(Num, ["f", "false", "n", "no", "0"]) of + true -> + false; + false -> + error + end + end. + +bool_to_list(true) -> + "TRUE"; +bool_to_list(false) -> + "FALSE". + +list_to_string([]) -> + ""; +list_to_string(List) when is_list(List) -> + Str = lists:flatmap(fun(Elm) when is_binary(Elm) -> + binary_to_list(Elm) ++ "\n"; + (Elm) when is_list(Elm) -> + Elm ++ "\n" + end, List), + lists:sublist(Str, length(Str)-1). + +string_to_list(null) -> + []; +string_to_list([]) -> + []; +string_to_list(String) -> + ejabberd_regexp:split(iolist_to_binary(String), <<"\n">>). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% ad-hoc (copy/pasted from mod_configure.erl) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-define(ITEMS_RESULT(Allow, LNode, Fallback), + case Allow of + deny -> + Fallback; + allow -> + case get_local_items(LServer, LNode, + jlib:jid_to_string(To), Lang) of + {result, Res} -> + {result, Res}; + {error, Error} -> + {error, Error} + end + end). + +get_local_items(Acc, From, #jid{lserver = LServer} = To, <<"">>, Lang) -> + case gen_mod:is_loaded(LServer, mod_adhoc) of + false -> + Acc; + _ -> + Items = case Acc of + {result, Its} -> Its; + empty -> [] + end, + AllowUser = acl:match_rule(LServer, mod_logdb, From), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + if + AllowUser == allow; AllowAdmin == allow -> + case get_local_items(LServer, [], + jlib:jid_to_string(To), Lang) of + {result, Res} -> + {result, Items ++ Res}; + {error, _Error} -> + {result, Items} + end; + true -> + {result, Items} + end + end; +get_local_items(Acc, From, #jid{lserver = LServer} = To, Node, Lang) -> + case gen_mod:is_loaded(LServer, mod_adhoc) of + false -> + Acc; + _ -> + LNode = str:tokens(Node, <<"/">>), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + case LNode of + [<<"mod_logdb">>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN}); + [<<"mod_logdb_users">>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN}); + [<<"mod_logdb_users">>, <<$@, _/binary>>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN}); + [<<"mod_logdb_users">>, _User] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN}); + [<<"mod_logdb_settings">>] -> + ?ITEMS_RESULT(AllowAdmin, LNode, {error, ?ERR_FORBIDDEN}); + _ -> + Acc + end + end. + +-define(T(Lang, Text), translate:translate(Lang, Text)). + +-define(NODE(Name, Node), + #xmlel{name = <<"item">>, + attrs = + [{<<"jid">>, Server}, {<<"name">>, ?T(Lang, Name)}, + {<<"node">>, Node}], + children = []}). + +get_local_items(_Host, [], Server, Lang) -> + {result, + [?NODE(<<"Messages logging engine">>, <<"mod_logdb">>)] + }; +get_local_items(_Host, [<<"mod_logdb">>], Server, Lang) -> + {result, + [?NODE(<<"Messages logging engine users">>, <<"mod_logdb_users">>), + ?NODE(<<"Messages logging engine settings">>, <<"mod_logdb_settings">>)] + }; +get_local_items(Host, [<<"mod_logdb_users">>], Server, Lang) -> + {result, get_all_vh_users(Host, Server, Lang)}; +get_local_items(Host, [<<"mod_logdb_users">>, <<$@, Diap/binary>>], Server, Lang) -> + case catch ejabberd_auth:get_vh_registered_users(Host) of + {'EXIT', _Reason} -> + ?ERR_INTERNAL_SERVER_ERROR; + Users -> + SUsers = lists:sort([{S, U} || {U, S} <- Users]), + case catch begin + [S1, S2] = ejabberd_regexp:split(Diap, <<"-">>), + N1 = jlib:binary_to_integer(S1), + N2 = jlib:binary_to_integer(S2), + Sub = lists:sublist(SUsers, N1, N2 - N1 + 1), + lists:map(fun({S, U}) -> + ?NODE(<< U/binary, "@", S/binary >>, + << (iolist_to_binary("mod_logdb_users/"))/binary, U/binary, "@", S/binary >>) + end, Sub) + end of + {'EXIT', _Reason} -> + ?ERR_NOT_ACCEPTABLE; + Res -> + {result, Res} + end + end; +get_local_items(_Host, [<<"mod_logdb_users">>, _User], _Server, _Lang) -> + {result, []}; +get_local_items(_Host, [<<"mod_logdb_settings">>], _Server, _Lang) -> + {result, []}; +get_local_items(_Host, Item, _Server, _Lang) -> + ?MYDEBUG("asked for items in ~p", [Item]), + {error, ?ERR_ITEM_NOT_FOUND}. + +-define(INFO_RESULT(Allow, Feats), + case Allow of + deny -> {error, ?ERR_FORBIDDEN}; + allow -> {result, Feats} + end). + +get_local_features(Acc, From, #jid{lserver = LServer} = _To, Node, _Lang) -> + case gen_mod:is_loaded(LServer, mod_adhoc) of + false -> + Acc; + _ -> + LNode = str:tokens(Node, <<"/">>), + AllowUser = acl:match_rule(LServer, mod_logdb, From), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + case LNode of + [<<"mod_logdb">>] when AllowUser == allow; AllowAdmin == allow -> + ?INFO_RESULT(allow, [?NS_COMMANDS]); + [<<"mod_logdb">>] -> + ?INFO_RESULT(deny, [?NS_COMMANDS]); + [<<"mod_logdb_users">>] -> + ?INFO_RESULT(AllowAdmin, []); + [<<"mod_logdb_users">>, [$@ | _]] -> + ?INFO_RESULT(AllowAdmin, []); + [<<"mod_logdb_users">>, _User] -> + ?INFO_RESULT(AllowAdmin, [?NS_COMMANDS]); + [<<"mod_logdb_settings">>] -> + ?INFO_RESULT(AllowAdmin, [?NS_COMMANDS]); + [] -> + Acc; + _ -> + Acc + end + end. + +-define(INFO_IDENTITY(Category, Type, Name, Lang), + [#xmlel{name = <<"identity">>, + attrs = + [{<<"category">>, Category}, {<<"type">>, Type}, + {<<"name">>, ?T(Lang, Name)}], + children = []}]). + +-define(INFO_COMMAND(Name, Lang), + ?INFO_IDENTITY(<<"automation">>, <<"command-node">>, + Name, Lang)). + +get_local_identity(Acc, _From, _To, Node, Lang) -> + LNode = str:tokens(Node, <<"/">>), + case LNode of + [<<"mod_logdb">>] -> + ?INFO_COMMAND(<<"Messages logging engine">>, Lang); + [<<"mod_logdb_users">>] -> + ?INFO_COMMAND(<<"Messages logging engine users">>, Lang); + [<<"mod_logdb_users">>, [$@ | _]] -> + Acc; + [<<"mod_logdb_users">>, User] -> + ?INFO_COMMAND(User, Lang); + [<<"mod_logdb_settings">>] -> + ?INFO_COMMAND(<<"Messages logging engine settings">>, Lang); + [] -> + Acc; + _ -> + Acc + end. + +%get_sm_items(Acc, From, To, Node, Lang) -> +% ?MYDEBUG("get_sm_items Acc=~p From=~p To=~p Node=~p Lang=~p", [Acc, From, To, Node, Lang]), +% Acc. + +%get_sm_features(Acc, From, To, Node, Lang) -> +% ?MYDEBUG("get_sm_features Acc=~p From=~p To=~p Node=~p Lang=~p", [Acc, From, To, Node, Lang]), +% Acc. + +%get_sm_identity(Acc, From, To, Node, Lang) -> +% ?MYDEBUG("get_sm_identity Acc=~p From=~p To=~p Node=~p Lang=~p", [Acc, From, To, Node, Lang]), +% Acc. + +adhoc_local_items(Acc, From, #jid{lserver = LServer, server = Server} = To, + Lang) -> + Items = case Acc of + {result, Its} -> Its; + empty -> [] + end, + Nodes = recursively_get_local_items(LServer, "", Server, Lang), + Nodes1 = lists:filter( + fun(N) -> + Nd = xml:get_tag_attr_s("node", N), + F = get_local_features([], From, To, Nd, Lang), + case F of + {result, [?NS_COMMANDS]} -> + true; + _ -> + false + end + end, Nodes), + {result, Items ++ Nodes1}. + +recursively_get_local_items(_LServer, <<"mod_logdb_users">>, _Server, _Lang) -> + []; +recursively_get_local_items(LServer, Node, Server, Lang) -> + LNode = str:tokens(Node, <<"/">>), + Items = case get_local_items(LServer, LNode, Server, Lang) of + {result, Res} -> + Res; + {error, _Error} -> + [] + end, + Nodes = lists:flatten( + lists:map( + fun(N) -> + S = xml:get_tag_attr_s("jid", N), + Nd = xml:get_tag_attr_s("node", N), + if (S /= Server) or (Nd == "") -> + []; + true -> + [N, recursively_get_local_items( + LServer, Nd, Server, Lang)] + end + end, Items)), + Nodes. + +-define(COMMANDS_RESULT(Allow, From, To, Request), + case Allow of + deny -> + {error, ?ERR_FORBIDDEN}; + allow -> + adhoc_local_commands(From, To, Request) + end). + +adhoc_local_commands(Acc, From, #jid{lserver = LServer} = To, + #adhoc_request{node = Node} = Request) -> + LNode = str:tokens(Node, <<"/">>), + AllowUser = acl:match_rule(LServer, mod_logdb, From), + AllowAdmin = acl:match_rule(LServer, mod_logdb_admin, From), + case LNode of + [<<"mod_logdb">>] when AllowUser == allow; AllowAdmin == allow -> + ?COMMANDS_RESULT(allow, From, To, Request); + [<<"mod_logdb_users">>, _User] when AllowAdmin == allow -> + ?COMMANDS_RESULT(allow, From, To, Request); + [<<"mod_logdb_settings">>] when AllowAdmin == allow -> + ?COMMANDS_RESULT(allow, From, To, Request); + _ -> + Acc + end. + +adhoc_local_commands(From, #jid{lserver = LServer} = _To, + #adhoc_request{lang = Lang, + node = Node, + sessionid = SessionID, + action = Action, + xdata = XData} = Request) -> + LNode = str:tokens(Node, <<"/">>), + %% If the "action" attribute is not present, it is + %% understood as "execute". If there was no + %% element in the first response (which there isn't in our + %% case), "execute" and "complete" are equivalent. + ActionIsExecute = lists:member(Action, + [<<"">>, <<"execute">>, <<"complete">>]), + if Action == <<"cancel">> -> + %% User cancels request + adhoc:produce_response( + Request, + #adhoc_response{status = canceled}); + XData == false, ActionIsExecute -> + %% User requests form + case get_form(LServer, LNode, From, Lang) of + {result, Form} -> + adhoc:produce_response( + Request, + #adhoc_response{status = executing, + elements = Form}); + {error, Error} -> + {error, Error} + end; + XData /= false, ActionIsExecute -> + %% User returns form. + case jlib:parse_xdata_submit(XData) of + invalid -> + {error, ?ERR_BAD_REQUEST}; + Fields -> + case catch set_form(From, LServer, LNode, Lang, Fields) of + {result, _Res} -> + adhoc:produce_response( + #adhoc_response{lang = Lang, + node = Node, + sessionid = SessionID, + status = completed}); + {'EXIT', _} -> {error, ?ERR_BAD_REQUEST}; + {error, Error} -> {error, Error} + end + end; + true -> + {error, ?ERR_BAD_REQUEST} + end. + +-define(LISTLINE(Label, Value), + #xmlel{name = <<"option">>, + attrs = [{<<"label">>, ?T(Lang, Label)}], + children = [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, Value}] + }]}). +-define(DEFVAL(Value), #xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, Value}]}). + +get_user_form(LUser, LServer, Lang) -> + %From = jlib:jid_to_string(jlib:jid_remove_resource(Jid)), + #user_settings{dolog_default=DLD, + dolog_list=DLL, + donotlog_list=DNLL} = get_user_settings(LUser, LServer), + {result, + [#xmlel{name = <<"x">>, + attrs = [{<<"xmlns">>, ?NS_XDATA}], + children = [ + #xmlel{name = <<"title">>, attrs = [], + children = + [{xmlcdata, + ?T(Lang, <<"Messages logging engine settings">>)}]}, + #xmlel{name = <<"instructions">>, attrs = [], + children = + [{xmlcdata, + << (?T(Lang, <<"Set logging preferences">>))/binary, (iolist_to_binary(": "))/binary, + LUser/binary, "@", LServer/binary >> }]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"list-single">>}, + {<<"label">>, ?T(Lang, <<"Default">>)}, + {<<"var">>, <<"dolog_default">>}], + children = + [?DEFVAL(jlib:atom_to_binary(DLD)), + ?LISTLINE(<<"Log Messages">>, <<"true">>), + ?LISTLINE(<<"Do Not Log Messages">>, <<"false">>) + ]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-multi">>}, + {<<"label">>, ?T(Lang, <<"Log Messages">>)}, + {<<"var">>, <<"dolog_list">>}], + children = [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, iolist_to_binary(list_to_string(DLL))}]} + ] + }, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-multi">>}, + {<<"label">>, ?T(Lang, <<"Do Not Log Messages">>)}, + {<<"var">>, <<"donotlog_list">>}], + children = [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, iolist_to_binary(list_to_string(DNLL))}]} + ] + } + ]}]}. + +get_settings_form(Host, Lang) -> + #state{dbmod=_DBMod, + dbs=_DBs, + dolog_default=DLD, + ignore_jids=IgnoreJids, + groupchat=GroupChat, + purge_older_days=PurgeDaysT, + drop_messages_on_user_removal=MRemoval, + poll_users_settings=PollTime} = mod_logdb:get_module_settings(Host), + + %Backends = lists:map(fun({Backend, _Opts}) -> + % ?LISTLINE(jlib:atom_to_binary(Backend), jlib:atom_to_binary(Backend)) + % end, DBs), + %DB = iolist_to_binary(lists:sublist(atom_to_list(DBMod), length(atom_to_list(?MODULE)) + 2, length(atom_to_list(DBMod)))), + %DBsL = lists:append([?DEFVAL(DB)], Backends), + + PurgeDays = + case PurgeDaysT of + never -> <<"never">>; + Num when is_integer(Num) -> integer_to_binary(Num); + _ -> <<"unknown">> + end, + {result, + [#xmlel{name = <<"x">>, + attrs = [{<<"xmlns">>, ?NS_XDATA}], + children = [#xmlel{name = <<"title">>, attrs = [], + children = + [{xmlcdata, + <<(?T(Lang, <<"Messages logging engine settings">>))/binary, + (iolist_to_binary(" (run-time)"))/binary >>}]}, + #xmlel{name = <<"instructions">>, attrs = [], + children = + [{xmlcdata, ?T(Lang, <<"Set run-time settings">>)}]}, +% #xmlel{name = <<"field">>, +% attrs = [{<<"type">>, <<"list-single">>}, +% {<<"label">>, ?T(Lang, <<"Backend">>)}, +% {<<"var">>, <<"backend">>}], +% children = DBsL}, +% #xmlel{name = <<"field">>, +% attrs = [{<<"type">>, <<"text-multi">>}, +% {<<"label">>, ?T(Lang, <<"dbs">>)}, +% {<<"var">>, <<"dbs">>}], +% children = [#xmlel{name = <<"value">>, attrs = [], +% children = [{xmlcdata, iolist_to_binary(lists:flatten(io_lib:format("~p.",[DBs])))}]} +% ] +% }, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"list-single">>}, + {<<"label">>, ?T(Lang, <<"Default">>)}, + {<<"var">>, <<"dolog_default">>}], + children = + [?DEFVAL(jlib:atom_to_binary(DLD)), + ?LISTLINE(<<"Log Messages">>, <<"true">>), + ?LISTLINE(<<"Do Not Log Messages">>, <<"false">>) + ]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"list-single">>}, + {<<"label">>, ?T(Lang, <<"Drop messages on user removal">>)}, + {<<"var">>, <<"drop_messages_on_user_removal">>}], + children = + [?DEFVAL(jlib:atom_to_binary(MRemoval)), + ?LISTLINE(<<"Drop">>, <<"true">>), + ?LISTLINE(<<"Do not drop">>, <<"false">>) + ]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"list-single">>}, + {<<"label">>, ?T(Lang, <<"Groupchat messages logging">>)}, + {<<"var">>, <<"groupchat">>}], + children = + [?DEFVAL(jlib:atom_to_binary(GroupChat)), + ?LISTLINE(<<"all">>, <<"all">>), + ?LISTLINE(<<"none">>, <<"none">>), + ?LISTLINE(<<"send">>, <<"send">>), + ?LISTLINE(<<"half">>, <<"half">>) + ]}, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-multi">>}, + {<<"label">>, ?T(Lang, <<"Jids/Domains to ignore">>)}, + {<<"var">>, <<"ignore_list">>}], + children = [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, iolist_to_binary(list_to_string(IgnoreJids))}]} + ] + }, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-single">>}, + {<<"label">>, ?T(Lang, <<"Purge messages older than (days)">>)}, + {<<"var">>, <<"purge_older_days">>}], + children = [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, iolist_to_binary(PurgeDays)}]} + ] + }, + #xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-single">>}, + {<<"label">>, ?T(Lang, <<"Poll users settings (seconds)">>)}, + {<<"var">>, <<"poll_users_settings">>}], + children = [#xmlel{name = <<"value">>, attrs = [], + children = [{xmlcdata, integer_to_binary(PollTime)}]} + ] + } + ]} + ]}. + +%get_form(_Host, [<<"mod_logdb">>], #jid{luser = LUser, lserver = LServer} = _Jid, Lang) -> +% get_user_form(LUser, LServer, Lang); +get_form(_Host, [<<"mod_logdb_users">>, User], _JidFrom, Lang) -> + #jid{luser=LUser, lserver=LServer} = jlib:string_to_jid(User), + get_user_form(LUser, LServer, Lang); +get_form(Host, [<<"mod_logdb_settings">>], _JidFrom, Lang) -> + get_settings_form(Host, Lang); +get_form(_Host, Command, _, _Lang) -> + ?MYDEBUG("asked for form ~p", [Command]), + {error, ?ERR_SERVICE_UNAVAILABLE}. + +check_log_list([]) -> + ok; +check_log_list([<<>>]) -> + ok; +check_log_list([Head | Tail]) -> + case binary:match(Head, <<$@>>) of + nomatch -> throw(error); + {_, _} -> ok + end, + % this check for Head to be valid jid + case jlib:string_to_jid(Head) of + error -> + throw(error); + _ -> + check_log_list(Tail) + end. + +check_ignore_list([]) -> + ok; +check_ignore_list([<<>>]) -> + ok; +check_ignore_list([<<>> | Tail]) -> + check_ignore_list(Tail); +check_ignore_list([Head | Tail]) -> + case binary:match(Head, <<$@>>) of + {_, _} -> ok; + nomatch -> throw(error) + end, + % this check for Head to be valid jid + case jlib:string_to_jid(Head) of + error -> + % this check for Head to be valid domain "@domain.org" + case Head of + << $@, Rest/binary >> -> + % TODO: this allows spaces and special characters in Head. May be change to nodeprep? + case jlib:nameprep(Rest) of + error -> throw(error); + _ -> check_ignore_list(Tail) + end; + _ -> throw(error) + end; + _ -> + check_ignore_list(Tail) + end. + +parse_users_settings(XData) -> + DLD = case lists:keysearch(<<"dolog_default">>, 1, XData) of + {value, {_, [String]}} when String == <<"true">>; String == <<"false">> -> + list_to_bool(String); + _ -> + throw(bad_request) + end, + DLL = case lists:keysearch(<<"dolog_list">>, 1, XData) of + false -> + throw(bad_request); + {value, {_, [[]]}} -> + []; + {value, {_, List1}} -> + case catch check_log_list(List1) of + error -> + throw(bad_request); + ok -> + List1 + end + end, + DNLL = case lists:keysearch(<<"donotlog_list">>, 1, XData) of + false -> + throw(bad_request); + {value, {_, [[]]}} -> + []; + {value, {_, List2}} -> + case catch check_log_list(List2) of + error -> + throw(bad_request); + ok -> + List2 + end + end, + #user_settings{dolog_default=DLD, + dolog_list=DLL, + donotlog_list=DNLL}. + +parse_module_settings(XData) -> + DLD = case lists:keysearch(<<"dolog_default">>, 1, XData) of + {value, {_, [Str1]}} when Str1 == <<"true">>; Str1 == <<"false">> -> + list_to_bool(Str1); + _ -> + throw(bad_request) + end, + MRemoval = case lists:keysearch(<<"drop_messages_on_user_removal">>, 1, XData) of + {value, {_, [Str5]}} when Str5 == <<"true">>; Str5 == <<"false">> -> + list_to_bool(Str5); + _ -> + throw(bad_request) + end, + GroupChat = case lists:keysearch(<<"groupchat">>, 1, XData) of + {value, {_, [Str2]}} when Str2 == <<"none">>; + Str2 == <<"all">>; + Str2 == <<"send">>; + Str2 == <<"half">> -> + jlib:binary_to_atom(Str2); + _ -> + throw(bad_request) + end, + Ignore = case lists:keysearch(<<"ignore_list">>, 1, XData) of + {value, {_, List}} -> + case catch check_ignore_list(List) of + ok -> + List; + error -> + throw(bad_request) + end; + _ -> + throw(bad_request) + end, + Purge = case lists:keysearch(<<"purge_older_days">>, 1, XData) of + {value, {_, [<<"never">>]}} -> + never; + {value, {_, [Str3]}} -> + case catch binary_to_integer(Str3) of + {'EXIT', {badarg, _}} -> throw(bad_request); + Int1 -> Int1 + end; + _ -> + throw(bad_request) + end, + Poll = case lists:keysearch(<<"poll_users_settings">>, 1, XData) of + {value, {_, [Str4]}} -> + case catch binary_to_integer(Str4) of + {'EXIT', {badarg, _}} -> throw(bad_request); + Int2 -> Int2 + end; + _ -> + throw(bad_request) + end, + #state{dolog_default=DLD, + groupchat=GroupChat, + ignore_jids=Ignore, + purge_older_days=Purge, + drop_messages_on_user_removal=MRemoval, + poll_users_settings=Poll}. + +set_form(From, _Host, [<<"mod_logdb">>], _Lang, XData) -> + #jid{luser=LUser, lserver=LServer} = From, + case catch parse_users_settings(XData) of + bad_request -> + {error, ?ERR_BAD_REQUEST}; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to set form ~p", [Reason]), + {error, ?ERR_BAD_REQUEST}; + UserSettings -> + case mod_logdb:set_user_settings(LUser, LServer, UserSettings) of + ok -> + {result, []}; + error -> + {error, ?ERR_INTERNAL_SERVER_ERROR} + end + end; +set_form(_From, _Host, [<<"mod_logdb_users">>, User], _Lang, XData) -> + #jid{luser=LUser, lserver=LServer} = jlib:string_to_jid(User), + case catch parse_users_settings(XData) of + bad_request -> {error, ?ERR_BAD_REQUEST}; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to set form ~p", [Reason]), + {error, ?ERR_BAD_REQUEST}; + UserSettings -> + case mod_logdb:set_user_settings(LUser, LServer, UserSettings) of + ok -> + {result, []}; + error -> + {error, ?ERR_INTERNAL_SERVER_ERROR} + end + end; +set_form(_From, Host, [<<"mod_logdb_settings">>], _Lang, XData) -> + case catch parse_module_settings(XData) of + bad_request -> {error, ?ERR_BAD_REQUEST}; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to set form ~p", [Reason]), + {error, ?ERR_BAD_REQUEST}; + Settings -> + case mod_logdb:set_module_settings(Host, Settings) of + ok -> + {result, []}; + error -> + {error, ?ERR_INTERNAL_SERVER_ERROR} + end + end; +set_form(From, _Host, Node, _Lang, XData) -> + User = jlib:jid_to_string(jlib:jid_remove_resource(From)), + ?MYDEBUG("set form for ~p at ~p XData=~p", [User, Node, XData]), + {error, ?ERR_SERVICE_UNAVAILABLE}. + +%adhoc_sm_items(Acc, From, To, Request) -> +% ?MYDEBUG("adhoc_sm_items Acc=~p From=~p To=~p Request=~p", [Acc, From, To, Request]), +% Acc. +% +%adhoc_sm_commands(Acc, From, To, Request) -> +% ?MYDEBUG("adhoc_sm_commands Acc=~p From=~p To=~p Request=~p", [Acc, From, To, Request]), +% Acc. + +get_all_vh_users(Host, Server, Lang) -> + case catch ejabberd_auth:get_vh_registered_users(Host) of + {'EXIT', _Reason} -> + []; + Users -> + SUsers = lists:sort([{S, U} || {U, S} <- Users]), + case length(SUsers) of + N when N =< 100 -> + lists:map(fun({S, U}) -> + ?NODE(<< U/binary, "@", S/binary >>, + << (iolist_to_binary("mod_logdb_users/"))/binary, U/binary, "@", S/binary >>) + end, + SUsers); + N -> + NParts = trunc(math:sqrt(N * 0.618)) + 1, + M = trunc(N / NParts) + 1, + lists:map(fun(K) -> + L = K + M - 1, + Node = <<"@", + (iolist_to_binary(integer_to_list(K)))/binary, + "-", + (iolist_to_binary(integer_to_list(L)))/binary + >>, + {FS, FU} = lists:nth(K, SUsers), + {LS, LU} = + if L < N -> lists:nth(L, SUsers); + true -> lists:last(SUsers) + end, + Name = + <>, + ?NODE(Name, << (iolist_to_binary("mod_logdb_users/"))/binary, Node/binary >>) + end, lists:seq(1, N, M)) + end + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% webadmin hooks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +webadmin_menu(Acc, _Host, Lang) -> + [{<<"messages">>, ?T(<<"Users Messages">>)} | Acc]. + +webadmin_user(Acc, User, Server, Lang) -> + Sett = get_user_settings(User, Server), + Log = + case Sett#user_settings.dolog_default of + false -> + ?INPUTT(<<"submit">>, <<"dolog">>, <<"Log Messages">>); + true -> + ?INPUTT(<<"submit">>, <<"donotlog">>, <<"Do Not Log Messages">>); + _ -> [] + end, + Acc ++ [?XE(<<"h3">>, [?ACT(<<"messages/">>, <<"Messages">>), ?C(<<" ">>), Log])]. + +webadmin_page(_, Host, + #request{path = [<<"messages">>], + q = Query, + lang = Lang}) -> + Res = vhost_messages_stats(Host, Query, Lang), + {stop, Res}; +webadmin_page(_, Host, + #request{path = [<<"messages">>, Date], + q = Query, + lang = Lang}) -> + Res = vhost_messages_stats_at(Host, Query, Lang, Date), + {stop, Res}; +webadmin_page(_, Host, + #request{path = [<<"user">>, U, <<"messages">>], + q = Query, + lang = Lang}) -> + Res = user_messages_stats(U, Host, Query, Lang), + {stop, Res}; +webadmin_page(_, Host, + #request{path = [<<"user">>, U, <<"messages">>, Date], + q = Query, + lang = Lang}) -> + Res = mod_logdb:user_messages_stats_at(U, Host, Query, Lang, Date), + {stop, Res}; +webadmin_page(Acc, _Host, _R) -> Acc. + +user_parse_query(_, <<"dolog">>, User, Server, _Query) -> + Sett = get_user_settings(User, Server), + % TODO: check returned value + set_user_settings(User, Server, Sett#user_settings{dolog_default=true}), + {stop, ok}; +user_parse_query(_, <<"donotlog">>, User, Server, _Query) -> + Sett = get_user_settings(User, Server), + % TODO: check returned value + set_user_settings(User, Server, Sett#user_settings{dolog_default=false}), + {stop, ok}; +user_parse_query(Acc, _Action, _User, _Server, _Query) -> + Acc. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% webadmin funcs +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +vhost_messages_stats(Server, Query, Lang) -> + Res = case catch vhost_messages_parse_query(Server, Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + {Time, Value} = timer:tc(mod_logdb, get_vhost_stats, [Server]), + ?INFO_MSG("get_vhost_stats(~p) elapsed ~p sec", [Server, Time/1000000]), + %case get_vhost_stats(Server) of + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_vhost_stats: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_vhost_stats: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s">>), [Server])))]; + {ok, Dates} -> + Fun = fun({Date, Count}) -> + DateBin = iolist_to_binary(Date), + ID = jlib:encode_base64( << Server/binary, DateBin/binary >> ), + ?XE(<<"tr">>, + [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XE(<<"td">>, [?AC(DateBin, DateBin)]), + ?XC(<<"td">>, integer_to_binary(Count)) + ]) + end, + + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"Logged messages for ~s">>), [Server])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"Date">>), + ?XCT(<<"td">>, <<"Count">>) + ])]), + ?XE(<<"tbody">>, + lists:map(Fun, Dates) + )]), + ?BR, + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>) + ])] + end. + +vhost_messages_stats_at(Server, Query, Lang, Date) -> + {Time, Value} = timer:tc(mod_logdb, get_vhost_stats_at, [Server, Date]), + ?INFO_MSG("get_vhost_stats_at(~p,~p) elapsed ~p sec", [Server, Date, Time/1000000]), + %case get_vhost_stats_at(Server, Date) of + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_vhost_stats_at: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_vhost_stats_at: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching list">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s at ~s">>), [Server, Date])))]; + {ok, Stats} -> + Res = case catch vhost_messages_at_parse_query(Server, Date, Stats, Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + Fun = fun({User, Count}) -> + UserBin = iolist_to_binary(User), + ID = jlib:encode_base64( << UserBin/binary, Server/binary >> ), + ?XE(<<"tr">>, + [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XE(<<"td">>, [?AC(<< <<"../user/">>/binary, UserBin/binary, <<"/messages/">>/binary, Date/binary >>, UserBin)]), + ?XC(<<"td">>, integer_to_binary(Count)) + ]) + end, + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"Logged messages for ~s at ~s">>), [Server, Date])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"User">>), + ?XCT(<<"td">>, <<"Count">>) + ])]), + ?XE(<<"tbody">>, + lists:map(Fun, Stats) + )]), + ?BR, + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>) + ])] + end. + +user_messages_stats(User, Server, Query, Lang) -> + Jid = jlib:jid_to_string({User, Server, ""}), + + Res = case catch user_messages_parse_query(User, Server, Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + + {Time, Value} = timer:tc(mod_logdb, get_user_stats, [User, Server]), + ?INFO_MSG("get_user_stats(~p,~p) elapsed ~p sec", [User, Server, Time/1000000]), + + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_user_stats: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching days">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_user_stats: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching days">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s">>), [Jid])))]; + {ok, Dates} -> + Fun = fun({Date, Count}) -> + DateBin = iolist_to_binary(Date), + ID = jlib:encode_base64( << User/binary, DateBin/binary >> ), + ?XE(<<"tr">>, + [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XE(<<"td">>, [?AC(DateBin, DateBin)]), + ?XC(<<"td">>, iolist_to_binary(integer_to_list(Count))) + ]) + end, + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T("Logged messages for ~s"), [Jid])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"Date">>), + ?XCT(<<"td">>, <<"Count">>) + ])]), + ?XE(<<"tbody">>, + lists:map(Fun, Dates) + )]), + ?BR, + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>) + ])] + end. + +search_user_nick(User, List) -> + case lists:keysearch(User, 1, List) of + {value,{User, []}} -> + nothing; + {value,{User, Nick}} -> + Nick; + false -> + nothing + end. + +user_messages_stats_at(User, Server, Query, Lang, Date) -> + Jid = jlib:jid_to_string({User, Server, ""}), + + {Time, Value} = timer:tc(mod_logdb, get_user_messages_at, [User, Server, Date]), + ?INFO_MSG("get_user_messages_at(~p,~p,~p) elapsed ~p sec", [User, Server, Date, Time/1000000]), + case Value of + {'EXIT', CReason} -> + ?ERROR_MSG("Failed to get_user_messages_at: ~p", [CReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching messages">>))]; + {error, GReason} -> + ?ERROR_MSG("Failed to get_user_messages_at: ~p", [GReason]), + [?XC(<<"h1">>, ?T(<<"Error occupied while fetching messages">>))]; + {ok, []} -> + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"No logged messages for ~s at ~s">>), [Jid, Date])))]; + {ok, User_messages} -> + Res = case catch user_messages_at_parse_query(Server, + Date, + User_messages, + Query) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]), + error; + VResult -> VResult + end, + + UR = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]), + UserRoster = + lists:map(fun(Item) -> + {jlib:jid_to_string(Item#roster.jid), Item#roster.name} + end, UR), + + UniqUsers = lists:foldl(fun(#msg{peer_name=PName, peer_server=PServer}, List) -> + ToAdd = PName++"@"++PServer, + case lists:member(ToAdd, List) of + true -> List; + false -> lists:append([ToAdd], List) + end + end, [], User_messages), + + % Users to filter (sublist of UniqUsers) + CheckedUsers = case lists:keysearch(<<"filter">>, 1, Query) of + {value, _} -> + lists:filter(fun(UFUser) -> + ID = jlib:encode_base64(term_to_binary(UFUser)), + lists:member({<<"selected">>, ID}, Query) + end, UniqUsers); + false -> [] + end, + + % UniqUsers in html (noone selected -> everyone selected) + Users = lists:map(fun(UHUser) -> + ID = jlib:encode_base64(term_to_binary(UHUser)), + Input = case lists:member(UHUser, CheckedUsers) of + true -> [?INPUTC(<<"checkbox">>, <<"selected">>, ID)]; + false when CheckedUsers == [] -> [?INPUTC(<<"checkbox">>, <<"selected">>, ID)]; + false -> [?INPUT(<<"checkbox">>, <<"selected">>, ID)] + end, + Nick = + case search_user_nick(UHUser, UserRoster) of + nothing -> <<"">>; + N -> iolist_to_binary( " ("++ N ++")" ) + end, + ?XE(<<"tr">>, + [?XE(<<"td">>, Input), + ?XC(<<"td">>, iolist_to_binary(UHUser++Nick))]) + end, lists:sort(UniqUsers)), + % Messages to show (based on Users) + User_messages_filtered = case CheckedUsers of + [] -> User_messages; + _ -> lists:filter(fun(#msg{peer_name=PName, peer_server=PServer}) -> + lists:member(PName++"@"++PServer, CheckedUsers) + end, User_messages) + end, + + Msgs_Fun = fun(#msg{timestamp=Timestamp, + subject=Subject, + direction=Direction, + peer_name=PName, peer_server=PServer, peer_resource=PRes, + type=Type, + body=Body}) -> + Text = case Subject of + "" -> iolist_to_binary(Body); + _ -> iolist_to_binary([binary_to_list(?T(<<"Subject">>)) ++ ": " ++ Subject ++ "\n" ++ Body]) + end, + Resource = case PRes of + [] -> []; + undefined -> []; + R -> "/" ++ R + end, + UserNick = + case search_user_nick(PName++"@"++PServer, UserRoster) of + nothing when PServer == Server -> + PName; + nothing when Type == "groupchat", Direction == from -> + PName++"@"++PServer++Resource; + nothing -> + PName++"@"++PServer; + N -> N + end, + ID = jlib:encode_base64(term_to_binary(Timestamp)), + ?XE(<<"tr">>, + [?XE(<<"td">>, [?INPUT(<<"checkbox">>, <<"selected">>, ID)]), + ?XC(<<"td">>, iolist_to_binary(convert_timestamp(Timestamp))), + ?XC(<<"td">>, iolist_to_binary(atom_to_list(Direction)++": "++UserNick)), + ?XE(<<"td">>, [?XC(<<"pre">>, Text)])]) + end, + % Filtered user messages in html + Msgs = lists:map(Msgs_Fun, lists:sort(User_messages_filtered)), + + [?XC(<<"h1">>, list_to_binary(io_lib:format(?T(<<"Logged messages for ~s at ~s">>), [Jid, Date])))] ++ + case Res of + ok -> [?CT(<<"Submitted">>), ?P]; + error -> [?CT(<<"Bad format">>), ?P]; + nothing -> [] + end ++ + [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"User">>) + ] + ), + ?XE(<<"tbody">>, + Users + )]), + ?INPUTT(<<"submit">>, <<"filter">>, <<"Filter Selected">>) + ] ++ + [?XE(<<"table">>, + [?XE(<<"thead">>, + [?XE(<<"tr">>, + [?X(<<"td">>), + ?XCT(<<"td">>, <<"Date, Time">>), + ?XCT(<<"td">>, <<"Direction: Jid">>), + ?XCT(<<"td">>, <<"Body">>) + ])]), + ?XE(<<"tbody">>, + Msgs + )]), + ?INPUTT(<<"submit">>, <<"delete">>, <<"Delete Selected">>), + ?BR + ] + )] + end. diff --git a/src/mod_logdb.hrl b/src/mod_logdb.hrl new file mode 100644 index 0000000..d44f0df --- /dev/null +++ b/src/mod_logdb.hrl @@ -0,0 +1,35 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb.hrl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : +%%% Version : trunk +%%% Id : $Id: mod_logdb.hrl 1273 2009-02-05 18:12:57Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-define(logdb_debug, true). + +-ifdef(logdb_debug). +-define(MYDEBUG(Format, Args), io:format("D(~p:~p:~p) : "++Format++"~n", + [calendar:local_time(),?MODULE,?LINE]++Args)). +-else. +-define(MYDEBUG(_F,_A),[]). +-endif. + +-record(msg, {timestamp, + owner_name, + peer_name, peer_server, peer_resource, + direction, + type, subject, + body}). + +-record(user_settings, {owner_name, + dolog_default, + dolog_list=[], + donotlog_list=[]}). + +-define(INPUTC(Type, Name, Value), + ?XA(<<"input">>, [{<<"type">>, Type}, + {<<"name">>, Name}, + {<<"value">>, Value}, + {<<"checked">>, <<"true">>}])). diff --git a/src/mod_logdb_mnesia.erl b/src/mod_logdb_mnesia.erl new file mode 100644 index 0000000..a8ae766 --- /dev/null +++ b/src/mod_logdb_mnesia.erl @@ -0,0 +1,557 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_mnesia.erl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : mnesia backend for mod_logdb +%%% Version : trunk +%%% Id : $Id: mod_logdb_mnesia.erl 1273 2009-02-05 18:12:57Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_mnesia). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +-define(PROCNAME, mod_logdb_mnesia). +-define(CALL_TIMEOUT, 10000). + +-record(state, {vhost}). + +-record(stats, {user, at, count}). + +prefix() -> + "logdb_". + +suffix(VHost) -> + "_" ++ binary_to_list(VHost). + +stats_table(VHost) -> + list_to_atom(prefix() ++ "stats" ++ suffix(VHost)). + +table_name(VHost, Date) -> + list_to_atom(prefix() ++ "messages_" ++ Date ++ suffix(VHost)). + +settings_table(VHost) -> + list_to_atom(prefix() ++ "settings" ++ suffix(VHost)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, _Opts]) -> + case mnesia:system_info(is_running) of + yes -> + ok = create_stats_table(VHost), + ok = create_settings_table(VHost), + {ok, #state{vhost=VHost}}; + no -> + ?ERROR_MSG("Mnesia not running", []), + {stop, db_connection_failed}; + Status -> + ?ERROR_MSG("Mnesia status: ~p", [Status]), + {stop, db_connection_failed} + end. + +handle_call({log_message, Msg}, _From, #state{vhost=VHost}=State) -> + {reply, log_message_int(VHost, Msg), State}; +handle_call({rebuild_stats}, _From, #state{vhost=VHost}=State) -> + {atomic, ok} = delete_nonexistent_stats(VHost), + Reply = + lists:foreach(fun(Date) -> + rebuild_stats_at_int(VHost, Date) + end, get_dates_int(VHost)), + {reply, Reply, State}; +handle_call({rebuild_stats_at, Date}, _From, #state{vhost=VHost}=State) -> + Reply = rebuild_stats_at_int(VHost, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{vhost=VHost}=State) -> + Table = table_name(VHost, Date), + Fun = fun() -> + lists:foreach( + fun(Msg) -> + mnesia:write_lock_table(stats_table(VHost)), + mnesia:write_lock_table(Table), + mnesia:delete_object(Table, Msg, write) + end, Msgs) + end, + DRez = case mnesia:transaction(Fun) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to delete_messages_by_user_at at ~p for ~p: ~p", [Date, VHost, Reason]), + error; + _ -> + ok + end, + Reply = + case rebuild_stats_at_int(VHost, Date) of + error -> + error; + ok -> + DRez + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{vhost=VHost}=State) -> + {reply, delete_all_messages_by_user_at_int(User, VHost, Date), State}; +handle_call({delete_messages_at, Date}, _From, #state{vhost=VHost}=State) -> + Reply = + case mnesia:delete_table(table_name(VHost, Date)) of + {atomic, ok} -> + delete_stats_by_vhost_at_int(VHost, Date); + {aborted, Reason} -> + ?ERROR_MSG("Failed to delete_messages_at for ~p at ~p", [VHost, Date, Reason]), + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{vhost=VHost}=State) -> + Fun = fun(#stats{at=Date, count=Count}, Stats) -> + case lists:keysearch(Date, 1, Stats) of + false -> + lists:append(Stats, [{Date, Count}]); + {value, {_, TempCount}} -> + lists:keyreplace(Date, 1, Stats, {Date, TempCount+Count}) + end + end, + Reply = + case mnesia:transaction(fun() -> + mnesia:foldl(Fun, [], stats_table(VHost)) + end) of + {atomic, Result} -> {ok, mod_logdb:sort_stats(Result)}; + {aborted, Reason} -> {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{vhost=VHost}=State) -> + Fun = fun() -> + Pat = #stats{user='$1', at=Date, count='$2'}, + mnesia:select(stats_table(VHost), [{Pat, [], [['$1', '$2']]}]) + end, + Reply = + case mnesia:transaction(Fun) of + {atomic, Result} -> + {ok, lists:reverse(lists:keysort(2, [{User, Count} || [User, Count] <- Result]))}; + {aborted, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{vhost=VHost}=State) -> + {reply, get_user_stats_int(User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{vhost=VHost}=State) -> + Reply = + case mnesia:transaction(fun() -> + Pat = #msg{owner_name=User, _='_'}, + mnesia:select(table_name(VHost, Date), + [{Pat, [], ['$_']}]) + end) of + {atomic, Result} -> {ok, Result}; + {aborted, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{vhost=VHost}=State) -> + {reply, get_dates_int(VHost), State}; +handle_call({get_users_settings}, _From, #state{vhost=VHost}=State) -> + Reply = mnesia:dirty_match_object(settings_table(VHost), #user_settings{_='_'}), + {reply, {ok, Reply}, State}; +handle_call({get_user_settings, User}, _From, #state{vhost=VHost}=State) -> + Reply = + case mnesia:dirty_match_object(settings_table(VHost), #user_settings{owner_name=User, _='_'}) of + [] -> []; + [Setting] -> + Setting + end, + {reply, Reply, State}; +handle_call({set_user_settings, _User, Set}, _From, #state{vhost=VHost}=State) -> + ?MYDEBUG("~p~n~p", [settings_table(VHost), Set]), + Reply = mnesia:dirty_write(settings_table(VHost), Set), + ?MYDEBUG("~p", [Reply]), + {reply, Reply, State}; +handle_call({drop_user, User}, _From, #state{vhost=VHost}=State) -> + {ok, Dates} = get_user_stats_int(User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(User, VHost, Date) + end, Dates), + SDResult = delete_user_settings_int(User, VHost), + Reply = + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append(MDResult, [SDResult])) of + true -> + ok; + false -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, State) -> + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats}, ?CALL_TIMEOUT). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {drop_user, User}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message_int(VHost, #msg{timestamp=Timestamp}=MsgBin) -> + Date = mod_logdb:convert_timestamp_brief(Timestamp), + + Msg = #msg{timestamp = MsgBin#msg.timestamp, + owner_name = binary_to_list(MsgBin#msg.owner_name), + peer_name = binary_to_list(MsgBin#msg.peer_name), + peer_server = binary_to_list(MsgBin#msg.peer_server), + peer_resource = binary_to_list(MsgBin#msg.peer_resource), + direction = MsgBin#msg.direction, + type = binary_to_list(MsgBin#msg.type), + subject = binary_to_list(MsgBin#msg.subject), + body = binary_to_list(MsgBin#msg.body)}, + + ATable = table_name(VHost, Date), + Fun = fun() -> + mnesia:write_lock_table(ATable), + mnesia:write(ATable, Msg, write) + end, + % log message, increment stats for both users + case mnesia:transaction(Fun) of + % if table does not exists - create it and try to log message again + {aborted,{no_exists, _Table}} -> + case create_msg_table(VHost, Date) of + {aborted, CReason} -> + ?ERROR_MSG("Failed to log message: ~p", [CReason]), + error; + {atomic, ok} -> + ?MYDEBUG("Created msg table for ~s at ~s", [VHost, Date]), + log_message_int(VHost, MsgBin) + end; + {aborted, TReason} -> + ?ERROR_MSG("Failed to log message: ~p", [TReason]), + error; + {atomic, _} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + increment_user_stats(Msg#msg.owner_name, VHost, Date) + end. + +increment_user_stats(Owner, VHost, Date) -> + Fun = fun() -> + Pat = #stats{user=Owner, at=Date, count='$1'}, + mnesia:write_lock_table(stats_table(VHost)), + case mnesia:select(stats_table(VHost), [{Pat, [], ['$_']}]) of + [] -> + mnesia:write(stats_table(VHost), + #stats{user=Owner, + at=Date, + count=1}, + write); + [Stats] -> + mnesia:delete_object(stats_table(VHost), + #stats{user=Owner, + at=Date, + count=Stats#stats.count}, + write), + New = Stats#stats{count = Stats#stats.count+1}, + if + New#stats.count > 0 -> mnesia:write(stats_table(VHost), + New, + write); + true -> ok + end + end + end, + case mnesia:transaction(Fun) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to update stats for ~s@~s: ~p", [Owner, VHost, Reason]), + error; + {atomic, _} -> + ?MYDEBUG("Updated stats for ~s@~s", [Owner, VHost]), + ok + end. + +get_dates_int(VHost) -> + Tables = mnesia:system_info(tables), + lists:foldl(fun(ATable, Dates) -> + Table = term_to_binary(ATable), + case ejabberd_regexp:run( Table, << VHost/binary, <<"$">>/binary >> ) of + match -> + case re:run(Table, "_[0-9]+-[0-9]+-[0-9]+_") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(binary_to_list(Table), S+2, E-2)]); + nomatch -> + Dates + end; + nomatch -> + Dates + end + end, [], Tables). + +rebuild_stats_at_int(VHost, Date) -> + Table = table_name(VHost, Date), + STable = stats_table(VHost), + CFun = fun(Msg, Stats) -> + Owner = Msg#msg.owner_name, + case lists:keysearch(Owner, 1, Stats) of + {value, {_, Count}} -> + lists:keyreplace(Owner, 1, Stats, {Owner, Count + 1}); + false -> + lists:append(Stats, [{Owner, 1}]) + end + end, + DFun = fun(#stats{at=SDate} = Stat, _Acc) + when SDate == Date -> + mnesia:delete_object(stats_table(VHost), Stat, write); + (_Stat, _Acc) -> ok + end, + % TODO: Maybe unregister hooks ? + case mnesia:transaction(fun() -> + mnesia:write_lock_table(Table), + mnesia:write_lock_table(STable), + % Delete all stats for VHost at Date + mnesia:foldl(DFun, [], STable), + % Calc stats for VHost at Date + case mnesia:foldl(CFun, [], Table) of + [] -> empty; + AStats -> + % Write new calc'ed stats + lists:foreach(fun({Owner, Count}) -> + WStat = #stats{user=Owner, at=Date, count=Count}, + mnesia:write(stats_table(VHost), WStat, write) + end, AStats), + ok + end + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to rebuild_stats_at for ~p at ~p: ~p", [VHost, Date, Reason]), + error; + {atomic, ok} -> + ok; + {atomic, empty} -> + {atomic,ok} = mnesia:delete_table(Table), + ?MYDEBUG("Dropped table at ~p", [Date]), + ok + end. + +delete_nonexistent_stats(VHost) -> + Dates = get_dates_int(VHost), + mnesia:transaction(fun() -> + mnesia:foldl(fun(#stats{at=Date} = Stat, _Acc) -> + case lists:member(Date, Dates) of + false -> mnesia:delete_object(Stat); + true -> ok + end + end, ok, stats_table(VHost)) + end). + +delete_stats_by_vhost_at_int(VHost, Date) -> + StatsDelete = fun(#stats{at=SDate} = Stat, _Acc) + when SDate == Date -> + mnesia:delete_object(stats_table(VHost), Stat, write), + ok; + (_Msg, _Acc) -> ok + end, + case mnesia:transaction(fun() -> + mnesia:write_lock_table(stats_table(VHost)), + mnesia:foldl(StatsDelete, ok, stats_table(VHost)) + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to update stats at ~p for ~p: ~p", [Date, VHost, Reason]), + rebuild_stats_at_int(VHost, Date); + _ -> + ?INFO_MSG("Updated stats at ~p for ~p", [Date, VHost]), + ok + end. + +get_user_stats_int(User, VHost) -> + case mnesia:transaction(fun() -> + Pat = #stats{user=User, at='$1', count='$2'}, + mnesia:select(stats_table(VHost), [{Pat, [], [['$1', '$2']]}]) + end) of + {atomic, Result} -> + {ok, mod_logdb:sort_stats([{Date, Count} || [Date, Count] <- Result])}; + {aborted, Reason} -> + {error, Reason} + end. + +delete_all_messages_by_user_at_int(User, VHost, Date) -> + Table = table_name(VHost, Date), + MsgDelete = fun(#msg{owner_name=Owner} = Msg, _Acc) + when Owner == User -> + mnesia:delete_object(Table, Msg, write), + ok; + (_Msg, _Acc) -> ok + end, + DRez = case mnesia:transaction(fun() -> + mnesia:foldl(MsgDelete, ok, Table) + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to delete_all_messages_by_user_at for ~p@~p at ~p: ~p", [User, VHost, Date, Reason]), + error; + _ -> + ok + end, + case rebuild_stats_at_int(VHost, Date) of + error -> + error; + ok -> + DRez + end. + +delete_user_settings_int(User, VHost) -> + STable = settings_table(VHost), + case mnesia:dirty_match_object(STable, #user_settings{owner_name=User, _='_'}) of + [] -> + ok; + [UserSettings] -> + mnesia:dirty_delete_object(STable, UserSettings) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_stats_table(VHost) -> + SName = stats_table(VHost), + case mnesia:create_table(SName, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, stats)}, + {record_name, stats} + ]) of + {atomic, ok} -> + ?MYDEBUG("Created stats table for ~p", [VHost]), + lists:foreach(fun(Date) -> + rebuild_stats_at_int(VHost, Date) + end, get_dates_int(VHost)), + ok; + {aborted, {already_exists, _}} -> + ?MYDEBUG("Stats table for ~p already exists", [VHost]), + ok; + {aborted, Reason} -> + ?ERROR_MSG("Failed to create stats table: ~p", [Reason]), + error + end. + +create_settings_table(VHost) -> + SName = settings_table(VHost), + case mnesia:create_table(SName, + [{disc_copies, [node()]}, + {type, set}, + {attributes, record_info(fields, user_settings)}, + {record_name, user_settings} + ]) of + {atomic, ok} -> + ?MYDEBUG("Created settings table for ~p", [VHost]), + ok; + {aborted, {already_exists, _}} -> + ?MYDEBUG("Settings table for ~p already exists", [VHost]), + ok; + {aborted, Reason} -> + ?ERROR_MSG("Failed to create settings table: ~p", [Reason]), + error + end. + +create_msg_table(VHost, Date) -> + mnesia:create_table( + table_name(VHost, Date), + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, msg)}, + {record_name, msg}]). diff --git a/src/mod_logdb_mnesia_old.erl b/src/mod_logdb_mnesia_old.erl new file mode 100644 index 0000000..e962d9a --- /dev/null +++ b/src/mod_logdb_mnesia_old.erl @@ -0,0 +1,259 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_mnesia_old.erl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : mod_logmnesia backend for mod_logdb (should be used only for copy_tables functionality) +%%% Version : trunk +%%% Id : $Id: mod_logdb_mnesia_old.erl 1273 2009-02-05 18:12:57Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_mnesia_old). +-author('o.palij@gmail.com'). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). + +-export([start/2, stop/1, + log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + rebuild_stats_at1/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +-record(stats, {user, server, table, count}). +-record(msg, {to_user, to_server, to_resource, from_user, from_server, from_resource, id, type, subject, body, timestamp}). + +tables_prefix() -> "messages_". +% stats_table should not start with tables_prefix(VHost) ! +% i.e. lists:prefix(tables_prefix(VHost), atom_to_list(stats_table())) must be /= true +stats_table() -> list_to_atom("messages-stats"). +% table name as atom from Date +-define(ATABLE(Date), list_to_atom(tables_prefix() ++ Date)). +-define(LTABLE(Date), tables_prefix() ++ Date). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(_Opts, _VHost) -> + case mnesia:system_info(is_running) of + yes -> + ok = create_stats_table(), + {ok, ok}; + no -> + ?ERROR_MSG("Mnesia not running", []), + error; + Status -> + ?ERROR_MSG("Mnesia status: ~p", [Status]), + error + end. + +stop(_VHost) -> + ok. + +log_message(_VHost, _Msg) -> + error. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks (maintaince) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +rebuild_stats(_VHost) -> + ok. + +rebuild_stats_at(VHost, Date) -> + Table = ?LTABLE(Date), + {Time, Value}=timer:tc(?MODULE, rebuild_stats_at1, [VHost, Table]), + ?INFO_MSG("rebuild_stats_at ~p elapsed ~p sec: ~p~n", [Date, Time/1000000, Value]), + Value. +rebuild_stats_at1(VHost, Table) -> + CFun = fun(Msg, Stats) -> + To = Msg#msg.to_user ++ "@" ++ Msg#msg.to_server, + Stats_to = if + Msg#msg.to_server == VHost -> + case lists:keysearch(To, 1, Stats) of + {value, {Who_to, Count_to}} -> + lists:keyreplace(To, 1, Stats, {Who_to, Count_to + 1}); + false -> + lists:append(Stats, [{To, 1}]) + end; + true -> + Stats + end, + From = Msg#msg.from_user ++ "@" ++ Msg#msg.from_server, + Stats_from = if + Msg#msg.from_server == VHost -> + case lists:keysearch(From, 1, Stats_to) of + {value, {Who_from, Count_from}} -> + lists:keyreplace(From, 1, Stats_to, {Who_from, Count_from + 1}); + false -> + lists:append(Stats_to, [{From, 1}]) + end; + true -> + Stats_to + end, + Stats_from + end, + DFun = fun(#stats{table=STable, server=Server} = Stat, _Acc) + when STable == Table, Server == VHost -> + mnesia:delete_object(stats_table(), Stat, write); + (_Stat, _Acc) -> ok + end, + case mnesia:transaction(fun() -> + mnesia:write_lock_table(list_to_atom(Table)), + mnesia:write_lock_table(stats_table()), + % Calc stats for VHost at Date + AStats = mnesia:foldl(CFun, [], list_to_atom(Table)), + % Delete all stats for VHost at Date + mnesia:foldl(DFun, [], stats_table()), + % Write new calc'ed stats + lists:foreach(fun({Who, Count}) -> + Jid = jlib:string_to_jid(Who), + JUser = Jid#jid.user, + WStat = #stats{user=JUser, server=VHost, table=Table, count=Count}, + mnesia:write(stats_table(), WStat, write) + end, AStats) + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to rebuild_stats_at for ~p at ~p: ~p", [VHost, Table, Reason]), + error; + {atomic, _} -> + ok + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks (delete) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +delete_messages_by_user_at(_VHost, _Msgs, _Date) -> + error. + +delete_all_messages_by_user_at(_User, _VHost, _Date) -> + error. + +delete_messages_at(VHost, Date) -> + Table = list_to_atom(tables_prefix() ++ Date), + + DFun = fun(#msg{to_server=To_server, from_server=From_server}=Msg, _Acc) + when To_server == VHost; From_server == VHost -> + mnesia:delete_object(Table, Msg, write); + (_Msg, _Acc) -> ok + end, + + case mnesia:transaction(fun() -> + mnesia:foldl(DFun, [], Table) + end) of + {aborted, Reason} -> + ?ERROR_MSG("Failed to delete_messages_at for ~p at ~p: ~p", [VHost, Date, Reason]), + error; + {atomic, _} -> + ok + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks (get) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_vhost_stats(_VHost) -> + {error, "does not emplemented"}. + +get_vhost_stats_at(VHost, Date) -> + Fun = fun() -> + Pat = #stats{user='$1', server=VHost, table=tables_prefix()++Date, count = '$2'}, + mnesia:select(stats_table(), [{Pat, [], [['$1', '$2']]}]) + end, + case mnesia:transaction(Fun) of + {atomic, Result} -> + RFun = fun([User, Count]) -> + {User, Count} + end, + {ok, lists:reverse(lists:keysort(2, lists:map(RFun, Result)))}; + {aborted, Reason} -> {error, Reason} + end. + +get_user_stats(_User, _VHost) -> + {error, "does not emplemented"}. + +get_user_messages_at(User, VHost, Date) -> + Table_name = tables_prefix() ++ Date, + case mnesia:transaction(fun() -> + Pat_to = #msg{to_user=User, to_server=VHost, _='_'}, + Pat_from = #msg{from_user=User, from_server=VHost, _='_'}, + mnesia:select(list_to_atom(Table_name), + [{Pat_to, [], ['$_']}, + {Pat_from, [], ['$_']}]) + end) of + {atomic, Result} -> + Msgs = lists:map(fun(#msg{to_user=To_user, to_server=To_server, to_resource=To_res, + from_user=From_user, from_server=From_server, from_resource=From_res, + type=Type, + subject=Subj, + body=Body, timestamp=Timestamp} = _Msg) -> + Subject = case Subj of + "None" -> ""; + _ -> Subj + end, + {msg, To_user, To_server, To_res, From_user, From_server, From_res, Type, Subject, Body, Timestamp} + end, Result), + {ok, Msgs}; + {aborted, Reason} -> + {error, Reason} + end. + +get_dates(_VHost) -> + Tables = mnesia:system_info(tables), + MessagesTables = + lists:filter(fun(Table) -> + lists:prefix(tables_prefix(), atom_to_list(Table)) + end, + Tables), + lists:map(fun(Table) -> + lists:sublist(atom_to_list(Table), + length(tables_prefix())+1, + length(atom_to_list(Table))) + end, + MessagesTables). + +get_users_settings(_VHost) -> + {ok, []}. +get_user_settings(_User, _VHost) -> + {ok, []}. +set_user_settings(_User, _VHost, _Set) -> + ok. +drop_user(_User, _VHost) -> + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internal +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% called from db_logon/2 +create_stats_table() -> + SName = stats_table(), + case mnesia:create_table(SName, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, stats)}, + {record_name, stats} + ]) of + {atomic, ok} -> + ?INFO_MSG("Created stats table", []), + ok; + {aborted, {already_exists, _}} -> + ok; + {aborted, Reason} -> + ?ERROR_MSG("Failed to create stats table: ~p", [Reason]), + error + end. diff --git a/src/mod_logdb_mysql.erl b/src/mod_logdb_mysql.erl new file mode 100644 index 0000000..62f437c --- /dev/null +++ b/src/mod_logdb_mysql.erl @@ -0,0 +1,1055 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_mysql.erl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : MySQL backend for mod_logdb +%%% Version : trunk +%%% Id : $Id: mod_logdb_mysql.erl 1360 2009-07-30 06:00:14Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_mysql). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +% gen_server call timeout +-define(CALL_TIMEOUT, 30000). +-define(MYSQL_TIMEOUT, 60000). +-define(INDEX_SIZE, integer_to_list(170)). +-define(PROCNAME, mod_logdb_mysql). + +-import(mod_logdb, [list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + convert_timestamp_brief/1]). + +-record(state, {dbref, vhost, server, port, db, user, password}). + +% replace "." with "_" +escape_vhost(VHost) -> lists:map(fun(46) -> 95; + (A) -> A + end, binary_to_list(VHost)). +prefix() -> + "`logdb_". + +suffix(VHost) -> + "_" ++ escape_vhost(VHost) ++ "`". + +messages_table(VHost, Date) -> + prefix() ++ "messages_" ++ Date ++ suffix(VHost). + +stats_table(VHost) -> + prefix() ++ "stats" ++ suffix(VHost). + +temp_table(VHost) -> + prefix() ++ "temp" ++ suffix(VHost). + +settings_table(VHost) -> + prefix() ++ "settings" ++ suffix(VHost). + +users_table(VHost) -> + prefix() ++ "users" ++ suffix(VHost). +servers_table(VHost) -> + prefix() ++ "servers" ++ suffix(VHost). +resources_table(VHost) -> + prefix() ++ "resources" ++ suffix(VHost). + +ets_users_table(VHost) -> list_to_atom("logdb_users_" ++ binary_to_list(VHost)). +ets_servers_table(VHost) -> list_to_atom("logdb_servers_" ++ binary_to_list(VHost)). +ets_resources_table(VHost) -> list_to_atom("logdb_resources_" ++ binary_to_list(VHost)). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, Opts]) -> + crypto:start(), + + Server = gen_mod:get_opt(server, Opts, fun(A) -> A end, <<"localhost">>), + Port = gen_mod:get_opt(port, Opts, fun(A) -> A end, 3306), + DB = gen_mod:get_opt(db, Opts, fun(A) -> A end, <<"logdb">>), + User = gen_mod:get_opt(user, Opts, fun(A) -> A end, <<"root">>), + Password = gen_mod:get_opt(password, Opts, fun(A) -> A end, <<"">>), + + St = #state{vhost=VHost, + server=Server, port=Port, db=DB, + user=User, password=Password}, + + case open_mysql_connection(St) of + {ok, DBRef} -> + State = St#state{dbref=DBRef}, + ok = create_stats_table(State), + ok = create_settings_table(State), + ok = create_users_table(State), + % clear ets cache every ... + timer:send_interval(timer:hours(12), clear_ets_tables), + ok = create_servers_table(State), + ok = create_resources_table(State), + erlang:monitor(process, DBRef), + {ok, State}; + {error, Reason} -> + ?ERROR_MSG("MySQL connection failed: ~p~n", [Reason]), + {stop, db_connection_failed} + end. + +open_mysql_connection(#state{server=Server, port=Port, db=DB, + user=DBUser, password=Password} = _State) -> + LogFun = fun(debug, _Format, _Argument) -> + %?MYDEBUG(Format, Argument); + ok; + (error, Format, Argument) -> + ?ERROR_MSG(Format, Argument); + (Level, Format, Argument) -> + ?MYDEBUG("MySQL (~p)~n", [Level]), + ?MYDEBUG(Format, Argument) + end, + ?INFO_MSG("Opening mysql connection ~s@~s:~p/~s", [DBUser, Server, Port, DB]), + p1_mysql_conn:start(binary_to_list(Server), Port, + binary_to_list(DBUser), binary_to_list(Password), + binary_to_list(DB), LogFun). + +close_mysql_connection(DBRef) -> + ?MYDEBUG("Closing ~p mysql connection", [DBRef]), + catch p1_mysql_conn:stop(DBRef). + +handle_call({log_message, Msg}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Date = convert_timestamp_brief(Msg#msg.timestamp), + + Table = messages_table(VHost, Date), + Owner_id = get_user_id(DBRef, VHost, binary_to_list(Msg#msg.owner_name)), + Peer_name_id = get_user_id(DBRef, VHost, binary_to_list(Msg#msg.peer_name)), + Peer_server_id = get_server_id(DBRef, VHost, binary_to_list(Msg#msg.peer_server)), + Peer_resource_id = get_resource_id(DBRef, VHost, binary_to_list(Msg#msg.peer_resource)), + + Query = ["INSERT INTO ",Table," ", + "(owner_id,", + "peer_name_id,", + "peer_server_id,", + "peer_resource_id,", + "direction,", + "type,", + "subject,", + "body,", + "timestamp) ", + "VALUES ", + "('", Owner_id, "',", + "'", Peer_name_id, "',", + "'", Peer_server_id, "',", + "'", Peer_resource_id, "',", + "'", atom_to_list(Msg#msg.direction), "',", + "'", binary_to_list(Msg#msg.type), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.subject) ), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.body) ), "',", + "'", Msg#msg.timestamp, "');"], + + Reply = + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + increment_user_stats(DBRef, Msg#msg.owner_name, Owner_id, VHost, Peer_name_id, Peer_server_id, Date); + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#42S02">>) of + % Table doesn't exist + match -> + case create_msg_table(DBRef, VHost, Date) of + error -> + error; + ok -> + {updated, _} = sql_query_internal(DBRef, Query), + increment_user_stats(DBRef, binary_to_list(Msg#msg.owner_name), Owner_id, VHost, Peer_name_id, Peer_server_id, Date) + end; + _ -> + ?ERROR_MSG("Failed to log message: ~p", [Reason]), + error + end + end, + {reply, Reply, State}; +handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Reply = rebuild_stats_at_int(DBRef, VHost, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, [], _Date}, _From, State) -> + {reply, error, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) -> + ["\"",Timestamp,"\"",","] + end, Msgs), + + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + + Query = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE timestamp IN (", Temp1], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, Aff} -> + ?MYDEBUG("Aff=~p", [Aff]), + rebuild_stats_at_int(DBRef, VHost, Date); + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + ok = delete_all_messages_by_user_at_int(DBRef, User, VHost, Date), + ok = delete_stats_by_user_at_int(DBRef, User, VHost, Date), + {reply, ok, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Reply = + case sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Date),";"]) of + {updated, _} -> + Query = ["DELETE FROM ",stats_table(VHost)," " + "WHERE at=\"",Date,"\";"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at, sum(count) ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]}; + {error, Reason} -> + % TODO: Duplicate error message ? + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT username, sum(count) AS allcount ", + "FROM ",SName," ", + "JOIN ",users_table(VHost)," ON owner_id=user_id " + "WHERE at=\"",Date,"\" " + "GROUP BY username ", + "ORDER BY allcount DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, lists:reverse( + lists:keysort(2, + [ {User, list_to_integer(Count)} || [User, Count] <- Result]))}; + {error, Reason} -> + % TODO: + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + {reply, get_user_stats_int(DBRef, User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + TName = messages_table(VHost, Date), + UName = users_table(VHost), + SName = servers_table(VHost), + RName = resources_table(VHost), + Query = ["SELECT users.username,", + "servers.server,", + "resources.resource,", + "messages.direction," + "messages.type," + "messages.subject," + "messages.body," + "messages.timestamp " + "FROM ",TName," AS messages " + "JOIN ",UName," AS users ON peer_name_id=user_id ", + "JOIN ",SName," AS servers ON peer_server_id=server_id ", + "JOIN ",RName," AS resources ON peer_resource_id=resource_id ", + "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\" ", + "ORDER BY timestamp ASC;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + Fun = fun([Peer_name, Peer_server, Peer_resource, + Direction, + Type, + Subject, Body, + Timestamp]) -> + #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource, + direction=list_to_atom(Direction), + type=Type, + subject=Subject, body=Body, + timestamp=Timestamp} + end, + {ok, lists:map(Fun, Result)}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + [ Date || [Date] <- Result ]; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost)," ", + "JOIN ",users_table(VHost)," ON user_id=owner_id;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, lists:map(fun([Owner, DoLogDef, DoLogL, DoNotLogL]) -> + #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL) + } + end, Result)}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT dolog_default,dolog_list,donotlog_list FROM ",settings_table(VHost)," ", + "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\";"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, []} -> + {ok, []}; + {data, [[Owner, DoLogDef, DoLogL, DoNotLogL]]} -> + {ok, #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL)}}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef, + dolog_list=DoLogL, + donotlog_list=DoNotLogL}}, + _From, #state{dbref=DBRef, vhost=VHost} = State) -> + User_id = get_user_id(DBRef, VHost, User), + + Query = ["UPDATE ",settings_table(VHost)," ", + "SET dolog_default=",bool_to_list(DoLogDef),", ", + "dolog_list='",list_to_string(DoLogL),"', ", + "donotlog_list='",list_to_string(DoNotLogL),"' ", + "WHERE owner_id=\"",User_id,"\";"], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, 0} -> + IQuery = ["INSERT INTO ",settings_table(VHost)," ", + "(owner_id, dolog_default, dolog_list, donotlog_list) ", + "VALUES ", + "('",User_id,"', ",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + ?MYDEBUG("New settings for ~s@~s", [User, VHost]), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>) of + % Already exists + match -> + ok; + _ -> + ?ERROR_MSG("Failed setup user ~p@~p: ~p", [User, VHost, Reason]), + error + end + end; + {updated, 1} -> + ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, #state{vhost=VHost}=State) -> + ets:delete(ets_users_table(VHost)), + ets:delete(ets_servers_table(VHost)), + ?MYDEBUG("Stoping mysql backend for ~p", [VHost]), + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_cast({rebuild_stats}, State) -> + rebuild_all_stats_int(State), + {noreply, State}; +handle_cast({drop_user, User}, #state{vhost=VHost} = State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + {ok, Dates} = get_user_stats_int(DBRef, User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) + end, Dates), + StDResult = delete_all_stats_by_user_int(DBRef, User, VHost), + SDResult = delete_user_settings_int(DBRef, User, VHost), + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append([MDResult, [StDResult], [SDResult]])) of + true -> + ?INFO_MSG("Removed ~s@~s", [User, VHost]); + false -> + ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost]) + end, + close_mysql_connection(DBRef) + end, + spawn(Fun), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info(clear_ets_tables, State) -> + ets:delete_all_objects(ets_users_table(State#state.vhost)), + ets:delete_all_objects(ets_resources_table(State#state.vhost)), + {noreply, State}; +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, connection_dropped, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, #state{dbref=DBRef}=_State) -> + close_mysql_connection(DBRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {drop_user, User}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +increment_user_stats(DBRef, User_name, User_id, VHost, PNameID, PServerID, Date) -> + SName = stats_table(VHost), + UQuery = ["UPDATE ",SName," ", + "SET count=count+1 ", + "WHERE owner_id=\"",User_id,"\" AND peer_name_id=\"",PNameID,"\" AND peer_server_id=\"",PServerID,"\" AND at=\"",Date,"\";"], + + case sql_query_internal(DBRef, UQuery) of + {updated, 0} -> + IQuery = ["INSERT INTO ",SName," ", + "(owner_id, peer_name_id, peer_server_id, at, count) ", + "VALUES ", + "('",User_id,"', '",PNameID,"', '",PServerID,"', '",Date,"', '1');"], + case sql_query_internal(DBRef, IQuery) of + {updated, _} -> + ?MYDEBUG("New stats for ~s@~s at ~s", [User_name, VHost, Date]), + ok; + {error, _} -> + error + end; + {updated, _} -> + ?MYDEBUG("Updated stats for ~s@~s at ~s", [User_name, VHost, Date]), + ok; + {error, _} -> + error + end. + +get_dates_int(DBRef, VHost) -> + case sql_query_internal(DBRef, ["SHOW TABLES"]) of + {data, Tables} -> + lists:foldl(fun([Table], Dates) -> + Reg = lists:sublist(prefix(),2,length(prefix())) ++ ".*" ++ escape_vhost(VHost), + case re:run(Table, Reg) of + {match, [{1, _}]} -> + ?MYDEBUG("matched ~p against ~p", [Table, Reg]), + case re:run(Table,"[0-9]+-[0-9]+-[0-9]+") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(Table,S,E)]); + nomatch -> + Dates + end; + _ -> + Dates + end + end, [], Tables); + {error, _} -> + [] + end. + +rebuild_all_stats_int(#state{vhost=VHost}=State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + ok = delete_nonexistent_stats(DBRef, VHost), + case lists:filter(fun(Date) -> + case catch rebuild_stats_at_int(DBRef, VHost, Date) of + ok -> false; + error -> true; + {'EXIT', _} -> true + end + end, get_dates_int(DBRef, VHost)) of + [] -> ok; + FTables -> + ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]), + error + end, + close_mysql_connection(DBRef) + end, + spawn(Fun). + +rebuild_stats_at_int(DBRef, VHost, Date) -> + TempTable = temp_table(VHost), + Fun = fun() -> + Table = messages_table(VHost, Date), + STable = stats_table(VHost), + + DQuery = [ "DELETE FROM ",STable," ", + "WHERE at='",Date,"';"], + + ok = create_temp_table(DBRef, TempTable), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," WRITE, ",TempTable," WRITE;"]), + SQuery = ["INSERT INTO ",TempTable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,\"",Date,"\",count(*) ", + "FROM ",Table," GROUP BY owner_id,peer_name_id,peer_server_id;"], + case sql_query_internal(DBRef, SQuery) of + {updated, 0} -> + Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]), + case Count of + {data, [["0"]]} -> + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table,";"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + ok; + _ -> + ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]), + error + end; + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + SQuery1 = ["INSERT INTO ",STable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,at,count ", + "FROM ",TempTable,";"], + case sql_query_internal(DBRef, SQuery1) of + {updated, _} -> ok; + {error, _} -> error + end; + {error, _} -> error + end + end, + + case catch apply(Fun, []) of + ok -> + ?INFO_MSG("Rebuilded stats for ~p at ~p", [VHost, Date]), + ok; + error -> + error; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]), + error + end, + sql_query_internal(DBRef, ["UNLOCK TABLES;"]), + sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]), + ok. + + +delete_nonexistent_stats(DBRef, VHost) -> + Dates = get_dates_int(DBRef, VHost), + STable = stats_table(VHost), + + Temp = lists:flatmap(fun(Date) -> + ["\"",Date,"\"",","] + end, Dates), + + case Temp of + [] -> + ok; + _ -> + % replace last "," with ");" + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + Query = ["DELETE FROM ",STable," ", + "WHERE at NOT IN (", Temp1], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end + end. + +get_user_stats_int(DBRef, User, VHost) -> + SName = stats_table(VHost), + Query = ["SELECT at, sum(count) as allcount ", + "FROM ",SName," ", + "WHERE owner_id=\"",get_user_id(DBRef, VHost, User),"\" ", + "GROUP BY at " + "ORDER BY DATE(at) DESC;" + ], + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result]}; + {error, Result} -> + {error, Result} + end. + +delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) -> + DQuery = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, DQuery) of + {updated, _} -> + ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> + error + end. + +delete_all_stats_by_user_int(DBRef, User, VHost) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]), + ok; + {error, _} -> error + end. + +delete_stats_by_user_at_int(DBRef, User, VHost, Date) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\") ", + "AND at=\"",Date,"\";"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> error + end. + +delete_user_settings_int(DBRef, User, VHost) -> + Query = ["DELETE FROM ",settings_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]), + ok; + {error, Reason} -> + ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]), + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_temp_table(DBRef, Name) -> + Query = ["CREATE TABLE ",Name," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at VARCHAR(11), ", + "count INT(11) ", + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> ok; + {error, _Reason} -> error + end. + +create_stats_table(#state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["CREATE TABLE ",SName," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at varchar(20), ", + "count int(11), ", + "INDEX(owner_id, peer_name_id, peer_server_id), ", + "INDEX(at)" + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Created stats table for ~p", [VHost]), + rebuild_all_stats_int(State), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#42S01">>) of + match -> + ?MYDEBUG("Stats table for ~p already exists", [VHost]), + CheckQuery = ["SHOW COLUMNS FROM ",SName," LIKE 'peer_%_id';"], + case sql_query_internal(DBRef, CheckQuery) of + {data, Elems} when length(Elems) == 2 -> + ?MYDEBUG("Stats table structure is ok", []), + ok; + _ -> + ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []), + case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of + {updated, _} -> + ?INFO_MSG("Successfully dropped ~p", [SName]); + _ -> + ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName]) + end, + error + end; + _ -> + ?ERROR_MSG("Failed to create stats table for ~p: ~p", [VHost, Reason]), + error + end + end. + +create_settings_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = settings_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "owner_id MEDIUMINT UNSIGNED PRIMARY KEY, ", + "dolog_default TINYINT(1) NOT NULL DEFAULT 1, ", + "dolog_list TEXT, ", + "donotlog_list TEXT ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created settings table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_users_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = users_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "username TEXT NOT NULL, ", + "user_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(username(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created users table for ~p", [VHost]), + ets:new(ets_users_table(VHost), [named_table, set, public]), + %update_users_from_db(DBRef, VHost), + ok; + {error, _} -> + error + end. + +create_servers_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = servers_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "server TEXT NOT NULL, ", + "server_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(server(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created servers table for ~p", [VHost]), + ets:new(ets_servers_table(VHost), [named_table, set, public]), + update_servers_from_db(DBRef, VHost), + ok; + {error, _} -> + error + end. + +create_resources_table(#state{dbref=DBRef, vhost=VHost}) -> + RName = resources_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",RName," (", + "resource TEXT NOT NULL, ", + "resource_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(resource(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created resources table for ~p", [VHost]), + ets:new(ets_resources_table(VHost), [named_table, set, public]), + ok; + {error, _} -> + error + end. + +create_msg_table(DBRef, VHost, Date) -> + TName = messages_table(VHost, Date), + Query = ["CREATE TABLE ",TName," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "peer_resource_id MEDIUMINT(8) UNSIGNED, ", + "direction ENUM('to', 'from'), ", + "type ENUM('chat','error','groupchat','headline','normal') NOT NULL, ", + "subject TEXT, ", + "body TEXT, ", + "timestamp DOUBLE, ", + "INDEX search_i (owner_id, peer_name_id, peer_server_id, peer_resource_id), ", + "FULLTEXT (body) " + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _MySQLRes} -> + ?MYDEBUG("Created msg table for ~p at ~p", [VHost, Date]), + ok; + {error, _} -> + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internal ets cache (users, servers, resources) +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +update_servers_from_db(DBRef, VHost) -> + ?INFO_MSG("Reading servers from db for ~p", [VHost]), + SQuery = ["SELECT server, server_id FROM ",servers_table(VHost),";"], + {data, Result} = sql_query_internal(DBRef, SQuery), + true = ets:delete_all_objects(ets_servers_table(VHost)), + true = ets:insert(ets_servers_table(VHost), [ {Server, Server_id} || [Server, Server_id] <- Result]). + +%update_users_from_db(DBRef, VHost) -> +% ?INFO_MSG("Reading users from db for ~p", [VHost]), +% SQuery = ["SELECT username, user_id FROM ",users_table(VHost),";"], +% {data, Result} = sql_query_internal(DBRef, SQuery), +% true = ets:delete_all_objects(ets_users_table(VHost)), +% true = ets:insert(ets_users_table(VHost), [ {Username, User_id} || [Username, User_id] <- Result]). + +%get_user_name(DBRef, VHost, User_id) -> +% case ets:match(ets_users_table(VHost), {'$1', User_id}) of +% [[User]] -> User; +% % this can be in clustered environment +% [] -> +% %update_users_from_db(DBRef, VHost), +% SQuery = ["SELECT username FROM ",users_table(VHost)," ", +% "WHERE user_id=\"",User_id,"\";"], +% {data, [[Name]]} = sql_query_internal(DBRef, SQuery), +% % cache {user, id} pair +% ets:insert(ets_users_table(VHost), {Name, User_id}), +% Name +% end. + +%get_server_name(DBRef, VHost, Server_id) -> +% case ets:match(ets_servers_table(VHost), {'$1', Server_id}) of +% [[Server]] -> Server; + % this can be in clustered environment +% [] -> +% update_servers_from_db(DBRef, VHost), +% [[Server1]] = ets:match(ets_servers_table(VHost), {'$1', Server_id}), +% Server1 +% end. + +get_user_id_from_db(DBRef, VHost, User) -> + SQuery = ["SELECT user_id FROM ",users_table(VHost)," ", + "WHERE username=\"",User,"\";"], + case sql_query_internal(DBRef, SQuery) of + % no such user in db + {data, []} -> + {ok, []}; + {data, [[DBId]]} -> + % cache {user, id} pair + ets:insert(ets_users_table(VHost), {User, DBId}), + {ok, DBId} + end. +get_user_id(DBRef, VHost, User) -> + % Look at ets + case ets:match(ets_users_table(VHost), {User, '$1'}) of + [] -> + % Look at db + case get_user_id_from_db(DBRef, VHost, User) of + % no such user in db + {ok, []} -> + IQuery = ["INSERT INTO ",users_table(VHost)," ", + "SET username=\"",User,"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {ok, NewId} = get_user_id_from_db(DBRef, VHost, User), + NewId; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~p", [User]), + {ok, ClID} = get_user_id_from_db(DBRef, VHost, User), + ClID + end; + {ok, DBId} -> + DBId + end; + [[EtsId]] -> EtsId + end. + +get_server_id(DBRef, VHost, Server) -> + case ets:match(ets_servers_table(VHost), {Server, '$1'}) of + [] -> + IQuery = ["INSERT INTO ",servers_table(VHost)," ", + "SET server=\"",Server,"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + SQuery = ["SELECT server_id FROM ",servers_table(VHost)," ", + "WHERE server=\"",Server,"\";"], + {data, [[Id]]} = sql_query_internal(DBRef, SQuery), + ets:insert(ets_servers_table(VHost), {Server, Id}), + Id; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~p", [Server]), + update_servers_from_db(DBRef, VHost), + [[Id1]] = ets:match(ets_servers_table(VHost), {Server, '$1'}), + Id1 + end; + [[Id]] -> Id + end. + +get_resource_id_from_db(DBRef, VHost, Resource) -> + SQuery = ["SELECT resource_id FROM ",resources_table(VHost)," ", + "WHERE resource=\"",binary_to_list(ejabberd_odbc:escape(iolist_to_binary(Resource))),"\";"], + case sql_query_internal(DBRef, SQuery) of + % no such resource in db + {data, []} -> + {ok, []}; + {data, [[DBId]]} -> + % cache {resource, id} pair + ets:insert(ets_resources_table(VHost), {Resource, DBId}), + {ok, DBId} + end. +get_resource_id(DBRef, VHost, Resource) -> + % Look at ets + case ets:match(ets_resources_table(VHost), {Resource, '$1'}) of + [] -> + % Look at db + case get_resource_id_from_db(DBRef, VHost, Resource) of + % no such resource in db + {ok, []} -> + IQuery = ["INSERT INTO ",resources_table(VHost)," ", + "SET resource=\"",binary_to_list(ejabberd_odbc:escape(iolist_to_binary(Resource))),"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {ok, NewId} = get_resource_id_from_db(DBRef, VHost, Resource), + NewId; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~s", [Resource]), + {ok, ClID} = get_resource_id_from_db(DBRef, VHost, Resource), + ClID + end; + {ok, DBId} -> + DBId + end; + [[EtsId]] -> EtsId + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% SQL internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +sql_query_internal(DBRef, Query) -> + case sql_query_internal_silent(DBRef, Query) of + {error, Reason} -> + ?ERROR_MSG("~p while ~p", [Reason, lists:append(Query)]), + {error, Reason}; + Rez -> Rez + end. + +sql_query_internal_silent(DBRef, Query) -> + ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]), + get_result(p1_mysql_conn:fetch(DBRef, Query, self(), ?MYSQL_TIMEOUT)). + +get_result({updated, MySQLRes}) -> + {updated, p1_mysql:get_result_affected_rows(MySQLRes)}; +get_result({data, MySQLRes}) -> + {data, p1_mysql:get_result_rows(MySQLRes)}; +get_result({error, "query timed out"}) -> + {error, "query timed out"}; +get_result({error, MySQLRes}) -> + Reason = p1_mysql:get_result_reason(MySQLRes), + {error, Reason}. diff --git a/src/mod_logdb_mysql5.erl b/src/mod_logdb_mysql5.erl new file mode 100644 index 0000000..d1f399f --- /dev/null +++ b/src/mod_logdb_mysql5.erl @@ -0,0 +1,983 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_mysql5.erl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : MySQL 5 backend for mod_logdb +%%% Version : trunk +%%% Id : $Id: mod_logdb_mysql5.erl 1360 2009-07-30 06:00:14Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_mysql5). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +% gen_server call timeout +-define(CALL_TIMEOUT, 30000). +-define(MYSQL_TIMEOUT, 60000). +-define(INDEX_SIZE, integer_to_list(170)). +-define(PROCNAME, mod_logdb_mysql5). + +-import(mod_logdb, [list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + convert_timestamp_brief/1]). + +-record(state, {dbref, vhost, server, port, db, user, password}). + +% replace "." with "_" +escape_vhost(VHost) -> lists:map(fun(46) -> 95; + (A) -> A + end, binary_to_list(VHost)). +prefix() -> + "`logdb_". + +suffix(VHost) -> + "_" ++ escape_vhost(VHost) ++ "`". + +messages_table(VHost, Date) -> + prefix() ++ "messages_" ++ Date ++ suffix(VHost). + +% TODO: this needs to be redone to unify view name in stored procedure and in delete_messages_at/2 +view_table(VHost, Date) -> + Table = messages_table(VHost, Date), + TablewoQ = lists:sublist(Table, 2, length(Table) - 2), + lists:append(["`v_", TablewoQ, "`"]). + +stats_table(VHost) -> + prefix() ++ "stats" ++ suffix(VHost). + +temp_table(VHost) -> + prefix() ++ "temp" ++ suffix(VHost). + +settings_table(VHost) -> + prefix() ++ "settings" ++ suffix(VHost). + +users_table(VHost) -> + prefix() ++ "users" ++ suffix(VHost). +servers_table(VHost) -> + prefix() ++ "servers" ++ suffix(VHost). +resources_table(VHost) -> + prefix() ++ "resources" ++ suffix(VHost). + +logmessage_name(VHost) -> + prefix() ++ "logmessage" ++ suffix(VHost). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, Opts]) -> + crypto:start(), + + Server = gen_mod:get_opt(server, Opts, fun(A) -> A end, <<"localhost">>), + Port = gen_mod:get_opt(port, Opts, fun(A) -> A end, 3306), + DB = gen_mod:get_opt(db, Opts, fun(A) -> A end, <<"logdb">>), + User = gen_mod:get_opt(user, Opts, fun(A) -> A end, <<"root">>), + Password = gen_mod:get_opt(password, Opts, fun(A) -> A end, <<"">>), + + St = #state{vhost=VHost, + server=Server, port=Port, db=DB, + user=User, password=Password}, + + case open_mysql_connection(St) of + {ok, DBRef} -> + State = St#state{dbref=DBRef}, + ok = create_internals(State), + ok = create_stats_table(State), + ok = create_settings_table(State), + ok = create_users_table(State), + ok = create_servers_table(State), + ok = create_resources_table(State), + erlang:monitor(process, DBRef), + {ok, State}; + {error, Reason} -> + ?ERROR_MSG("MySQL connection failed: ~p~n", [Reason]), + {stop, db_connection_failed} + end. + +open_mysql_connection(#state{server=Server, port=Port, db=DB, + user=DBUser, password=Password} = _State) -> + LogFun = fun(debug, _Format, _Argument) -> + %?MYDEBUG(Format, Argument); + ok; + (error, Format, Argument) -> + ?ERROR_MSG(Format, Argument); + (Level, Format, Argument) -> + ?MYDEBUG("MySQL (~p)~n", [Level]), + ?MYDEBUG(Format, Argument) + end, + ?INFO_MSG("Opening mysql connection ~s@~s:~p/~s", [DBUser, Server, Port, DB]), + p1_mysql_conn:start(binary_to_list(Server), Port, + binary_to_list(DBUser), binary_to_list(Password), + binary_to_list(DB), LogFun). + +close_mysql_connection(DBRef) -> + ?MYDEBUG("Closing ~p mysql connection", [DBRef]), + catch p1_mysql_conn:stop(DBRef). + +handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Reply = rebuild_stats_at_int(DBRef, VHost, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, [], _Date}, _From, State) -> + {reply, error, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) -> + ["\"",Timestamp,"\"",","] + end, Msgs), + + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + + Query = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE timestamp IN (", Temp1], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, Aff} -> + ?MYDEBUG("Aff=~p", [Aff]), + rebuild_stats_at_int(DBRef, VHost, Date); + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + ok = delete_all_messages_by_user_at_int(DBRef, User, VHost, Date), + ok = delete_stats_by_user_at_int(DBRef, User, VHost, Date), + {reply, ok, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Fun = fun() -> + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Date),";"]), + TQuery = ["DELETE FROM ",stats_table(VHost)," " + "WHERE at=\"",Date,"\";"], + {updated, _} = sql_query_internal(DBRef, TQuery), + VQuery = ["DROP VIEW IF EXISTS ",view_table(VHost,Date),";"], + {updated, _} = sql_query_internal(DBRef, VQuery), + ok + end, + Reply = + case catch apply(Fun, []) of + ok -> + ok; + {'EXIT', _} -> + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at, sum(count) ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]}; + {error, Reason} -> + % TODO: Duplicate error message ? + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT username, sum(count) as allcount ", + "FROM ",SName," ", + "JOIN ",users_table(VHost)," ON owner_id=user_id " + "WHERE at=\"",Date,"\" ", + "GROUP BY username ", + "ORDER BY allcount DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {User, list_to_integer(Count)} || [User, Count] <- Result ]}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + {reply, get_user_stats_int(DBRef, User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT peer_name,", + "peer_server,", + "peer_resource,", + "direction," + "type," + "subject," + "body," + "timestamp " + "FROM ",view_table(VHost, Date)," " + "WHERE owner_name=\"",User,"\";"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + Fun = fun([Peer_name, Peer_server, Peer_resource, + Direction, + Type, + Subject, Body, + Timestamp]) -> + #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource, + direction=list_to_atom(Direction), + type=Type, + subject=Subject, body=Body, + timestamp=Timestamp} + end, + {ok, lists:map(Fun, Result)}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["SELECT at ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + [ Date || [Date] <- Result ]; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost)," ", + "JOIN ",users_table(VHost)," ON user_id=owner_id;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, lists:map(fun([Owner, DoLogDef, DoLogL, DoNotLogL]) -> + #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL) + } + end, Result)}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost}=State) -> + Query = ["SELECT dolog_default,dolog_list,donotlog_list FROM ",settings_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, []} -> + {ok, []}; + {data, [[Owner, DoLogDef, DoLogL, DoNotLogL]]} -> + {ok, #user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL)}}; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef, + dolog_list=DoLogL, + donotlog_list=DoNotLogL}}, + _From, #state{dbref=DBRef, vhost=VHost} = State) -> + User_id = get_user_id(DBRef, VHost, User), + Query = ["UPDATE ",settings_table(VHost)," ", + "SET dolog_default=",bool_to_list(DoLogDef),", ", + "dolog_list='",list_to_string(DoLogL),"', ", + "donotlog_list='",list_to_string(DoNotLogL),"' ", + "WHERE owner_id=",User_id,";"], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, 0} -> + IQuery = ["INSERT INTO ",settings_table(VHost)," ", + "(owner_id, dolog_default, dolog_list, donotlog_list) ", + "VALUES ", + "(",User_id,",",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + ?MYDEBUG("New settings for ~s@~s", [User, VHost]), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>) of + % Already exists + match -> + ok; + _ -> + ?ERROR_MSG("Failed setup user ~p@~p: ~p", [User, VHost, Reason]), + error + end + end; + {updated, 1} -> + ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, #state{vhost=VHost}=State) -> + ?MYDEBUG("Stoping mysql5 backend for ~p", [VHost]), + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + +handle_cast({log_message, Msg}, #state{dbref=DBRef, vhost=VHost}=State) -> + Fun = fun() -> + Date = convert_timestamp_brief(Msg#msg.timestamp), + TableName = messages_table(VHost, Date), + + Query = [ "CALL ",logmessage_name(VHost)," " + "('", TableName, "',", + "'", Date, "',", + "'", binary_to_list(Msg#msg.owner_name), "',", + "'", binary_to_list(Msg#msg.peer_name), "',", + "'", binary_to_list(Msg#msg.peer_server), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.peer_resource) ), "',", + "'", atom_to_list(Msg#msg.direction), "',", + "'", binary_to_list(Msg#msg.type), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.subject) ), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.body) ), "',", + "'", Msg#msg.timestamp, "');"], + + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + ok; + {error, _Reason} -> + error + end + end, + spawn(Fun), + {noreply, State}; +handle_cast({rebuild_stats}, State) -> + rebuild_all_stats_int(State), + {noreply, State}; +handle_cast({drop_user, User}, #state{vhost=VHost} = State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + {ok, Dates} = get_user_stats_int(DBRef, User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) + end, Dates), + StDResult = delete_all_stats_by_user_int(DBRef, User, VHost), + SDResult = delete_user_settings_int(DBRef, User, VHost), + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append([MDResult, [StDResult], [SDResult]])) of + true -> + ?INFO_MSG("Removed ~s@~s", [User, VHost]); + false -> + ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost]) + end, + close_mysql_connection(DBRef) + end, + spawn(Fun), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, connection_dropped, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, #state{dbref=DBRef}=_State) -> + close_mysql_connection(DBRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {log_message, Msg}). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {drop_user, User}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_dates_int(DBRef, VHost) -> + case sql_query_internal(DBRef, ["SHOW TABLES"]) of + {data, Tables} -> + lists:foldl(fun([Table], Dates) -> + Reg = lists:sublist(prefix(),2,length(prefix())) ++ ".*" ++ escape_vhost(VHost), + case re:run(Table, Reg) of + {match, [{1, _}]} -> + case re:run(Table,"[0-9]+-[0-9]+-[0-9]+") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(Table,S,E)]); + nomatch -> + Dates + end; + _ -> + Dates + end + end, [], Tables); + {error, _} -> + [] + end. + +rebuild_all_stats_int(#state{vhost=VHost}=State) -> + Fun = fun() -> + {ok, DBRef} = open_mysql_connection(State), + ok = delete_nonexistent_stats(DBRef, VHost), + case lists:filter(fun(Date) -> + case catch rebuild_stats_at_int(DBRef, VHost, Date) of + ok -> false; + error -> true; + {'EXIT', _} -> true + end + end, get_dates_int(DBRef, VHost)) of + [] -> ok; + FTables -> + ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]), + error + end, + close_mysql_connection(DBRef) + end, + spawn(Fun). + +rebuild_stats_at_int(DBRef, VHost, Date) -> + TempTable = temp_table(VHost), + Fun = fun() -> + Table = messages_table(VHost, Date), + STable = stats_table(VHost), + + DQuery = [ "DELETE FROM ",STable," ", + "WHERE at='",Date,"';"], + + ok = create_temp_table(DBRef, TempTable), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," WRITE, ",TempTable," WRITE;"]), + SQuery = ["INSERT INTO ",TempTable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,\"",Date,"\",count(*) ", + "FROM ",Table," WHERE ext is NULL GROUP BY owner_id,peer_name_id,peer_server_id;"], + case sql_query_internal(DBRef, SQuery) of + {updated, 0} -> + Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]), + case Count of + {data, [["0"]]} -> + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table,";"]), + sql_query_internal(DBRef, ["UNLOCK TABLES;"]), + {updated, _} = sql_query_internal(DBRef, ["DROP VIEW IF EXISTS ",view_table(VHost,Date),";"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + ok; + _ -> + ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]), + error + end; + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," WRITE, ",TempTable," WRITE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + SQuery1 = ["INSERT INTO ",STable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,at,count ", + "FROM ",TempTable,";"], + case sql_query_internal(DBRef, SQuery1) of + {updated, _} -> ok; + {error, _} -> error + end; + {error, _} -> error + end + end, + + case catch apply(Fun, []) of + ok -> + ?INFO_MSG("Rebuilded stats for ~p at ~p", [VHost, Date]), + ok; + error -> + error; + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]), + error + end, + sql_query_internal(DBRef, ["UNLOCK TABLES;"]), + sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]), + ok. + +delete_nonexistent_stats(DBRef, VHost) -> + Dates = get_dates_int(DBRef, VHost), + STable = stats_table(VHost), + + Temp = lists:flatmap(fun(Date) -> + ["\"",Date,"\"",","] + end, Dates), + case Temp of + [] -> + ok; + _ -> + % replace last "," with ");" + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + Query = ["DELETE FROM ",STable," ", + "WHERE at NOT IN (", Temp1], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end + end. + +get_user_stats_int(DBRef, User, VHost) -> + SName = stats_table(VHost), + UName = users_table(VHost), + Query = ["SELECT stats.at, sum(stats.count) ", + "FROM ",UName," AS users ", + "JOIN ",SName," AS stats ON owner_id=user_id " + "WHERE users.username=\"",User,"\" ", + "GROUP BY stats.at " + "ORDER BY DATE(stats.at) DESC;" + ], + case sql_query_internal(DBRef, Query) of + {data, Result} -> + {ok, [ {Date, list_to_integer(Count)} || [Date, Count] <- Result ]}; + {error, Result} -> + {error, Result} + end. + +delete_all_messages_by_user_at_int(DBRef, User, VHost, Date) -> + DQuery = ["DELETE FROM ",messages_table(VHost, Date)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, DQuery) of + {updated, _} -> + ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> + error + end. + +delete_all_stats_by_user_int(DBRef, User, VHost) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]), + ok; + {error, _} -> error + end. + +delete_stats_by_user_at_int(DBRef, User, VHost, Date) -> + SQuery = ["DELETE FROM ",stats_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\") ", + "AND at=\"",Date,"\";"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> error + end. + +delete_user_settings_int(DBRef, User, VHost) -> + Query = ["DELETE FROM ",settings_table(VHost)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost)," WHERE username=\"",User,"\");"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]), + ok; + {error, Reason} -> + ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]), + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_temp_table(DBRef, Name) -> + Query = ["CREATE TABLE ",Name," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at VARCHAR(11), ", + "count INT(11) ", + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> ok; + {error, _Reason} -> error + end. + +create_stats_table(#state{dbref=DBRef, vhost=VHost}=State) -> + SName = stats_table(VHost), + Query = ["CREATE TABLE ",SName," (", + "owner_id MEDIUMINT UNSIGNED, ", + "peer_name_id MEDIUMINT UNSIGNED, ", + "peer_server_id MEDIUMINT UNSIGNED, ", + "at VARCHAR(11), ", + "count INT(11), ", + "ext INTEGER DEFAULT NULL, " + "INDEX ext_i (ext), " + "INDEX(owner_id,peer_name_id,peer_server_id), ", + "INDEX(at) ", + ") ENGINE=MyISAM CHARACTER SET utf8;" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created stats table for ~p", [VHost]), + rebuild_all_stats_int(State), + ok; + {error, Reason} -> + case ejabberd_regexp:run(iolist_to_binary(Reason), <<"#42S01">>) of + match -> + ?MYDEBUG("Stats table for ~p already exists", [VHost]), + CheckQuery = ["SHOW COLUMNS FROM ",SName," LIKE 'peer_%_id';"], + case sql_query_internal(DBRef, CheckQuery) of + {data, Elems} when length(Elems) == 2 -> + ?MYDEBUG("Stats table structure is ok", []), + ok; + _ -> + ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []), + case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of + {updated, _} -> + ?INFO_MSG("Successfully dropped ~p", [SName]); + _ -> + ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName]) + end, + error + end; + _ -> + ?ERROR_MSG("Failed to create stats table for ~p: ~p", [VHost, Reason]), + error + end + end. + +create_settings_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = settings_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "owner_id MEDIUMINT UNSIGNED PRIMARY KEY, ", + "dolog_default TINYINT(1) NOT NULL DEFAULT 1, ", + "dolog_list TEXT, ", + "donotlog_list TEXT ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created settings table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_users_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = users_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "username TEXT NOT NULL, ", + "user_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(username(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created users table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_servers_table(#state{dbref=DBRef, vhost=VHost}) -> + SName = servers_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",SName," (", + "server TEXT NOT NULL, ", + "server_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(server(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created servers table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_resources_table(#state{dbref=DBRef, vhost=VHost}) -> + RName = resources_table(VHost), + Query = ["CREATE TABLE IF NOT EXISTS ",RName," (", + "resource TEXT NOT NULL, ", + "resource_id MEDIUMINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE, ", + "UNIQUE INDEX(resource(",?INDEX_SIZE,")) ", + ") ENGINE=InnoDB CHARACTER SET utf8;" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created resources table for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +create_internals(#state{dbref=DBRef, vhost=VHost}) -> + sql_query_internal(DBRef, ["DROP PROCEDURE IF EXISTS ",logmessage_name(VHost),";"]), + case sql_query_internal(DBRef, [get_logmessage(VHost)]) of + {updated, _} -> + ?MYDEBUG("Created logmessage for ~p", [VHost]), + ok; + {error, _} -> + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% SQL internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +sql_query_internal(DBRef, Query) -> + case sql_query_internal_silent(DBRef, Query) of + {error, Reason} -> + ?ERROR_MSG("~p while ~p", [Reason, lists:append(Query)]), + {error, Reason}; + Rez -> Rez + end. + +sql_query_internal_silent(DBRef, Query) -> + ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]), + get_result(p1_mysql_conn:fetch(DBRef, Query, self(), ?MYSQL_TIMEOUT)). + +get_result({updated, MySQLRes}) -> + {updated, p1_mysql:get_result_affected_rows(MySQLRes)}; +get_result({data, MySQLRes}) -> + {data, p1_mysql:get_result_rows(MySQLRes)}; +get_result({error, "query timed out"}) -> + {error, "query timed out"}; +get_result({error, MySQLRes}) -> + Reason = p1_mysql:get_result_reason(MySQLRes), + {error, Reason}. + +get_user_id(DBRef, VHost, User) -> + SQuery = ["SELECT user_id FROM ",users_table(VHost)," ", + "WHERE username=\"",User,"\";"], + case sql_query_internal(DBRef, SQuery) of + {data, []} -> + IQuery = ["INSERT INTO ",users_table(VHost)," ", + "SET username=\"",User,"\";"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {data, [[DBIdNew]]} = sql_query_internal(DBRef, SQuery), + DBIdNew; + {error, Reason} -> + % this can be in clustered environment + match = ejabberd_regexp:run(iolist_to_binary(Reason), <<"#23000">>), + ?ERROR_MSG("Duplicate key name for ~p", [User]), + {data, [[ClID]]} = sql_query_internal(DBRef, SQuery), + ClID + end; + {data, [[DBId]]} -> + DBId + end. + +get_logmessage(VHost) -> + UName = users_table(VHost), + SName = servers_table(VHost), + RName = resources_table(VHost), + StName = stats_table(VHost), + io_lib:format(" +CREATE PROCEDURE ~s(tablename TEXT, atdate TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(10), msubject TEXT, mbody TEXT, mtimestamp DOUBLE) +BEGIN + DECLARE ownerID MEDIUMINT UNSIGNED; + DECLARE peer_nameID MEDIUMINT UNSIGNED; + DECLARE peer_serverID MEDIUMINT UNSIGNED; + DECLARE peer_resourceID MEDIUMINT UNSIGNED; + DECLARE Vmtype VARCHAR(10); + DECLARE Vmtimestamp DOUBLE; + DECLARE Vmdirection VARCHAR(4); + DECLARE Vmbody TEXT; + DECLARE Vmsubject TEXT; + DECLARE iq TEXT; + DECLARE cq TEXT; + DECLARE viewname TEXT; + DECLARE notable INT; + DECLARE CONTINUE HANDLER FOR SQLSTATE '42S02' SET @notable = 1; + + SET @notable = 0; + SET @ownerID = NULL; + SET @peer_nameID = NULL; + SET @peer_serverID = NULL; + SET @peer_resourceID = NULL; + + SET @Vmtype = mtype; + SET @Vmtimestamp = mtimestamp; + SET @Vmdirection = mdirection; + SET @Vmbody = mbody; + SET @Vmsubject = msubject; + + SELECT user_id INTO @ownerID FROM ~s WHERE username=owner; + IF @ownerID IS NULL THEN + INSERT INTO ~s SET username=owner; + SET @ownerID = LAST_INSERT_ID(); + END IF; + + SELECT user_id INTO @peer_nameID FROM ~s WHERE username=peer_name; + IF @peer_nameID IS NULL THEN + INSERT INTO ~s SET username=peer_name; + SET @peer_nameID = LAST_INSERT_ID(); + END IF; + + SELECT server_id INTO @peer_serverID FROM ~s WHERE server=peer_server; + IF @peer_serverID IS NULL THEN + INSERT INTO ~s SET server=peer_server; + SET @peer_serverID = LAST_INSERT_ID(); + END IF; + + SELECT resource_id INTO @peer_resourceID FROM ~s WHERE resource=peer_resource; + IF @peer_resourceID IS NULL THEN + INSERT INTO ~s SET resource=peer_resource; + SET @peer_resourceID = LAST_INSERT_ID(); + END IF; + + SET @iq = CONCAT(\"INSERT INTO \",tablename,\" (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (@ownerID,@peer_nameID,@peer_serverID,@peer_resourceID,@Vmdirection,@Vmtype,@Vmsubject,@Vmbody,@Vmtimestamp);\"); + PREPARE insertmsg FROM @iq; + + IF @notable = 1 THEN + SET @cq = CONCAT(\"CREATE TABLE \",tablename,\" ( + owner_id MEDIUMINT UNSIGNED NOT NULL, + peer_name_id MEDIUMINT UNSIGNED NOT NULL, + peer_server_id MEDIUMINT UNSIGNED NOT NULL, + peer_resource_id MEDIUMINT(8) UNSIGNED NOT NULL, + direction ENUM('to', 'from') NOT NULL, + type ENUM('chat','error','groupchat','headline','normal') NOT NULL, + subject TEXT, + body TEXT, + timestamp DOUBLE NOT NULL, + ext INTEGER DEFAULT NULL, + INDEX search_i (owner_id, peer_name_id, peer_server_id, peer_resource_id), + INDEX ext_i (ext), + FULLTEXT (body) + ) ENGINE=MyISAM + PACK_KEYS=1 + CHARACTER SET utf8;\"); + PREPARE createtable FROM @cq; + EXECUTE createtable; + DEALLOCATE PREPARE createtable; + + SET @viewname = CONCAT(\"`v_\", TRIM(BOTH '`' FROM tablename), \"`\"); + SET @cq = CONCAT(\"CREATE OR REPLACE VIEW \",@viewname,\" AS + SELECT owner.username AS owner_name, + peer.username AS peer_name, + servers.server AS peer_server, + resources.resource AS peer_resource, + messages.direction, + messages.type, + messages.subject, + messages.body, + messages.timestamp + FROM + ~s owner, + ~s peer, + ~s servers, + ~s resources, + \", tablename,\" messages + WHERE + owner.user_id=messages.owner_id and + peer.user_id=messages.peer_name_id and + servers.server_id=messages.peer_server_id and + resources.resource_id=messages.peer_resource_id + ORDER BY messages.timestamp;\"); + PREPARE createview FROM @cq; + EXECUTE createview; + DEALLOCATE PREPARE createview; + + SET @notable = 0; + PREPARE insertmsg FROM @iq; + EXECUTE insertmsg; + ELSEIF @notable = 0 THEN + EXECUTE insertmsg; + END IF; + + DEALLOCATE PREPARE insertmsg; + + IF @notable = 0 THEN + UPDATE ~s SET count=count+1 WHERE owner_id=@ownerID AND peer_name_id=@peer_nameID AND peer_server_id=@peer_serverID AND at=atdate; + IF ROW_COUNT() = 0 THEN + INSERT INTO ~s (owner_id, peer_name_id, peer_server_id, at, count) VALUES (@ownerID, @peer_nameID, @peer_serverID, atdate, 1); + END IF; + END IF; +END;", [logmessage_name(VHost),UName,UName,UName,UName,SName,SName,RName,RName,UName,UName,SName,RName,StName,StName]). diff --git a/src/mod_logdb_pgsql.erl b/src/mod_logdb_pgsql.erl new file mode 100644 index 0000000..3c2ae95 --- /dev/null +++ b/src/mod_logdb_pgsql.erl @@ -0,0 +1,1108 @@ +% {ok, DBRef} = pgsql:connect([{host, "127.0.0.1"}, {database, "logdb"}, {user, "logdb"}, {password, "logdb"}, {port, 5432}, {as_binary, true}]). +% Schema = "test". +% pgsql:squery(DBRef, "CREATE TABLE test.\"logdb_stats_test\" (owner_id INTEGER, peer_name_id INTEGER, peer_server_id INTEGER, at VARCHAR(20), count integer);" ). +%%%---------------------------------------------------------------------- +%%% File : mod_logdb_pgsql.erl +%%% Author : Oleg Palij (mailto,xmpp:o.palij@gmail.com) +%%% Purpose : Posgresql backend for mod_logdb +%%% Version : trunk +%%% Id : $Id: mod_logdb_pgsql.erl 1360 2009-07-30 06:00:14Z malik $ +%%% Url : http://www.dp.uz.gov.ua/o.palij/mod_logdb/ +%%%---------------------------------------------------------------------- + +-module(mod_logdb_pgsql). +-author('o.palij@gmail.com'). + +-include("mod_logdb.hrl"). +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("logger.hrl"). + +-behaviour(gen_logdb). +-behaviour(gen_server). + +% gen_server +-export([code_change/3,handle_call/3,handle_cast/2,handle_info/2,init/1,terminate/2]). +% gen_mod +-export([start/2, stop/1]). +% gen_logdb +-export([log_message/2, + rebuild_stats/1, + rebuild_stats_at/2, + delete_messages_by_user_at/3, delete_all_messages_by_user_at/3, delete_messages_at/2, + get_vhost_stats/1, get_vhost_stats_at/2, get_user_stats/2, get_user_messages_at/3, + get_dates/1, + get_users_settings/1, get_user_settings/2, set_user_settings/3, + drop_user/2]). + +-export([view_table/3]). + +% gen_server call timeout +-define(CALL_TIMEOUT, 30000). +-define(PGSQL_TIMEOUT, 60000). +-define(PROCNAME, mod_logdb_pgsql). + +-import(mod_logdb, [list_to_bool/1, bool_to_list/1, + list_to_string/1, string_to_list/1, + convert_timestamp_brief/1]). + +-record(state, {dbref, vhost, server, port, db, user, password, schema}). + +% replace "." with "_" +escape_vhost(VHost) -> lists:map(fun(46) -> 95; + (A) -> A + end, binary_to_list(VHost)). + +prefix(Schema) -> + Schema ++ ".\"" ++ "logdb_". + +suffix(VHost) -> + "_" ++ escape_vhost(VHost) ++ "\"". + +messages_table(VHost, Schema, Date) -> + prefix(Schema) ++ "messages_" ++ Date ++ suffix(VHost). + +view_table(VHost, Schema, Date) -> + Table = messages_table(VHost, Schema, Date), + TablewoS = lists:sublist(Table, length(Schema) + 3, length(Table) - length(Schema) - 3), + lists:append([Schema, ".\"v_", TablewoS, "\""]). + +stats_table(VHost, Schema) -> + prefix(Schema) ++ "stats" ++ suffix(VHost). + +temp_table(VHost, Schema) -> + prefix(Schema) ++ "temp" ++ suffix(VHost). + +settings_table(VHost, Schema) -> + prefix(Schema) ++ "settings" ++ suffix(VHost). + +users_table(VHost, Schema) -> + prefix(Schema) ++ "users" ++ suffix(VHost). +servers_table(VHost, Schema) -> + prefix(Schema) ++ "servers" ++ suffix(VHost). +resources_table(VHost, Schema) -> + prefix(Schema) ++ "resources" ++ suffix(VHost). + +logmessage_name(VHost, Schema) -> + prefix(Schema) ++ "logmessage" ++ suffix(VHost). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_mod callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start(VHost, Opts) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:start({local, Proc}, ?MODULE, [VHost, Opts], []). + +stop(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {stop}, ?CALL_TIMEOUT). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_server callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init([VHost, Opts]) -> + Server = gen_mod:get_opt(server, Opts, fun(A) -> A end, <<"localhost">>), + DB = gen_mod:get_opt(db, Opts, fun(A) -> A end, <<"ejabberd_logdb">>), + User = gen_mod:get_opt(user, Opts, fun(A) -> A end, <<"root">>), + Port = gen_mod:get_opt(port, Opts, fun(A) -> A end, 5432), + Password = gen_mod:get_opt(password, Opts, fun(A) -> A end, <<"">>), + Schema = binary_to_list(gen_mod:get_opt(schema, Opts, fun(A) -> A end, <<"public">>)), + + ?MYDEBUG("Starting pgsql backend for ~s", [VHost]), + + St = #state{vhost=VHost, + server=Server, port=Port, db=DB, + user=User, password=Password, + schema=Schema}, + + case open_pgsql_connection(St) of + {ok, DBRef} -> + State = St#state{dbref=DBRef}, + ok = create_internals(State), + ok = create_stats_table(State), + ok = create_settings_table(State), + ok = create_users_table(State), + ok = create_servers_table(State), + ok = create_resources_table(State), + erlang:monitor(process, DBRef), + {ok, State}; + % this does not work + {error, Reason} -> + ?ERROR_MSG("PgSQL connection failed: ~p~n", [Reason]), + {stop, db_connection_failed}; + % and this too, becouse pgsql_conn do exit() which can not be catched + {'EXIT', Rez} -> + ?ERROR_MSG("Rez: ~p~n", [Rez]), + {stop, db_connection_failed} + end. + +open_pgsql_connection(#state{server=Server, port=Port, db=DB, schema=Schema, + user=User, password=Password} = _State) -> + ?INFO_MSG("Opening pgsql connection ~s@~s:~p/~s", [User, Server, Port, DB]), + {ok, DBRef} = pgsql:connect(Server, DB, User, Password, Port), + {updated, _} = sql_query_internal(DBRef, ["SET SEARCH_PATH TO ",Schema,";"]), + {ok, DBRef}. + +close_pgsql_connection(DBRef) -> + ?MYDEBUG("Closing ~p pgsql connection", [DBRef]), + pgsql:terminate(DBRef). + +handle_call({log_message, Msg}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Date = convert_timestamp_brief(Msg#msg.timestamp), + TableName = messages_table(VHost, Schema, Date), + ViewName = view_table(VHost, Schema, Date), + + Query = [ "SELECT ", logmessage_name(VHost, Schema)," " + "('", TableName, "',", + "'", ViewName, "',", + "'", Date, "',", + "'", binary_to_list(Msg#msg.owner_name), "',", + "'", binary_to_list(Msg#msg.peer_name), "',", + "'", binary_to_list(Msg#msg.peer_server), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.peer_resource) ), "',", + "'", atom_to_list(Msg#msg.direction), "',", + "'", binary_to_list(Msg#msg.type), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.subject) ), "',", + "'", binary_to_list( ejabberd_odbc:escape(Msg#msg.body) ), "',", + "'", Msg#msg.timestamp, "');"], + + case sql_query_internal_silent(DBRef, Query) of + % TODO: change this + {data, [{"0"}]} -> + ?MYDEBUG("Logged ok for ~s, peer: ~s", [ [Msg#msg.owner_name, <<"@">>, VHost], + [Msg#msg.peer_name, <<"@">>, Msg#msg.peer_server] ]), + ok; + {error, _Reason} -> + error + end, + {reply, ok, State}; +handle_call({rebuild_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Reply = rebuild_stats_at_int(DBRef, VHost, Schema, Date), + {reply, Reply, State}; +handle_call({delete_messages_by_user_at, [], _Date}, _From, State) -> + {reply, error, State}; +handle_call({delete_messages_by_user_at, Msgs, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Temp = lists:flatmap(fun(#msg{timestamp=Timestamp} = _Msg) -> + ["'",Timestamp,"'",","] + end, Msgs), + + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + + Query = ["DELETE FROM ",messages_table(VHost, Schema, Date)," ", + "WHERE timestamp IN (", Temp1], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, _} -> + rebuild_stats_at_int(DBRef, VHost, Schema, Date); + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({delete_all_messages_by_user_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + ok = delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date), + ok = delete_stats_by_user_at_int(DBRef, Schema, User, VHost, Date), + {reply, ok, State}; +handle_call({delete_messages_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + {updated, _} = sql_query_internal(DBRef, ["DROP VIEW ",view_table(VHost, Schema, Date),";"]), + Reply = + case sql_query_internal(DBRef, ["DROP TABLE ",messages_table(VHost, Schema, Date)," CASCADE;"]) of + {updated, _} -> + Query = ["DELETE FROM ",stats_table(VHost, Schema)," " + "WHERE at='",Date,"';"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({get_vhost_stats}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + Query = ["SELECT at, sum(count) ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY DATE(at) DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + {ok, [ {Date, list_to_integer(Count)} || {Date, Count} <- Recs]}; + {error, Reason} -> + % TODO: Duplicate error message ? + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_vhost_stats_at, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + Query = ["SELECT username, sum(count) AS allcount ", + "FROM ",SName," ", + "JOIN ",users_table(VHost, Schema)," ON owner_id=user_id ", + "WHERE at='",Date,"' ", + "GROUP BY username ", + "ORDER BY allcount DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + RFun = fun({User, Count}) -> + {User, list_to_integer(Count)} + end, + {ok, lists:reverse(lists:keysort(2, lists:map(RFun, Recs)))}; + {error, Reason} -> + % TODO: + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_stats, User}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + {reply, get_user_stats_int(DBRef, Schema, User, VHost), State}; +handle_call({get_user_messages_at, User, Date}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Query = ["SELECT peer_name,", + "peer_server,", + "peer_resource,", + "direction," + "type," + "subject," + "body," + "timestamp " + "FROM ",view_table(VHost, Schema, Date)," " + "WHERE owner_name='",User,"';"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + Fun = fun({Peer_name, Peer_server, Peer_resource, + Direction, + Type, + Subject, Body, + Timestamp}) -> + #msg{peer_name=Peer_name, peer_server=Peer_server, peer_resource=Peer_resource, + direction=list_to_atom(Direction), + type=Type, + subject=Subject, body=Body, + timestamp=Timestamp} + end, + {ok, lists:map(Fun, Recs)}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_dates}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + Query = ["SELECT at ", + "FROM ",SName," ", + "GROUP BY at ", + "ORDER BY at DESC;" + ], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Result} -> + [ Date || {Date} <- Result ]; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_users_settings}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Query = ["SELECT username,dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost, Schema)," ", + "JOIN ",users_table(VHost, Schema)," ON user_id=owner_id;"], + Reply = + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + {ok, [#user_settings{owner_name=Owner, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL) + } || {Owner, DoLogDef, DoLogL, DoNotLogL} <- Recs]}; + {error, Reason} -> + {error, Reason} + end, + {reply, Reply, State}; +handle_call({get_user_settings, User}, _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + Query = ["SELECT dolog_default,dolog_list,donotlog_list ", + "FROM ",settings_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + Reply = + case sql_query_internal_silent(DBRef, Query) of + {data, []} -> + {ok, []}; + {data, [{DoLogDef, DoLogL, DoNotLogL}]} -> + {ok, #user_settings{owner_name=User, + dolog_default=list_to_bool(DoLogDef), + dolog_list=string_to_list(DoLogL), + donotlog_list=string_to_list(DoNotLogL)}}; + {error, Reason} -> + ?ERROR_MSG("Failed to get_user_settings for ~s@~s: ~p", [User, VHost, Reason]), + error + end, + {reply, Reply, State}; +handle_call({set_user_settings, User, #user_settings{dolog_default=DoLogDef, + dolog_list=DoLogL, + donotlog_list=DoNotLogL}}, + _From, #state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + User_id = get_user_id(DBRef, VHost, Schema, User), + Query = ["UPDATE ",settings_table(VHost, Schema)," ", + "SET dolog_default=",bool_to_list(DoLogDef),", ", + "dolog_list='",list_to_string(DoLogL),"', ", + "donotlog_list='",list_to_string(DoNotLogL),"' ", + "WHERE owner_id=",User_id,";"], + + Reply = + case sql_query_internal(DBRef, Query) of + {updated, 0} -> + IQuery = ["INSERT INTO ",settings_table(VHost, Schema)," ", + "(owner_id, dolog_default, dolog_list, donotlog_list) ", + "VALUES ", + "(",User_id,", ",bool_to_list(DoLogDef),",'",list_to_string(DoLogL),"','",list_to_string(DoNotLogL),"');"], + case sql_query_internal(DBRef, IQuery) of + {updated, 1} -> + ?MYDEBUG("New settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end; + {updated, 1} -> + ?MYDEBUG("Updated settings for ~s@~s", [User, VHost]), + ok; + {error, _} -> + error + end, + {reply, Reply, State}; +handle_call({stop}, _From, State) -> + ?MYDEBUG("Stoping pgsql backend for ~p", [State#state.vhost]), + {stop, normal, ok, State}; +handle_call(Msg, _From, State) -> + ?INFO_MSG("Got call Msg: ~p, State: ~p", [Msg, State]), + {noreply, State}. + + +handle_cast({rebuild_stats}, State) -> + rebuild_all_stats_int(State), + {noreply, State}; +handle_cast({drop_user, User}, #state{vhost=VHost, schema=Schema}=State) -> + Fun = fun() -> + {ok, DBRef} = open_pgsql_connection(State), + {ok, Dates} = get_user_stats_int(DBRef, Schema, User, VHost), + MDResult = lists:map(fun({Date, _}) -> + delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date) + end, Dates), + StDResult = delete_all_stats_by_user_int(DBRef, Schema, User, VHost), + SDResult = delete_user_settings_int(DBRef, Schema, User, VHost), + case lists:all(fun(Result) when Result == ok -> + true; + (Result) when Result == error -> + false + end, lists:append([MDResult, [StDResult], [SDResult]])) of + true -> + ?INFO_MSG("Removed ~s@~s", [User, VHost]); + false -> + ?ERROR_MSG("Failed to remove ~s@~s", [User, VHost]) + end, + close_pgsql_connection(DBRef) + end, + spawn(Fun), + {noreply, State}; +handle_cast(Msg, State) -> + ?INFO_MSG("Got cast Msg:~p, State:~p", [Msg, State]), + {noreply, State}. + +handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, State) -> + {stop, connection_dropped, State}; +handle_info(Info, State) -> + ?INFO_MSG("Got Info:~p, State:~p", [Info, State]), + {noreply, State}. + +terminate(_Reason, #state{dbref=DBRef}=_State) -> + close_pgsql_connection(DBRef), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% gen_logdb callbacks +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +log_message(VHost, Msg) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {log_message, Msg}, ?CALL_TIMEOUT). +rebuild_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {rebuild_stats}). +rebuild_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {rebuild_stats_at, Date}, ?CALL_TIMEOUT). +delete_messages_by_user_at(VHost, Msgs, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_by_user_at, Msgs, Date}, ?CALL_TIMEOUT). +delete_all_messages_by_user_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_all_messages_by_user_at, User, Date}, ?CALL_TIMEOUT). +delete_messages_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {delete_messages_at, Date}, ?CALL_TIMEOUT). +get_vhost_stats(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats}, ?CALL_TIMEOUT). +get_vhost_stats_at(VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_vhost_stats_at, Date}, ?CALL_TIMEOUT). +get_user_stats(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_stats, User}, ?CALL_TIMEOUT). +get_user_messages_at(User, VHost, Date) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_messages_at, User, Date}, ?CALL_TIMEOUT). +get_dates(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_dates}, ?CALL_TIMEOUT). +get_users_settings(VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_users_settings}, ?CALL_TIMEOUT). +get_user_settings(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {get_user_settings, User}, ?CALL_TIMEOUT). +set_user_settings(User, VHost, Set) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:call(Proc, {set_user_settings, User, Set}, ?CALL_TIMEOUT). +drop_user(User, VHost) -> + Proc = gen_mod:get_module_proc(VHost, ?PROCNAME), + gen_server:cast(Proc, {drop_user, User}). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_dates_int(DBRef, VHost) -> + Query = ["SELECT n.nspname as \"Schema\", + c.relname as \"Name\", + CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' END as \"Type\", + r.rolname as \"Owner\" + FROM pg_catalog.pg_class c + JOIN pg_catalog.pg_roles r ON r.oid = c.relowner + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind IN ('r','') + AND n.nspname NOT IN ('pg_catalog', 'pg_toast') + AND c.relname ~ '^(.*",escape_vhost(VHost),".*)$' + AND pg_catalog.pg_table_is_visible(c.oid) + ORDER BY 1,2;"], + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + lists:foldl(fun({_Schema, Table, _Type, _Owner}, Dates) -> + case re:run(Table,"[0-9]+-[0-9]+-[0-9]+") of + {match, [{S, E}]} -> + lists:append(Dates, [lists:sublist(Table,S,E)]); + nomatch -> + Dates + end + end, [], Recs); + {error, _} -> + [] + end. + +rebuild_all_stats_int(#state{vhost=VHost, schema=Schema}=State) -> + Fun = fun() -> + {ok, DBRef} = open_pgsql_connection(State), + ok = delete_nonexistent_stats(DBRef, Schema, VHost), + case lists:filter(fun(Date) -> + case catch rebuild_stats_at_int(DBRef, VHost, Schema, Date) of + ok -> false; + error -> true; + {'EXIT', _} -> true + end + end, get_dates_int(DBRef, VHost)) of + [] -> ok; + FTables -> + ?ERROR_MSG("Failed to rebuild stats for ~p dates", [FTables]), + error + end, + close_pgsql_connection(DBRef) + end, + spawn(Fun). + +rebuild_stats_at_int(DBRef, VHost, Schema, Date) -> + TempTable = temp_table(VHost, Schema), + Fun = + fun() -> + Table = messages_table(VHost, Schema, Date), + STable = stats_table(VHost, Schema), + + DQuery = [ "DELETE FROM ",STable," ", + "WHERE at='",Date,"';"], + + ok = create_temp_table(DBRef, VHost, Schema), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",Table," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",TempTable," IN ACCESS EXCLUSIVE MODE;"]), + SQuery = ["INSERT INTO ",TempTable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,'",Date,"'",",count(*) ", + "FROM ",Table," GROUP BY owner_id,peer_name_id,peer_server_id;"], + case sql_query_internal(DBRef, SQuery) of + {updated, 0} -> + Count = sql_query_internal(DBRef, ["SELECT count(*) FROM ",Table,";"]), + case Count of + {data, [{"0"}]} -> + {updated, _} = sql_query_internal(DBRef, ["DROP VIEW ",view_table(VHost, Schema, Date),";"]), + {updated, _} = sql_query_internal(DBRef, ["DROP TABLE ",Table," CASCADE;"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + ok; + _ -> + ?ERROR_MSG("Failed to calculate stats for ~s table! Count was ~p.", [Date, Count]), + error + end; + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",STable," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, ["LOCK TABLE ",TempTable," IN ACCESS EXCLUSIVE MODE;"]), + {updated, _} = sql_query_internal(DBRef, DQuery), + SQuery1 = ["INSERT INTO ",STable," ", + "(owner_id,peer_name_id,peer_server_id,at,count) ", + "SELECT owner_id,peer_name_id,peer_server_id,at,count ", + "FROM ",TempTable,";"], + case sql_query_internal(DBRef, SQuery1) of + {updated, _} -> ok; + {error, _} -> error + end; + {error, _} -> error + end + end, % fun + + case sql_transaction_internal(DBRef, Fun) of + {atomic, _} -> + ?INFO_MSG("Rebuilded stats for ~s at ~s", [VHost, Date]), + ok; + {aborted, Reason} -> + ?ERROR_MSG("Failed to rebuild stats for ~s table: ~p.", [Date, Reason]), + error + end, + sql_query_internal(DBRef, ["DROP TABLE ",TempTable,";"]), + ok. + +delete_nonexistent_stats(DBRef, Schema, VHost) -> + Dates = get_dates_int(DBRef, VHost), + STable = stats_table(VHost, Schema), + + Temp = lists:flatmap(fun(Date) -> + ["'",Date,"'",","] + end, Dates), + + case Temp of + [] -> + ok; + _ -> + % replace last "," with ");" + Temp1 = lists:append([lists:sublist(Temp, length(Temp)-1), ");"]), + Query = ["DELETE FROM ",STable," ", + "WHERE at NOT IN (", Temp1], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ok; + {error, _} -> + error + end + end. + +get_user_stats_int(DBRef, Schema, User, VHost) -> + SName = stats_table(VHost, Schema), + UName = users_table(VHost, Schema), + Query = ["SELECT stats.at, sum(stats.count) ", + "FROM ",UName," AS users ", + "JOIN ",SName," AS stats ON owner_id=user_id " + "WHERE users.username='",User,"' ", + "GROUP BY stats.at " + "ORDER BY DATE(at) DESC;" + ], + case sql_query_internal(DBRef, Query) of + {data, Recs} -> + {ok, [ {Date, list_to_integer(Count)} || {Date, Count} <- Recs ]}; + {error, Result} -> + {error, Result} + end. + +delete_all_messages_by_user_at_int(DBRef, Schema, User, VHost, Date) -> + DQuery = ["DELETE FROM ",messages_table(VHost, Schema, Date)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + case sql_query_internal(DBRef, DQuery) of + {updated, _} -> + ?INFO_MSG("Dropped messages for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> + error + end. + +delete_all_stats_by_user_int(DBRef, Schema, User, VHost) -> + SQuery = ["DELETE FROM ",stats_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped all stats for ~s@~s", [User, VHost]), + ok; + {error, _} -> error + end. + +delete_stats_by_user_at_int(DBRef, Schema, User, VHost, Date) -> + SQuery = ["DELETE FROM ",stats_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"') ", + "AND at='",Date,"';"], + case sql_query_internal(DBRef, SQuery) of + {updated, _} -> + ?INFO_MSG("Dropped stats for ~s@~s at ~s", [User, VHost, Date]), + ok; + {error, _} -> error + end. + +delete_user_settings_int(DBRef, Schema, User, VHost) -> + Query = ["DELETE FROM ",settings_table(VHost, Schema)," ", + "WHERE owner_id=(SELECT user_id FROM ",users_table(VHost, Schema)," WHERE username='",User,"');"], + case sql_query_internal(DBRef, Query) of + {updated, _} -> + ?INFO_MSG("Dropped ~s@~s settings", [User, VHost]), + ok; + {error, Reason} -> + ?ERROR_MSG("Failed to drop ~s@~s settings: ~p", [User, VHost, Reason]), + error + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% tables internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +create_temp_table(DBRef, VHost, Schema) -> + TName = temp_table(VHost, Schema), + Query = ["CREATE TABLE ",TName," (", + "owner_id INTEGER, ", + "peer_name_id INTEGER, ", + "peer_server_id INTEGER, ", + "at VARCHAR(20), ", + "count INTEGER ", + ");" + ], + case sql_query_internal(DBRef, Query) of + {updated, _} -> ok; + {error, _Reason} -> error + end. + +create_stats_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + SName = stats_table(VHost, Schema), + + Fun = + fun() -> + Query = ["CREATE TABLE ",SName," (", + "owner_id INTEGER, ", + "peer_name_id INTEGER, ", + "peer_server_id INTEGER, ", + "at VARCHAR(20), ", + "count integer", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"s_search_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (owner_id, peer_name_id, peer_server_id);"]), + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"s_at_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (at);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create stats table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created stats table for ~s", [VHost]), + rebuild_all_stats_int(State), + ok; + {atomic, exists} -> + ?MYDEBUG("Stats table for ~s already exists", [VHost]), + {match, [{F, L}]} = re:run(SName, "\".*\""), + QTable = lists:sublist(SName, F+2, L-2), + OIDQuery = ["SELECT c.oid FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE c.relname='",QTable,"' AND pg_catalog.pg_table_is_visible(c.oid);"], + {data,[{OID}]} = sql_query_internal(DBRef, OIDQuery), + CheckQuery = ["SELECT a.attname FROM pg_catalog.pg_attribute a WHERE a.attrelid = '",OID,"' AND a.attnum > 0 AND NOT a.attisdropped AND a.attname ~ '^peer_.*_id$';"], + case sql_query_internal(DBRef, CheckQuery) of + {data, Elems} when length(Elems) == 2 -> + ?MYDEBUG("Stats table structure is ok", []), + ok; + _ -> + ?INFO_MSG("It seems like stats table structure is invalid. I will drop it and recreate", []), + case sql_query_internal(DBRef, ["DROP TABLE ",SName,";"]) of + {updated, _} -> + ?INFO_MSG("Successfully dropped ~p", [SName]); + _ -> + ?ERROR_MSG("Failed to drop ~p. You should drop it and restart module", [SName]) + end, + error + end; + {error, _} -> error + end. + +create_settings_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + SName = settings_table(VHost, Schema), + Query = ["CREATE TABLE ",SName," (", + "owner_id INTEGER PRIMARY KEY, ", + "dolog_default BOOLEAN, ", + "dolog_list TEXT DEFAULT '', ", + "donotlog_list TEXT DEFAULT ''", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + ?MYDEBUG("Created settings table for ~s", [VHost]), + ok; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + ?MYDEBUG("Settings table for ~s already exists", [VHost]), + ok; + _ -> + ?ERROR_MSG("Failed to create settings table for ~s: ~p", [VHost, Reason]), + error + end + end. + +create_users_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + SName = users_table(VHost, Schema), + + Fun = + fun() -> + Query = ["CREATE TABLE ",SName," (", + "username TEXT UNIQUE, ", + "user_id SERIAL PRIMARY KEY", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"username_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (username);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create users table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created users table for ~s", [VHost]), + ok; + {atomic, exists} -> + ?MYDEBUG("Users table for ~s already exists", [VHost]), + ok; + {aborted, _} -> error + end. + +create_servers_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + SName = servers_table(VHost, Schema), + Fun = + fun() -> + Query = ["CREATE TABLE ",SName," (", + "server TEXT UNIQUE, ", + "server_id SERIAL PRIMARY KEY", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"server_i_",Schema,"_",escape_vhost(VHost),"\" ON ",SName," (server);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create servers table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created servers table for ~s", [VHost]), + ok; + {atomic, exists} -> + ?MYDEBUG("Servers table for ~s already exists", [VHost]), + ok; + {aborted, _} -> error + end. + +create_resources_table(#state{dbref=DBRef, vhost=VHost, schema=Schema}) -> + RName = resources_table(VHost, Schema), + Fun = fun() -> + Query = ["CREATE TABLE ",RName," (", + "resource TEXT UNIQUE, ", + "resource_id SERIAL PRIMARY KEY", + ");" + ], + case sql_query_internal_silent(DBRef, Query) of + {updated, _} -> + {updated, _} = sql_query_internal(DBRef, ["CREATE INDEX \"resource_i_",Schema,"_",escape_vhost(VHost),"\" ON ",RName," (resource);"]), + created; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42P07"}} -> + exists; + _ -> + ?ERROR_MSG("Failed to create users table for ~s: ~p", [VHost, Reason]), + error + end + end + end, + case sql_transaction_internal(DBRef, Fun) of + {atomic, created} -> + ?MYDEBUG("Created resources table for ~s", [VHost]), + ok; + {atomic, exists} -> + ?MYDEBUG("Resources table for ~s already exists", [VHost]), + ok; + {aborted, _} -> error + end. + +create_internals(#state{dbref=DBRef, vhost=VHost, schema=Schema}=State) -> + sql_query_internal(DBRef, ["DROP FUNCTION IF EXISTS ",logmessage_name(VHost,Schema)," (tbname TEXT, atdt TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(9), msubj TEXT, mbody TEXT, mtimestamp DOUBLE PRECISION);"]), + case sql_query_internal(DBRef, [get_logmessage(VHost, Schema)]) of + {updated, _} -> + ?MYDEBUG("Created logmessage for ~p", [VHost]), + ok; + {error, Reason} -> + case lists:keysearch(code, 1, Reason) of + {value, {code, "42704"}} -> + ?ERROR_MSG("plpgsql language must be installed into database '~s'. Use CREATE LANGUAGE...", [State#state.db]), + error; + _ -> + error + end + end. + +get_user_id(DBRef, VHost, Schema, User) -> + SQuery = ["SELECT user_id FROM ",users_table(VHost, Schema)," ", + "WHERE username='",User,"';"], + case sql_query_internal(DBRef, SQuery) of + {data, []} -> + IQuery = ["INSERT INTO ",users_table(VHost, Schema)," ", + "VALUES ('",User,"');"], + case sql_query_internal_silent(DBRef, IQuery) of + {updated, _} -> + {data, [{DBIdNew}]} = sql_query_internal(DBRef, SQuery), + DBIdNew; + {error, Reason} -> + % this can be in clustered environment + {value, {code, "23505"}} = lists:keysearch(code, 1, Reason), + ?ERROR_MSG("Duplicate key name for ~p", [User]), + {data, [{ClID}]} = sql_query_internal(DBRef, SQuery), + ClID + end; + {data, [{DBId}]} -> + DBId + end. + +get_logmessage(VHost,Schema) -> + UName = users_table(VHost,Schema), + SName = servers_table(VHost,Schema), + RName = resources_table(VHost,Schema), + StName = stats_table(VHost,Schema), + io_lib:format("CREATE OR REPLACE FUNCTION ~s (tbname TEXT, vname TEXT, atdt TEXT, owner TEXT, peer_name TEXT, peer_server TEXT, peer_resource TEXT, mdirection VARCHAR(4), mtype VARCHAR(9), msubj TEXT, mbody TEXT, mtimestamp DOUBLE PRECISION) RETURNS INTEGER AS $$ +DECLARE + ownerID INTEGER; + peer_nameID INTEGER; + peer_serverID INTEGER; + peer_resourceID INTEGER; + tablename ALIAS for $1; + viewname ALIAS for $2; + atdate ALIAS for $3; +BEGIN + SELECT INTO ownerID user_id FROM ~s WHERE username = owner; + IF NOT FOUND THEN + INSERT INTO ~s (username) VALUES (owner); + ownerID := lastval(); + END IF; + + SELECT INTO peer_nameID user_id FROM ~s WHERE username = peer_name; + IF NOT FOUND THEN + INSERT INTO ~s (username) VALUES (peer_name); + peer_nameID := lastval(); + END IF; + + SELECT INTO peer_serverID server_id FROM ~s WHERE server = peer_server; + IF NOT FOUND THEN + INSERT INTO ~s (server) VALUES (peer_server); + peer_serverID := lastval(); + END IF; + + SELECT INTO peer_resourceID resource_id FROM ~s WHERE resource = peer_resource; + IF NOT FOUND THEN + INSERT INTO ~s (resource) VALUES (peer_resource); + peer_resourceID := lastval(); + END IF; + + BEGIN + EXECUTE 'INSERT INTO ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (' || ownerID || ',' || peer_nameID || ',' || peer_serverID || ',' || peer_resourceID || ',''' || mdirection || ''',''' || mtype || ''',' || quote_literal(msubj) || ',' || quote_literal(mbody) || ',' || mtimestamp || ')'; + EXCEPTION WHEN undefined_table THEN + EXECUTE 'CREATE TABLE ' || tablename || ' (' || + 'owner_id INTEGER, ' || + 'peer_name_id INTEGER, ' || + 'peer_server_id INTEGER, ' || + 'peer_resource_id INTEGER, ' || + 'direction VARCHAR(4) CHECK (direction IN (''to'',''from'')), ' || + 'type VARCHAR(9) CHECK (type IN (''chat'',''error'',''groupchat'',''headline'',''normal'')), ' || + 'subject TEXT, ' || + 'body TEXT, ' || + 'timestamp DOUBLE PRECISION)'; + EXECUTE 'CREATE INDEX \"search_i_' || '~s' || '_' || atdate || '_' || '~s' || '\"' || ' ON ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id)'; + + EXECUTE 'CREATE OR REPLACE VIEW ' || viewname || ' AS ' || + 'SELECT owner.username AS owner_name, ' || + 'peer.username AS peer_name, ' || + 'servers.server AS peer_server, ' || + 'resources.resource AS peer_resource, ' || + 'messages.direction, ' || + 'messages.type, ' || + 'messages.subject, ' || + 'messages.body, ' || + 'messages.timestamp ' || + 'FROM ' || + '~s owner, ' || + '~s peer, ' || + '~s servers, ' || + '~s resources, ' || + tablename || ' messages ' || + 'WHERE ' || + 'owner.user_id=messages.owner_id and ' || + 'peer.user_id=messages.peer_name_id and ' || + 'servers.server_id=messages.peer_server_id and ' || + 'resources.resource_id=messages.peer_resource_id ' || + 'ORDER BY messages.timestamp'; + + EXECUTE 'INSERT INTO ' || tablename || ' (owner_id, peer_name_id, peer_server_id, peer_resource_id, direction, type, subject, body, timestamp) VALUES (' || ownerID || ',' || peer_nameID || ',' || peer_serverID || ',' || peer_resourceID || ',''' || mdirection || ''',''' || mtype || ''',' || quote_literal(msubj) || ',' || quote_literal(mbody) || ',' || mtimestamp || ')'; + END; + + UPDATE ~s SET count=count+1 where at=atdate and owner_id=ownerID and peer_name_id=peer_nameID and peer_server_id=peer_serverID; + IF NOT FOUND THEN + INSERT INTO ~s (owner_id, peer_name_id, peer_server_id, at, count) VALUES (ownerID, peer_nameID, peer_serverID, atdate, 1); + END IF; + RETURN 0; +END; +$$ LANGUAGE plpgsql; +", [logmessage_name(VHost,Schema),UName,UName,UName,UName,SName,SName,RName,RName,Schema,escape_vhost(VHost),UName,UName,SName,RName,StName,StName]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% +% SQL internals +% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +% like do_transaction/2 in mysql_conn.erl (changeset by Yariv Sadan ) +sql_transaction_internal(DBRef, Fun) -> + case sql_query_internal(DBRef, ["BEGIN;"]) of + {updated, _} -> + case catch Fun() of + error = Err -> + rollback_internal(DBRef, Err); + {error, _} = Err -> + rollback_internal(DBRef, Err); + {'EXIT', _} = Err -> + rollback_internal(DBRef, Err); + Res -> + case sql_query_internal(DBRef, ["COMMIT;"]) of + {error, _} -> rollback_internal(DBRef, {commit_error}); + {updated, _} -> + case Res of + {atomic, _} -> Res; + _ -> {atomic, Res} + end + end + end; + {error, _} -> + {aborted, {begin_error}} + end. + +% like rollback/2 in mysql_conn.erl (changeset by Yariv Sadan ) +rollback_internal(DBRef, Reason) -> + Res = sql_query_internal(DBRef, ["ROLLBACK;"]), + {aborted, {Reason, {rollback_result, Res}}}. + +sql_query_internal(DBRef, Query) -> + case sql_query_internal_silent(DBRef, Query) of + {error, undefined, Rez} -> + ?ERROR_MSG("Got undefined result: ~p while ~p", [Rez, lists:append(Query)]), + {error, undefined}; + {error, Error} -> + ?ERROR_MSG("Failed: ~p while ~p", [Error, lists:append(Query)]), + {error, Error}; + Rez -> Rez + end. + +sql_query_internal_silent(DBRef, Query) -> + ?MYDEBUG("DOING: \"~s\"", [lists:append(Query)]), + % TODO: use pquery? + get_result(pgsql:squery(DBRef, Query)). + +get_result({ok, ["CREATE TABLE"]}) -> + {updated, 1}; +get_result({ok, ["DROP TABLE"]}) -> + {updated, 1}; +get_result({ok, ["ALTER TABLE"]}) -> + {updated, 1}; +get_result({ok,["DROP VIEW"]}) -> + {updated, 1}; +get_result({ok,["DROP FUNCTION"]}) -> + {updated, 1}; +get_result({ok, ["CREATE INDEX"]}) -> + {updated, 1}; +get_result({ok, ["CREATE FUNCTION"]}) -> + {updated, 1}; +get_result({ok, [{[$S, $E, $L, $E, $C, $T, $ | _Rest], _Rows, Recs}]}) -> + Fun = fun(Rec) -> + list_to_tuple( + lists:map(fun(Elem) when is_binary(Elem) -> + binary_to_list(Elem); + (Elem) when is_list(Elem) -> + Elem; + (Elem) when is_integer(Elem) -> + integer_to_list(Elem); + (Elem) when is_float(Elem) -> + float_to_list(Elem); + (Elem) when is_boolean(Elem) -> + atom_to_list(Elem); + (Elem) -> + ?ERROR_MSG("Unknown element type ~p", [Elem]), + Elem + end, Rec)) + end, + Res = lists:map(Fun, Recs), + %{data, [list_to_tuple(Rec) || Rec <- Recs]}; + {data, Res}; +get_result({ok, ["INSERT " ++ OIDN]}) -> + [_OID, N] = string:tokens(OIDN, " "), + {updated, list_to_integer(N)}; +get_result({ok, ["DELETE " ++ N]}) -> + {updated, list_to_integer(N)}; +get_result({ok, ["UPDATE " ++ N]}) -> + {updated, list_to_integer(N)}; +get_result({ok, ["BEGIN"]}) -> + {updated, 1}; +get_result({ok, ["LOCK TABLE"]}) -> + {updated, 1}; +get_result({ok, ["ROLLBACK"]}) -> + {updated, 1}; +get_result({ok, ["COMMIT"]}) -> + {updated, 1}; +get_result({ok, ["SET"]}) -> + {updated, 1}; +get_result({ok, [{error, Error}]}) -> + {error, Error}; +get_result(Rez) -> + {error, undefined, Rez}. + diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index df06bce..b460f46 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -757,6 +757,12 @@ handle_sync_event({change_state, NewStateData}, _From, handle_sync_event({process_item_change, Item, UJID}, _From, StateName, StateData) -> NSD = process_item_change(Item, StateData, UJID), {reply, {ok, NSD}, StateName, NSD}; +handle_sync_event({get_jid_nick, Jid}, _From, StateName, StateData) -> + R = case (?DICT):find(jlib:jid_tolower(Jid), StateData#state.users) of + error -> []; + {ok, {user, _, Nick, _, _}} -> Nick + end, + {reply, R, StateName, StateData}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, {reply, Reply, StateName, StateData}. diff --git a/src/mod_roster.erl b/src/mod_roster.erl index 31fbeb1..764a628 100644 --- a/src/mod_roster.erl +++ b/src/mod_roster.erl @@ -63,6 +63,8 @@ -include("ejabberd_web_admin.hrl"). +-include("mod_logdb.hrl"). + -export_type([subscription/0]). start(Host, Opts) -> @@ -1426,6 +1428,14 @@ user_roster(User, Server, Query, Lang) -> Query), Items = get_roster(LUser, LServer), SItems = lists:sort(Items), + + Settings = case gen_mod:is_loaded(Server, mod_logdb) of + true -> + mod_logdb:get_user_settings(User, Server); + false -> + [] + end, + FItems = case SItems of [] -> [?CT(<<"None">>)]; _ -> @@ -1483,7 +1493,33 @@ user_roster(User, Server, Query, Lang) -> [?INPUTT(<<"submit">>, <<"remove", (ejabberd_web_admin:term_to_id(R#roster.jid))/binary>>, - <<"Remove">>)])]) + <<"Remove">>)]), + case gen_mod:is_loaded(Server, mod_logdb) of + true -> + Peer = jlib:jid_to_string(R#roster.jid), + A = lists:member(Peer, Settings#user_settings.dolog_list), + B = lists:member(Peer, Settings#user_settings.donotlog_list), + {Name, Value} = + if + A -> + {<<"donotlog">>, <<"Do Not Log Messages">>}; + B -> + {<<"dolog">>, <<"Log Messages">>}; + Settings#user_settings.dolog_default == true -> + {<<"donotlog">>, <<"Do Not Log Messages">>}; + Settings#user_settings.dolog_default == false -> + {<<"dolog">>, <<"Log Messages">>} + end, + + ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], + [?INPUTT(<<"submit">>, + <>, + Value)]); + false -> + ?X([]) + end + ]) end, SItems)))])] end, @@ -1608,9 +1644,42 @@ user_roster_item_parse_query(User, Server, Items, = []}]}}), throw(submitted); - false -> ok - end - end + false -> + case lists:keysearch( + <<"donotlog">>, (ejabberd_web_admin:term_to_id(JID))/binary, 1, Query) of + {value, _} -> + Peer = jlib:jid_to_string(JID), + Settings = mod_logdb:get_user_settings(User, Server), + DNLL = case lists:member(Peer, Settings#user_settings.donotlog_list) of + false -> lists:append(Settings#user_settings.donotlog_list, [Peer]); + true -> Settings#user_settings.donotlog_list + end, + DLL = lists:delete(jlib:jid_to_string(JID), Settings#user_settings.dolog_list), + Sett = Settings#user_settings{donotlog_list=DNLL, dolog_list=DLL}, + % TODO: check returned value + ok = mod_logdb:set_user_settings(User, Server, Sett), + throw(nothing); + false -> + case lists:keysearch( + <<"dolog">>, (ejabberd_web_admin:term_to_id(JID))/binary, 1, Query) of + {value, _} -> + Peer = jlib:jid_to_string(JID), + Settings = mod_logdb:get_user_settings(User, Server), + DLL = case lists:member(Peer, Settings#user_settings.dolog_list) of + false -> lists:append(Settings#user_settings.dolog_list, [Peer]); + true -> Settings#user_settings.dolog_list + end, + DNLL = lists:delete(jlib:jid_to_string(JID), Settings#user_settings.donotlog_list), + Sett = Settings#user_settings{donotlog_list=DNLL, dolog_list=DLL}, + % TODO: check returned value + ok = mod_logdb:set_user_settings(User, Server, Sett), + throw(nothing); + false -> + ok + end % dolog + end % donotlog + end % remove + end % validate end, Items), nothing.