Just For Coding

Keep learning, keep living …

Ejabberd消息处理流程分析

ejabberd的程序入口为ejabberd_app.erl中的start/2,简化后逻辑如下:

1
2
3
4
5
6
7
8
9
start(normal, _Args) ->
    ...
    Sup = ejabberd_sup:start_link(),
    ...
    ejabberd_listener:start_listeners(),
    ?INFO_MSG("ejabberd ~s is started in the node ~p", [?VERSION, node()]),
    Sup;
start(_, _) ->
    {error, badarg}.

ejabberd_sup是一个supervisor,调用start_link时,它会生成各功能组件进程,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
------------------------------------------------

                +------------+
                |ejabberd_sup|
                +-----+------+
                      |
+---------------------+------------------------+
|                                              |
|  +-----+              +-------+              |
|  |hooks|              |c2s_sup|              |
|  +-----+              +-------+              |
|                                              |
|  +-----------+        +---------+            |
|  |node_groups|        |s2sin_sup|            |
|  +-----------+        +---------+            |
|                                              |
|  +--------------+     +----------+           |
|  |system_monitor|     |s2sout_sup|           |
|  +--------------+     +----------+           |
|                                              |
|  +------+             +-----------+          |
|  |router|             |service_sup|          |
|  +------+             +-----------+          |
|                                              |
|  +--+                 +--------+             |
|  |sm|                 |http_sup|             |
|  +--+                 +--------+             |
|                                              |
|  +---+                +-------------+        |
|  |s2s|                |http_poll_sup|        |
|  +---+                +-------------+        |
|                                              |
|  +-----+              +-------------------+  |
|  |local|              |frontend_socket_sup|  |
|  +-----+              +-------------------+  |
|                                              |
|  +-------+            +------+               |
|  |captcha|            |iq_sup|               |
|  +-------+            +------+               |
|                                              |
|  +--------+           +--------+             |
|  |listener|           |stun_sup|             |
|  +--------+           +--------+             |
|                                              |
|  +------------+       +-------------+        |
|  |receiver_sup|       |cache_tab_sup|        |
|  +------------+       +-------------+        |
|                                              |
+----------------------------------------------+

部分关键组件功能:

  • hooks: 执行各模块注册的HOOK函数
  • router: 分发用户发送的消息
  • sm: session manager, 处理用户到用户的消息分发
  • local: 处理发送给Server本身的消息。
  • listener: supervisor进程,创建子进程监听网络端口
  • receiver_sup: supervisor进程, 它为每一个TCP连接创建一个进程来接收该TCP连接的数据
  • c2s_sup: supervisor进程, 它为每一个TCP连接创建一个进程来处理协议状态,并负责向TCP连接发送数据

ejabberd_listener:start_listerners/0会从配置文件(ejabberd.cfg)中获取“listen”选项。 listen配置的一般形式为:

1
2
3
4
5
6
7
8
9
10
{listen,
 [
  {5222, ejabberd_c2s, [
            {access, c2s},
            {shaper, c2s_shaper},
            {max_stanza_size, 65536}
               ]},
  ...
 ]
}

每一个端口需要指定一个处理模块。

start_listeners/0遍历listen配置中指定的所有端口,为每个端口调用start_listener/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
start_listeners() ->
    case ejabberd_config:get_local_option(listen) of
    undefined ->
        ignore;
    Ls ->
        Ls2 = lists:map(
            fun({Port, Module, Opts}) ->
                case start_listener(Port, Module, Opts) of
                {ok, _Pid} = R -> R;
                {error, Error} ->
                throw(Error)
            end
        end, Ls),
        report_duplicated_portips(Ls),
        {ok, {{one_for_one, 10, 1}, Ls2}}
    end.

start_listerner最终会调用start_listener_sup/3。它通过supervisor:start_child令ejabberd_listeners进程创建子进程执行ejabberd_listener:start/3。

1
2
3
4
5
6
7
8
start_listener_sup(Port, Module, Opts) ->
    ChildSpec = {Port,
         {?MODULE, start, [Port, Module, Opts]},
         transient,
         brutal_kill,
         worker,
         [?MODULE]},
    supervisor:start_child(ejabberd_listeners, ChildSpec).

ejabberd_listener:start/3依据该端口处理模块的socket_type/0函数的返回值进行相应处理。如果返回值为independent时,则表示该处理模块自行处理监听端口a。否则,调用start_dependent/3。

1
2
3
4
5
6
7
start(Port, Module, Opts) ->
    %% Check if the module is an ejabberd listener or an independent listener
    ModuleRaw = strip_frontend(Module),
    case ModuleRaw:socket_type() of
    independent -> ModuleRaw:start_listener(Port, Opts);
    _ -> start_dependent(Port, Module, Opts)
    end.

start_dependent/3中会创建子进程执行init/3, init/3函数根据配置参数调用init_tcp/6或init_udp/6。 init_tcp/6开始监听相应端口,然后调用accept/0等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
    ...
    Res = gen_tcp:listen(Port, [binary,
                {packet, 0},
                {active, false},
                {reuseaddr, true},
                {nodelay, true},
                {send_timeout, ?TCP_SEND_TIMEOUT},
                {keepalive, true} |
                SockOpts2]),
    case Res of
    {ok, ListenSocket} ->
        %% Inform my parent that this port was opened succesfully
        proc_lib:init_ack({ok, self()}),
        %% And now start accepting connection attempts
        accept(ListenSocket, Module, Opts);
    {error, Reason} ->
        socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
    end.

如果监听UDP端口,则init/3调用init_udp/6。init/6打开一个UDP端口,调用udp_recv/3等待接收UDP数据。

1
2
3
4
5
6
7
8
9
10
11
12
init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
    case gen_udp:open(Port, [binary,
                 {active, false},
                 {reuseaddr, true} |
                 SockOpts]) of
    {ok, Socket} ->
        %% Inform my parent that this port was opened succesfully
        proc_lib:init_ack({ok, self()}),
        udp_recv(Socket, Module, Opts);
    {error, Reason} ->
        socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
    end.

至此ejabberd启动完成。

当UDP数据到来后,udp_recv/3调用该端口处理模块的udp_recv/5处理,接着递归调用udp_recv继续等待接收UDP数据。处理模块的udp_recv/5函数中需要处理返回UDP响应等逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
udp_recv(Socket, Module, Opts) ->
    case gen_udp:recv(Socket, 0) of
    {ok, {Addr, Port, Packet}} ->
        case catch Module:udp_recv(Socket, Addr, Port, Packet, Opts) of
        {'EXIT', Reason} ->
            ?ERROR_MSG("failed to process UDP packet:~n"
                   "** Source: {~p, ~p}~n"
                   "** Reason: ~p~n** Packet: ~p",
                   [Addr, Port, Reason, Packet]);
        _ ->
            ok
        end,
        udp_recv(Socket, Module, Opts);
    {error, Reason} ->
        ?ERROR_MSG("unexpected UDP error: ~s", [format_error(Reason)]),
        throw({error, Reason})
    end.

当有TCP连接成功后,gen_tcp:accept/1返回,accept/3根据配置中处理模块是否指定了frontend, 选择调用ejabberd_frontend_socket:start/4或ejabberd_socket:start/4, 然后递归调用accept再次等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
accept(ListenSocket, Module, Opts) ->
    case gen_tcp:accept(ListenSocket) of
    {ok, Socket} ->
        case {inet:sockname(Socket), inet:peername(Socket)} of
        {{ok, Addr}, {ok, PAddr}} ->
            ?INFO_MSG("(~w) Accepted connection ~w -> ~w",
                  [Socket, PAddr, Addr]);
        _ ->
            ok
        end,
        CallMod = case is_frontend(Module) of
              true -> ejabberd_frontend_socket;
              false -> ejabberd_socket
              end,
        CallMod:start(strip_frontend(Module), gen_tcp, Socket, Opts),
        accept(ListenSocket, Module, Opts);
    {error, Reason} ->
        ?INFO_MSG("(~w) Failed TCP accept: ~w",
              [ListenSocket, Reason]),
        accept(ListenSocket, Module, Opts)
    end.

ejabberd_socket:start/4函数根据处理模块的socket_type/0的返回值进行不同处理。5222端口的处理模块是ejabberd_c2s,该模块socket_type/0返回xml_stream。

ejabberd_socket:start/4对于xml_stream的处理的简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
RecPid = ejabberd_receiver:start(
       Socket, SockMod, none, MaxStanzaSize),

{ReceiverMod, Receiver, RecRef} =
    {ejabberd_receiver, RecPid, RecPid}

SocketData = #socket_state{sockmod = SockMod,
               socket = Socket,
               receiver = RecRef},

case Module:start({?MODULE, SocketData}, Opts) of
{ok, Pid} ->
    case SockMod:controlling_process(Socket, Receiver) of
    ok ->
        ok;
    {error, _Reason} ->
        SockMod:close(Socket)
    end,
    ReceiverMod:become_controller(Receiver, Pid);
{error, _Reason} ->
    SockMod:close(Socket)
end;

首先,它调用ejabberd_receiver:start/4, 它创建一个receiver进程,接着调用处理模块的start/2创建一个处理模块子进程。然后将新的receiver进程设为该TCP Socket的控制进程,以后从该Socket收到数据将以消息形式发送给该receiver进程。最后调用ejabberd_receiver:become_controller/2, 向该receiver进程发送become_controller消息。receiver进程处理该消息,生成一个XML解析状态并将处理进程Pid保存在状态中。

1
2
3
4
5
6
7
handle_call({become_controller, C2SPid}, _From, State) ->
    XMLStreamState = xml_stream:new(C2SPid, State#state.max_stanza_size),
    NewState = State#state{c2s_pid = C2SPid,
               xml_stream_state = XMLStreamState},
    activate_socket(NewState),
    Reply = ok,
    {reply, Reply, NewState, ?HIBERNATE_TIMEOUT};

当该Socket有数据可读时,receiver进程将收到TCP消息,receiver进程调用process_data/1函数来处理收到的数据。它调用xml_stream:parse进行解析,当解析出XML元素后,它会通过gen_fsm:send_event向该TCP的处理进程发送消息。处理进程根据消息进行协议状态的转换。

当XMPP协商完成后,处理进程状态为session_established。此时收到XMPP消息,处理进程解析出From和To属性, 调用ejabberd_router:route/3分发消息。ejabberd_router:route/3调用do_route进行分发。它查询route表中是否已经注册了JID所在域名。ejabberd_local进程启动时会在route表中注册配置中所添加的域名。如果已经注册,该消息则应该由当前服务器处理,否则路由至其他Server。

处理本地域名时,do_route首先获取ejabbred_local进程的Pid,如果只有一个进程,并且该进程位于当前节点,则直接调用ejabberd_local:route/3进行处理,否则发送route消息至相应的Pid。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
LDstDomain = To#jid.lserver,
case mnesia:dirty_read(route, LDstDomain) of
[] ->
    ejabberd_s2s:route(From, To, Packet);
[R] ->
    Pid = R#route.pid,
    if
    node(Pid) == node() ->
        case R#route.local_hint of
        {apply, Module, Function} ->
            Module:Function(From, To, Packet);
        _ ->
            Pid ! {route, From, To, Packet}
        end;
    is_pid(Pid) ->
        Pid ! {route, From, To, Packet};
    true ->
        drop
    end;
...
end

这两种方式都会调用ejabberd_local:do_route来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
do_route(From, To, Packet) ->
    ?DEBUG("local route~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
       [From, To, Packet, 8]),
    if
    To#jid.luser /= "" ->
        ejabberd_sm:route(From, To, Packet);
    To#jid.lresource == "" ->
        {xmlelement, Name, _Attrs, _Els} = Packet,
        case Name of
        "iq" ->
            process_iq(From, To, Packet);
        "message" ->
            ok;
        "presence" ->
            ok;
        _ ->
            ok
        end;
    true ->
        {xmlelement, _Name, Attrs, _Els} = Packet,
        case xml:get_attr_s("type", Attrs) of
        "error" -> ok;
        "result" -> ok;
        _ ->
            ejabberd_hooks:run(local_send_to_resource_hook,
                       To#jid.lserver,
                       [From, To, Packet])
        end
    end.
end

如果”To”属性的JID指定了user, 则该消息应该分发至用户,调用ejabberd_sm:route/3进行分发。ejabberd根据JID的Resource是否为空进行了不同处理。JID的Resource为空的情况本文略过。JID的Resource不为空时,ejabberd_sm:route/3的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR, #session.usr) of
[] ->
    case Name of
    "message" ->
        route_message(From, To, Packet);
    "iq" ->
        case xml:get_attr_s("type", Attrs) of
        "error" -> ok;
        "result" -> ok;
        _ ->
            Err =
            jlib:make_error_reply(
              Packet, ?ERR_SERVICE_UNAVAILABLE),
            ejabberd_router:route(To, From, Err)
        end;
    _ ->
        ?DEBUG("packet droped~n", [])
    end;
Ss ->
    Session = lists:max(Ss),
    Pid = element(2, Session#session.sid),
    ?DEBUG("sending to process ~p~n", [Pid]),
    Pid ! {route, From, To, Packet}
end

它从session表中读取出该用户的信息,这些信息是用户连接上由该TCP的处理进程添加到session表中的。从中获得为该用户TCP连接的处理进程,向该进程发送route消息。处理进程处理该route消息,向该用户Socket发送消息。这便完成了一个用户到另一个用户的消息传递。

本文中ejabberd代码版本为2.1.3。

PowerDNS模块机制分析

PowerDNS是一个开源DNS服务器。它包括了授权DNS服务器和递归DNS服务器。本文只讨论授权DNS服务器,主要分析PowerDNS的架构及Backend模块机制的实现。

PowerDNS由各种各样的backend来处理DNS解析相关数据,数据可以存储在BIND模式的ZONE文件中,也可以存储在关系型数据库中。官方实现了很多backend,如Bind, Postgresql, Mysql等。每种backend实现为一个SO文件。PowerDNS启动时根据配置中的launch语句加载指定的so文件。这种方式使得PowerDNS的可扩展性非常高。 简略的PowerDNS架构图如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  +-----------------------------------------------------------------------+
  |                    |                 |                                |
  |    +------------+  |  +-----------+  |                                |
  |    |main thread |  |  |PacketCache|  |                                |
  |    +------------+  |  +-----------+  |                                |
  |                    |                 |                                |
  |    +------------+  |                 |                                |
  |    |DynListerner|  |                 |                                |
  |    +------------+  |                 |  +-----------+   +----------+  |
  |                    |       +---------+--|Distributor|---|DNSBackend|  |
  |    +--------+      |       |         |  +-----------+   +----------+  |
  |    |receiver|------+-------+         |                                |
  |    +--------+      |       |         |  +-----------+   +----------+  |
  |                    |       +---------+--|Distributor|---|DNSBackend|  |
  |       ...          |                 |  +-----------+   +----------+  |
  |                    |                 |     ...                        |
  |                    |                 |                                |
  |    +--------+      |                 |  +-----------+   +----------+  |
  |    |receiver|------+-------+---------+--|Distributor|---|DNSBackend|  |
  |    +--------+      |       |         |  +-----------+   +----------+  |
  |                    |       |         |                                |
  |                    |       |         |  +-----------+   +----------+  |
  |                    |       +---------+--|Distributor|---|DNSBackend|  |
  |                    |                 |  +-----------+   +----------+  |
  |                                                                       |
  +-----------------------------------------------------------------------+

下面具体分析PowerDNS的启动流程。

1
2
3
4
5
6
7
8
9
10
11
12
if(::arg().mustDo("guardian") && !isGuarded(argv)) {
  if(::arg().mustDo("daemon")) {
    L.toConsole(Logger::Critical);
    daemonize();
  }
  guardian(argc, argv);
  // never get here, guardian will reinvoke process
  cerr<<"Um, we did get here!"<<endl;
}

// we really need to do work - either standalone or as an instance
...

如果启动参数中指定了“–guardian=yes”,则PowerDNS运行在guardian模式,主进程fork出一个子进程执行正常的启动流程,而父进程则一秒钟探测一次子进程是否存活,如果子进程异常退出则重新启动子进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  for(;;) {
    int ret=waitpid(pid,&status,WNOHANG);

    if(ret<0) {
      L<<Logger::Error<<"In guardian loop, waitpid returned error: "<<strerror(errno)<<endl;
      L<<Logger::Error<<"Dying"<<endl;
      exit(1);
    }
    else if(ret) // something exited
      break;
    else { // child is alive
      // execute some kind of ping here
      if(DLQuitPlease())
        takedown(1); // needs a parameter..
      setStatus("Child running on pid "+itoa(pid));
      sleep(1);
    }
  }

  pthread_mutex_lock(&g_guardian_lock);
  close(g_fd1[1]);
  fclose(g_fp);
  g_fp=0;

  if(WIFEXITED(status)) {
    int ret=WEXITSTATUS(status);

    if(ret==99) {
      L<<Logger::Error<<"Child requested a stop, exiting"<<endl;
      exit(1);
    }
    setStatus("Child died with code "+itoa(ret));
    L<<Logger::Error<<"Our pdns instance exited with code "<<ret<<endl;
    L<<Logger::Error<<"Respawning"<<endl;

    goto respawn;
  }

子进程或非guardian模式的原生进程执行正常的启动流程。

1
2
loadModules();
BackendMakers().launch(::arg()["launch"]); // vrooooom!

loadModules()会根据”load-modules”配置语句调用dlopen来加载相应的SO文件。BackendMakers().launch(::arg()[“launch”]);来加载launch语句指定的modules。然后开始监听指定的UDP和TCP端口。

1
2
3
4
N=new UDPNameserver; // this fails when we are not root, throws exception

if(!::arg().mustDo("disable-tcp"))
  TN=new TCPNameserver;

接着调用mainthread()来初始化若干网络包接收线程。

1
2
3
  unsigned int max_rthreads= ::arg().asNum("receiver-threads");
  for(unsigned int n=0; n < max_rthreads; ++n)
    pthread_create(&qtid,0,qthread, reinterpret_cast<void *>(n)); // receives packets

而每个接收线程则会创建若干自己的distributor线程。

1
DNSDistributor *distributor = DNSDistributor::Create(::arg().asNum("distributor-threads")); // the big dispatcher!

接下来接收线程开始自己处理网络数据。接收到DNS查询包后,首先在PacketCache中查询是否有相应的响应。PacketCache是PowerDNS中Backend响应的Cache,它减少对于Backend的访问,可以大大提高性能。如果在PacketCache中没有找到,则调用distributor->question分发给distributor线程。示意代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
  for(;;) {
    if(!(P=NS->receive(&question))) { // receive a packet         inline
      continue;                    // packet was broken, try again
    }

    if(P->couldBeCached() && PC.get(P, &cached)) { // short circuit - does the PacketCache recognize this question?
      NS->send(&cached);   // answer it then
      continue;
    }

    distributor->question(P, &sendout);
  }

在调用question方法时会传递一个回调函数sendout。distributor线程处理完DNS请求后将调用该回调函数将响应返回给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void sendout(const DNSDistributor::AnswerData &AD)
{
  static unsigned int &numanswered=*S.getPointer("udp-answers");
  static unsigned int &numanswered4=*S.getPointer("udp4-answers");
  static unsigned int &numanswered6=*S.getPointer("udp6-answers");

  if(!AD.A)
    return;

  N->send(AD.A);
  numanswered++;

  if(AD.A->d_remote.getSocklen()==sizeof(sockaddr_in))
    numanswered4++;
  else
    numanswered6++;

  int diff=AD.A->d_dt.udiff();
  avg_latency=(int)(1023*avg_latency/1024+diff/1024);

  delete AD.A;
}

distributor线程中调用backend来处理DNS响应。示意代码如下:

1
2
3
4
5
B.lookup;
DNSResourceRecord rr;
while(yb.get(rr))
    cout<<"Found cname pointing to '"+rr.content+"'"<<endl;
}

其中B为UeberBackend实例。UeberBackend是一个特殊的Backend。启动时被加载的其他backend向它注册,被保存在一个vector中。UeberBackend按注册顺序依次调用其他backend的相应成员方法。以reload操作为例:

1
2
3
4
5
6
7
void UeberBackend::reload()
{
  for ( vector< DNSBackend * >::iterator i = backends.begin(); i != backends.end(); ++i )
  {
    ( *i )->reload();
  }
}

lookup和get成员方法实现比较特殊。UeberBackend有一个成员变量d_handle表示当前处理请求的真实backend.在lookup中,会将第一个注册的backend赋给d_handle, 并调用该backend的lookup。

1
(d_handle.d_hinterBackend=backends[d_handle.i++])->lookup(qtype, qname,pkt_p,zoneId);

UeberBackend的get方法会调用d_handle实例的get方法。而d_handle.get首先调用第一个backend的get方法。如果成功,则返回。否则调用下一个backend的lookup和get方法。直到get方法返回成功或者遍历完所有backend.这种实现保证了get方法所获取的响应是由本backend的lookup所生成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bool UeberBackend::handle::get(DNSResourceRecord &r)
{
  DLOG(L << "Ueber get() was called for a "<<qtype.getName()<<" record" << endl);
  bool isMore=false;
  while(d_hinterBackend && !(isMore=d_hinterBackend->get(r))) { // this backend out of answers
    if(i<parent->backends.size()) {
      DLOG(L<<"Backend #"<<i<<" of "<<parent->backends.size()
           <<" out of answers, taking next"<<endl);

      d_hinterBackend=parent->backends[i++];
      d_hinterBackend->lookup(qtype,qname,pkt_p,parent->domain_id);
    }
    else
      break;

    DLOG(L<<"Now asking backend #"<<i<<endl);
  }

  if(!isMore && i==parent->backends.size()) {
    DLOG(L<<"UeberBackend reached end of backends"<<endl);
    return false;
  }

  DLOG(L<<"Found an answering backend - will not try another one"<<endl);
  i=parent->backends.size(); // don't go on to the next backend
  return true;
}

下面介绍module加载的过程。PowerDNS的module实现需要包含三个部分。

1) Backend本身,它需要继承父类DNSBackend.

1
2
/* FIRST PART */
class RandomBackend : public DNSBackend{}

2) 工厂类,它用于产生Backend实例。

1
2
/* SECOND PART */
class RandomFactory : public BackendFactory{}

3) 加载器类和该类的一个static实例。

1
2
3
/* THIRD PART */
class RandomLoader{};
static RandomLoader randomloader;

当进程调用dlopen时,该static loader实例的构造函数被调用,将其注册在UeberBackend中。

可以参考我写的一个简单的PowerDNS模块: remoteipbackend, 用于返回到达它的递归DNS的出口IP地址。

NGINX按目录清除缓存

NGINX只在商业版中支持proxy_cache_purge指令清除缓存,开源的ngx_cache_purge模块只支持单一key的缓存清除。为了实现按目录清除缓存只能自己开发。

NGINX作为Cache服务器时将资源内容以文件形式进行缓存,缓存元信息存储于共享内存中,组织成一棵红黑树。红黑树中的每个节点代表一个Cache元信息。NGINX将Cache Key的HASH值作为红黑树节点的KEY。内容缓存文件以该HASH值作为文件名存储在磁盘上。

NGINX的处理流程简化描述是这样的:当请求到达时,根据Cache Key的HASH值在红黑树中进行查找。如果找到,并查看相关信息,如果Cache可用,返回相应的Cache文件。否则,则回源抓取。

因为元信息是以Cache Key的HASH值作为Key存储的,因而红黑树中并不能保留Cache Key中有层级关系. 如”/uri/foo”和”/uri/bar”在元信息红黑树中完全没有关系。要实现按照目录清除缓存,需要将Cache Key中层次关系存储起来。

我选择的方案是在共享内存中建立一棵目录树来存储层级关系。将Cache Key类比于文件系统中的路径, 每级路径存储为树中的一个节点。当需要清除某一目录下的所有缓存时,将该节点子树的中的所有缓存清除即可。

目录树采用Child-Sibling链表实现。节点结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct ngx_http_file_cache_path_node_s ngx_http_file_cache_path_node_t;

struct ngx_http_file_cache_path_node_s {
    ngx_hlist_node_t                  node;
    ngx_http_file_cache_path_node_t  *parent;
    ngx_http_file_cache_path_node_t  *child;
    ngx_http_file_cache_path_node_t  *next;
    ngx_http_file_cache_path_node_t  *prev;
    ngx_http_file_cache_node_t       *rb_node;
    unsigned                          count:32;
    unsigned                          depth:8;
    unsigned                          isdir:1;
                                      /* 23 unused bits */
    ngx_uint_t                        len;
    u_char                            data[0];
};

以纵向表示孩子关系,横向表示兄弟关系,则Cache key “/uri/foo”则生成如下结构:

1
2
3
4
5
6
7
+-----+
| uri |
+--+--+
   |
+--+--+
| foo |
+-----+

再缓存Cache Key “/uri/bar”之后则结构为:

1
2
3
4
5
6
7
+-----+
| uri |
+--+--+
   |
+--+--+    +-----+
| bar +----+ foo |
+-----+    +-----+

其中node字段用于将节点加入HASH表中。考虑到如果一个目录下子目录或文件太多,则遍历兄弟链表则非常耗时。因而引入一个HASH表,将所有树结点以路径节点名字作为key加入HASH表。当子节点少,直接遍历。子节点过多时,则从HASH中进行查找。由于不同位置的路径节点会重名,如”/uri/foo/dummy”和”/uri/bar/dummy”两个名字为”dummy”的路径节点分别指向不同的Cache。因而在HASH表中查找时需要考虑路径节点的父节点。 如,判断一个中间目录路径节点时代码:

1
2
3
4
5
6
7
8
if (pos->isdir
    && pos->parent == parent
    && pos->len == path[i].len
    && ngx_strncmp(pos->data, path[i].data, path[i].len)
       == 0)
{
   ......
}

非叶子节点代表路径。叶子节点可能为目录或是缓存文件。代表缓存文件的叶子节点中则保存该Cache的元信息节点的地址。同时在元信息中也添加上该叶子节点的地址。因而当找到代表要清除的目录节点时,遍历子树便可以找到所有缓存信息的元信息,对元信息进行相关操作,完成清除操作。 当删除Cache文件时,将Cache key的路径节点也一并删除,回收内存。