1
0
Fork 0
mirror of https://github.com/processone/ejabberd synced 2025-10-03 17:59:31 +02:00

* src/ejabberd.cfg.example: Updated

* src/ejabberd_s2s_in.erl: Added support for shapers

* src/ejabberd_c2s.erl: Moved receiver functions to
ejabberd_receiver module
* src/ejabberd_s2s_in.erl: Likewise
* src/ejabberd_receiver.erl: Likewise

SVN Revision: 173
This commit is contained in:
Alexey Shchepin 2003-11-10 21:25:36 +00:00
parent 592d738114
commit b18ed05e92
5 changed files with 122 additions and 76 deletions

View file

@ -1,5 +1,14 @@
2003-11-10 Alexey Shchepin <alexey@sevcom.net> 2003-11-10 Alexey Shchepin <alexey@sevcom.net>
* src/ejabberd.cfg.example: Updated
* src/ejabberd_s2s_in.erl: Added support for shapers
* src/ejabberd_c2s.erl: Moved receiver functions to
ejabberd_receiver module
* src/ejabberd_s2s_in.erl: Likewise
* src/ejabberd_receiver.erl: Likewise
* src/mod_muc/mod_muc_room.erl: Bugfix * src/mod_muc/mod_muc_room.erl: Bugfix
* src/ejabberd_sm.erl (route_message): Bugfix * src/ejabberd_sm.erl (route_message): Bugfix

View file

@ -48,10 +48,16 @@
% Set shaper with name "normal" to limit traffic speed to 1000B/s % Set shaper with name "normal" to limit traffic speed to 1000B/s
{shaper, normal, {maxrate, 1000}}. {shaper, normal, {maxrate, 1000}}.
% Set shaper with name "fast" to limit traffic speed to 50000B/s
{shaper, fast, {maxrate, 50000}}.
% For all users except admins used "normal" shaper % For all users except admins used "normal" shaper
{access, c2s_shaper, [{none, admin}, {access, c2s_shaper, [{none, admin},
{normal, all}]}. {normal, all}]}.
% For all S2S connections used "fast" shaper
{access, s2s_shaper, [{fast, all}]}.
% Admins of this server are also admins of MUC service: % Admins of this server are also admins of MUC service:
{access, muc_admin, [{allow, admin}]}. {access, muc_admin, [{allow, admin}]}.
@ -64,7 +70,7 @@
{shaper, c2s_shaper}]}, {shaper, c2s_shaper}]},
{5223, ejabberd_c2s, [{access, c2s}, {5223, ejabberd_c2s, [{access, c2s},
{ssl, [{certfile, "./ssl.pem"}]}]}, {ssl, [{certfile, "./ssl.pem"}]}]},
{5269, ejabberd_s2s_in, []}, {5269, ejabberd_s2s_in, [{shaper, s2s_shaper}]},
{8888, ejabberd_service, [{hosts, {8888, ejabberd_service, [{hosts,
["conference.e.localhost", ["conference.e.localhost",
"muc.e.localhost"], "muc.e.localhost"],

View file

@ -15,7 +15,6 @@
%% External exports %% External exports
-export([start/2, -export([start/2,
start_link/2, start_link/2,
receiver/4,
send_text/2, send_text/2,
send_element/2]). send_element/2]).
@ -102,8 +101,7 @@ start_link(SockData, Opts) ->
%% {stop, StopReason} %% {stop, StopReason}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
init([{SockMod, Socket}, Opts]) -> init([{SockMod, Socket}, Opts]) ->
ReceiverPid = proc_lib:spawn( ReceiverPid = ejabberd_receiver:start(Socket, SockMod, none),
?MODULE, receiver, [Socket, SockMod, none, self()]),
Access = case lists:keysearch(access, 1, Opts) of Access = case lists:keysearch(access, 1, Opts) of
{value, {_, A}} -> A; {value, {_, A}} -> A;
_ -> all _ -> all
@ -303,7 +301,7 @@ wait_for_sasl_auth({xmlstreamelement, El}, StateData) ->
Mech, Mech,
ClientIn) of ClientIn) of
{ok, Props} -> {ok, Props} ->
StateData#state.receiver ! reset_stream, ejabberd_receiver:reset_stream(StateData#state.receiver),
send_element(StateData, send_element(StateData,
{xmlelement, "success", {xmlelement, "success",
[{"xmlns", ?NS_SASL}], []}), [{"xmlns", ?NS_SASL}], []}),
@ -366,7 +364,7 @@ wait_for_sasl_response({xmlstreamelement, El}, StateData) ->
case cyrsasl:server_step(StateData#state.sasl_state, case cyrsasl:server_step(StateData#state.sasl_state,
ClientIn) of ClientIn) of
{ok, Props} -> {ok, Props} ->
StateData#state.receiver ! reset_stream, ejabberd_receiver:reset_stream(StateData#state.receiver),
send_element(StateData, send_element(StateData,
{xmlelement, "success", {xmlelement, "success",
[{"xmlns", ?NS_SASL}], []}), [{"xmlns", ?NS_SASL}], []}),
@ -812,55 +810,9 @@ terminate(Reason, StateName, StateData) ->
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
receiver(Socket, SockMod, Shaper, C2SPid) ->
XMLStreamPid = xml_stream:start(C2SPid),
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
ssl ->
20;
_ ->
infinity
end,
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout).
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout) ->
case catch SockMod:recv(Socket, 0, Timeout) of
{ok, Text} ->
ShaperSt1 = receive
{change_shaper, Shaper} ->
shaper:new(Shaper)
after 0 ->
ShaperState
end,
NewShaperState = shaper:update(ShaperSt1, size(Text)),
XMLStreamPid1 = receive
reset_stream ->
exit(XMLStreamPid, closed),
xml_stream:start(C2SPid)
after 0 ->
XMLStreamPid
end,
xml_stream:send_text(XMLStreamPid1, Text),
receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamPid1,
Timeout);
{error, timeout} ->
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid,
Timeout);
{error, Reason} ->
exit(XMLStreamPid, closed),
gen_fsm:send_event(C2SPid, closed),
ok;
{'EXIT', Reason} ->
?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n",
[Socket, SockMod, Reason]),
exit(XMLStreamPid, closed),
gen_fsm:send_event(C2SPid, closed),
ok
end.
change_shaper(StateData, JID) -> change_shaper(StateData, JID) ->
Shaper = acl:match_rule(StateData#state.shaper, JID), Shaper = acl:match_rule(StateData#state.shaper, JID),
StateData#state.receiver ! {change_shaper, Shaper}. ejabberd_receiver:change_shaper(StateData#state.receiver, Shaper).
send_text(StateData, Text) -> send_text(StateData, Text) ->
(StateData#state.sockmod):send(StateData#state.socket, Text). (StateData#state.sockmod):send(StateData#state.socket, Text).

78
src/ejabberd_receiver.erl Normal file
View file

@ -0,0 +1,78 @@
%%%----------------------------------------------------------------------
%%% File : ejabberd_receiver.erl
%%% Author : Alexey Shchepin <alexey@sevcom.net>
%%% Purpose : Socket receiver for C2S and S2S connections
%%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@sevcom.net>
%%% Id : $Id$
%%%----------------------------------------------------------------------
-module(ejabberd_receiver).
-author('alexey@sevcom.net').
-vsn('$Revision$ ').
-export([start/3,
receiver/4,
change_shaper/2,
reset_stream/1]).
-include("ejabberd.hrl").
start(Socket, SockMod, Shaper) ->
proc_lib:spawn(?MODULE, receiver, [Socket, SockMod, Shaper, self()]).
receiver(Socket, SockMod, Shaper, C2SPid) ->
XMLStreamPid = xml_stream:start(C2SPid),
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
ssl ->
20;
_ ->
infinity
end,
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout).
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout) ->
case catch SockMod:recv(Socket, 0, Timeout) of
{ok, Text} ->
ShaperSt1 = receive
{change_shaper, Shaper} ->
shaper:new(Shaper)
after 0 ->
ShaperState
end,
NewShaperState = shaper:update(ShaperSt1, size(Text)),
XMLStreamPid1 = receive
reset_stream ->
exit(XMLStreamPid, closed),
xml_stream:start(C2SPid)
after 0 ->
XMLStreamPid
end,
xml_stream:send_text(XMLStreamPid1, Text),
receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamPid1,
Timeout);
{error, timeout} ->
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid,
Timeout);
{error, Reason} ->
exit(XMLStreamPid, closed),
gen_fsm:send_event(C2SPid, closed),
ok;
{'EXIT', Reason} ->
?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n",
[Socket, SockMod, Reason]),
exit(XMLStreamPid, closed),
gen_fsm:send_event(C2SPid, closed),
ok
end.
change_shaper(Pid, Shaper) ->
Pid ! {change_shaper, Shaper}.
reset_stream(Pid) ->
Pid ! reset_stream.

View file

@ -13,7 +13,10 @@
-behaviour(gen_fsm). -behaviour(gen_fsm).
%% External exports %% External exports
-export([start/2, start_link/2, receiver/2, send_text/2, send_element/2]). -export([start/2,
start_link/2,
send_text/2,
send_element/2]).
%% gen_fsm callbacks %% gen_fsm callbacks
-export([init/1, -export([init/1,
@ -30,7 +33,10 @@
-define(DICT, dict). -define(DICT, dict).
-record(state, {socket, receiver, streamid, -record(state, {socket,
receiver,
streamid,
shaper,
connections = ?DICT:new()}). connections = ?DICT:new()}).
@ -68,8 +74,8 @@
start(SockData, Opts) -> start(SockData, Opts) ->
supervisor:start_child(ejabberd_s2s_in_sup, [SockData, Opts]). supervisor:start_child(ejabberd_s2s_in_sup, [SockData, Opts]).
start_link(SockData, _Opts) -> start_link(SockData, Opts) ->
gen_fsm:start_link(ejabberd_s2s_in, [SockData], ?FSMOPTS). gen_fsm:start_link(ejabberd_s2s_in, [SockData, Opts], ?FSMOPTS).
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm %%% Callback functions from gen_fsm
@ -82,13 +88,18 @@ start_link(SockData, _Opts) ->
%% ignore | %% ignore |
%% {stop, StopReason} %% {stop, StopReason}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
init([{SockMod, Socket}]) -> init([{SockMod, Socket}, Opts]) ->
?INFO_MSG("started: ~p", [{SockMod, Socket}]), ?INFO_MSG("started: ~p", [{SockMod, Socket}]),
ReceiverPid = spawn(?MODULE, receiver, [Socket, self()]), ReceiverPid = ejabberd_receiver:start(Socket, SockMod, none),
Shaper = case lists:keysearch(shaper, 1, Opts) of
{value, {_, S}} -> S;
_ -> none
end,
{ok, wait_for_stream, {ok, wait_for_stream,
#state{socket = Socket, #state{socket = Socket,
receiver = ReceiverPid, receiver = ReceiverPid,
streamid = new_id()}, streamid = new_id(),
shaper = Shaper},
?S2STIMEOUT}. ?S2STIMEOUT}.
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
@ -133,6 +144,7 @@ stream_established({xmlstreamelement, El}, StateData) ->
Key, StateData#state.streamid}), Key, StateData#state.streamid}),
Conns = ?DICT:store({LFrom, LTo}, wait_for_verification, Conns = ?DICT:store({LFrom, LTo}, wait_for_verification,
StateData#state.connections), StateData#state.connections),
change_shaper(StateData, jlib:make_jid("", LFrom, "")),
{next_state, {next_state,
stream_established, stream_established,
StateData#state{connections = Conns}, StateData#state{connections = Conns},
@ -306,21 +318,6 @@ terminate(Reason, _StateName, StateData) ->
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
receiver(Socket, C2SPid) ->
XMLStreamPid = xml_stream:start(C2SPid),
receiver(Socket, C2SPid, XMLStreamPid).
receiver(Socket, C2SPid, XMLStreamPid) ->
case gen_tcp:recv(Socket, 0) of
{ok, Text} ->
xml_stream:send_text(XMLStreamPid, Text),
receiver(Socket, C2SPid, XMLStreamPid);
{error, _Reason} ->
exit(XMLStreamPid, closed),
gen_fsm:send_event(C2SPid, closed),
ok
end.
send_text(Socket, Text) -> send_text(Socket, Text) ->
gen_tcp:send(Socket,Text). gen_tcp:send(Socket,Text).
@ -328,6 +325,10 @@ send_element(Socket, El) ->
send_text(Socket, xml:element_to_string(El)). send_text(Socket, xml:element_to_string(El)).
change_shaper(StateData, JID) ->
Shaper = acl:match_rule(StateData#state.shaper, JID),
ejabberd_receiver:change_shaper(StateData#state.receiver, Shaper).
new_id() -> new_id() ->
randoms:get_string(). randoms:get_string().