Just For Coding

Keep learning, keep living …

ejabberd中XMPP协议PING实现

根据XEP-0199, XMPP客户端和服务器都可以在XML流上发送应用层PING请求。因为XMPP依赖底层的TCP连接,有可能TCP连接意外中断,而上层的XMPP并不知晓,从而影响消息传递。通过发送应用层PING请求可以来确认对端的连接可用性。

以服务器发给客户端为例,协议如下:

发送的PING请求:

1
2
3
<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='s2c1' type='get'>
  <ping xmlns='urn:xmpp:ping'/>
</iq>

如果对端支持PING请求,则返回对应的”PONG”回应。

1
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='result'/>

如果对端不支持则返回错误。

1
2
3
4
5
6
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='error'>
  <ping xmlns='urn:xmpp:ping'/>
  <error type='cancel'>
    <service-unavailable xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
  </error>
</iq>

ejabberd中PING功能实现位于mod_ping.erl。它主要支持3个配置:

  • send_pings: true|false

如果这个选项设置为true, 当客户端在给定时间间隔内没有活动,则向客户端发送一个ping请求。

  • ping_interval: Seconds

设置上述send_pings选项中客户端没有活动的时间间隔。

  • timeout_action: none|kill

表示当PING请求发出32秒后,ejabberd依然没有收到PING响应,服务端如何处理。none表示什么也不做,kill表示关闭客户端连接。

当ejabberd启动时会调用mod_ping:start/2。

1
2
3
4
5
start(Host, Opts) ->
    Proc = gen_mod:get_module_proc(Host, ?MODULE),
    PingSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
                transient, 2000, worker, [?MODULE]},
    supervisor:start_child(?SUPERVISOR, PingSpec).

start函数调用supervisor:start_child/2为每个支持的host创建一个负责该host的worker进程。

进程树模型如下:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
+------------+
|ejabberd_sup|
+-----+------+
      |
      |    +------------------+
      +--->|Other processes...|
      |    +------------------+
      |
      |    +------------------------+
      +--->|ping(im.just4coding.com)|
      |    +------------------------+
      |
      |    +------------------------+
      +--->|ping(localhost)         |
      |    +------------------------+
      |
      |    +------------------------+
      +--->|ping(Other host)        |
           +------------------------+

每个worker是一个gen_server进程,进程调用init函数进行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
init([Host, Opts]) ->
    SendPings = gen_mod:get_opt(send_pings, Opts, ?DEFAULT_SEND_PINGS),
    PingInterval = gen_mod:get_opt(ping_interval, Opts, ?DEFAULT_PING_INTERVAL),
    TimeoutAction = gen_mod:get_opt(timeout_action, Opts, none),
    IQDisc = gen_mod:get_opt(iqdisc, Opts, no_queue),
    mod_disco:register_feature(Host, ?NS_PING),
    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PING,
                                  ?MODULE, iq_ping, IQDisc),
    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_PING,
                                  ?MODULE, iq_ping, IQDisc),
    case SendPings of
    true ->
        %% Ping requests are sent to all entities, whether they
        %% announce 'urn:xmpp:ping' in their caps or not
        ejabberd_hooks:add(sm_register_connection_hook, Host,
                           ?MODULE, user_online, 100),
        ejabberd_hooks:add(sm_remove_connection_hook, Host,
                           ?MODULE, user_offline, 100),
        ejabberd_hooks:add(user_send_packet, Host,
                           ?MODULE, user_send, 100);
    _ ->
        ok
    end,
    {ok, #state{host = Host,
                send_pings = SendPings,
                ping_interval = PingInterval,
                timeout_action = TimeoutAction,
                timers = ?DICT:new()}}.
  • 首先获取相关配置

  • 接着调用mod_disco:register_feature注册PING功能的XMLNS。这样当客户端请求”Service Discovery”信息时,ejabberd返回的特征中会包括”urn:xmpp:ping”。

ServiceDiscovery请求:

1
2
3
4
5
6
<iq type='get'
    from='juliet@capulet.lit/balcony'
    to='capulet.lit'
    id='disco1'>
  <query xmlns='http://jabber.org/protocol/disco#info'/>
</iq>

ServiceDiscovery响应:

1
2
3
4
5
6
7
8
9
10
<iq type='result'
    from='capulet.lit'
    to='juliet@capulet.lit/balcony'
    id='disco1'>
  <query xmlns='http://jabber.org/protocol/disco#info'>
    ...
    <feature var='urn:xmpp:ping'/>
    ...
  </query>
</iq>

ServiceDiscovery相关信息参考XEP-0030

  • 接下来,注册IQ处理器,令XMLNS为”urn:xmpp:ping”的IQ请求由函数iq_ping处理。iq_ping简单地返回相应响应或者错误。
1
2
3
4
5
6
7
iq_ping(_From, _To, #iq{type = Type, sub_el = SubEl} = IQ) ->
    case {Type, SubEl} of
        {get, {xmlelement, "ping", _, _}} ->
            IQ#iq{type = result, sub_el = []};
        _ ->
            IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}
    end.

如果send_pings配置为true, mod_ping在ejabberd中注册n以下3个hook函数:

  • sm_register_connection_hook: 它在客户端完成登录验证,建立session信息时调用。
1
2
3
4
5
6
7
8
open_session(SID, User, Server, Resource, Info) ->
    set_session(SID, User, Server, Resource, undefined, Info),
    mnesia:dirty_update_counter(session_counter,
                jlib:nameprep(Server), 1),
    check_for_sessions_to_replace(User, Server, Resource),
    JID = jlib:make_jid(User, Server, Resource),
    ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver,
               [SID, JID, Info]).
  • sm_remove_connection_hook: 在用户退出,关闭session时调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
close_session(SID, User, Server, Resource) ->
    Info = case mnesia:dirty_read({session, SID}) of
    [] -> [];
    [#session{info=I}] -> I
    end,
    F = fun() ->
        mnesia:delete({session, SID}),
        mnesia:dirty_update_counter(session_counter,
                        jlib:nameprep(Server), -1)
    end,
    mnesia:sync_dirty(F),
    JID = jlib:make_jid(User, Server, Resource),
    ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver,
               [SID, JID, Info]).
  • user_send_packet: 在C2S进程收到客户端发送的消息时被调用。

sm_register_connection_hook的hook函数user_onlineuser_send_packet的hook函数user_send都会调用start_ping函数。

1
2
3
start_ping(Host, JID) ->
    Proc = gen_mod:get_module_proc(Host, ?MODULE),
    gen_server:cast(Proc, {start_ping, JID}).

start_ping向该HOST的worker进程发送一个{start_ping, JID}消息。worker进程调用handle_cast进行处理:

1
2
3
handle_cast({start_ping, JID}, State) ->
    Timers = add_timer(JID, State#state.ping_interval, State#state.timers),
    {noreply, State#state{timers = Timers}};

handle_cast调用add_timer为该客户端创建一个timer。

1
2
3
4
5
6
7
8
9
10
11
add_timer(JID, Interval, Timers) ->
    LJID = jlib:jid_tolower(JID),
    NewTimers = case ?DICT:find(LJID, Timers) of
            {ok, OldTRef} ->
            cancel_timer(OldTRef),
            ?DICT:erase(LJID, Timers);
            _ ->
            Timers
        end,
    TRef = erlang:start_timer(Interval * 1000, self(), {ping, JID}),
    ?DICT:store(LJID, TRef, NewTimers).

由于用户每次发送消息时都会调用add_timer函数,因而add_timer中需要检查之前是否已经存在timer。如果存在,则先取消旧的timer, 再创建新的Timer。

当timer超时后,即客户若干时间内没有活动,进程收到{ping, JID}消息,此时ejabberd应向客户端发送PING消息。进程调用handle_info处理。

1
2
3
4
5
6
7
8
9
10
11
handle_info({timeout, _TRef, {ping, JID}}, State) ->
    IQ = #iq{type = get,
             sub_el = [{xmlelement, "ping", [{"xmlns", ?NS_PING}], []}]},
    Pid = self(),
    F = fun(Response) ->
        gen_server:cast(Pid, {iq_pong, JID, Response})
    end,
    From = jlib:make_jid("", State#state.host, ""),
    ejabberd_local:route_iq(From, JID, IQ, F),
    Timers = add_timer(JID, State#state.ping_interval, State#state.timers),
    {noreply, State#state{timers = Timers}};

handle_info创建IQ消息后,设置回调函数F,调用ejabberd_local:route_iq/4消息IQ消息发送给客户端。当收到该IQ消息的响应或者超过32秒依然没有收到客户端的响应,回调函数F将会被调用。如果响应超时,Response为timeout,F将向进程发送{iq_pong, JID, timeout}消息。进程调用handle_cast处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
handle_cast({iq_pong, JID, timeout}, State) ->
    Timers = del_timer(JID, State#state.timers),
    ejabberd_hooks:run(user_ping_timeout, State#state.host, [JID]),
    case State#state.timeout_action of
    kill ->
        #jid{user = User, server = Server, resource = Resource} = JID,
        case ejabberd_sm:get_session_pid(User, Server, Resource) of
        Pid when is_pid(Pid) ->
            ejabberd_c2s:stop(Pid);
        _ ->
            ok
        end;
    _ ->
        ok
    end,
    {noreply, State#state{timers = Timers}};

如果timeout_action设置为kill, 则调用ejabberd_c2s:stop关闭相应的客户端连接。

因为在sm_remove_connection_hook注册了hook函数user_offline, 当用户退出时会调用stop_ping函数,向worker进程发送{stop_ping, JID}消息。

1
2
3
stop_ping(Host, JID) ->
    Proc = gen_mod:get_module_proc(Host, ?MODULE),
    gen_server:cast(Proc, {stop_ping, JID}).

worker进程调用del_timer函数将该客户端的timer删除。

1
2
3
handle_cast({stop_ping, JID}, State) ->
    Timers = del_timer(JID, State#state.timers),
    {noreply, State#state{timers = Timers}};
1
2
3
4
5
6
7
8
9
del_timer(JID, Timers) ->
    LJID = jlib:jid_tolower(JID),
    case ?DICT:find(LJID, Timers) of
        {ok, TRef} ->
        cancel_timer(TRef),
        ?DICT:erase(LJID, Timers);
        _ ->
        Timers
    end.

模块及进程停止的逻辑与模块和进程初始化的逻辑相反,本文略过。