mirror of
https://github.com/processone/ejabberd
synced 2025-10-04 10:19:31 +02:00
Handle Redis connection in a separate module
This commit is contained in:
parent
4717d64d7a
commit
068db1a2d9
4 changed files with 196 additions and 48 deletions
|
@ -63,6 +63,7 @@ start(normal, _Args) ->
|
||||||
Sup = ejabberd_sup:start_link(),
|
Sup = ejabberd_sup:start_link(),
|
||||||
ejabberd_rdbms:start(),
|
ejabberd_rdbms:start(),
|
||||||
ejabberd_riak_sup:start(),
|
ejabberd_riak_sup:start(),
|
||||||
|
ejabberd_redis:start(),
|
||||||
ejabberd_sm:start(),
|
ejabberd_sm:start(),
|
||||||
cyrsasl:start(),
|
cyrsasl:start(),
|
||||||
% Profiling
|
% Profiling
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
add_global_option/2, add_local_option/2,
|
add_global_option/2, add_local_option/2,
|
||||||
get_global_option/2, get_local_option/2,
|
get_global_option/2, get_local_option/2,
|
||||||
get_global_option/3, get_local_option/3,
|
get_global_option/3, get_local_option/3,
|
||||||
get_option/2, get_option/3, add_option/2,
|
get_option/2, get_option/3, add_option/2, has_option/1,
|
||||||
get_vh_by_auth_method/1, is_file_readable/1,
|
get_vh_by_auth_method/1, is_file_readable/1,
|
||||||
get_version/0, get_myhosts/0, get_mylang/0,
|
get_version/0, get_myhosts/0, get_mylang/0,
|
||||||
prepare_opt_val/4, convert_table_to_binary/5,
|
prepare_opt_val/4, convert_table_to_binary/5,
|
||||||
|
@ -838,6 +838,10 @@ get_option(Opt, F, Default) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec has_option(atom() | {atom(), global | binary()}) -> any().
|
||||||
|
has_option(Opt) ->
|
||||||
|
get_option(Opt, fun(_) -> true end, false).
|
||||||
|
|
||||||
init_module_db_table(Modules) ->
|
init_module_db_table(Modules) ->
|
||||||
catch ets:new(module_db, [named_table, public, bag]),
|
catch ets:new(module_db, [named_table, public, bag]),
|
||||||
%% Dirty hack for mod_pubsub
|
%% Dirty hack for mod_pubsub
|
||||||
|
|
179
src/ejabberd_redis.erl
Normal file
179
src/ejabberd_redis.erl
Normal file
|
@ -0,0 +1,179 @@
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||||
|
%%% @copyright (C) 2016, Evgeny Khramtsov
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 8 May 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(ejabberd_redis).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
-behaviour(ejabberd_config).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start/0, start_link/0, q/1, qp/1, opt_type/1]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
-define(PROCNAME, 'ejabberd_redis_client').
|
||||||
|
|
||||||
|
-include("logger.hrl").
|
||||||
|
-include("ejabberd.hrl").
|
||||||
|
|
||||||
|
-record(state, {}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
start() ->
|
||||||
|
case lists:any(
|
||||||
|
fun(Host) ->
|
||||||
|
is_redis_configured(Host)
|
||||||
|
end, ?MYHOSTS) of
|
||||||
|
true ->
|
||||||
|
Spec = {?MODULE, {?MODULE, start_link, []},
|
||||||
|
permanent, 2000, worker, [?MODULE]},
|
||||||
|
supervisor:start_child(ejabberd_sup, Spec);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
q(Command) ->
|
||||||
|
try eredis:q(?PROCNAME, Command)
|
||||||
|
catch _:Reason -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
qp(Pipeline) ->
|
||||||
|
try eredis:qp(?PROCNAME, Pipeline)
|
||||||
|
catch _:Reason -> {error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
init([]) ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
connect(),
|
||||||
|
{ok, #state{}}.
|
||||||
|
|
||||||
|
handle_call(_Request, _From, State) ->
|
||||||
|
Reply = ok,
|
||||||
|
{reply, Reply, State}.
|
||||||
|
|
||||||
|
handle_cast(_Msg, State) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(connect, State) ->
|
||||||
|
connect(),
|
||||||
|
{noreply, State};
|
||||||
|
handle_info({'DOWN', _MRef, _Type, _Pid, Reason}, State) ->
|
||||||
|
?INFO_MSG("Redis connection has failed: ~p", [Reason]),
|
||||||
|
connect(),
|
||||||
|
{noreply, State};
|
||||||
|
handle_info({'EXIT', _, _}, State) ->
|
||||||
|
{noreply, State};
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?INFO_MSG("unexpected info = ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, _State) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
is_redis_configured(Host) ->
|
||||||
|
ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
|
||||||
|
PortConfigured = ejabberd_config:has_option({redis_port, Host}),
|
||||||
|
DBConfigured = ejabberd_config:has_option({redis_db, Host}),
|
||||||
|
PassConfigured = ejabberd_config:has_option({redis_password, Host}),
|
||||||
|
ReconnTimeoutConfigured = ejabberd_config:has_option(
|
||||||
|
{redis_reconnect_timeout, Host}),
|
||||||
|
ConnTimeoutConfigured = ejabberd_config:has_option(
|
||||||
|
{redis_connect_timeout, Host}),
|
||||||
|
Modules = ejabberd_config:get_option(
|
||||||
|
{modules, Host},
|
||||||
|
fun(L) when is_list(L) -> L end, []),
|
||||||
|
SMConfigured = ejabberd_config:get_option(
|
||||||
|
{sm_db_type, Host},
|
||||||
|
fun(V) -> V end) == redis,
|
||||||
|
ModuleWithRedisDBConfigured =
|
||||||
|
lists:any(
|
||||||
|
fun({Module, Opts}) ->
|
||||||
|
gen_mod:db_type(Host, Opts, Module) == redis
|
||||||
|
end, Modules),
|
||||||
|
ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
|
||||||
|
ReconnTimeoutConfigured or ConnTimeoutConfigured or
|
||||||
|
SMConfigured or ModuleWithRedisDBConfigured.
|
||||||
|
|
||||||
|
iolist_to_list(IOList) ->
|
||||||
|
binary_to_list(iolist_to_binary(IOList)).
|
||||||
|
|
||||||
|
connect() ->
|
||||||
|
Server = ejabberd_config:get_option(redis_server,
|
||||||
|
fun iolist_to_list/1,
|
||||||
|
"localhost"),
|
||||||
|
Port = ejabberd_config:get_option(redis_port,
|
||||||
|
fun(P) when is_integer(P),
|
||||||
|
P>0, P<65536 ->
|
||||||
|
P
|
||||||
|
end, 6379),
|
||||||
|
DB = ejabberd_config:get_option(redis_db,
|
||||||
|
fun(I) when is_integer(I), I >= 0 ->
|
||||||
|
I
|
||||||
|
end, 0),
|
||||||
|
Pass = ejabberd_config:get_option(redis_password,
|
||||||
|
fun iolist_to_list/1,
|
||||||
|
""),
|
||||||
|
ReconnTimeout = timer:seconds(
|
||||||
|
ejabberd_config:get_option(
|
||||||
|
redis_reconnect_timeout,
|
||||||
|
fun(I) when is_integer(I), I>0 -> I end,
|
||||||
|
1)),
|
||||||
|
ConnTimeout = timer:seconds(
|
||||||
|
ejabberd_config:get_option(
|
||||||
|
redis_connect_timeout,
|
||||||
|
fun(I) when is_integer(I), I>0 -> I end,
|
||||||
|
1)),
|
||||||
|
try case eredis:start_link(Server, Port, DB, Pass,
|
||||||
|
ReconnTimeout, ConnTimeout) of
|
||||||
|
{ok, Client} ->
|
||||||
|
?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]),
|
||||||
|
unlink(Client),
|
||||||
|
erlang:monitor(process, Client),
|
||||||
|
register(?PROCNAME, Client),
|
||||||
|
{ok, Client};
|
||||||
|
{error, Why} ->
|
||||||
|
erlang:error(Why)
|
||||||
|
end
|
||||||
|
catch _:Reason ->
|
||||||
|
Timeout = 10,
|
||||||
|
?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; "
|
||||||
|
"reconnecting in ~p seconds",
|
||||||
|
[Server, Port, Reason, Timeout]),
|
||||||
|
erlang:send_after(timer:seconds(Timeout), self(), connect)
|
||||||
|
end.
|
||||||
|
|
||||||
|
opt_type(redis_connect_timeout) ->
|
||||||
|
fun (I) when is_integer(I), I > 0 -> I end;
|
||||||
|
opt_type(redis_db) ->
|
||||||
|
fun (I) when is_integer(I), I >= 0 -> I end;
|
||||||
|
opt_type(redis_password) -> fun iolist_to_list/1;
|
||||||
|
opt_type(redis_port) ->
|
||||||
|
fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
|
||||||
|
opt_type(redis_reconnect_timeout) ->
|
||||||
|
fun (I) when is_integer(I), I > 0 -> I end;
|
||||||
|
opt_type(redis_server) -> fun iolist_to_list/1;
|
||||||
|
opt_type(_) ->
|
||||||
|
[redis_connect_timeout, redis_db, redis_password,
|
||||||
|
redis_port, redis_reconnect_timeout, redis_server].
|
|
@ -21,48 +21,12 @@
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("jlib.hrl").
|
-include("jlib.hrl").
|
||||||
|
|
||||||
-define(PROCNAME, 'ejabberd_redis_client').
|
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
-spec init() -> ok | {error, any()}.
|
-spec init() -> ok | {error, any()}.
|
||||||
init() ->
|
init() ->
|
||||||
Server = ejabberd_config:get_option(redis_server,
|
clean_table().
|
||||||
fun iolist_to_list/1,
|
|
||||||
"localhost"),
|
|
||||||
Port = ejabberd_config:get_option(redis_port,
|
|
||||||
fun(P) when is_integer(P),
|
|
||||||
P>0, P<65536 ->
|
|
||||||
P
|
|
||||||
end, 6379),
|
|
||||||
DB = ejabberd_config:get_option(redis_db,
|
|
||||||
fun(I) when is_integer(I), I >= 0 ->
|
|
||||||
I
|
|
||||||
end, 0),
|
|
||||||
Pass = ejabberd_config:get_option(redis_password,
|
|
||||||
fun iolist_to_list/1,
|
|
||||||
""),
|
|
||||||
ReconnTimeout = timer:seconds(
|
|
||||||
ejabberd_config:get_option(
|
|
||||||
redis_reconnect_timeout,
|
|
||||||
fun(I) when is_integer(I), I>0 -> I end,
|
|
||||||
1)),
|
|
||||||
ConnTimeout = timer:seconds(
|
|
||||||
ejabberd_config:get_option(
|
|
||||||
redis_connect_timeout,
|
|
||||||
fun(I) when is_integer(I), I>0 -> I end,
|
|
||||||
1)),
|
|
||||||
case eredis:start_link(Server, Port, DB, Pass,
|
|
||||||
ReconnTimeout, ConnTimeout) of
|
|
||||||
{ok, Client} ->
|
|
||||||
register(?PROCNAME, Client),
|
|
||||||
clean_table(),
|
|
||||||
ok;
|
|
||||||
{error, _} = Err ->
|
|
||||||
?ERROR_MSG("failed to start redis client: ~p", [Err]),
|
|
||||||
Err
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec set_session(#session{}) -> ok.
|
-spec set_session(#session{}) -> ok.
|
||||||
set_session(Session) ->
|
set_session(Session) ->
|
||||||
|
@ -71,8 +35,8 @@ set_session(Session) ->
|
||||||
SIDKey = sid_to_key(Session#session.sid),
|
SIDKey = sid_to_key(Session#session.sid),
|
||||||
ServKey = server_to_key(element(2, Session#session.us)),
|
ServKey = server_to_key(element(2, Session#session.us)),
|
||||||
USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
|
USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
|
||||||
case eredis:qp(?PROCNAME, [["HSET", USKey, SIDKey, T],
|
case ejabberd_redis:qp([["HSET", USKey, SIDKey, T],
|
||||||
["HSET", ServKey, USSIDKey, T]]) of
|
["HSET", ServKey, USSIDKey, T]]) of
|
||||||
[{ok, _}, {ok, _}] ->
|
[{ok, _}, {ok, _}] ->
|
||||||
ok;
|
ok;
|
||||||
Err ->
|
Err ->
|
||||||
|
@ -83,7 +47,7 @@ set_session(Session) ->
|
||||||
{ok, #session{}} | {error, notfound}.
|
{ok, #session{}} | {error, notfound}.
|
||||||
delete_session(LUser, LServer, _LResource, SID) ->
|
delete_session(LUser, LServer, _LResource, SID) ->
|
||||||
USKey = us_to_key({LUser, LServer}),
|
USKey = us_to_key({LUser, LServer}),
|
||||||
case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
|
case ejabberd_redis:q(["HGETALL", USKey]) of
|
||||||
{ok, Vals} ->
|
{ok, Vals} ->
|
||||||
Ss = decode_session_list(Vals),
|
Ss = decode_session_list(Vals),
|
||||||
case lists:keyfind(SID, #session.sid, Ss) of
|
case lists:keyfind(SID, #session.sid, Ss) of
|
||||||
|
@ -93,8 +57,8 @@ delete_session(LUser, LServer, _LResource, SID) ->
|
||||||
SIDKey = sid_to_key(SID),
|
SIDKey = sid_to_key(SID),
|
||||||
ServKey = server_to_key(element(2, Session#session.us)),
|
ServKey = server_to_key(element(2, Session#session.us)),
|
||||||
USSIDKey = us_sid_to_key(Session#session.us, SID),
|
USSIDKey = us_sid_to_key(Session#session.us, SID),
|
||||||
eredis:qp(?PROCNAME, [["HDEL", USKey, SIDKey],
|
ejabberd_redis:qp([["HDEL", USKey, SIDKey],
|
||||||
["HDEL", ServKey, USSIDKey]]),
|
["HDEL", ServKey, USSIDKey]]),
|
||||||
{ok, Session}
|
{ok, Session}
|
||||||
end;
|
end;
|
||||||
Err ->
|
Err ->
|
||||||
|
@ -112,7 +76,7 @@ get_sessions() ->
|
||||||
-spec get_sessions(binary()) -> [#session{}].
|
-spec get_sessions(binary()) -> [#session{}].
|
||||||
get_sessions(LServer) ->
|
get_sessions(LServer) ->
|
||||||
ServKey = server_to_key(LServer),
|
ServKey = server_to_key(LServer),
|
||||||
case eredis:q(?PROCNAME, ["HGETALL", ServKey]) of
|
case ejabberd_redis:q(["HGETALL", ServKey]) of
|
||||||
{ok, Vals} ->
|
{ok, Vals} ->
|
||||||
decode_session_list(Vals);
|
decode_session_list(Vals);
|
||||||
Err ->
|
Err ->
|
||||||
|
@ -123,7 +87,7 @@ get_sessions(LServer) ->
|
||||||
-spec get_sessions(binary(), binary()) -> [#session{}].
|
-spec get_sessions(binary(), binary()) -> [#session{}].
|
||||||
get_sessions(LUser, LServer) ->
|
get_sessions(LUser, LServer) ->
|
||||||
USKey = us_to_key({LUser, LServer}),
|
USKey = us_to_key({LUser, LServer}),
|
||||||
case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
|
case ejabberd_redis:q(["HGETALL", USKey]) of
|
||||||
{ok, Vals} when is_list(Vals) ->
|
{ok, Vals} when is_list(Vals) ->
|
||||||
decode_session_list(Vals);
|
decode_session_list(Vals);
|
||||||
Err ->
|
Err ->
|
||||||
|
@ -135,7 +99,7 @@ get_sessions(LUser, LServer) ->
|
||||||
[#session{}].
|
[#session{}].
|
||||||
get_sessions(LUser, LServer, LResource) ->
|
get_sessions(LUser, LServer, LResource) ->
|
||||||
USKey = us_to_key({LUser, LServer}),
|
USKey = us_to_key({LUser, LServer}),
|
||||||
case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
|
case ejabberd_redis:q(["HGETALL", USKey]) of
|
||||||
{ok, Vals} when is_list(Vals) ->
|
{ok, Vals} when is_list(Vals) ->
|
||||||
[S || S <- decode_session_list(Vals),
|
[S || S <- decode_session_list(Vals),
|
||||||
element(3, S#session.usr) == LResource];
|
element(3, S#session.usr) == LResource];
|
||||||
|
@ -172,7 +136,7 @@ clean_table() ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(LServer) ->
|
fun(LServer) ->
|
||||||
ServKey = server_to_key(LServer),
|
ServKey = server_to_key(LServer),
|
||||||
case eredis:q(?PROCNAME, ["HKEYS", ServKey]) of
|
case ejabberd_redis:q(["HKEYS", ServKey]) of
|
||||||
{ok, []} ->
|
{ok, []} ->
|
||||||
ok;
|
ok;
|
||||||
{ok, Vals} ->
|
{ok, Vals} ->
|
||||||
|
@ -189,7 +153,7 @@ clean_table() ->
|
||||||
SIDKey = sid_to_key(SID),
|
SIDKey = sid_to_key(SID),
|
||||||
["HDEL", USKey, SIDKey]
|
["HDEL", USKey, SIDKey]
|
||||||
end, Vals1),
|
end, Vals1),
|
||||||
Res = eredis:qp(?PROCNAME, [Q1|Q2]),
|
Res = ejabberd_redis:qp([Q1|Q2]),
|
||||||
case lists:filter(
|
case lists:filter(
|
||||||
fun({ok, _}) -> false;
|
fun({ok, _}) -> false;
|
||||||
(_) -> true
|
(_) -> true
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue