Just For Coding

Keep learning, keep living …

ejabberd中ACL实现

ejbberd中多个模块或组件都允许管理员在配置文件中基于用户JID进行访问限制, 如在ejabberd_c2s模块中禁用某用户.

ejabberd实现了一套通用的ACL机制来满足各模块的需求.

配置方法如下:

  • 添加acl规则, 如:
1
{acl, blocked, {user, "test"}}.

acl元组第2个元素为该条acl规则的名称, 第3个元素为JID的过滤规则. 示例中的过滤规则表示用户JID的User部分为”test”.

  • 添加access规则, 如:
1
{access, c2s, [{deny, blocked}, {allow, all}]}.

access元组第2个元素为access规则的名称, 第3个元素中的每个元素为一个指定值和一个acl规则名字. 当JID满足某条acl规则时, 该条access规则的值则为该acl规则的对应值. 示例中, 当用户JID中的User部分为”test”时, access规则c2s值为deny, 否则为allow.

  • 为模块或组件指定access规则(不同的模块或组件所用的配置指令可能不同) 如:
1
2
3
4
5
{5222, ejabberd_c2s, [
                        {access, c2s},
                        {shaper, c2s_shaper},
                        {max_stanza_size, 65536}
                       ]}.

按示例配置后, ejabberd_c2s会根据access规则c2s的值禁用相应用户, 如JID中User为”test”的用户全部被禁用.

PowerDNS中PacketCache实现

PowerDNS中DNS解析由各类Backend模块处理。如果解析相关的数据存储于MySQL, Postgres等数据库,Backend需要在这些数据库中查询相应的记录是否存在。查询数据库的性能很低。因而PowerDNS中实现了PacketCache来提高性能。PowerDNS接收到请求后,先在PacketCache中查询是否已经有相应的DNS响应。如果有则直接返回该缓存。否则交由backend处理,处理后再添加到PacketCache中。

PacketCache实现主要位于packetcache.hh和packetcache.cc中。

Flashcache简介及模块初始化

flashcache是Facebook开源的用SSD来缓存机械磁盘数据的Linux内核模块。

简单来说,Linux的I/O栈层次结构可以表示为:

token data
1
2
3
4
5
6
7
+----------------------------+
| VFS(Virtual File System)   |
+----------------------------+
| Block I/O Layer            |
+----------------------------+
| Devices                    |
+----------------------------+

文件系统I/O操作传递给块设备,抽象块设备最终传递给真实的设备驱动,完成I/O操作。Linux在块设备层实现了Device Mapper(DM)的映射机制。DM机制可以将一个或多个块设备再映射成一个虚拟块设备。具体的映射规则由mapping table来实现。到达虚拟块设备的I/O请求,DM机制会根据映射表找到正确的目标设备,将请求放到目标设备的请求队列中,目标设备根据目标类型进行处理。结构图如下:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
      +---------------+
      | Mapped Device |
      +-------+-------+
              |
              v
      +---------------+
      | mapping table |
      +--+---------+--+
         |         |
         |         |
         v         v
 +---------------+  +-----------------+
 |Physical Device|  | Physical Device |
 +---------------+  +-----------------+

DM机制的映射目标类型可以通过内核模块进行扩展,flashcache就是通过定义一种名称为”flashcache”的映射目标类型来实现SSD做为机械磁盘的缓存。结构示意图如下:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+---------------+
| Mapped Device |
+-------+-------+
        |
        v
+---------------+
| mapper table  |
+-------+-------+
        |
        | flashcache
        |
    +---+---+
    |       |
    v       v
+-----+  +------+
| SSD |  | DISK |
+-----+  +------+

flashcache做为缓存层,提供了三种写入模式:

  • writeback
  • writethrough
  • writearound

三种模式的区别如下图所示:

token data
1
2
3
4
5
6
7
 writethrough  writearound  writeback
      |            |           |
 -----|------------|-----------|------------
      v            |           v       SSD
 -----|------------|------------------------
      v            v                   DISK
 -------------------------------------------
  • writethrough:进行写操作时,会同时写入SSD和磁盘
  • writearound: 进行写操作时,只对磁盘进行写操作
  • writeback: 进行写操作时,只写入SSD,flashcache异步地将内容写入磁盘

下面分析flashcache模块的初始化。

flashcache内核模块的入口函数为flashcache_init。

1
module_init(flashcache_init);

flashcache_init首先会对job相关的变量及结构进行初始化:

1
2
3
4
5
r = flashcache_jobs_init();
if (r)
    return r;
atomic_set(&nr_cache_jobs, 0);
atomic_set(&nr_pending_jobs, 0);

flashcache_jobs_init会分配job操作所需的内存池,简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int
flashcache_jobs_init(void)
{
    _job_cache = kmem_cache_create("kcached-jobs",
                                   sizeof(struct kcached_job),
                                   __alignof__(struct kcached_job),
                                   0, NULL);
    ...

    _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
                               mempool_free_slab, _job_cache);

    _pending_job_cache = kmem_cache_create("pending-jobs",
                           sizeof(struct pending_job),
                           __alignof__(struct pending_job),
                           0, NULL);

    ...

    _pending_job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
                       mempool_free_slab, _pending_job_cache);
    ...

    return 0;
}

然后,flashcache_init调用dm_io_client_create创建一个dm_io_client结构,它为块设备I/O请求执行过程提供内存池。

1
flashcache_io_client = dm_io_client_create();

接着,flashcache_init调用dm_kcopyd_client_create创建了一个dm_kcopyd_client结构。kcopyd是一个内核进程,它用于异步地copy一个块设备的区域到一个或多个其他的块设备。dm_kcopyd_client用于向kcopyd提交任务。

1
flashcache_kcp_client = dm_kcopyd_client_create(NULL);

然后,初始化了一个内核工作队列,执行do_work去处理各种job, 本文略过。

1
INIT_WORK(&_kcached_wq, do_work);

接着,注册flashcache的DM机制目标类型, 目标类型中具体回调本文略过。

1
r = dm_register_target(&flashcache_target);

再来看flashcache_init的最后部分:

1
2
3
4
5
6
flashcache_module_procfs_init();
flashcache_control = (struct flashcache_control_s *)
    kmalloc(sizeof(struct flashcache_control_s), GFP_KERNEL);
flashcache_control->synch_flags = 0;
register_reboot_notifier(&flashcache_notifier);
return 0;

首先,调用flashcache_module_procfs_init创建”/proc/flashcache”目录及文件”/proc/flashcache/flashcache_version”。 flachcache_module_procfs_init简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void
flashcache_module_procfs_init(void)
{
#ifdef CONFIG_PROC_FS
    struct proc_dir_entry *entry;

    if (proc_mkdir("flashcache", NULL)) {
        entry = create_proc_entry("flashcache/flashcache_version", 0, NULL);
        if (entry)
            entry->proc_fops =  &flashcache_version_operations;
    }
#endif /* CONFIG_PROC_FS */
}

最后,通过register_reboot_notifier注册函数flashcache_notify_reboot。这函数会在机器重启或关闭时被调用。

1
2
3
4
5
static struct notifier_block flashcache_notifier = {
    .notifier_call  = flashcache_notify_reboot,
    .next       = NULL,
    .priority   = INT_MAX, /* should be > ssd pri's and disk dev pri's */
};

至此,flashcache内核模块初始化完成,可以使用dmsetup指定flashcache目标类型创建虚拟块设备了。

Ejabberd配置模块分析

ejabberd_config模块负责加载ejabberd配置文件,存储相应的配置选项,并提供添加和获取配置选项的API。

比如, ejabberd_app:start_modules函数会使用ejabber_config:get_local_option获取配置文件中的modules选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
%% Start all the modules in all the hosts
start_modules() ->
    lists:foreach(
      fun(Host) ->
          case ejabberd_config:get_local_option({modules, Host}) of
          undefined ->
              ok;
          Modules ->
              lists:foreach(
            fun({Module, Args}) ->
                gen_mod:start_module(Host, Module, Args)
            end, Modules)
          end
      end, ?MYHOSTS).

下面分析ejbberd_config模块实现。

ejabberd启动时,ejabberd_app:start/0会调用ejabberd_config:start/0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
start() ->
    mnesia:create_table(config,
            [{disc_copies, [node()]},
             {attributes, record_info(fields, config)}]),
    mnesia:add_table_copy(config, node(), ram_copies),
    mnesia:create_table(local_config,
            [{disc_copies, [node()]},
             {local_content, true},
             {attributes, record_info(fields, local_config)}]),
    mnesia:add_table_copy(local_config, node(), ram_copies),
    Config = get_ejabberd_config_path(),
    load_file(Config),
    %% This start time is used by mod_last:
    add_local_option(node_start, now()),
    ok.

start函数首先创建config和local_config两个mnesia表,接着调用get_ejabberd_config_path获取配置文件路径。

1
2
3
4
5
6
7
8
9
10
11
get_ejabberd_config_path() ->
    case application:get_env(config) of
    {ok, Path} -> Path;
    undefined ->
        case os:getenv("EJABBERD_CONFIG_PATH") of
        false ->
            ?CONFIG_PATH;
        Path ->
            Path
        end
    end.

get_ejabberd_config_path首先使用application:get_env从ejabberd.app的env或erlang命令行中的config选项中获取值: 如:

1
{env, [{config, "/etc/ejabberd/ejabberd.cfg"]}

或者:

1
erl -config "/path/to/ejabberd.cfg"

如果没有设置这两个选项,则尝试从系统环境变量”EJABBERD_CONFIG_PATH”读取文件路径。ejabberdctl会设置该环境变量。若也没有设置该环境变量,get_ejabberd_config_path则返回?CONFIG_PATH, 这个宏被定义为”ejabberd.cfg”。

1
-define(CONFIG_PATH, "ejabberd.cfg").

接下来, start函数调用load_file来加载配置文件:

1
2
3
4
5
6
load_file(File) ->
    Terms = get_plain_terms_file(File),
    State = lists:foldl(fun search_hosts/2, #state{}, Terms),
    Terms_macros = replace_macros(Terms),
    Res = lists:foldl(fun process_term/2, State, Terms_macros),
    set_opts(Res).

load_file首先调用get_plain_terms_file来获取所有配置选项的列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
get_plain_terms_file(File1) ->
    File = get_absolute_path(File1),
    case file:consult(File) of
    {ok, Terms} ->
        include_config_files(Terms);
    {error, {LineNumber, erl_parse, _ParseMessage} = Reason} ->
        ExitText = describe_config_problem(File, Reason, LineNumber),
        ?ERROR_MSG(ExitText, []),
        exit_or_halt(ExitText);
    {error, Reason} ->
        ExitText = describe_config_problem(File, Reason),
        ?ERROR_MSG(ExitText, []),
        exit_or_halt(ExitText)
    end.

我们来看get_plain_terms_file实现。首先,调用get_absolute_path,期望得到配置文件的绝对路径。不过,get_absolute_path实现上存在BUG:

1
2
3
4
5
6
7
8
9
get_absolute_path(File) ->
    case filename:pathtype(File) of
    absolute ->
        File;
    relative ->
        Config_path = get_ejabberd_config_path(),
        Config_dir = filename:dirname(Config_path),
        filename:absname_join(Config_dir, File)
    end.

当File为相对路径时,使用filename:absname_join不能得到绝对路径。我提了一个PATCH,使用当前目录来转成绝对路径。官方已接受。 PATCH地址: https://github.com/processone/ejabberd/commit/62ccf1cf0e13954ee5207bc6288afbc669247d14

接着,get_plain_terms_file调用file:consult读取配置文件中的所有Erlang Terms到列表中,再调用include_config_files函数来处理include_config_file选项。

1
2
3
4
5
6
7
include_config_files([{include_config_file, Filename, Options} | Terms], Res) ->
    Included_terms = get_plain_terms_file(Filename),
    Disallow = proplists:get_value(disallow, Options, []),
    Included_terms2 = delete_disallowed(Disallow, Included_terms),
    Allow_only = proplists:get_value(allow_only, Options, all),
    Included_terms3 = keep_only_allowed(Allow_only, Included_terms2),
    include_config_files(Terms, Res ++ Included_terms3);

include_config_file选项格式为:

1
{include_config_file, [{disallow, foo}, {allow_only, bar}], "/path/to/included_config"}.

include_config_files递归调用get_plain_terms_file获取被引用的配置文件中所有配置,接着检查include_config_file选项中是否有disallow选项。如果有,调用delete_disallowed将disallow指定的配置选项从被引用文件的配置列表中删除。接着检查其中是否存在allow_only选项,如果有,则调用keep_only_allowed只保留下allow_only中指定的配置,将其和外部配置合并,再递归调用include_config_files/2处理剩余的选项,最终返回所有配置文件中所有选项列表。

1
2
include_config_files([], Res) ->
    Res;

load_file接着遍历配置列表调用search_host, 最终调用add_option来添加hosts选项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
add_option(Opt, Val, State) ->
    Table = case Opt of
        hosts ->
            config;
        language ->
            config;
        _ ->
            local_config
        end,
    case Table of
    config ->
        State#state{opts = [#config{key = Opt, value = Val} |
                State#state.opts]};
    local_config ->
        case Opt of
        {{add, OptName}, Host} ->
            State#state{opts = compact({OptName, Host}, Val,
                           State#state.opts, [])};
        _ ->
            State#state{opts = [#local_config{key = Opt, value = Val} |
                    State#state.opts]}
        end
    end.

add_option将指定选项以Key/Value形式添加进状态结构的opts域中。其中,hosts和language使用记录config, 其他选项使用local_config。这与最初创建的MNESIA表相对应。 状态结构如下:

1
2
3
4
5
-record(state, {opts = [],
        hosts = [],
        override_local = false,
        override_global = false,
        override_acls = false}).

接下来,load_file调用replace_macros来替换配置中的宏为相应的值。我们来看replace_macros实现。

1
2
3
replace_macros(Terms) ->
    {TermsOthers, Macros} = split_terms_macros(Terms),
    replace(TermsOthers, Macros).

首先, 调用split_terms_macros将宏选项和其他选项分开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
split_terms_macros(Terms) ->
    lists:foldl(
      fun(Term, {TOs, Ms}) ->
          case Term of
          {define_macro, Key, Value} ->
              case is_atom(Key) and is_all_uppercase(Key) of
              true ->
                  {TOs, Ms++[{Key, Value}]};
              false ->
                  exit({macro_not_properly_defined, Term})
              end;
          Term ->
              {TOs ++ [Term], Ms}
          end
      end,
      {[], []},
      Terms).

宏定义选项格式为:

1
{define_macro, 'KEY', bar}.

其中key必须为atom类型且必须全部为大写字母,得到的宏选项列表为{key, value}格式的列表。 接着, replace_macros调用replace/2。

1
2
3
4
replace([], _) ->
    [];
replace([Term|Terms], Macros) ->
    [replace_term(Term, Macros) | replace(Terms, Macros)].

replace通过递归对每个选项调用replace_term/2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
replace_term(Key, Macros) when is_atom(Key) ->
    case is_all_uppercase(Key) of
    true ->
        case proplists:get_value(Key, Macros) of
        undefined -> exit({undefined_macro, Key});
        Value -> Value
        end;
    false ->
        Key
    end;
replace_term({use_macro, Key, Value}, Macros) ->
    proplists:get_value(Key, Macros, Value);
replace_term(Term, Macros) when is_list(Term) ->
    replace(Term, Macros);
replace_term(Term, Macros) when is_tuple(Term) ->
    List = tuple_to_list(Term),
    List2 = replace(List, Macros),
    list_to_tuple(List2);
replace_term(Term, _) ->
    Term.

replace_term遍历选项中的所有子项,如果在宏列表中查找到相应的值,则替换该子项为找到的值。 另外,如果配置中某子项指定了{use_macro, Key, Value}这种格式的配置,在替换时优先从宏列表中查找相应的值,找不到再使用use_macro指定的Value来替换。

至此,获得了所有配置选项的列表,接下来对每个选项调用process_term。

选项存储主要分为3种类型, process_term分别进行不同处理:

  • 在状态结构中以独立域进行存储,如override_global选项:
1
2
override_global ->
    State#state{override_global = true};
  • 以选项名做为key进行存储, 如max_fsm_queue选项:
1
2
{max_fsm_queue, N} ->
    add_option(max_fsm_queue, N, State);
  • 以选项名和Host一起做为key进行存储,如domain_certfile选项:
1
2
3
4
5
6
7
8
{domain_certfile, Domain, CertFile} ->
    case ejabberd_config:is_file_readable(CertFile) of
    true -> add_option({domain_certfile, Domain}, CertFile, State);
    false ->
        ErrorText = "There is a problem in the configuration: "
        "the specified file is not readable: ",
        throw({error, ErrorText ++ CertFile})
    end;

其中由host_config指定的绝大多数配置选项都以这种方式存储:

1
2
3
{host_config, Host, Terms} ->
    lists:foldl(fun(T, S) -> process_host_term(T, Host, S) end,
        State, Terms);

process_terms对于没有明确列出的选项,给配置的每个HOST都调用process_host_term添加了一个以选项名和HOST一起做为Key的配置选项。

1
2
3
{_Opt, _Val} ->
    lists:foldl(fun(Host, S) -> process_host_term(Term, Host, S) end,
        State, State#state.hosts)

这样,如果我们自定义配置选项dummy_config:

1
{dummy_config, [foo, bar]}.

查询时应使用如下提供Host参数的语句:

1
ejabberd_config:get_local_option({dummy_config, Host})

此外,acl, accessshaper三个选项比较特殊。 acl存储acl记录结构在状态结构的opts域中, 在set_opts中这些记录会被写入acl表中。 accessshaper选项的KEY中除了选项名,HOST,还包括了规则名称。

至此,load_file将所有选项保存到了状态结构的opts域中,最后调用set_opts进行存储:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
set_opts(State) ->
    Opts = lists:reverse(State#state.opts),
    F = fun() ->
        if
            State#state.override_global ->
            Ksg = mnesia:all_keys(config),
            lists:foreach(fun(K) ->
                          mnesia:delete({config, K})
                      end, Ksg);
            true ->
            ok
        end,
        if
            State#state.override_local ->
            Ksl = mnesia:all_keys(local_config),
            lists:foreach(fun(K) ->
                          mnesia:delete({local_config, K})
                      end, Ksl);
            true ->
            ok
        end,
        if
            State#state.override_acls ->
            Ksa = mnesia:all_keys(acl),
            lists:foreach(fun(K) ->
                          mnesia:delete({acl, K})
                      end, Ksa);
            true ->
            ok
        end,
        lists:foreach(fun(R) ->
                      mnesia:write(R)
                  end, Opts)
    end,
    case mnesia:transaction(F) of
    ...
    end.

如果配置了override_global, override_local, override_acls选项,set_opts首先会分别删除表config, local_config和acl中的所有内容。接着分别将状态结构opts域中的配置写入config, local_config和acl三个表中。

配置加载过程结束。

ejabberd_config模块提供了添加和查询选项的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
add_global_option(Opt, Val) ->
    mnesia:transaction(fun() ->
                   mnesia:write(#config{key = Opt,
                            value = Val})
               end).

add_local_option(Opt, Val) ->
    mnesia:transaction(fun() ->
                   mnesia:write(#local_config{key = Opt,
                              value = Val})
               end).

get_global_option(Opt) ->
    case ets:lookup(config, Opt) of
    [#config{value = Val}] ->
        Val;
    _ ->
        undefined
    end.

get_local_option(Opt) ->
    case ets:lookup(local_config, Opt) of
    [#local_config{value = Val}] ->
        Val;
    _ ->
        undefined
    end.

add_global_option和add_local_option分别向config和local_config表中添加选项记录。get_global_option和get_local_option直接使用ets:lookup查找相应配置。这是由于mnesia底层由ETS实现,直接使用ets:lookup性能会更高。不过,我个人不太欣赏这种写法。

Ejabberd中hook机制分析

ejabberd中的hook机制是ejabberd XMPP模块的基础。XMPP模块需要根据需求在相应的hook点上注册自己的处理函数,在处理函数的逻辑中实现需求。ejabberd执行到hook点时,会按注册的顺序号由小到大来执行各模块所注册的处理函数。

下面来分析具体实现。

ejabberd启动时,ejabberd_sup:init/1会通过调用ejabberd_hooks:start_link/0启动名称为ejabberd_hooks的worker进程。

1
2
3
4
5
6
7
8
9
10
11
12
init([]) ->
    Hooks =
    {ejabberd_hooks,
     {ejabberd_hooks, start_link, []},
     permanent,
     brutal_kill,
     worker,
     [ejabberd_hooks]},
    ...
    {ok, {{one_for_one, 10, 1},
      [Hooks,
       ...]}}.

ejabberd_hooks进程初始化时执行init/1函数创建了名为hooks的ETS表。这个表用来存储在各注册点和域名下注册的hook函数。

1
2
3
init([]) ->
    ets:new(hooks, [named_table]),
    {ok, #state{}}.

模块一般使用ejabberd_hooks:add/5注册hook函数。

1
2
add(Hook, Host, Module, Function, Seq) ->
    gen_server:call(ejabberd_hooks, {add, Hook, Host, Module, Function, Seq}).

参数:

  • Hook: 注册的hook点位置
  • Host: 注册的域名
  • Module: hook函数所在模块
  • Function: hook函数名
  • Seq: hook函数顺序号,顺序号越小函数越早被执行

add函数发送add消息给ejabberd_hooks进程。ejabberd_hooks进程调用handle_call处理消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) ->
    Reply = case ets:lookup(hooks, {Hook, Host}) of
        [{_, Ls}] ->
            El = {Seq, Module, Function},
            case lists:member(El, Ls) of
            true ->
                ok;
            false ->
                NewLs = lists:merge(Ls, [El]),
                ets:insert(hooks, {{Hook, Host}, NewLs}),
                ok
            end;
        [] ->
            NewLs = [{Seq, Module, Function}],
            ets:insert(hooks, {{Hook, Host}, NewLs}),
            ok
        end,
    {reply, Reply, State};

handle_call首先从hooks表中查找该hook点和域名下是否已经注册了函数。若不存在,则将顺序号、模块、函数名添加到表中。若已存在,再检查是否为重复添加。如果不是,则将顺序号、模块、函数名和之前的函数信息按顺序号排序后一并添加。

如果hook函数必须在集群内特定节点上执行,可以调用ejabberd_hooks:add_dist注册。它的处理逻辑与add函数类似,只是在hooks表中多存储了node信息,此处略过。

当需要删除hook函数时(一般是模块停止时),调用ejabberd_hooks:delete/5。

1
2
delete(Hook, Host, Module, Function, Seq) ->
    gen_server:call(ejabberd_hooks, {delete, Hook, Host, Module, Function, Seq}).

delete函数发送delete消息给ejabberd_hooks进程。进程执行handle_call处理。

1
2
3
4
5
6
7
8
9
10
handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) ->
    Reply = case ets:lookup(hooks, {Hook, Host}) of
        [{_, Ls}] ->
            NewLs = lists:delete({Seq, Module, Function}, Ls),
            ets:insert(hooks, {{Hook, Host}, NewLs}),
            ok;
        [] ->
            ok
        end,
    {reply, Reply, State};

handle_call从hooks表中获取注册在该hook点和域名上的所有函数,从中删除指定的函数,再将结果保存。 删除注册在特定节点上的函数要使用delete_dist,处理逻辑类似,略过。

ejabberd执行到hook点时会调用ejabberd_hooks:run/3或ejabberd_hooks:run_fold/4来执行注册的HOOK函数。如果这个hook点不关心各hook函数的返回结果,则调用run函数,否则调用run_fold函数。 首先看run函数:

1
2
3
4
5
6
7
run(Hook, Host, Args) ->
    case ets:lookup(hooks, {Hook, Host}) of
    [{_, Ls}] ->
        run1(Ls, Hook, Args);
    [] ->
        ok
    end.

run函数从hooks表中查找注册在该hook点和域名上的所有函数,然后调用run1/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
run1([{_Seq, Module, Function} | Ls], Hook, Args) ->
    Res = if is_function(Function) ->
          catch apply(Function, Args);
         true ->
          catch apply(Module, Function, Args)
      end,
    case Res of
    {'EXIT', Reason} ->
        ?ERROR_MSG("~p~nrunning hook: ~p",
               [Reason, {Hook, Args}]),
        run1(Ls, Hook, Args);
    stop ->
        ok;
    _ ->
        run1(Ls, Hook, Args)
    end.

run1依次执行注册的hook函数,如果某个hook函数返回stop, 则run1结束返回,之后的hook函数不再被执行。

如果需要执行的是注册在某个节点上的hook函数,则通过rpc:call在该节点上执行函数,其他逻辑类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) ->
    case rpc:call(Node, Module, Function, Args, ?TIMEOUT_DISTRIBUTED_HOOK) of
    timeout ->
        ?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p",
               [Node, {Hook, Args}]),
        run1(Ls, Hook, Args);
    {badrpc, Reason} ->
        ?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p",
               [Node, Reason, {Hook, Args}]),
        run1(Ls, Hook, Args);
    stop ->
        ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
              "Stop.", [self(), node(), Node]), % debug code
        ok;
    Res ->
        ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
              "The response is:~n~s", [self(), node(), Node, Res]), % debug code
        run1(Ls, Hook, Args)
    end;

再来看run_fold函数。和run函数相比,run_fold还需要一个参数,表示默认的返回结果。

1
2
3
4
5
6
7
run_fold(Hook, Host, Val, Args) ->
    case ets:lookup(hooks, {Hook, Host}) of
    [{_, Ls}] ->
        run_fold1(Ls, Hook, Val, Args);
    [] ->
        Val
    end.

run_fold首先找到注册在该hook点和域名上的所有函数,如果没有,则返回默认结果。否则调用run_fold1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) ->
    Res = if is_function(Function) ->
          catch apply(Function, [Val | Args]);
         true ->
          catch apply(Module, Function, [Val | Args])
      end,
    case Res of
    {'EXIT', Reason} ->
        ?ERROR_MSG("~p~nrunning hook: ~p",
               [Reason, {Hook, Args}]),
        run_fold1(Ls, Hook, Val, Args);
    stop ->
        stopped;
    {stop, NewVal} ->
        NewVal;
    NewVal ->
        run_fold1(Ls, Hook, NewVal, Args)
    end.

run_fold1会将传入的结果Val(或者来自默认结果,或者来自前一hook函数的返回结果)和参数Args组成新的lists做为参数传给hook函数,依次递归调用。若某hook函数返回stop,结束递归调用,返回stopped。若hook函数返回{stop, NewVal},则直接返回该hook函数的结果NewVal。这两种情况下,其余的hook函数不再被执行。否则,返回结果做为Val参数再次递归调用run_fold1。

注册在特定节点上的函数处理逻辑类似,只是使用rpc:call在相应节点上执行,略过。

具体的hook点和hook函数原型可以参考官方文档:

https://www.process-one.net/en/wiki/ejabberd_events_and_hooks/

这个文档写地不是很详细。不确定的地方需要参考源码。

当这些内置hook点不能满足需求时,可以在ejabberd中合适位置调用ejabberd_hooks:run或ejabberd_hooks:run_fold添加hook点。

如:

1
ejabberd_hooks:run(dummy_hook, []),

另外,需要注意的有: ejabberd执行某些hook点时,调用不同参数版本的run或run_fold。这种情况Host参数为global。注册这种hook点时,Host参数也应该使用global。 如:

1
2
case ejabberd_hooks:run_fold(filter_packet,
             {OrigFrom, OrigTo, OrigPacket}, []) of
1
ejabberd_hooks:add(filter_packet, global, ?MODULE, on_filter_packet, 120),

注: 文中代码版本为:ejabberd-2.1.13。

Abstract Database和Skeleton Database

TokyoCabinet(TC)提供了6种不同结构的数据库,包括:

  • (MDB) on-memory hash database
  • (NDB) on-memory tree database
  • (HDB) hash database
  • (BDB) B+ tree database
  • (FDB) fixed-length database
  • (TDB) table database

每种数据库都有各自一套API来进行各种操作。

为了简化使用,TC还提供了一套通用的API来操作以上所有类型数据库,叫做Abstract Database API.

Abstract Database API通过数据库名称来区分各类型数据库:

  • “*” on-memory hash database
  • “+” on-memory tree database
  • “.tch” hash database
  • “.tcb” B+ tree database
  • “.tcf” fixed-length database
  • “.tct” table database

不仅如此,TC更进一步进行了抽象,在Abstract Database中还提供了一种Skeleton Database。 通过实现Skeleton Database指定的API,可以使用自定义的数据库类型。 Skeleton Database API结构体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
typedef struct {                        /* type of structure for a extra database skeleton */
  void *opq;                            /* opaque pointer */
  void (*del)(void *);                  /* destructor */
  bool (*open)(void *, const char *);
  bool (*close)(void *);
  bool (*put)(void *, const void *, int, const void *, int);
  bool (*putkeep)(void *, const void *, int, const void *, int);
  bool (*putcat)(void *, const void *, int, const void *, int);
  bool (*out)(void *, const void *, int);
  void *(*get)(void *, const void *, int, int *);
  int (*vsiz)(void *, const void *, int);
  bool (*iterinit)(void *);
  void *(*iternext)(void *, int *);
  TCLIST *(*fwmkeys)(void *, const void *, int, int);
  int (*addint)(void *, const void *, int, int);
  double (*adddouble)(void *, const void *, int, double);
  bool (*sync)(void *);
  bool (*optimize)(void *, const char *);
  bool (*vanish)(void *);
  bool (*copy)(void *, const char *);
  bool (*tranbegin)(void *);
  bool (*trancommit)(void *);
  bool (*tranabort)(void *);
  const char *(*path)(void *);
  uint64_t (*rnum)(void *);
  uint64_t (*size)(void *);
  TCLIST *(*misc)(void *, const char *, const TCLIST *);
  bool (*putproc)(void *, const void *, int, const void *, int, TCPDPROC, void *);
  bool (*foreach)(void *, TCITER, void *);
} ADBSKEL;

各成员与其它类型API相应成员意义一致。在开发时,只需实现功能必需的相应函数,忽略其他成员。

使用示例:

1
2
3
4
5
6
7
8
9
10
ADBSKEL skel;
memset(0, &skel, sizeof(skel));
skel.opq = mydbnew();
skel.del = mydbdel;
skel.open = mydbopen;
skel.close = mydbclose;
...
TCADB *adb = tcadbnew();
tcadbsetskel(adb, &skel);
tcadbopen(adb, "foobarbaz");

为了解决多进程共享访问和远程访问TC数据库的不便与繁琐,TC作者开发了一个网络访问层,叫做TokyoTyrant(TT)。它使用TC的Abstract Database API来访问TC数据库。因而内置支持skeleton database扩展。 TT提供了-skel命令行选项来指定skeleton database,启动时它会加载传入的Shared Object(SO)文件,使用SO中定制的数据库实现。

1
ttserver -skel ttskelfoo.so

我们可以根据需求实现特定的SO文件,就可以完整利用TT本身已经实现的各种特性,如主备同步,memcache协议支持,HTTP协议支持等。在性能满足需求的情况,这将大大减少开发量。SO文件必须导出一个名字为initialize的函数,TT启动时会从SO文件中查找该函数来初始化skeleton database。 该函数原型为:

1
bool (*initfunc)(ADBSKEL *);

该函数传入一个指向skeleton database的指针。initialize函数中需要将skeleton database定制的数据库操作的API实现赋值到相应函数指针。 由于initialize函数没有参数传递TT本身相关信息,如命令行选项,配置结构等,而TT将一些信息存储在全局变量g_serv指向的TTSERV结构体中,因而SO中可以声明g_serv外部变量来引用。

1
extern TTSERV  *g_serv;

不过较为遗憾的是TTSERV中的信息较少,如有需要的话可以自行扩展。如果SO中逻辑需要依赖命令行选项,可以通过使用启动TT时传入的数据库名来做不同处理。skeleton database的open函数会传入该参数。

具体例子可以参考: https://github.com/flygoast/ttskeliplist

Opentracker扩展

opentracker是一个开源P2P tracker服务器.之前我们系统中主要使用的是PHP+MYSQL实现的peertracker。随着业务增长,peertracker的性能已经不能满足系统需要。因而我们决定引入性能更好的opentracker。不过, opentracker在功能上并不能完全满足我们的需求,因而我对它进行了一些扩展。

  • UDP单播同步数据

opentracker本身支持cluster模式。cluster内各节点之间会同步数据。这样可以通过添加节点提高整体的集群性能。然而,opentracker原生通过UDP多播进行数据同步。我们不具备多播IP,因而开发了单播模式进行同步。实现非常简单,就是依次向cluster内其他节点发送数据。缺点是当节点数较多时,会影响性能。

  • 持久化支持

opentracker将torrent和peer信息保存在内存中。当opentracker重启时,所有的torrent和peer信息就都丢失了。这会导致我们的系统一段时间内不能进行正常的P2P传输。因而我扩展了持久化功能。opentracker架构上通过多个不同的线程执行不同的任务。我添加了一个线程,周期性地将内存中的torrent和peer信息保存到磁盘文件中。这个线程很像Redis中进行数据dump的进程。磁盘文件格式定义为ODB(opentracker database),它主要借鉴自Redis的RDB格式。目前,没有处理IPV6格式,因而只支持IPV4。 我还提供了一个工具支持流式地对odb文件进行处理。

具体用法请参考: perldoc OdbParser

格式规范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
----------------------------------- # ODB is a binary format. There are no new lines or spaces in the file.
4f 50 45 4e 54 52 41 43 4b 45 52    # Magic String "OPENTRACKER"
30 30 30 33                         # ODB Version Number in ASCII characters. In this case, version = "0001" = 1
-----------------------------------
FE                                  # FE = Opcode that indicates following is a torrent information.
----------------------------------- # Torrent information starts from here.
00 02 2e 33 01 c4 a1 df bb 82
1f 51 0f b5 b6 02 6f 93 6e 9f       # 20 byte info_hash of torrent
-----------------------------------
e2 37 59 01 00 00 00 00             # Last access time in minutes since UNIX epoch, 8 bytes.
                                    # At present, when loading ODB file, just use current clock, ignore this.
-----------------------------------
00 00 00 00 00 00 00 00             # Seeding peer count for current torrent. 8 bytes long integer in little endian.
                                    # NOT used at present.
-----------------------------------
00 00 00 00 00 00 00 00             # Total peer count for current torrent. 8 bytes long integer in little endian.
                                    # NOT used at present.
-----------------------------------
00 00 00 00 00 00 00 00             # Download times of files in current torrent. 8 bytes long integer in little endian.
                                    # NOT used at present.
-----------------------------------
01 00 00 00                         # Peer count in current peer, 4 bytes integer in little endian.
----------------------------------- # Peers information starts from here.
7f 00 00 01                         # 4 bytes ip address in network byte order. In this case, 0x7f000001 = "127.0.0.1"
1b 31                               # 2 bytes port in network byte order. In this case, 0x1b31 = 6961.
80                                  # Flag of peers. SEEDING = 0x80, COMPLETED = 0x40, STOPPED = 0x20, LEECHING = 0x00
00                                  # Reserved. Just set zero.
-----------------------------------
...                                 # Other peers information.
-----------------------------------
FE
-----------------------------------
...                                 # Other torrent information.
-----------------------------------
FF                                  # EOF opcode.
  • HTTP debug接口

由于opentracker响应内容是按BENCODE编码过的,调试时不太方便。因而扩展了一个返回human-readable内容的调试接口。

具体参看: https://github.com/flygoast/opentracker

Passenger导致NGINX Master进程阻塞定位及解决

我们线上服务使用nginx+passenger-3.0.11来运行Rails程序。我们发现当执行若干次nginx -s reload或者kill -HUP cat /usr/local/nginx/logs/nginx.pid之后,再执行以上命令不再生效。

用pstack观察master进程,master进程阻塞在read()操作上。 使用gcore生成core文件,然后用gdb查看,查看栈帧所在文件及行号。 查看passenger源码。readExact()逻辑为一直读到size大小返回。readArrayMessage()代码逻辑为先从传入fd中读取两个字节做为长度,再从fd中读取相应长度的内容。Passenger::AgentsStarter::start函数的过程为创建一个socketpair,然后fork()子进程,子进程执行PassengerWatchdog。父子进程通过该socketpair进行通信。master就是卡在读取socketpair上。

从gdb中看到readExact()传入的size为12848,即从socketpair中读到的长度为12848,而实际读取的内容长度为621,并且读取的内容也很诡异。应该是某个地方向socketpair中写入了错误内容。 继续查看passenger代码中子进程的逻辑。关闭socketpair一端。然后将socketpair另一端通过dup2()调用复制到3上。然后关闭除了0-3以外的所有文件描述符。接着解除信号阻塞,重置信号处理函数。

通过strace追踪master进程及其子进程行为,发现子进程确实发送了错误内容。”20”的十六进制表示为0x3230,按网络字节序读出正好是12848。从调用位置看是在SIGCHLD的信号处理函数中。而从写入内容看像是nginx在写日志。 继续追查为什么会有SIGCHLD产生。最后发现getHighestFileDescriptor()函数中子进程会fork()出孙进程,获取当前打开的最大fd,子进程等待孙进程退出后返回。由于孙进程退出,子进程收到了SIGCHLD信号。但是由于子进程是由master进程fork()出来的,SIGCHLD信号是被阻塞的。当执行sigprocmask()时,被阻塞的SIGCHLD被处理了,而这时的信号处理函数是ngx_signal_handler(), 该函数会调用ngx_log_error()向error log写入日志。若error log的fd为3,就发生我们上述的情况。

为了避免这种情况,应该先重置信号处理函数,再解除信号阻塞。

patch:

https://github.com/flygoast/passenger/commit/0b62808943aba65432f0b492f4ef941499fad02c

PassengerHelperAgent开机启动异常分析

异常现象:

在/etc/rc.local中添加/usr/local/nginx/sbin/nginx来开机自动启动NGINX时,PassengerHelperAgent进程不停反复重启,而从shell上手动启动NGINX时一切正常。

追查过程:

查阅异常时的error.log日志发现以下错误:

1
2
3
4
5
[ pid=3413 thr=140583025772288 file=ext/nginx/HelperAgent.cpp:963 time=2014-09-30 11:05:20.925 ]: Uncaught exception in PassengerServer client thread:
   exception: Cannot accept new connection: Too many open files (24)
   backtrace:
     in 'Passenger::FileDescriptor Client::acceptConnection()' (HelperAgent.cpp:429)
     in 'void Client::threadMain()' (HelperAgent.cpp:952)

根据日志错误信息,可以确定是PassengerHelperAgent进程文件描述符达到了上限。

找到ext/nginx/HelperAgent.cpp文件的963行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    void threadMain() {
            TRACE_POINT();
            try {
                    while (true) {
                            UPDATE_TRACE_POINT();
                            inactivityTimer.start();
                            FileDescriptor fd(acceptConnection());
                            inactivityTimer.stop();
                            handleRequest(fd);
                    }
            } catch (const boost::thread_interrupted &) {
                    P_TRACE(2, "Client thread " << this << " interrupted.");
            } catch (const tracable_exception &e) {
                    P_ERROR("Uncaught exception in PassengerServer client thread:\n"
                            << "   exception: " << e.what() << "\n"
                            << "   backtrace:\n" << e.backtrace());
                    abort();
            }
    }

通过上下文可以确定是调用acceptConnection()出错,查看acceptConnection()代码,确定是由该函数抛出的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
    FileDescriptor acceptConnection() {
            TRACE_POINT();
            struct sockaddr_un addr;
            socklen_t addrlen = sizeof(addr);
            int fd = syscalls::accept(serverSocket,
                    (struct sockaddr *) &addr,
                    &addrlen);
            if (fd == -1) {
                    throw SystemException("Cannot accept new connection", errno);
            } else {
                    return FileDescriptor(fd);
            }
    }

syscalls::accept是对系统调用accept的简单封装。

1
2
3
4
5
6
7
8
syscalls::accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
        int ret;
        CHECK_INTERRUPTION(
                ret == -1,
                ret = ::accept(sockfd, addr, addrlen)
        );
        return ret;
}

错误原因就是accept由于进程文件描述符达到上限而出错返回了。

接下来追查为什么进程文件描述符数会达到上限。

首先看一下PassengerHelperAgent的整体代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int
main(int argc, char *argv[]) {
        TRACE_POINT();
        VariantMap options = initializeAgent(argc, argv, "PassengerHelperAgent");
        pid_t   webServerPid  = options.getPid("web_server_pid");
        string  tempDir       = options.get("temp_dir");
        bool    userSwitching = options.getBool("user_switching");
        string  defaultUser   = options.get("default_user");
        string  defaultGroup  = options.get("default_group");
        string  passengerRoot = options.get("passenger_root");
        string  rubyCommand   = options.get("ruby");
        unsigned int generationNumber   = options.getInt("generation_number");
        unsigned int maxPoolSize        = options.getInt("max_pool_size");
        unsigned int maxInstancesPerApp = options.getInt("max_instances_per_app");
        unsigned int poolIdleTime       = options.getInt("pool_idle_time");

        try {
                UPDATE_TRACE_POINT();
                Server server(FEEDBACK_FD, webServerPid, tempDir,
                        userSwitching, defaultUser, defaultGroup,
                        passengerRoot, rubyCommand, generationNumber,
                        maxPoolSize, maxInstancesPerApp, poolIdleTime,
                        options);
                P_DEBUG("Passenger helper agent started on PID " << getpid());

                UPDATE_TRACE_POINT();
                server.mainLoop();
        } catch (const tracable_exception &e) {
                P_ERROR(e.what() << "\n" << e.backtrace());
                return 1;
        } catch (const std::exception &e) {
                P_ERROR(e.what());
                return 1;
        }

        P_TRACE(2, "Helper agent exited.");
        return 0;
}

首先创建一个Server对象,然后调用Server对象的mainLoop成员函数。Server对象构造函数会调用成员函数startListening。

1
2
3
4
5
6
7
8
9
10
11
12
    void startListening() {
            this_thread::disable_syscall_interruption dsi;
            requestSocket = createUnixServer(getRequestSocketFilename().c_str());

            int ret;
            do {
                    ret = chmod(getRequestSocketFilename().c_str(), S_ISVTX |
                            S_IRUSR | S_IWUSR | S_IXUSR |
                            S_IRGRP | S_IWGRP | S_IXGRP |
                            S_IROTH | S_IWOTH | S_IXOTH);
            } while (ret == -1 && errno == EINTR);
    }

createUnixServer函数会创建一个socket文件,然后监听这个文件。NGINX收到请求后,会由Passenger模块转发请求到该socket文件。 mainLoop会调用成员函数startClientHandlerThreads,它会创建numberOfThreads个Client对象。

1
2
3
4
5
6
7
8
    void startClientHandlerThreads() {
            for (unsigned int i = 0; i < numberOfThreads; i++) {
                    ClientPtr client(new Client(i + 1, pool, requestSocketPassword,
                            defaultUser, defaultGroup, requestSocket,
                            analyticsLogger));
                    clients.insert(client);
            }
    }

Client对象构造函数会启动一个线程执行threadMain。threadMain就是我们上面出错的函数。每个线程等待接收通过socket文件发来的请求,接收请求后调用handleRequest进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    Client(unsigned int number, ApplicationPool::Ptr pool,
           const string &password, const string &defaultUser,
           const string &defaultGroup, int serverSocket,
           const AnalyticsLoggerPtr &logger)
            : inactivityTimer(false)
    {
            this->number = number;
            this->pool = pool;
            this->password = password;
            this->defaultUser = defaultUser;
            this->defaultGroup = defaultGroup;
            this->serverSocket = serverSocket;
            this->analyticsLogger = logger;

            sbmh_init(&statusFinder.ctx, NULL, NULL, 0);
            sbmh_init(&transferEncodingFinder.ctx, NULL, NULL, 0);

            thr = new oxt::thread(
                    boost::bind(&Client::threadMain, this),
                    "Client thread " + toString(number),
                    CLIENT_THREAD_STACK_SIZE
            );
    }

问题出在线程调用accept等待接收请求时。我们所创建的线程数量numberOfThreads是在Server对象被创建时指定的。

1
numberOfThreads     = maxPoolSize * 4;

而maxPoolSize由passenger_max_pool_size配置项指定,我们指定的是256。256 × 4 = 1024,开机启动时PassengerHelperAgent进程的文件描述符上限就是1024。这个数字值得怀疑。因而我将配置修改为128,果然正常了。

1
passenger_max_pool_size 256;

可以确定每个线程中占用了文件描述符。然而从代码中并没有找到打开文件相关的逻辑。当accept成功返回时,会返回一个新的文件描述符。开始怀疑accept在还没有接收到请求时就预先占用了一个文件描述符。通过一个简单程序来验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include <assert.h>

void *start_accept(void *arg) {
    int                 fd = (int) arg;
    int                 newfd;
    struct sockaddr_un  addr;
    socklen_t           addrlen;

    newfd = accept(fd, (struct sockaddr *)&addr, &addrlen);

    if (newfd < 0) {
        fprintf(stderr, "accept failed: %u: %s\n",
                pthread_self(), strerror(errno));
    }
}

int main() {
    int                 i, fd;
    struct sockaddr_un  addr;
    socklen_t           addrlen = sizeof(addr);
    pthread_t           pt;

    assert((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) > 0);

    for (i = 0; i < 1020; i++) {
        assert(open("emfile.c", O_RDONLY) > 0);
    }

    addr.sun_family = AF_LOCAL;
    strncpy(addr.sun_path, "emfile.socket", sizeof("emfile.socket") - 1);
    addr.sun_path[sizeof("emfile.socket") - 1] = '\0';

    assert(bind(fd, (const struct sockaddr *)&addr, sizeof(addr)) == 0);

    assert(listen(fd, 512) == 0);

    pthread_create(&pt, NULL, start_accept, (void *) fd);

    pthread_join(pt, NULL);

    exit(0);
}

使用ulimit将shell文件打开数上限修改为1024:

1
$ ulimit -n 1024

编译验证程序,并执行

1
2
$ gcc emfile.c -lpthread
$ ./a.out

得到结果:

1
accept failed: 591611648: Too many open files

确实如些,那再来看一下accept的实现。accept系统调用的内核实现是sys_accept,而sys_accept是对sys_accept4的简单封装。

1
2
3
4
5
SYSCALL_DEFINE3(accept, int, fd, struct sockaddr __user *, upeer_sockaddr,
        int __user *, upeer_addrlen)
{
    return sys_accept4(fd, upeer_sockaddr, upeer_addrlen, 0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,
        int __user *, upeer_addrlen, int, flags)
{
    ...

    sock = sockfd_lookup_light(fd, &err, &fput_needed);
    if (!sock)
        goto out;

    err = -ENFILE;
    if (!(newsock = sock_alloc()))
        goto out_put;

    newsock->type = sock->type;
    newsock->ops = sock->ops;

    ...

    newfd = sock_alloc_file(newsock, &newfile, flags);

    ...

    err = sock->ops->accept(sock, newsock, sock->file->f_flags);
    if (err < 0)
        goto out_fd;

    ...

    fd_install(newfd, newfile);
    err = newfd;

out_put:
    fput_light(sock->file, fput_needed);
out:
    return err;
out_fd:
    fput(newfile);
    put_unused_fd(newfd);
    goto out_put;
}

sys_accept4中在调用sock->ops->accept去接收网络请求前就调用sock_alloc_file来分配文件描述符。再来看sock_alloc_file这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
static int sock_alloc_file(struct socket *sock, struct file **f, int flags)
{
    struct qstr name = { .name = "" };
    struct path path;
    struct file *file;
    int fd;

    fd = get_unused_fd_flags(flags);
    if (unlikely(fd < 0))
        return fd;
    ...
}

sock_alloc_file会调用get_unused_fd_flags,这是一个宏,实际会调用函数alloc_fd, 而alloc_fd又会调用函数expand_files:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int expand_files(struct files_struct *files, int nr)
{
    struct fdtable *fdt;

    fdt = files_fdtable(files);

    /*
     * N.B. For clone tasks sharing a files structure, this test
     * will limit the total number of files that can be opened.
     */
    if (nr >= current->signal->rlim[RLIMIT_NOFILE].rlim_cur)
        return -EMFILE;

    /* Do we need to expand? */
    if (nr < fdt->max_fds)
        return 0;

    /* Can we expand? */
    if (nr >= sysctl_nr_open)
        return -EMFILE;

    /* All good, so we try */
    return expand_fdtable(files, nr);
}

可以看到expand_files进行文件描述符限制的检查,当超过限制时返回”EMFILE”。”EMFILE”错误的提示就是”Too many open files”。

1
#define EMFILE      24  /* Too many open files */

结合上面的测试程序,使用一个systemtap脚本可以捕获到上述调用路径。

1
2
3
4
5
6
7
8
9
10
11
12
probe  kernel.function("expand_files").return {
    if ($return == -24) {
        println("expand_files");
        printf("%d\n", $nr);
        print_backtrace();
        exit();
    }
}

probe begin {
    println("start\n");
}

执行stap:

1
$sudo stap emfile.stap

捕获结果为:

1
2
3
4
5
6
7
8
9
10
start

expand_files
1024
Returning from:  0xffffffff811931d0 : expand_files+0x0/0x220 [kernel]
Returning to  :  0xffffffff81193443 : alloc_fd+0x53/0x160 [kernel]
 0xffffffff814187f3 : sock_alloc_file+0x43/0x150 [kernel]
 0xffffffff8141b3dd : sys_accept4+0x11d/0x2b0 [kernel]
 0xffffffff8141b580 : sys_accept+0x10/0x20 [kernel]
 0xffffffff8100b0f2 : system_call_fastpath+0x16/0x1b [kernel]

最终确定异常原因:

开机自启时,进程的打开文件数限制为1024,而创建1024个线程执行accept()时。每个accept会占用一个文件描述符, 达到了进程的文件描述符上限而异常。而从shell启动时,我们shell进程的文件描述符限制是32768,因而不会出现问题。

解决方法: 创建一个启动脚本,在执行/usr/local/nginx/sbin/nginx前执行ulimit修改文件描述符限制。

1
2
ulimit -SHn 65535
/usr/local/nginx/sbin/nginx &

注意:

  • /etc/security/limits.conf中的设置只针对登录动作发生时才生效,因而对于开机自动启动进程这种情况,这种修改该文件的方式不生效。
  • Passenger版本为3.0.11
  • kernel版本为CentOS 6.2内核,kernel-2.6.32-220.4.2.el6