Keep learning, keep living...

0%

NGINX中使用ngx_http_output_filter()向一个请求发送响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ngx_int_t
ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
ngx_int_t rc;
ngx_connection_t *c;

c = r->connection;

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http output filter "%V?%V"", &r->uri, &r->args);

rc = ngx_http_top_body_filter(r, in);

if (rc == NGX_ERROR) {
/* NGX_ERROR may be returned by any filter */
c->error = 1;
}

return rc;
}

其中,r是请求结构体,in为以ngx_chain_t结构链接起来的需要发送的内容。ngx_http_output_filter()会调用ngx_http_top_body_filter()。NGINX采用filter机制对响应进行处理。filter分为header filterbody filter。NGINX分别将两种filter各构建成一个链表。全局变量ngx_http_top_header_filter指向header filter链表的头结点,而全局变量ngx_http_top_body_filter指向body filter链表的头结点。每个filter中用一个变量记录下一个filter,依据情况决定是调用下一个filter,还是直接返回。

body filter链表顺序如下图:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

+--------------------------+
|ngx_http_range_body_filter|
+----------+---------------+
|
v
+----------+---------+
|ngx_http_copy_filter|
+----------+---------+
|
v
+----------+-----------------+
|ngx_http_charset_body_filter|
+----------+-----------------+
|
v
+----------+-------------+
|ngx_http_ssi_body_filter|
+----------+-------------+
|
v
+----------+-------------+
|ngx_http_postpone_filter|
+----------+-------------+
|
v
+----------+--------------+
|ngx_http_gzip_body_filter|
+----------+--------------+
|
v
+----------+-----------------+
|ngx_http_chunked_body_filter|
+----------+-----------------+
|
v
+---------------------+
|ngx_http_write_filter|
+---------------------+

header filter链表顺序如图:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

+----------------------------+
|ngx_http_not_modified_filter|
+----------+-----------------+
|
v
+----------+------------+
|ngx_http_headers_filter|
+----------+------------+
|
v
+----------+-----------+
|ngx_http_userid_filter|
+----------+-----------+
|
v
+----------+-------------------+
|ngx_http_charset_header_filter|
+----------+-------------------+
|
v
+----------+---------------+
|ngx_http_ssi_header_filter|
+----------+---------------+
|
v
+----------+----------------+
|ngx_http_gzip_header_filter|
+----------+----------------+
|
v
+----------+-----------------+
|ngx_http_range_header_filter|
+----------+-----------------+
|
v
+----------+-------------------+
|ngx_http_chunked_header_filter|
+----------+-------------------+
|
v
+----------+-----------+
|ngx_http_header_filter|
+----------------------+

ngx_http_write_filter()是最后一个被调用的body filter,它进行真正的网络I/O操作,将响应发送给客户端。实际上, ngx_http_header_filter()也是调用ngx_http_write_filter()来发送响应中的headers。

ngx_http_write_filter()的简化流程如下:

  • 检查之前是否有错误发生(c->error被置位)。如果有错误发生,则没有必要再进行网络I/O操作,直接返回NGX_ERROR。
    1
    2
    3
    if (c->error) {
    return NGX_ERROR;
    }
  • 计算之前没有发送完成的内容大小并检查是否存在特殊标志。为了优化性能,当没有必要立即发送响应且响应内容大小没有达到设置的阀值时,NGINX可以暂时推迟发送该部分响应。参看:postpone_output指令。flush标志表示需要立即发送响应。recycled表示该buffer需要循环使用,因而需要立即发送以释放该buffer被重新使用。last标志表示该buffer是响应的最后一部分内容,因而也需要立即发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (cl = r->out; cl; cl = cl->next) {
ll = &cl->next;

......

size += ngx_buf_size(cl->buf);

if (cl->buf->flush || cl->buf->recycled) {
flush = 1;
}

if (cl->buf->last_buf) {
last = 1;
}
}
  • 计算本次将发送的内容大小,检查是否存在特殊标志,并将内容链接到r->out上。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    for (ln = in; ln; ln = ln->next) {
    cl = ngx_alloc_chain_link(r->pool);
    if (cl == NULL) {
    return NGX_ERROR;
    }

    cl->buf = ln->buf;
    *ll = cl;
    ll = &cl->next;

    ...

    size += ngx_buf_size(cl->buf);

    if (cl->buf->flush || cl->buf->recycled) {
    flush = 1;
    }

    if (cl->buf->last_buf) {
    last = 1;
    }
    }
  • 根据情况决定是需要真正进行网络I/O操作, 还是直接返回。
    1
    2
    3
    if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
    return NGX_OK;
    }
  • 真正进行网络I/O操作,发送内容。
    1
    chain = c->send_chain(c, r->out, limit);
  • 回收发送完成内容的buffer和chain结构, 将没有发送完成的内容存入r->out
    1
    2
    3
    4
    5
    6
    7
    for (cl = r->out; cl && cl != chain; /* void */) {
    ln = cl;
    cl = cl->next;
    ngx_free_chain(r->pool, ln);
    }

    r->out = chain;
  • 根据发送是否完成,返回NGX_OKNGX_AGAIN
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    if (chain) {
    c->buffered |= NGX_HTTP_WRITE_BUFFERED;
    return NGX_AGAIN;
    }

    c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;

    if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
    return NGX_AGAIN;
    }

    return NGX_OK;
    此外,ngx_http_write_filter()中也处理了限速发送的逻辑,本文不详述。

XMPP使用SASL进行登录验证。ejabberd中模仿CyrusSASL库自己实现了SASL协议,API用法与CyrusSASL类似。

ejabberd启动时,ejabberd_app:start/0会调用cyrsasl:start/0。它创建了一个ets表sasl_mechanism,然后调用各个SASL机制模块的start函数。

1
2
3
4
5
6
7
8
9
start() ->
ets:new(sasl_mechanism, [named_table,
public,
{keypos, #sasl_mechanism.mechanism}]),
cyrsasl_plain:start([]),
cyrsasl_digest:start([]),
cyrsasl_scram:start([]),
cyrsasl_anonymous:start([]),
ok.

各个SASL机制模块的start/1函数会调用cyrsasl:register_mechanism/3, 参数分别为SASL机制名称,机制处理模块,机制支持的密码存储类型。register_mechanism会将机制信息添加到sasl_mechanism表中。

1
2
3
start(_Opts) ->
cyrsasl:register_mechanism("PLAIN", ?MODULE, plain),
ok.
1
2
3
4
5
register_mechanism(Mechanism, Module, PasswordType) ->
ets:insert(sasl_mechanism,
#sasl_mechanism{mechanism = Mechanism,
module = Module,
password_type = PasswordType}).

客户端连接到ejabberd后,ejabberd会创建两个进程。一个ejabberd_c2s进程来处理连接状态并向客户端发送数据,另一个ejabberd_receiver进程接收客户端数据。ejabberd_c2s进程实现了gen_fsm行为,有限状态机的初始状态为wait_for_stream。ejabberd_c2s:init/1的简化代码逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
init([{SockMod, Socket}, Opts]) ->
...
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
true ->
?INFO_MSG("Connection attempt from blacklisted IP: ~s (~w)",
[jlib:ip_to_list(IP), IP]),
{stop, normal};
false ->
...
{ok, wait_for_stream, #state{socket = Socket1,
sockmod = SockMod,
socket_monitor = SocketMonitor,
xml_socket = XMLSocket,
zlib = Zlib,
tls = TLS,
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled,
tls_options = TLSOpts,
streamid = new_id(),
access = Access,
shaper = Shaper,
ip = IP},
?C2S_OPEN_TIMEOUT}
end.

ejabberd_receiver进程解析到XML流头之后会调用gen_fsm:send_event向ejabberd_c2s进程发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
process_data([Element|Els], #state{c2s_pid = C2SPid} = State)
when element(1, Element) == xmlelement;
element(1, Element) == xmlstreamstart;
element(1, Element) == xmlstreamelement;
element(1, Element) == xmlstreamend ->
if
C2SPid == undefined ->
State;
true ->
catch gen_fsm:send_event(C2SPid, element_wrapper(Element)),
process_data(Els, State)
end;

ejabberd_c2s进程接收到{xmlstreamstart, _Name, Attrs}消息后,调用状态函数wait_for_stream/2来处理。wait_for_stream在一系列正确性校验通过之后,回应给客户端XML流头。如果这个XML流之前还没有通过登录验证,则进行登录验证过程。简化的代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
case StateData#state.authenticated of
false ->
SASLState = cyrsasl:server_new(
"jabber", Server, "", [],
fun(U) ->
ejabberd_auth:get_password_with_authmodule(U, Server)
end,
fun(U, P) ->
ejabberd_auth:check_password_with_authmodule(U, Server, P)
end,
fun(U, P, D, DG) ->
ejabberd_auth:check_password_with_authmodule(U, Server, P, D, DG)
end),
Mechs = lists:map(
fun(S) ->
{xmlelement, "mechanism", [], [{xmlcdata, S}]}
end, cyrsasl:listmech(Server)),
...

send_element(StateData,
{xmlelement, "stream:features", [],
TLSFeature ++ CompressFeature ++
[{xmlelement, "mechanisms",
[{"xmlns", ?NS_SASL}],
Mechs}] ++
ejabberd_hooks:run_fold(
c2s_stream_features,
Server,
[], [Server])}),
fsm_next_state(wait_for_feature_request,
StateData#state{
server = Server,
sasl_state = SASLState,
lang = Lang});
_ ->
...
end

首先调用cyrsasl:server_new/7创建一个SASL验证状态, 其中存储了3个用于密码校验的回调函数。

1
2
3
4
5
6
7
8
server_new(Service, ServerFQDN, UserRealm, _SecFlags,
GetPassword, CheckPassword, CheckPasswordDigest) ->
#sasl_state{service = Service,
myname = ServerFQDN,
realm = UserRealm,
get_password = GetPassword,
check_password = CheckPassword,
check_password_digest= CheckPasswordDigest}.

wait_for_stream函数接着调用cyrsasl:listmech/1获取当前域名所支持的SASL验证机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
listmech(Host) ->
Mechs = ets:select(sasl_mechanism,
[{#sasl_mechanism{mechanism = '$1',
password_type = '$2',
_ = '_'},
case catch ejabberd_auth:store_type(Host) of
external ->
[{'==', '$2', plain}];
scram ->
[{'/=', '$2', digest}];
{'EXIT',{undef,[{Module,store_type,[]} | _]}} ->
?WARNING_MSG("~p doesn't implement the function store_type/0", [Module]),
[];
_Else ->
[]
end,
['$1']}]),
filter_anonymous(Host, Mechs).

listmech函数调用ejabberd_auth:store_type/1从ejabberd.cfg文件获取密码存储格式(auth_password_format)的配置。从sasl_mechanism表中查询出支持该密码存储格式的SASL机制。wait_for_stream函数将这些机制组织成XMPP协议格式发送回客户端,将当前进程状态改为wait_for_feature_request。比如,发送的机制列表为:

1
2
3
4
5
6
7
8
9
<stream:features>
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
<mechanism>DIGEST-MD5</mechanism>
<mechanism>SCRAM-SHA-1</mechanism>
</mechanisms>
<c xmlns="http://jabber.org/protocol/caps" node="http://www.process-one.net/en/ejabberd/" ver="yy7di5kE0syuCXOQTXNBTclpNTo=" hash="sha-1"/>
<register xmlns="http://jabber.org/features/iq-register"/>
</stream:features>

客户端从其中选择一个机制并发送给ejabberd服务器。如:

1
<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="PLAIN">AGFhYQAxMjM=</auth>

ejabberd_receiver进程解析完这个XML元素后,发送消息{xmlstreamelement, El}给ejabberd_c2s进程。ejabberd_c2s进程调用wait_for_feature_request函数进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Mech = xml:get_attr_s("mechanism", Attrs),
ClientIn = jlib:decode_base64(xml:get_cdata(Els)),
case cyrsasl:server_start(StateData#state.sasl_state,
Mech,
ClientIn) of
{ok, Props} ->
(StateData#state.sockmod):reset_stream(StateData#state.socket),
send_element(StateData, {xmlelement, "success", [{"xmlns", ?NS_SASL}], []}),
U = xml:get_attr_s(username, Props),
AuthModule = xml:get_attr_s(auth_module, Props),
...
fsm_next_state(wait_for_stream,
StateData#state{
streamid = new_id(),
authenticated = true,
auth_module = AuthModule,
user = U });
{continue, ServerOut, NewSASLState} ->
send_element(StateData,
{xmlelement, "challenge",
[{"xmlns", ?NS_SASL}],
[{xmlcdata,
jlib:encode_base64(ServerOut)}]}),
fsm_next_state(wait_for_sasl_response,
StateData#state{
sasl_state = NewSASLState});
{error, Error, Username} ->
IP = peerip(StateData#state.sockmod, StateData#state.socket),
...
send_element(StateData,
{xmlelement, "failure",
[{"xmlns", ?NS_SASL}],
[{xmlelement, Error, [], []}]}),
{next_state, wait_for_feature_request, StateData, ?C2S_OPEN_TIMEOUT};
{error, Error} ->
send_element(StateData,
{xmlelement, "failure",
[{"xmlns", ?NS_SASL}],
[{xmlelement, Error, [], []}]}),
fsm_next_state(wait_for_feature_request, StateData)
end;

ejabberd_c2s进程判断收到的XML元素是auth请求后,从请求中获取客户端选择的机制mechanism,读取客户端发送的信息,然后调用cyrsasl:server_start/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server_start(State, Mech, ClientIn) ->
case lists:member(Mech, listmech(State#sasl_state.myname)) of
true ->
case ets:lookup(sasl_mechanism, Mech) of
[#sasl_mechanism{module = Module}] ->
{ok, MechState} = Module:mech_new(
State#sasl_state.myname,
State#sasl_state.get_password,
State#sasl_state.check_password,
State#sasl_state.check_password_digest),
server_step(State#sasl_state{mech_mod = Module,
mech_state = MechState},
ClientIn);
_ ->
{error, "no-mechanism"}
end;
false ->
{error, "no-mechanism"}
end.

server_start从sasl_mechanism表中查询出机制模块,并调用机制模块的mech_new/4。这个函数会创建一个机制本身的状态结构。如,PLAIN机制模块的mech_new/4:

1
2
mech_new(_Host, _GetPassword, CheckPassword, _CheckPasswordDigest) ->
{ok, #state{check_password = CheckPassword}}.

server_start函数将这个机制状态保存到SASL状态结构里的mech_state字段,调用server_step。而server_step则调用机制模块的mech_step。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
server_step(State, ClientIn) ->
Module = State#sasl_state.mech_mod,
MechState = State#sasl_state.mech_state,
case Module:mech_step(MechState, ClientIn) of
{ok, Props} ->
case check_credentials(State, Props) of
ok ->
{ok, Props};
{error, Error} ->
{error, Error}
end;
{ok, Props, ServerOut} ->
case check_credentials(State, Props) of
ok ->
{ok, Props, ServerOut};
{error, Error} ->
{error, Error}
end;
{continue, ServerOut, NewMechState} ->
{continue, ServerOut,
State#sasl_state{mech_state = NewMechState}};
{error, Error, Username} ->
{error, Error, Username};
{error, Error} ->
{error, Error}
end.

Module:mech_step根据自身机制状态,返回不同的值。当验证通过时,返回ok信息。如若还需要其他信息继续验证,则返回continue信息。验证出错时,返回error信息。wait_for_feature_request函数根据不同的返回值,进行不同的处理。

  • 当返回ok信息时,ejabberd_c2s进程向客户端发送验证成功的消息,登录验证流程结束。
  • 当返回continue信息时,表示验证流程需要继续,因而向客户端返回服务端的chelange信息,ejabberd_c2s进程状态变为wait_for_sasl_response。当客户端再回应验证信息后,wait_for_sasl_response函数再次调用server_step进行验证处理。
  • 当返回error信息时,ejabberd_c2s进程向客户端发送验证失败的消息,登录验证流程结束。

PLAIN机制只需要用户名和密码,这些信息附在客户端选择机制的AUTH请求中。因而只需要调用一次mech_step函数,mech_step也因而不会返回continue。cyrsasl_plain:mech_step调用mech_new中传入的check_password回调函数(不同机制会调用不同的回调函数)来检查用户名和密码是否正确。这部分逻辑由ejabberd_auth模块封装不同的模块完成,本文略过。

1
2
3
4
5
6
7
8
9
10
11
12
13
mech_step(State, ClientIn) ->
case prepare(ClientIn) of
[AuthzId, User, Password] ->
case (State#state.check_password)(User, Password) of
{true, AuthModule} ->
{ok, [{username, User}, {authzid, AuthzId},
{auth_module, AuthModule}]};
_ ->
{error, "not-authorized", User}
end;
_ ->
{error, "bad-protocol"}
end.

SCRAM机制需要多次Chelange/Response交互,需要多次调用它的mech_step。因而它在机制状态内部来分步骤完成,具体可以参考cyrsasl_scram.erl,本文不详述。

当机制mech_step返回ok或error时,ejabberd_c2s进程返回给客户端相应的回应,登录验证的流程结束。

注: ejabberd代码版本为2.1.13。

身份验证是很多C/S模式应用协议的通用需求,为了避免每个协议都单独实现一套验证逻辑,SASL(Simple Authentication and Secure Layer)被提出了, 它定位成为基于可靠连接的应用协议提供身份验证和数据安全服务的通用框架。SASL定义了通用的身份验证信息交换的流程, 并且包含一系列验证机制。这些验证机制完成具体的身份验证逻辑。这样,SASL就成为了一个将应用协议和验证机制相连接的抽象层,如下图所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-------------------------------------------------------------

+----+ +----+ +----+ +-------------------+
|SMTP| |LDAP| |XMPP| |Other protocols ...|
+--+-+ +--+-+ +--+-+ +--+----------------+
| | | |
| | | |
------+--------+---------+--------+------------------
SASL abstraction layer
------+--------+---------+--------+------------------
| | | |
| | | |
+-----+--+ +--+---+ +--+--+ +--+-----------------+
|EXTERNAL| |GSSAPI| |PLAIN| |Other machanisms ...|
+--------+ +------+ +-----+ +--------------------+

-------------------------------------------------------------

任何应用协议都可以使用任何验证机制,而具体使用哪个机制则由应用协议的客户端和服务器进行协商。

分别以”C:”和”S:”代表客户端和服务端,SASL规定的验证信息交换的基本流程为:

1
2
3
4
5
C: 请求验证交换
S: 最初的挑战码
C: 最初的响应消息
<额外的挑战码/响应消息>
S: 身份验证结果

根据机制不同,流程略有差异。

具体应用哪个机制进行身份验证由使用SASL的应用协议来协商。服务器向客户端通告服务器所支持的机制, 客户端从中选择一个它支持且最合适的机制并通知服务器,请求开始身份验证。接下来便是上述的一系列Chalenges/Responses信息交换。这些信息载体的形式由应用协议指定。最终服务器发送回身份验证的结果。

SASL机制注册由IANA维护:
http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml

下面说明几个具体的SASL机制:

  • EXTERNAL:

EXTERNAL机制允许客户端请求服务器使用其他途径获取的验证信息来验证该客户端。如通过TLS获取的验证信息。以ACAP(Application Configuration Access Protocol)协议来举例:

1
2
3
4
5
6
7
8
9
S: * ACAP (SASL "DIGEST-MD5")
C: a001 STARTTLS
S: a001 OK "Begin TLS negotiation now"
<TLS negotiation, further commands are under TLS layer>
S: * ACAP (SASL "DIGEST-MD5" "EXTERNAL")
C: a002 AUTHENTICATE "EXTERNAL"
S: + ""
C: + ""
S: a002 OK "Authenticated"

在TLS安全层建立后,服务端通告它支持DIGEST-MD5和EXTERNAL机制,客户端选择使用EXTERNAL机制,并且不使用其他授权实体。服务器使用外部信息验证通过后,返回成功的响应。

  • PLAIN

PLAIN机制只需要传递一条消息,这个消息由授权实体,验证实体和密码三部分组成。如下图所示:

1
authzid<NUL>authcid<NUL>passwd

授权实体authzid为可选的。如果提供了它,身份验证通过后,如果权限允许,将以authzid身份进行操作。如果权限不允许,则服务器返回授权失败。由于PLAIN机制直接传递密码本身,因而不应该在没有私密性保护的连接上使用。
同样以ACAP协议举例:

1
2
3
4
5
6
7
8
S: * ACAP (SASL "CRAM-MD5") (STARTTLS)
C: a001 STARTTLS
S: a001 OK "Begin TLS negotiation now"
<TLS negotiation, further commands are under TLS layer>
S: * ACAP (SASL "CRAM-MD5" "PLAIN")
C: a002 AUTHENTICATE "PLAIN" {20+}
C: Ursel<NUL>Kurt<NUL>xipj3plmq
S: a002 NO "Not authorized to requested authorization identity"

TLS安全层建立后,服务器通告它支持CRAM-MD5和PLAIN机制,客户端选择PLAIN机制,并发送身份验证消息,服务器返回授权失败,即Kurt身份验证通过,但不能以Ursel的身份进行操作。

  • SCRAM-SHA-1

SCRAM是一系统机制的统称,具体机制名称后缀上算法所使用的HASH函数。我们以SCRAM-SHA-1举例,它使用SHA-1哈希函数。PLAIN机制在网络上传输的是密码本身,因而只应该用在TLS等安全层之上。SCRAM机制则没有这个限制。

下面的例子略去机制协商的过程, 用户名为”user”, 密码为”pencil”:

1
2
3
4
C: n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL
S: r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92, i=4096
C: c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=
S: v=rmF9pqV8S7suAoZWja4dJRkFsKQ=

SCRAM机制的消息由多个属性构成,每个属性为”a=xxx”的形式,而且属性有顺序要求。

客户端发送的首条消息包括了以下内容:

  1. 一个GS2头,它包括一个字符,只能为”n”, “y”, “p”,是通道绑定的标识,和一个授权实体(例子中没有提供,因而为”,,”,当需要指定时使用属性a, 如”a=dummy”)。
  2. 属性n, 表示身份验证的用户名。
  3. 属性r, 表示客户端nonce, 一个随机的可打印字符串(不能包括”,”)。

我们把后两部分称为client_first_message_bare, 后面算法中要使用它。

然后服务器回应首条消息。属性r为客户端NONCE拼接上服务器的随机NONCE值,属性s为使用BASE64编码后的用户密码的salt值,属性i为迭代次数。在后面介绍的算法中可以看到具体用途。这条消息我们称为server_first_message, 后面算法需要使用它。

接着,客户端发送末条消息。属性c为使用BASE64编码的GS2头及通道绑定数据。例子中的”biws”解码后为”n,,”, 即客户端首条消息的第一部分,属性r与服务器回应的属性r必须相同。属性p为使用BASE64编码的客户端证明信息(ClientProof)。它由客户端使用后面介绍的算法计算得到。我们把前两个属性称为client_final_message_without_proof, 后面算法要使用它。

服务端验证客户端发送的NONCE值和证明信息(ClientProof),如果提供了授权实体,则也需要验证是否可以授权给该实体,然后发送服务端末条消息。属性v为服务器签名(ServerSignature)。客户端通过比较计算得到的和从服务端所收到的签名是否相同来对服务器进行身份验证。如果服务器验证失败,将回应属性e, 它可以用来诊断错误原因。

下面介绍客户端和服务器签名的具体算法:

1
2
3
4
5
6
7
8
SaltedPassword := Hi(Normalize(password), salt, i)
ClientKey := HMAC(SaltedPassword, "Client Key")
StoredKey := H(ClientKey)
AuthMessage := client-first-message-bare + "," + server-first-message + "," + client-final-message-without-proof
ClientSignature := HMAC(StoredKey, AuthMessage)
ClientProof := ClientKey XOR ClientSignature
ServerKey := HMAC(SaltedPassword, "Server Key")
ServerSignature := HMAC(ServerKey, AuthMessage)

其中HMAC原型为HMAC(key, str), Hi函数算法为:

1
2
3
4
5
6
7
8
Hi(str, salt, i):

U1 := HMAC(str, salt + INT(1))
U2 := HMAC(str, U1)
...
Ui-1 := HMAC(str, Ui-2)
Ui := HMAC(str, Ui-1)
Hi := U1 XOR U2 XOR ... XOR Ui

用PHP实现该算法来验证上述例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
function hi($str, $salt, $i) {
$int1 = "\0\0\0\1";
$ui = hash_hmac("sha1", $salt . $int1, $str, true);
$result = $ui;

for ($k = 1; $k < $i; $k++) {
$ui = hash_hmac("sha1", $ui, $str, true);
$result = $result ^ $ui;
}

return $result;
}

$password = "pencil";
$salt = base64_decode('QSXCR+Q6sek8bf92');
$i = 4096;
$client_first_message_bare = 'n=user,r=fyko+d2lbbFgONRv9qkxdawL';
$server_first_message = 'r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096';
$client_final_message_without_proof = 'c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j';

$salted_password = hi($password, $salt, $i);
$client_key = hash_hmac("sha1", "Client Key", $salted_password, true);
$stored_key = sha1($client_key, true);
$auth_message = $client_first_message_bare . ","
. $server_first_message . ","
. $client_final_message_without_proof;
$client_signature = hash_hmac("sha1", $auth_message, $stored_key, true);
$client_proof = $client_key ^ $client_signature;

$server_key = hash_hmac("sha1", "Server Key", $salted_password, true);
$server_signature = hash_hmac("sha1", $auth_message, $server_key, true);

echo "p=" . base64_encode($client_proof) . "\n";
echo "v=" . base64_encode($server_signature) . "\n";
?>

输出结果为:

1
2
p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=
v=rmF9pqV8S7suAoZWja4dJRkFsKQ=

与上述例子中值相符。

在实际项目中,一般不需要自己来实现这些验证算法。C语言可直接使用CyrusSASL库或GNU的libgsasl。

ejabberd的程序入口为ejabberd_app.erl中的start/2,简化后逻辑如下:

1
2
3
4
5
6
7
8
9
start(normal, _Args) ->
...
Sup = ejabberd_sup:start_link(),
...
ejabberd_listener:start_listeners(),
?INFO_MSG("ejabberd ~s is started in the node ~p", [?VERSION, node()]),
Sup;
start(_, _) ->
{error, badarg}.

ejabberd_sup是一个supervisor,调用start_link时,它会生成各功能组件进程,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
------------------------------------------------

+------------+
|ejabberd_sup|
+-----+------+
|
+---------------------+------------------------+
| |
| +-----+ +-------+ |
| |hooks| |c2s_sup| |
| +-----+ +-------+ |
| |
| +-----------+ +---------+ |
| |node_groups| |s2sin_sup| |
| +-----------+ +---------+ |
| |
| +--------------+ +----------+ |
| |system_monitor| |s2sout_sup| |
| +--------------+ +----------+ |
| |
| +------+ +-----------+ |
| |router| |service_sup| |
| +------+ +-----------+ |
| |
| +--+ +--------+ |
| |sm| |http_sup| |
| +--+ +--------+ |
| |
| +---+ +-------------+ |
| |s2s| |http_poll_sup| |
| +---+ +-------------+ |
| |
| +-----+ +-------------------+ |
| |local| |frontend_socket_sup| |
| +-----+ +-------------------+ |
| |
| +-------+ +------+ |
| |captcha| |iq_sup| |
| +-------+ +------+ |
| |
| +--------+ +--------+ |
| |listener| |stun_sup| |
| +--------+ +--------+ |
| |
| +------------+ +-------------+ |
| |receiver_sup| |cache_tab_sup| |
| +------------+ +-------------+ |
| |
+----------------------------------------------+

部分关键组件功能:

  • hooks: 执行各模块注册的HOOK函数
  • router: 分发用户发送的消息
  • sm: session manager, 处理用户到用户的消息分发
  • local: 处理发送给Server本身的消息。
  • listener: supervisor进程,创建子进程监听网络端口
  • receiver_sup: supervisor进程, 它为每一个TCP连接创建一个进程来接收该TCP连接的数据
  • c2s_sup: supervisor进程, 它为每一个TCP连接创建一个进程来处理协议状态,并负责向TCP连接发送数据

ejabberd_listener:start_listerners/0会从配置文件(ejabberd.cfg)中获取“listen”选项。
listen配置的一般形式为:

1
2
3
4
5
6
7
8
9
10
{listen,
[
{5222, ejabberd_c2s, [
{access, c2s},
{shaper, c2s_shaper},
{max_stanza_size, 65536}
]},
...
]
}

每一个端口需要指定一个处理模块。

start_listeners/0遍历listen配置中指定的所有端口,为每个端口调用start_listener/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
start_listeners() ->
case ejabberd_config:get_local_option(listen) of
undefined ->
ignore;
Ls ->
Ls2 = lists:map(
fun({Port, Module, Opts}) ->
case start_listener(Port, Module, Opts) of
{ok, _Pid} = R -> R;
{error, Error} ->
throw(Error)
end
end, Ls),
report_duplicated_portips(Ls),
{ok, {{one_for_one, 10, 1}, Ls2}}
end.

start_listerner最终会调用start_listener_sup/3。它通过supervisor:start_child令ejabberd_listeners进程创建子进程执行ejabberd_listener:start/3。

1
2
3
4
5
6
7
8
start_listener_sup(Port, Module, Opts) ->
ChildSpec = {Port,
{?MODULE, start, [Port, Module, Opts]},
transient,
brutal_kill,
worker,
[?MODULE]},
supervisor:start_child(ejabberd_listeners, ChildSpec).

ejabberd_listener:start/3依据该端口处理模块的socket_type/0函数的返回值进行相应处理。如果返回值为independent时,则表示该处理模块自行处理监听端口a。否则,调用start_dependent/3。

1
2
3
4
5
6
7
start(Port, Module, Opts) ->
%% Check if the module is an ejabberd listener or an independent listener
ModuleRaw = strip_frontend(Module),
case ModuleRaw:socket_type() of
independent -> ModuleRaw:start_listener(Port, Opts);
_ -> start_dependent(Port, Module, Opts)
end.

start_dependent/3中会创建子进程执行init/3, init/3函数根据配置参数调用init_tcp/6或init_udp/6。
init_tcp/6开始监听相应端口,然后调用accept/0等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
...
Res = gen_tcp:listen(Port, [binary,
{packet, 0},
{active, false},
{reuseaddr, true},
{nodelay, true},
{send_timeout, ?TCP_SEND_TIMEOUT},
{keepalive, true} |
SockOpts2]),
case Res of
{ok, ListenSocket} ->
%% Inform my parent that this port was opened succesfully
proc_lib:init_ack({ok, self()}),
%% And now start accepting connection attempts
accept(ListenSocket, Module, Opts);
{error, Reason} ->
socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
end.

如果监听UDP端口,则init/3调用init_udp/6。init/6打开一个UDP端口,调用udp_recv/3等待接收UDP数据。

1
2
3
4
5
6
7
8
9
10
11
12
init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
case gen_udp:open(Port, [binary,
{active, false},
{reuseaddr, true} |
SockOpts]) of
{ok, Socket} ->
%% Inform my parent that this port was opened succesfully
proc_lib:init_ack({ok, self()}),
udp_recv(Socket, Module, Opts);
{error, Reason} ->
socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
end.

至此ejabberd启动完成。

当UDP数据到来后,udp_recv/3调用该端口处理模块的udp_recv/5处理,接着递归调用udp_recv继续等待接收UDP数据。处理模块的udp_recv/5函数中需要处理返回UDP响应等逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
udp_recv(Socket, Module, Opts) ->
case gen_udp:recv(Socket, 0) of
{ok, {Addr, Port, Packet}} ->
case catch Module:udp_recv(Socket, Addr, Port, Packet, Opts) of
{'EXIT', Reason} ->
?ERROR_MSG("failed to process UDP packet:~n"
"** Source: {~p, ~p}~n"
"** Reason: ~p~n** Packet: ~p",
[Addr, Port, Reason, Packet]);
_ ->
ok
end,
udp_recv(Socket, Module, Opts);
{error, Reason} ->
?ERROR_MSG("unexpected UDP error: ~s", [format_error(Reason)]),
throw({error, Reason})
end.

当有TCP连接成功后,gen_tcp:accept/1返回,accept/3根据配置中处理模块是否指定了frontend, 选择调用ejabberd_frontend_socket:start/4或ejabberd_socket:start/4, 然后递归调用accept再次等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
accept(ListenSocket, Module, Opts) ->
case gen_tcp:accept(ListenSocket) of
{ok, Socket} ->
case {inet:sockname(Socket), inet:peername(Socket)} of
{{ok, Addr}, {ok, PAddr}} ->
?INFO_MSG("(~w) Accepted connection ~w -> ~w",
[Socket, PAddr, Addr]);
_ ->
ok
end,
CallMod = case is_frontend(Module) of
true -> ejabberd_frontend_socket;
false -> ejabberd_socket
end,
CallMod:start(strip_frontend(Module), gen_tcp, Socket, Opts),
accept(ListenSocket, Module, Opts);
{error, Reason} ->
?INFO_MSG("(~w) Failed TCP accept: ~w",
[ListenSocket, Reason]),
accept(ListenSocket, Module, Opts)
end.

ejabberd_socket:start/4函数根据处理模块的socket_type/0的返回值进行不同处理。5222端口的处理模块是ejabberd_c2s,该模块socket_type/0返回xml_stream。

ejabberd_socket:start/4对于xml_stream的处理的简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
RecPid = ejabberd_receiver:start(
Socket, SockMod, none, MaxStanzaSize),

{ReceiverMod, Receiver, RecRef} =
{ejabberd_receiver, RecPid, RecPid}

SocketData = #socket_state{sockmod = SockMod,
socket = Socket,
receiver = RecRef},

case Module:start({?MODULE, SocketData}, Opts) of
{ok, Pid} ->
case SockMod:controlling_process(Socket, Receiver) of
ok ->
ok;
{error, _Reason} ->
SockMod:close(Socket)
end,
ReceiverMod:become_controller(Receiver, Pid);
{error, _Reason} ->
SockMod:close(Socket)
end;

首先,它调用ejabberd_receiver:start/4, 它创建一个receiver进程,接着调用处理模块的start/2创建一个处理模块子进程。然后将新的receiver进程设为该TCP Socket的控制进程,以后从该Socket收到数据将以消息形式发送给该receiver进程。最后调用ejabberd_receiver:become_controller/2, 向该receiver进程发送become_controller消息。receiver进程处理该消息,生成一个XML解析状态并将处理进程Pid保存在状态中。

1
2
3
4
5
6
7
handle_call({become_controller, C2SPid}, _From, State) ->
XMLStreamState = xml_stream:new(C2SPid, State#state.max_stanza_size),
NewState = State#state{c2s_pid = C2SPid,
xml_stream_state = XMLStreamState},
activate_socket(NewState),
Reply = ok,
{reply, Reply, NewState, ?HIBERNATE_TIMEOUT};

当该Socket有数据可读时,receiver进程将收到TCP消息,receiver进程调用process_data/1函数来处理收到的数据。它调用xml_stream:parse进行解析,当解析出XML元素后,它会通过gen_fsm:send_event向该TCP的处理进程发送消息。处理进程根据消息进行协议状态的转换。

当XMPP协商完成后,处理进程状态为session_established。此时收到XMPP消息,处理进程解析出From和To属性, 调用ejabberd_router:route/3分发消息。ejabberd_router:route/3调用do_route进行分发。它查询route表中是否已经注册了JID所在域名。ejabberd_local进程启动时会在route表中注册配置中所添加的域名。如果已经注册,该消息则应该由当前服务器处理,否则路由至其他Server。

处理本地域名时,do_route首先获取ejabbred_local进程的Pid,如果只有一个进程,并且该进程位于当前节点,则直接调用ejabberd_local:route/3进行处理,否则发送route消息至相应的Pid。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
LDstDomain = To#jid.lserver,
case mnesia:dirty_read(route, LDstDomain) of
[] ->
ejabberd_s2s:route(From, To, Packet);
[R] ->
Pid = R#route.pid,
if
node(Pid) == node() ->
case R#route.local_hint of
{apply, Module, Function} ->
Module:Function(From, To, Packet);
_ ->
Pid ! {route, From, To, Packet}
end;
is_pid(Pid) ->
Pid ! {route, From, To, Packet};
true ->
drop
end;
...
end

这两种方式都会调用ejabberd_local:do_route来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
do_route(From, To, Packet) ->
?DEBUG("local route~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
[From, To, Packet, 8]),
if
To#jid.luser /= "" ->
ejabberd_sm:route(From, To, Packet);
To#jid.lresource == "" ->
{xmlelement, Name, _Attrs, _Els} = Packet,
case Name of
"iq" ->
process_iq(From, To, Packet);
"message" ->
ok;
"presence" ->
ok;
_ ->
ok
end;
true ->
{xmlelement, _Name, Attrs, _Els} = Packet,
case xml:get_attr_s("type", Attrs) of
"error" -> ok;
"result" -> ok;
_ ->
ejabberd_hooks:run(local_send_to_resource_hook,
To#jid.lserver,
[From, To, Packet])
end
end.
end

如果”To”属性的JID指定了user, 则该消息应该分发至用户,调用ejabberd_sm:route/3进行分发。ejabberd根据JID的Resource是否为空进行了不同处理。JID的Resource为空的情况本文略过。JID的Resource不为空时,ejabberd_sm:route/3的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR, #session.usr) of
[] ->
case Name of
"message" ->
route_message(From, To, Packet);
"iq" ->
case xml:get_attr_s("type", Attrs) of
"error" -> ok;
"result" -> ok;
_ ->
Err =
jlib:make_error_reply(
Packet, ?ERR_SERVICE_UNAVAILABLE),
ejabberd_router:route(To, From, Err)
end;
_ ->
?DEBUG("packet droped~n", [])
end;
Ss ->
Session = lists:max(Ss),
Pid = element(2, Session#session.sid),
?DEBUG("sending to process ~p~n", [Pid]),
Pid ! {route, From, To, Packet}
end

它从session表中读取出该用户的信息,这些信息是用户连接上由该TCP的处理进程添加到session表中的。从中获得为该用户TCP连接的处理进程,向该进程发送route消息。处理进程处理该route消息,向该用户Socket发送消息。这便完成了一个用户到另一个用户的消息传递。

本文中ejabberd代码版本为2.1.3。

PowerDNS是一个开源DNS服务器。它包括了授权DNS服务器和递归DNS服务器。本文只讨论授权DNS服务器,主要分析PowerDNS的架构及Backend模块机制的实现。

PowerDNS由各种各样的backend来处理DNS解析相关数据,数据可以存储在BIND模式的ZONE文件中,也可以存储在关系型数据库中。官方实现了很多backend,如Bind, Postgresql, Mysql等。每种backend实现为一个SO文件。PowerDNS启动时根据配置中的launch语句加载指定的so文件。这种方式使得PowerDNS的可扩展性非常高。
简略的PowerDNS架构图如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
+-----------------------------------------------------------------------+
| | | |
| +------------+ | +-----------+ | |
| |main thread | | |PacketCache| | |
| +------------+ | +-----------+ | |
| | | |
| +------------+ | | |
| |DynListerner| | | |
| +------------+ | | +-----------+ +----------+ |
| | +---------+--|Distributor|---|DNSBackend| |
| +--------+ | | | +-----------+ +----------+ |
| |receiver|------+-------+ | |
| +--------+ | | | +-----------+ +----------+ |
| | +---------+--|Distributor|---|DNSBackend| |
| ... | | +-----------+ +----------+ |
| | | ... |
| | | |
| +--------+ | | +-----------+ +----------+ |
| |receiver|------+-------+---------+--|Distributor|---|DNSBackend| |
| +--------+ | | | +-----------+ +----------+ |
| | | | |
| | | | +-----------+ +----------+ |
| | +---------+--|Distributor|---|DNSBackend| |
| | | +-----------+ +----------+ |
| |
+-----------------------------------------------------------------------+

下面具体分析PowerDNS的启动流程。

1
2
3
4
5
6
7
8
9
10
11
12
if(::arg().mustDo("guardian") && !isGuarded(argv)) {
if(::arg().mustDo("daemon")) {
L.toConsole(Logger::Critical);
daemonize();
}
guardian(argc, argv);
// never get here, guardian will reinvoke process
cerr<<"Um, we did get here!"<<endl;
}

// we really need to do work - either standalone or as an instance
...

如果启动参数中指定了“–guardian=yes”,则PowerDNS运行在guardian模式,主进程fork出一个子进程执行正常的启动流程,而父进程则一秒钟探测一次子进程是否存活,如果子进程异常退出则重新启动子进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
for(;;) {
int ret=waitpid(pid,&status,WNOHANG);

if(ret<0) {
L<<Logger::Error<<"In guardian loop, waitpid returned error: "<<strerror(errno)<<endl;
L<<Logger::Error<<"Dying"<<endl;
exit(1);
}
else if(ret) // something exited
break;
else { // child is alive
// execute some kind of ping here
if(DLQuitPlease())
takedown(1); // needs a parameter..
setStatus("Child running on pid "+itoa(pid));
sleep(1);
}
}

pthread_mutex_lock(&g_guardian_lock);
close(g_fd1[1]);
fclose(g_fp);
g_fp=0;

if(WIFEXITED(status)) {
int ret=WEXITSTATUS(status);

if(ret==99) {
L<<Logger::Error<<"Child requested a stop, exiting"<<endl;
exit(1);
}
setStatus("Child died with code "+itoa(ret));
L<<Logger::Error<<"Our pdns instance exited with code "<<ret<<endl;
L<<Logger::Error<<"Respawning"<<endl;

goto respawn;
}

子进程或非guardian模式的原生进程执行正常的启动流程。

1
2
loadModules();
BackendMakers().launch(::arg()["launch"]); // vrooooom!

loadModules()会根据”load-modules”配置语句调用dlopen来加载相应的SO文件。BackendMakers().launch(::arg()[“launch”]);来加载launch语句指定的modules。然后开始监听指定的UDP和TCP端口。

1
2
3
4
N=new UDPNameserver; // this fails when we are not root, throws exception

if(!::arg().mustDo("disable-tcp"))
TN=new TCPNameserver;

接着调用mainthread()来初始化若干网络包接收线程。

1
2
3
unsigned int max_rthreads= ::arg().asNum("receiver-threads");
for(unsigned int n=0; n < max_rthreads; ++n)
pthread_create(&qtid,0,qthread, reinterpret_cast<void *>(n)); // receives packets

而每个接收线程则会创建若干自己的distributor线程。

1
DNSDistributor *distributor = DNSDistributor::Create(::arg().asNum("distributor-threads")); // the big dispatcher!

接下来接收线程开始自己处理网络数据。接收到DNS查询包后,首先在PacketCache中查询是否有相应的响应。PacketCache是PowerDNS中Backend响应的Cache,它减少对于Backend的访问,可以大大提高性能。如果在PacketCache中没有找到,则调用distributor->question分发给distributor线程。示意代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
for(;;) {
if(!(P=NS->receive(&question))) { // receive a packet inline
continue; // packet was broken, try again
}

if(P->couldBeCached() && PC.get(P, &cached)) { // short circuit - does the PacketCache recognize this question?
NS->send(&cached); // answer it then
continue;
}

distributor->question(P, &sendout);
}

在调用question方法时会传递一个回调函数sendout。distributor线程处理完DNS请求后将调用该回调函数将响应返回给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void sendout(const DNSDistributor::AnswerData &AD)
{
static unsigned int &numanswered=*S.getPointer("udp-answers");
static unsigned int &numanswered4=*S.getPointer("udp4-answers");
static unsigned int &numanswered6=*S.getPointer("udp6-answers");

if(!AD.A)
return;

N->send(AD.A);
numanswered++;

if(AD.A->d_remote.getSocklen()==sizeof(sockaddr_in))
numanswered4++;
else
numanswered6++;

int diff=AD.A->d_dt.udiff();
avg_latency=(int)(1023*avg_latency/1024+diff/1024);

delete AD.A;
}

distributor线程中调用backend来处理DNS响应。示意代码如下:

1
2
3
4
5
B.lookup;
DNSResourceRecord rr;
while(yb.get(rr))
cout<<"Found cname pointing to '"+rr.content+"'"<<endl;
}

其中B为UeberBackend实例。UeberBackend是一个特殊的Backend。启动时被加载的其他backend向它注册,被保存在一个vector中。UeberBackend按注册顺序依次调用其他backend的相应成员方法。以reload操作为例:

1
2
3
4
5
6
7
void UeberBackend::reload()
{
for ( vector< DNSBackend * >::iterator i = backends.begin(); i != backends.end(); ++i )
{
( *i )->reload();
}
}

lookup和get成员方法实现比较特殊。UeberBackend有一个成员变量d_handle表示当前处理请求的真实backend.在lookup中,会将第一个注册的backend赋给d_handle, 并调用该backend的lookup。

1
(d_handle.d_hinterBackend=backends[d_handle.i++])->lookup(qtype, qname,pkt_p,zoneId);

UeberBackend的get方法会调用d_handle实例的get方法。而d_handle.get首先调用第一个backend的get方法。如果成功,则返回。否则调用下一个backend的lookup和get方法。直到get方法返回成功或者遍历完所有backend.这种实现保证了get方法所获取的响应是由本backend的lookup所生成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
bool UeberBackend::handle::get(DNSResourceRecord &r)
{
DLOG(L << "Ueber get() was called for a "<<qtype.getName()<<" record" << endl);
bool isMore=false;
while(d_hinterBackend && !(isMore=d_hinterBackend->get(r))) { // this backend out of answers
if(i<parent->backends.size()) {
DLOG(L<<"Backend #"<<i<<" of "<<parent->backends.size()
<<" out of answers, taking next"<<endl);

d_hinterBackend=parent->backends[i++];
d_hinterBackend->lookup(qtype,qname,pkt_p,parent->domain_id);
}
else
break;

DLOG(L<<"Now asking backend #"<<i<<endl);
}

if(!isMore && i==parent->backends.size()) {
DLOG(L<<"UeberBackend reached end of backends"<<endl);
return false;
}

DLOG(L<<"Found an answering backend - will not try another one"<<endl);
i=parent->backends.size(); // don't go on to the next backend
return true;
}

下面介绍module加载的过程。PowerDNS的module实现需要包含三个部分。

  1. Backend本身,它需要继承父类DNSBackend.
1
2
/* FIRST PART */
class RandomBackend : public DNSBackend{}
  1. 工厂类,它用于产生Backend实例。
1
2
/* SECOND PART */
class RandomFactory : public BackendFactory{}
  1. 加载器类和该类的一个static实例。
1
2
3
/* THIRD PART */
class RandomLoader{};
static RandomLoader randomloader;

当进程调用dlopen时,该static loader实例的构造函数被调用,将其注册在UeberBackend中。

可以参考我写的一个简单的PowerDNS模块: remoteipbackend, 用于返回到达它的递归DNS的出口IP地址。

NGINX只在商业版中支持proxy_cache_purge指令清除缓存,开源的ngx_cache_purge模块只支持单一key的缓存清除。为了实现按目录清除缓存只能自己开发。

NGINX作为Cache服务器时将资源内容以文件形式进行缓存,缓存元信息存储于共享内存中,组织成一棵红黑树。红黑树中的每个节点代表一个Cache元信息。NGINX将Cache Key的HASH值作为红黑树节点的KEY。内容缓存文件以该HASH值作为文件名存储在磁盘上。

NGINX的处理流程简化描述是这样的:当请求到达时,根据Cache Key的HASH值在红黑树中进行查找。如果找到,并查看相关信息,如果Cache可用,返回相应的Cache文件。否则,则回源抓取。

因为元信息是以Cache Key的HASH值作为Key存储的,因而红黑树中并不能保留Cache Key中有层级关系. 如”/uri/foo”和”/uri/bar”在元信息红黑树中完全没有关系。要实现按照目录清除缓存,需要将Cache Key中层次关系存储起来。

我选择的方案是在共享内存中建立一棵目录树来存储层级关系。将Cache Key类比于文件系统中的路径, 每级路径存储为树中的一个节点。当需要清除某一目录下的所有缓存时,将该节点子树的中的所有缓存清除即可。

目录树采用Child-Sibling链表实现。节点结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct ngx_http_file_cache_path_node_s ngx_http_file_cache_path_node_t;

struct ngx_http_file_cache_path_node_s {
ngx_hlist_node_t node;
ngx_http_file_cache_path_node_t *parent;
ngx_http_file_cache_path_node_t *child;
ngx_http_file_cache_path_node_t *next;
ngx_http_file_cache_path_node_t *prev;
ngx_http_file_cache_node_t *rb_node;
unsigned count:32;
unsigned depth:8;
unsigned isdir:1;
/* 23 unused bits */
ngx_uint_t len;
u_char data[0];
};

以纵向表示孩子关系,横向表示兄弟关系,则Cache key “/uri/foo”则生成如下结构:

1
2
3
4
5
6
7
+-----+
| uri |
+--+--+
|
+--+--+
| foo |
+-----+

再缓存Cache Key “/uri/bar”之后则结构为:

1
2
3
4
5
6
7
+-----+
| uri |
+--+--+
|
+--+--+ +-----+
| bar +----+ foo |
+-----+ +-----+

其中node字段用于将节点加入HASH表中。考虑到如果一个目录下子目录或文件太多,则遍历兄弟链表则非常耗时。因而引入一个HASH表,将所有树结点以路径节点名字作为key加入HASH表。当子节点少,直接遍历。子节点过多时,则从HASH中进行查找。由于不同位置的路径节点会重名,如”/uri/foo/dummy”和”/uri/bar/dummy”两个名字为”dummy”的路径节点分别指向不同的Cache。因而在HASH表中查找时需要考虑路径节点的父节点。
如,判断一个中间目录路径节点时代码:

1
2
3
4
5
6
7
8
if (pos->isdir
&& pos->parent == parent
&& pos->len == path[i].len
&& ngx_strncmp(pos->data, path[i].data, path[i].len)
== 0)
{
......
}

非叶子节点代表路径。叶子节点可能为目录或是缓存文件。代表缓存文件的叶子节点中则保存该Cache的元信息节点的地址。同时在元信息中也添加上该叶子节点的地址。因而当找到代表要清除的目录节点时,遍历子树便可以找到所有缓存信息的元信息,对元信息进行相关操作,完成清除操作。
当删除Cache文件时,将Cache key的路径节点也一并删除,回收内存。