Just For Coding

Keep learning, keep living …

Passenger导致NGINX Master进程阻塞定位及解决

我们线上服务使用nginx+passenger-3.0.11来运行Rails程序。我们发现当执行若干次nginx -s reload或者kill -HUP cat /usr/local/nginx/logs/nginx.pid之后,再执行以上命令不再生效。

用pstack观察master进程,master进程阻塞在read()操作上。 使用gcore生成core文件,然后用gdb查看,查看栈帧所在文件及行号。 查看passenger源码。readExact()逻辑为一直读到size大小返回。readArrayMessage()代码逻辑为先从传入fd中读取两个字节做为长度,再从fd中读取相应长度的内容。Passenger::AgentsStarter::start函数的过程为创建一个socketpair,然后fork()子进程,子进程执行PassengerWatchdog。父子进程通过该socketpair进行通信。master就是卡在读取socketpair上。

从gdb中看到readExact()传入的size为12848,即从socketpair中读到的长度为12848,而实际读取的内容长度为621,并且读取的内容也很诡异。应该是某个地方向socketpair中写入了错误内容。 继续查看passenger代码中子进程的逻辑。关闭socketpair一端。然后将socketpair另一端通过dup2()调用复制到3上。然后关闭除了0-3以外的所有文件描述符。接着解除信号阻塞,重置信号处理函数。

通过strace追踪master进程及其子进程行为,发现子进程确实发送了错误内容。”20”的十六进制表示为0x3230,按网络字节序读出正好是12848。从调用位置看是在SIGCHLD的信号处理函数中。而从写入内容看像是nginx在写日志。 继续追查为什么会有SIGCHLD产生。最后发现getHighestFileDescriptor()函数中子进程会fork()出孙进程,获取当前打开的最大fd,子进程等待孙进程退出后返回。由于孙进程退出,子进程收到了SIGCHLD信号。但是由于子进程是由master进程fork()出来的,SIGCHLD信号是被阻塞的。当执行sigprocmask()时,被阻塞的SIGCHLD被处理了,而这时的信号处理函数是ngx_signal_handler(), 该函数会调用ngx_log_error()向error log写入日志。若error log的fd为3,就发生我们上述的情况。

为了避免这种情况,应该先重置信号处理函数,再解除信号阻塞。

patch:

https://github.com/flygoast/passenger/commit/0b62808943aba65432f0b492f4ef941499fad02c

PassengerHelperAgent开机启动异常分析

异常现象:

在/etc/rc.local中添加/usr/local/nginx/sbin/nginx来开机自动启动NGINX时,PassengerHelperAgent进程不停反复重启,而从shell上手动启动NGINX时一切正常。

追查过程:

查阅异常时的error.log日志发现以下错误:

1
2
3
4
5
[ pid=3413 thr=140583025772288 file=ext/nginx/HelperAgent.cpp:963 time=2014-09-30 11:05:20.925 ]: Uncaught exception in PassengerServer client thread:
   exception: Cannot accept new connection: Too many open files (24)
   backtrace:
     in 'Passenger::FileDescriptor Client::acceptConnection()' (HelperAgent.cpp:429)
     in 'void Client::threadMain()' (HelperAgent.cpp:952)

根据日志错误信息,可以确定是PassengerHelperAgent进程文件描述符达到了上限。

找到ext/nginx/HelperAgent.cpp文件的963行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    void threadMain() {
            TRACE_POINT();
            try {
                    while (true) {
                            UPDATE_TRACE_POINT();
                            inactivityTimer.start();
                            FileDescriptor fd(acceptConnection());
                            inactivityTimer.stop();
                            handleRequest(fd);
                    }
            } catch (const boost::thread_interrupted &) {
                    P_TRACE(2, "Client thread " << this << " interrupted.");
            } catch (const tracable_exception &e) {
                    P_ERROR("Uncaught exception in PassengerServer client thread:\n"
                            << "   exception: " << e.what() << "\n"
                            << "   backtrace:\n" << e.backtrace());
                    abort();
            }
    }

通过上下文可以确定是调用acceptConnection()出错,查看acceptConnection()代码,确定是由该函数抛出的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
    FileDescriptor acceptConnection() {
            TRACE_POINT();
            struct sockaddr_un addr;
            socklen_t addrlen = sizeof(addr);
            int fd = syscalls::accept(serverSocket,
                    (struct sockaddr *) &addr,
                    &addrlen);
            if (fd == -1) {
                    throw SystemException("Cannot accept new connection", errno);
            } else {
                    return FileDescriptor(fd);
            }
    }

syscalls::accept是对系统调用accept的简单封装。

1
2
3
4
5
6
7
8
syscalls::accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) {
        int ret;
        CHECK_INTERRUPTION(
                ret == -1,
                ret = ::accept(sockfd, addr, addrlen)
        );
        return ret;
}

错误原因就是accept由于进程文件描述符达到上限而出错返回了。

接下来追查为什么进程文件描述符数会达到上限。

首先看一下PassengerHelperAgent的整体代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int
main(int argc, char *argv[]) {
        TRACE_POINT();
        VariantMap options = initializeAgent(argc, argv, "PassengerHelperAgent");
        pid_t   webServerPid  = options.getPid("web_server_pid");
        string  tempDir       = options.get("temp_dir");
        bool    userSwitching = options.getBool("user_switching");
        string  defaultUser   = options.get("default_user");
        string  defaultGroup  = options.get("default_group");
        string  passengerRoot = options.get("passenger_root");
        string  rubyCommand   = options.get("ruby");
        unsigned int generationNumber   = options.getInt("generation_number");
        unsigned int maxPoolSize        = options.getInt("max_pool_size");
        unsigned int maxInstancesPerApp = options.getInt("max_instances_per_app");
        unsigned int poolIdleTime       = options.getInt("pool_idle_time");

        try {
                UPDATE_TRACE_POINT();
                Server server(FEEDBACK_FD, webServerPid, tempDir,
                        userSwitching, defaultUser, defaultGroup,
                        passengerRoot, rubyCommand, generationNumber,
                        maxPoolSize, maxInstancesPerApp, poolIdleTime,
                        options);
                P_DEBUG("Passenger helper agent started on PID " << getpid());

                UPDATE_TRACE_POINT();
                server.mainLoop();
        } catch (const tracable_exception &e) {
                P_ERROR(e.what() << "\n" << e.backtrace());
                return 1;
        } catch (const std::exception &e) {
                P_ERROR(e.what());
                return 1;
        }

        P_TRACE(2, "Helper agent exited.");
        return 0;
}

首先创建一个Server对象,然后调用Server对象的mainLoop成员函数。Server对象构造函数会调用成员函数startListening。

1
2
3
4
5
6
7
8
9
10
11
12
    void startListening() {
            this_thread::disable_syscall_interruption dsi;
            requestSocket = createUnixServer(getRequestSocketFilename().c_str());

            int ret;
            do {
                    ret = chmod(getRequestSocketFilename().c_str(), S_ISVTX |
                            S_IRUSR | S_IWUSR | S_IXUSR |
                            S_IRGRP | S_IWGRP | S_IXGRP |
                            S_IROTH | S_IWOTH | S_IXOTH);
            } while (ret == -1 && errno == EINTR);
    }

createUnixServer函数会创建一个socket文件,然后监听这个文件。NGINX收到请求后,会由Passenger模块转发请求到该socket文件。 mainLoop会调用成员函数startClientHandlerThreads,它会创建numberOfThreads个Client对象。

1
2
3
4
5
6
7
8
    void startClientHandlerThreads() {
            for (unsigned int i = 0; i < numberOfThreads; i++) {
                    ClientPtr client(new Client(i + 1, pool, requestSocketPassword,
                            defaultUser, defaultGroup, requestSocket,
                            analyticsLogger));
                    clients.insert(client);
            }
    }

Client对象构造函数会启动一个线程执行threadMain。threadMain就是我们上面出错的函数。每个线程等待接收通过socket文件发来的请求,接收请求后调用handleRequest进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    Client(unsigned int number, ApplicationPool::Ptr pool,
           const string &password, const string &defaultUser,
           const string &defaultGroup, int serverSocket,
           const AnalyticsLoggerPtr &logger)
            : inactivityTimer(false)
    {
            this->number = number;
            this->pool = pool;
            this->password = password;
            this->defaultUser = defaultUser;
            this->defaultGroup = defaultGroup;
            this->serverSocket = serverSocket;
            this->analyticsLogger = logger;

            sbmh_init(&statusFinder.ctx, NULL, NULL, 0);
            sbmh_init(&transferEncodingFinder.ctx, NULL, NULL, 0);

            thr = new oxt::thread(
                    boost::bind(&Client::threadMain, this),
                    "Client thread " + toString(number),
                    CLIENT_THREAD_STACK_SIZE
            );
    }

问题出在线程调用accept等待接收请求时。我们所创建的线程数量numberOfThreads是在Server对象被创建时指定的。

1
numberOfThreads     = maxPoolSize * 4;

而maxPoolSize由passenger_max_pool_size配置项指定,我们指定的是256。256 × 4 = 1024,开机启动时PassengerHelperAgent进程的文件描述符上限就是1024。这个数字值得怀疑。因而我将配置修改为128,果然正常了。

1
passenger_max_pool_size 256;

可以确定每个线程中占用了文件描述符。然而从代码中并没有找到打开文件相关的逻辑。当accept成功返回时,会返回一个新的文件描述符。开始怀疑accept在还没有接收到请求时就预先占用了一个文件描述符。通过一个简单程序来验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/un.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <pthread.h>
#include <assert.h>

void *start_accept(void *arg) {
    int                 fd = (int) arg;
    int                 newfd;
    struct sockaddr_un  addr;
    socklen_t           addrlen;

    newfd = accept(fd, (struct sockaddr *)&addr, &addrlen);

    if (newfd < 0) {
        fprintf(stderr, "accept failed: %u: %s\n",
                pthread_self(), strerror(errno));
    }
}

int main() {
    int                 i, fd;
    struct sockaddr_un  addr;
    socklen_t           addrlen = sizeof(addr);
    pthread_t           pt;

    assert((fd = socket(PF_LOCAL, SOCK_STREAM, 0)) > 0);

    for (i = 0; i < 1020; i++) {
        assert(open("emfile.c", O_RDONLY) > 0);
    }

    addr.sun_family = AF_LOCAL;
    strncpy(addr.sun_path, "emfile.socket", sizeof("emfile.socket") - 1);
    addr.sun_path[sizeof("emfile.socket") - 1] = '\0';

    assert(bind(fd, (const struct sockaddr *)&addr, sizeof(addr)) == 0);

    assert(listen(fd, 512) == 0);

    pthread_create(&pt, NULL, start_accept, (void *) fd);

    pthread_join(pt, NULL);

    exit(0);
}

使用ulimit将shell文件打开数上限修改为1024:

1
$ ulimit -n 1024

编译验证程序,并执行

1
2
$ gcc emfile.c -lpthread
$ ./a.out

得到结果:

1
accept failed: 591611648: Too many open files

确实如些,那再来看一下accept的实现。accept系统调用的内核实现是sys_accept,而sys_accept是对sys_accept4的简单封装。

1
2
3
4
5
SYSCALL_DEFINE3(accept, int, fd, struct sockaddr __user *, upeer_sockaddr,
        int __user *, upeer_addrlen)
{
    return sys_accept4(fd, upeer_sockaddr, upeer_addrlen, 0);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
SYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,
        int __user *, upeer_addrlen, int, flags)
{
    ...

    sock = sockfd_lookup_light(fd, &err, &fput_needed);
    if (!sock)
        goto out;

    err = -ENFILE;
    if (!(newsock = sock_alloc()))
        goto out_put;

    newsock->type = sock->type;
    newsock->ops = sock->ops;

    ...

    newfd = sock_alloc_file(newsock, &newfile, flags);

    ...

    err = sock->ops->accept(sock, newsock, sock->file->f_flags);
    if (err < 0)
        goto out_fd;

    ...

    fd_install(newfd, newfile);
    err = newfd;

out_put:
    fput_light(sock->file, fput_needed);
out:
    return err;
out_fd:
    fput(newfile);
    put_unused_fd(newfd);
    goto out_put;
}

sys_accept4中在调用sock->ops->accept去接收网络请求前就调用sock_alloc_file来分配文件描述符。再来看sock_alloc_file这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
static int sock_alloc_file(struct socket *sock, struct file **f, int flags)
{
    struct qstr name = { .name = "" };
    struct path path;
    struct file *file;
    int fd;

    fd = get_unused_fd_flags(flags);
    if (unlikely(fd < 0))
        return fd;
    ...
}

sock_alloc_file会调用get_unused_fd_flags,这是一个宏,实际会调用函数alloc_fd, 而alloc_fd又会调用函数expand_files:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int expand_files(struct files_struct *files, int nr)
{
    struct fdtable *fdt;

    fdt = files_fdtable(files);

    /*
     * N.B. For clone tasks sharing a files structure, this test
     * will limit the total number of files that can be opened.
     */
    if (nr >= current->signal->rlim[RLIMIT_NOFILE].rlim_cur)
        return -EMFILE;

    /* Do we need to expand? */
    if (nr < fdt->max_fds)
        return 0;

    /* Can we expand? */
    if (nr >= sysctl_nr_open)
        return -EMFILE;

    /* All good, so we try */
    return expand_fdtable(files, nr);
}

可以看到expand_files进行文件描述符限制的检查,当超过限制时返回”EMFILE”。”EMFILE”错误的提示就是”Too many open files”。

1
#define EMFILE      24  /* Too many open files */

结合上面的测试程序,使用一个systemtap脚本可以捕获到上述调用路径。

1
2
3
4
5
6
7
8
9
10
11
12
probe  kernel.function("expand_files").return {
    if ($return == -24) {
        println("expand_files");
        printf("%d\n", $nr);
        print_backtrace();
        exit();
    }
}

probe begin {
    println("start\n");
}

执行stap:

1
$sudo stap emfile.stap

捕获结果为:

1
2
3
4
5
6
7
8
9
10
start

expand_files
1024
Returning from:  0xffffffff811931d0 : expand_files+0x0/0x220 [kernel]
Returning to  :  0xffffffff81193443 : alloc_fd+0x53/0x160 [kernel]
 0xffffffff814187f3 : sock_alloc_file+0x43/0x150 [kernel]
 0xffffffff8141b3dd : sys_accept4+0x11d/0x2b0 [kernel]
 0xffffffff8141b580 : sys_accept+0x10/0x20 [kernel]
 0xffffffff8100b0f2 : system_call_fastpath+0x16/0x1b [kernel]

最终确定异常原因:

开机自启时,进程的打开文件数限制为1024,而创建1024个线程执行accept()时。每个accept会占用一个文件描述符, 达到了进程的文件描述符上限而异常。而从shell启动时,我们shell进程的文件描述符限制是32768,因而不会出现问题。

解决方法: 创建一个启动脚本,在执行/usr/local/nginx/sbin/nginx前执行ulimit修改文件描述符限制。

1
2
ulimit -SHn 65535
/usr/local/nginx/sbin/nginx &

注意:

  • /etc/security/limits.conf中的设置只针对登录动作发生时才生效,因而对于开机自动启动进程这种情况,这种修改该文件的方式不生效。
  • Passenger版本为3.0.11
  • kernel版本为CentOS 6.2内核,kernel-2.6.32-220.4.2.el6

NGINX变量机制分析

NGINX实现了一套变量机制,使得配置动态内容非常灵活。比如我们可以使用“proxy_cache_key”指令根据我们的需求灵活地设置Cache的KEY。变量机制也为各模块合作提供了一个桥梁,使得模块间配合完成功能更加方便。比如”proxy_pass”指令支持将回源地址设置为变量,我们可以在另一个模块中依据条件对该变量赋值,从而非常简便地完成基于各种策略选择上游地址的功能。

变量机制相关的内部数据结构主要有两种, ngx_http_variable_t和ngx_http_variable_value_t,分别代表变量本身和变量值。

ngx_http_variable_t结构如下:

1
2
3
4
5
6
7
8
struct ngx_http_variable_s {
    ngx_str_t                     name;   /* must be first to build the hash */
    ngx_http_set_variable_pt      set_handler;
    ngx_http_get_variable_pt      get_handler;
    uintptr_t                     data;
    ngx_uint_t                    flags;
    ngx_uint_t                    index;
};

各成员意义如下:

  • name: 变量名称
  • set_handler: 赋值函数,主要用于”set”指令,处理请求时执行set指令时调用
  • get_handler: 取值函数,当读取该变量时调用该函数得到变量值
  • data: 传递给set_handler和get_handler的参数
  • flags: 变量属性标志
  • index: 变量在cmcf->variables数组中的索引

其中flags取值及意义如下:

  • NGX_HTTP_VAR_CHANGEABLE: 变量被添加时如果已有同名变量,则返回该变量,否则会报错认为变量名冲突。
  • NGX_HTTP_VAR_NOCACHEABLE: 变量的值不应该被缓存。变量被取值后,变量值的no_cacheable被置为1。
  • NGX_HTTP_VAR_INDEXED:表示变量被索引,存储在cmcf->variables数组,这样的变量可以通过索引值直接找到。
  • NGX_HTTP_VAR_NOHASH: 不会将该变量存储在cmcf->variables_hash哈希表。

ngx_http_variable_value_t结构如下:

1
typedef ngx_variable_value_t  ngx_http_variable_value_t;
1
2
3
4
5
6
7
8
9
10
typedef struct {
    unsigned    len:28;

    unsigned    valid:1;
    unsigned    no_cacheable:1;
    unsigned    not_found:1;
    unsigned    escape:1;

    u_char     *data;
} ngx_variable_value_t;

各成员意义如下:

  • len: 变量值数据长度
  • valid: 该变量值是否可用
  • no_cacheable: 该变量值是否不能缓存
  • not_found: 对应变量不存在
  • escape: 变量值内容中的特殊字符是否进行了转义
  • data:变量值的数据

因为NGINX所有变量存储在ngx_http_core_module的main级结构ngx_http_core_main_conf_t中(以下简写cmcf),所以变量的作用范围是整个http{}配置。在某个server{}中添加的变量,在另一个server{}同样可以使用。

1
2
3
4
5
6
7
8
typedef struct {
    ......
    ngx_hash_t                 variables_hash;
    ngx_array_t                variables;       /* ngx_http_variable_t */
    ......
    ngx_hash_keys_arrays_t    *variables_keys;
    ......
} ngx_http_core_main_conf_t;

NGINX变量有以下3种类型:

  • 模块内置变量
  • 根据配置动态添加的变量
  • 内置规则变量

模块内置变量主要在ngx_http_module_t的preconfiguration阶段中添加。如upstream模块的preconfiguration回调为ngx_http_upstream_add_variables().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static ngx_int_t
ngx_http_upstream_add_variables(ngx_conf_t *cf)
{
    ngx_http_variable_t  *var, *v;

    for (v = ngx_http_upstream_vars; v->name.len; v++) {
        var = ngx_http_add_variable(cf, &v->name, v->flags);
        if (var == NULL) {
            return NGX_ERROR;
        }

        var->get_handler = v->get_handler;
        var->data = v->data;
    }

    return NGX_OK;
}

根据配置动态添加的变量一般当解析相应指令时,在指令的解析函数中添加。比如rewrite模块中的set指令可以添加用户定义名称的变量。

1
2
3
4
v = ngx_http_add_variable(cf, &value[1], NGX_HTTP_VAR_CHANGEABLE);
if (v == NULL) {
    return NGX_CONF_ERROR;
}

内置规则变量不需要添加,而是按特定规则来解析。如”http”, “upstream_http”, “arg”, “cookie”等等一系列变量。 前两种方式都是调用ngx_http_add_variable()来添加变量。ngx_http_add_variable()向cmcf->variable_keys数组中添加变量,并将该变量结构返回。如果指定了NGX_HTTP_VAR_CHANGEABLE标志,那么当检查到同名的变量时,则直接返回该变量。否则报错返回NULL。 如果某一指令需要用到一个变量,则一般在解析该指令配置时会调用ngx_http_get_variable_index(),并将该索引值保存,当处理请求时直接通过索引找到变量。如geo模块中geo指令的解析函数。

1
2
3
4
geo->index = ngx_http_get_variable_index(cf, &name);
if (geo->index == NGX_ERROR) {
    return NGX_CONF_ERROR;
}

ngx_http_get_variable_index()会从cmcf->variables数组中查找变量,查找到则返回该变量的索引。否则在cmcf->variables添加该变量并返回数组索引。当解析完HTTP{}配置后,NGINX会将cmcf->variables_keys中的变量组织到cmcf->variables_hash这个HASH表中。如果变量指令了NGX_HTTP_VAR_NOHASH标志,则该变量不会被添加到cmcf->variables_hash中。如果一个变量既没有添加到cmcf->variables中,也没有添加到cmcf->variables_hash中,那么这个变量就不能被找到,因而会被认为不存在。

NGINX开始处理请求时会在请求结构体ngx_http_request_中创建被索引的变量值的缓存空间。

1
2
r->variables = ngx_pcalloc(r->pool, cmcf->variables.nelts
                                    * sizeof(ngx_http_variable_value_t));

对于被索引的变量,可以使用ngx_http_get_indexed_variable()或者ngx_http_get_flushed_variable()来求值。这样省去了查找哈希表的消耗。ngx_http_get_indexed_variable()首先检查r->variables[index]变量缓存是否可用。可用则直接返回,否则调用v->get_handler对变量求值,并将结果存储在r->variables[index]中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ngx_http_variable_value_t *
ngx_http_get_indexed_variable(ngx_http_request_t *r, ngx_uint_t index)
{
    ngx_http_variable_t        *v;
    ngx_http_core_main_conf_t  *cmcf;

    cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);

    if (cmcf->variables.nelts <= index) {
        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "unknown variable index: %d", index);
        return NULL;
    }

    if (r->variables[index].not_found || r->variables[index].valid) {
        return &r->variables[index];
    }

    v = cmcf->variables.elts;

    if (v[index].get_handler(r, &r->variables[index], v[index].data)
        == NGX_OK)
    {
        if (v[index].flags & NGX_HTTP_VAR_NOCACHEABLE) {
            r->variables[index].no_cacheable = 1;
        }

        return &r->variables[index];
    }

    r->variables[index].valid = 0;
    r->variables[index].not_found = 1;

    return NULL;
}

ngx_http_get_flushed_variable()还会对变量值的no_cacheable标志进行检查。如果为0,表示变量值可以cache, 则直接返回已缓存的变量值。否则,将变量值置为不可用,调用ngx_http_get_indexed_variable()对变量求值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ngx_http_variable_value_t *
ngx_http_get_flushed_variable(ngx_http_request_t *r, ngx_uint_t index)
{
    ngx_http_variable_value_t  *v;

    v = &r->variables[index];

    if (v->valid || v->not_found) {
        if (!v->no_cacheable) {
            return v;
        }

        v->valid = 0;
        v->not_found = 0;
    }

    return ngx_http_get_indexed_variable(r, index);
}

没有索引的变量,可以调用ngx_http_get_variable()完成取值。它会查找cmcf->variables_hash哈希表,找到变量,从相应变量缓存中取值或调用变量的get_handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cmcf = ngx_http_get_module_main_conf(r, ngx_http_core_module);

v = ngx_hash_find(&cmcf->variables_hash, key, name->data, name->len);

if (v) {

    if (v->flags & NGX_HTTP_VAR_INDEXED) {
        return ngx_http_get_flushed_variable(r, v->index);

    } else {

        vv = ngx_palloc(r->pool, sizeof(ngx_http_variable_value_t));

        if (vv && v->get_handler(r, vv, v->data) == NGX_OK) {
            return vv;
        }

        return NULL;
    }
}

NGINX中rewrite模块实现了set指令,可以给变量赋值。如果另一模块的变量也可以使用set来赋值,则多模块配合完成功能会更加灵活。 set指令的解析函数ngx_http_rewrite_set()代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static char *
ngx_http_rewrite_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_rewrite_loc_conf_t  *lcf = conf;

    ngx_int_t                            index;
    ngx_str_t                           *value;
    ngx_http_variable_t                 *v;
    ngx_http_script_var_code_t          *vcode;
    ngx_http_script_var_handler_code_t  *vhcode;

    value = cf->args->elts;

    if (value[1].data[0] != '$') {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "invalid variable name "%V"", &value[1]);
        return NGX_CONF_ERROR;
    }

    value[1].len--;
    value[1].data++;

    v = ngx_http_add_variable(cf, &value[1], NGX_HTTP_VAR_CHANGEABLE);
    if (v == NULL) {
        return NGX_CONF_ERROR;
    }

    index = ngx_http_get_variable_index(cf, &value[1]);
    if (index == NGX_ERROR) {
        return NGX_CONF_ERROR;
    }

    if (v->get_handler == NULL
        && ngx_strncasecmp(value[1].data, (u_char *) "http_", 5) != 0
        && ngx_strncasecmp(value[1].data, (u_char *) "sent_http_", 10) != 0
        && ngx_strncasecmp(value[1].data, (u_char *) "upstream_http_", 14) != 0)
    {
        v->get_handler = ngx_http_rewrite_var;
        v->data = index;
    }

    if (ngx_http_rewrite_value(cf, lcf, &value[2]) != NGX_CONF_OK) {
        return NGX_CONF_ERROR;
    }

    if (v->set_handler) {
        vhcode = ngx_http_script_start_code(cf->pool, &lcf->codes,
                                   sizeof(ngx_http_script_var_handler_code_t));
        if (vhcode == NULL) {
            return NGX_CONF_ERROR;
        }

        vhcode->code = ngx_http_script_var_set_handler_code;
        vhcode->handler = v->set_handler;
        vhcode->data = v->data;

        return NGX_CONF_OK;
    }

    vcode = ngx_http_script_start_code(cf->pool, &lcf->codes,
                                       sizeof(ngx_http_script_var_code_t));
    if (vcode == NULL) {
        return NGX_CONF_ERROR;
    }

    vcode->code = ngx_http_script_set_var_code;
    vcode->index = (uintptr_t) index;

    return NGX_CONF_OK;
}

它向lcf->codes函数引擎添加rewrite阶段需要执行的函数。如果检测到变量的set_handler存在,则添加ngx_http_script_var_set_handler_code函数,它会调用set_handler。而如果没有set_handler, 则添加ngx_http_script_set_var_code函数,它不会调用set_handler。由于内置变量添加一般是在preconfiguration中完成,因而解析set指令时,变量的set_handler存在,可以正常处理。而根据配置动态添加的变量如果解析出现在set指令后,set指令先被解析,此时变量的set_handler为空,此时添加的函数为ngx_http_script_set_var_code,v->set_handler不会得到调用。因此个人感觉添加函数引擎的逻辑应该放到postconfiguration中处理。此时和变量的各成员值都已被正常赋值。因而可以更方便地让根据配置动态添加的变量也可以和set指令轻松结合。

修改NGINX实现灵活设置文件缓存时间

业务要求Cache服务器能够随时增删允许访问的HOST。而每个HOST有单独的配置,这些配置随时都可能更改。如果单纯采用静态配置文件(nginx.conf)的方式,每次修改都要reload NGINX。如果更改很频繁,会造成服务器上存在大量的NGINX进程,导致服务器负载很高。因而我们将需要随时更改的配置存储于一个独立的配置服务器中。请求处理时,先去配置服务器中获取该请求需要使用的配置,再根据这些配置进行相应的处理。因而,我们可以随时更改配置服务器中的相应内容。 其中一个配置就是文件缓存时间。NGINX中设置文件缓存时间有两种方法:

  • 设置proxy_cache_valid指令
  • 在上游响应中的添加”Cache-Control” header和”Expires” header

其中上游响应header的优先级更高。当不想使用上游响应header中所设置的缓存时间时,可以使用以下指令来禁用。

1
proxy_ignore_headers X-Accel-Expires;

这两种方法都无法满足我们根据动态配置来设置缓存时间的需求。因而我给NGINX添加了一个内置变量”cache_time”来支持灵活地设置缓存时间,并且该种方式具有最高的优先级。这样,可以非常方便地在ngx_lua等第三方模块中根据条件设置不同的缓存时间。

在ngx_http_request_t添加一个cache_time成员,在ngx_http_core_variables数组中添加内置变量”cache_time”,”cache_time”在被赋值时会将值存储在r->cache_time中。

1
2
3
4
{ ngx_string("cache_time"), ngx_http_variable_request_set_time,
  ngx_http_variable_request_get_time,
  offsetof(ngx_http_request_t, cache_time),
  NGX_HTTP_VAR_CHANGEABLE|NGX_HTTP_VAR_NOCACHEABLE, 0 },
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void
ngx_http_variable_request_set_time(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data)
{
    ngx_str_t  val;
    time_t     valid, *vp;

    val.len = v->len;
    val.data = v->data;

    valid = ngx_parse_time(&val, 1);
    if (valid == (time_t) NGX_ERROR) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                      "invalid time value "%V"", &val);
        return;
    }

    vp = (time_t *) ((char *) r + data);

    *vp = valid;

    return;
}

因为”cache_time”变量需要比上游响应header具有更高的优先级,因而要在上游header处理之后再处理”cache_time”变量。上游响应的header在ngx_http_upstream_process_headers()中进行处理。因而我在upstream模块中添加了一个hook, 该hook在调用完ngx_http_upstream_process_headers()后,开始处理body前被调用。

1
2
3
4
5
6
7
8
9
10
11
if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
    return;
}

if (u->post_headers) {
    rc = u->post_headers(r);
    if (rc != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u, rc);
        return;
    }
}

proxy模块在该hook上注册一个函数,这个函数执行时,首先检查上游响应的状态码判断是否需要处理”cache_time”变量。检查通过后,读取”cache_time”变量的值,依据值来进行各种操作。当值为0时,禁用cache.为正值,则将缓存时间修改为该值。当修改cache缓存时间后,将上游响应中的”Cache-Control”和”Expires” header去除,不再发送给下游。

1
u->post_headers = ngx_http_proxy_post_headers;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static ngx_int_t
ngx_http_proxy_post_headers(ngx_http_request_t *r)
{
    ngx_uint_t         i;
    ngx_table_elt_t  **ph;

    if (ngx_http_upstream_check_status(r->upstream->conf->cache_time_valid,
                                       r->upstream->headers_in.status_n)
        == NGX_DECLINED)
    {
        return NGX_OK;
    }

    if (r->cache_time == (time_t) -1) {
        return NGX_OK;
    }

    if (r->cache_time == (time_t) 0) {
        r->upstream->cacheable = 0;
        return NGX_OK;
    }

    r->cache->valid_sec = ngx_time() + r->cache_time;

    r->headers_out.expires->hash = 0;

    ph = r->headers_out.cache_control.elts;
    for (i = 0; i < r->headers_out.cache_control.nelts; i++) {
        ph[i]->hash = 0;
    }

    return NGX_OK;
}

ejabberd中Jabber组件协议实现

XEP-0114中定义了Jabber组件协议(Jabber Componet Protocol)。XMPP网络外的可信组件可以使用这个协议和XMPP网络内实体进行通信。

组件协议定义了两种模式:

  • accept:外部组件向XMPP服务器发起连接
  • connect:XMPP服务器向外部组件发起连接

其中, accept方式使用比较广泛,ejabberd中只实现了accept方式。

组件协议像XMPP一样,也是基于XML流,使用的XMLNS为jabber:componet:accept或者jabber:component:connect

accept方式的协议流程:

  • 外部组件建立到XMPP服务器的TCP连接,发送流头。
1
2
3
4
<stream:stream
    xmlns='jabber:component:accept'
    xmlns:stream='http://etherx.jabber.org/streams'
    to='plays.shakespeare.lit'>
  • XMPP服务器回应,也发送流头,其中必须包括流ID属性:
1
2
3
4
5
<stream:stream
    xmlns:stream='http://etherx.jabber.org/streams'
    xmlns='jabber:component:accept'
    from='plays.shakespeare.lit'
    id='3BF96D32'>
  • 外部组件发送身份验证摘要信息。
1
<handshake>aaee83c26aeeafcbabeabfcbcd50df997e0a2a1e</handshake>

组件协议身份验证不使用SASL,也不使用已废弃的XEP-0078。它使用双方共享密钥计算摘要信息来验证身份。计算方法如下:

1. 将服务器流头中的流ID属性和共享密钥拼接成字符串
2. 计算该字符串的SHA1哈希值,并转换成小写16进制字符串
  • XMPP服务器用同样方法计算进行校验。通过后,返回一个空的handshake元素。
1
<handshake/>

至此,外部组件和XMPP服务器就可以交换XMPP消息了。

我们来看ejabberd中组件协议实现,位于ejabberd_service.erl模块中。

ejabberd中的ejabberd_service的默认配置为:

1
2
3
4
5
6
7
8
{8888, ejabberd_service, [
                          {access, all},
                          {shaper_rule, fast},
                          {ip, {127, 0, 0, 1}},
                          {hosts, ["icq.example.org", "sms.example.org"],
                                  [{password, "secret"}]
                          }
                         ]},

ejabberd_service是端口8888的处理模块。当有ejabberd接收端口上的TCP连接后,ejabberd_socket:start/4调用处理模块的socket_type/0, 根据返回值进行不同处理。ejabberd_service:socket_type/0返回xml_stream。它的处理流程和ejabberd_c2s模块相同。ejabberd为每个TCP连接分别创建一个receiver进程和一个处理进程(这里是ejabberd_service进程)。receiver进程接收消息并解析,然后发送相应的消息给处理进程。具体不再详述,请参考:ejabberd消息处理流程分析

service进程为gen_fsm进程,初始状态为wait_for_stream。receiver进程接收到XML流头后发送xmlstreamstart消息给service进程。service进程调用wait_for_stream函数进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
    case xml:get_attr_s("xmlns", Attrs) of
    "jabber:component:accept" ->
        %% Note: XEP-0114 requires to check that destination is a Jabber
        %% component served by this Jabber server.
        %% However several transports don't respect that,
        %% so ejabberd doesn't check 'to' attribute (EJAB-717)
        To = xml:get_attr_s("to", Attrs),
        Header = io_lib:format(?STREAM_HEADER,
                   [StateData#state.streamid, xml:crypt(To)]),
        send_text(StateData, Header),
        {next_state, wait_for_handshake, StateData};
    _ ->
        send_text(StateData, ?INVALID_HEADER_ERR),
        {stop, normal, StateData}
    end;

wait_for_stream检测到XML流头中XMLNS为”jabber:component:accept”后,向组件发送流头,状态变更为wait_for_handshake。

receiver进程收到handshake消息后,发送xmlstreamelement消息给service进程,service调用wait_for_handshake处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
wait_for_handshake({xmlstreamelement, El}, StateData) ->
    {xmlelement, Name, _Attrs, Els} = El,
    case {Name, xml:get_cdata(Els)} of
    {"handshake", Digest} ->
        case sha:sha(StateData#state.streamid ++
             StateData#state.password) of
        Digest ->
            send_text(StateData, "<handshake/>"),
            lists:foreach(
              fun(H) ->
                  ejabberd_router:register_route(H),
                  ?INFO_MSG("Route registered for service ~p~n", [H])
              end, StateData#state.hosts),
            {next_state, stream_established, StateData};
        _ ->
            send_text(StateData, ?INVALID_HANDSHAKE_ERR),
            {stop, normal, StateData}
        end;
    _ ->
        {next_state, wait_for_handshake, StateData}
    end;

wait_for_handshake使用XML流ID和密码计算身份验证摘要,和组件所发的摘要信息进行对比判断是否通过。检查通过后,发送空的handshake元素。然后调用ejabberd_router:register_route/1依次注册配置的所有service域名。这样,XMPP实体发往这些域名的消息都将被ejabberd_router路由给该service进程。service进程状态变更为stream_established。

至此,外部组件就可以和XMPP服务器交换XMPP消息了。

组件向XMPP服务器发送消息后,receiver进程解析后向service进程发送xmlstreamelement消息,service进程调用stream_established处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
stream_established({xmlstreamelement, El}, StateData) ->
    NewEl = jlib:remove_attr("xmlns", El),
    {xmlelement, Name, Attrs, _Els} = NewEl,
    From = xml:get_attr_s("from", Attrs),
    ...
    To = xml:get_attr_s("to", Attrs),
    ToJID = case To of
        "" -> error;
        _ -> jlib:string_to_jid(To)
        end,
    if
    ((Name == "iq") or
     (Name == "message") or
     (Name == "presence")) and
    (ToJID /= error) and (FromJID /= error) ->
        ejabberd_router:route(FromJID, ToJID, NewEl);
    true ->
        Err = jlib:make_error_reply(NewEl, ?ERR_BAD_REQUEST),
        send_element(StateData, Err),
        error
    end,
    {next_state, stream_established, StateData};

stream_established进行一系列检查后,调用ejabberd_router:route转发消息。

XMPP实体发送给service域名的消息会由ejabberd_router以route消息的格式发给service进程。service进程调用handle_info处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
handle_info({route, From, To, Packet}, StateName, StateData) ->
    case acl:match_rule(global, StateData#state.access, From) of
    allow ->
        {xmlelement, Name, Attrs, Els} = Packet,
        Attrs2 = jlib:replace_from_to_attrs(jlib:jid_to_string(From),
                        jlib:jid_to_string(To),
                        Attrs),
        Text = xml:element_to_binary({xmlelement, Name, Attrs2, Els}),
        send_text(StateData, Text);
    deny ->
        Err = jlib:make_error_reply(Packet, ?ERR_NOT_ALLOWED),
        ejabberd_router:route_error(To, From, Err, Packet)
    end,
    {next_state, StateName, StateData};

handle_info首先进行ACL检查,通过后,修改From和To属性,将消息发送给组件。

使用telnet演示简单登录过程:

1
2
3
4
5
6
7
8
9
10
[root@flygoast flygoast]# telnet 127.0.0.1 8888
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
<stream:stream
    xmlns='jabber:component:accept'
    xmlns:stream='http://etherx.jabber.org/streams'
    to='sms.example.com'>
<?xml version='1.0'?><stream:stream xmlns:stream='http://etherx.jabber.org/streams' xmlns='jabber:component:accept' id='2744762983' from='sms.example.com'><handshake>cffc7fab4feae018a325ea834d2dca8c3b707a51</handshake>
<handshake/>

身份校验信息计算:

1
2
[root@flygoast flygoast]# echo -n "2744762983secret" | sha1sum
cffc7fab4feae018a325ea834d2dca8c3b707a51  -

ejabberd中XMPP协议PING实现

根据XEP-0199, XMPP客户端和服务器都可以在XML流上发送应用层PING请求。因为XMPP依赖底层的TCP连接,有可能TCP连接意外中断,而上层的XMPP并不知晓,从而影响消息传递。通过发送应用层PING请求可以来确认对端的连接可用性。

以服务器发给客户端为例,协议如下:

发送的PING请求:

1
2
3
<iq from='capulet.lit' to='juliet@capulet.lit/balcony' id='s2c1' type='get'>
  <ping xmlns='urn:xmpp:ping'/>
</iq>

如果对端支持PING请求,则返回对应的”PONG”回应。

1
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='result'/>

如果对端不支持则返回错误。

1
2
3
4
5
6
<iq from='juliet@capulet.lit/balcony' to='capulet.lit' id='s2c1' type='error'>
  <ping xmlns='urn:xmpp:ping'/>
  <error type='cancel'>
    <service-unavailable xmlns='urn:ietf:params:xml:ns:xmpp-stanzas'/>
  </error>
</iq>

ejabberd中PING功能实现位于mod_ping.erl。它主要支持3个配置:

  • send_pings: true|false

如果这个选项设置为true, 当客户端在给定时间间隔内没有活动,则向客户端发送一个ping请求。

  • ping_interval: Seconds

设置上述send_pings选项中客户端没有活动的时间间隔。

  • timeout_action: none|kill

表示当PING请求发出32秒后,ejabberd依然没有收到PING响应,服务端如何处理。none表示什么也不做,kill表示关闭客户端连接。

当ejabberd启动时会调用mod_ping:start/2。

1
2
3
4
5
start(Host, Opts) ->
    Proc = gen_mod:get_module_proc(Host, ?MODULE),
    PingSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
                transient, 2000, worker, [?MODULE]},
    supervisor:start_child(?SUPERVISOR, PingSpec).

start函数调用supervisor:start_child/2为每个支持的host创建一个负责该host的worker进程。

进程树模型如下:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
+------------+
|ejabberd_sup|
+-----+------+
      |
      |    +------------------+
      +--->|Other processes...|
      |    +------------------+
      |
      |    +------------------------+
      +--->|ping(im.just4coding.com)|
      |    +------------------------+
      |
      |    +------------------------+
      +--->|ping(localhost)         |
      |    +------------------------+
      |
      |    +------------------------+
      +--->|ping(Other host)        |
           +------------------------+

每个worker是一个gen_server进程,进程调用init函数进行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
init([Host, Opts]) ->
    SendPings = gen_mod:get_opt(send_pings, Opts, ?DEFAULT_SEND_PINGS),
    PingInterval = gen_mod:get_opt(ping_interval, Opts, ?DEFAULT_PING_INTERVAL),
    TimeoutAction = gen_mod:get_opt(timeout_action, Opts, none),
    IQDisc = gen_mod:get_opt(iqdisc, Opts, no_queue),
    mod_disco:register_feature(Host, ?NS_PING),
    gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PING,
                                  ?MODULE, iq_ping, IQDisc),
    gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_PING,
                                  ?MODULE, iq_ping, IQDisc),
    case SendPings of
    true ->
        %% Ping requests are sent to all entities, whether they
        %% announce 'urn:xmpp:ping' in their caps or not
        ejabberd_hooks:add(sm_register_connection_hook, Host,
                           ?MODULE, user_online, 100),
        ejabberd_hooks:add(sm_remove_connection_hook, Host,
                           ?MODULE, user_offline, 100),
        ejabberd_hooks:add(user_send_packet, Host,
                           ?MODULE, user_send, 100);
    _ ->
        ok
    end,
    {ok, #state{host = Host,
                send_pings = SendPings,
                ping_interval = PingInterval,
                timeout_action = TimeoutAction,
                timers = ?DICT:new()}}.
  • 首先获取相关配置

  • 接着调用mod_disco:register_feature注册PING功能的XMLNS。这样当客户端请求”Service Discovery”信息时,ejabberd返回的特征中会包括”urn:xmpp:ping”。

ServiceDiscovery请求:

1
2
3
4
5
6
<iq type='get'
    from='juliet@capulet.lit/balcony'
    to='capulet.lit'
    id='disco1'>
  <query xmlns='http://jabber.org/protocol/disco#info'/>
</iq>

ServiceDiscovery响应:

1
2
3
4
5
6
7
8
9
10
<iq type='result'
    from='capulet.lit'
    to='juliet@capulet.lit/balcony'
    id='disco1'>
  <query xmlns='http://jabber.org/protocol/disco#info'>
    ...
    <feature var='urn:xmpp:ping'/>
    ...
  </query>
</iq>

ServiceDiscovery相关信息参考XEP-0030

  • 接下来,注册IQ处理器,令XMLNS为”urn:xmpp:ping”的IQ请求由函数iq_ping处理。iq_ping简单地返回相应响应或者错误。
1
2
3
4
5
6
7
iq_ping(_From, _To, #iq{type = Type, sub_el = SubEl} = IQ) ->
    case {Type, SubEl} of
        {get, {xmlelement, "ping", _, _}} ->
            IQ#iq{type = result, sub_el = []};
        _ ->
            IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]}
    end.

如果send_pings配置为true, mod_ping在ejabberd中注册n以下3个hook函数:

  • sm_register_connection_hook: 它在客户端完成登录验证,建立session信息时调用。
1
2
3
4
5
6
7
8
open_session(SID, User, Server, Resource, Info) ->
    set_session(SID, User, Server, Resource, undefined, Info),
    mnesia:dirty_update_counter(session_counter,
                jlib:nameprep(Server), 1),
    check_for_sessions_to_replace(User, Server, Resource),
    JID = jlib:make_jid(User, Server, Resource),
    ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver,
               [SID, JID, Info]).
  • sm_remove_connection_hook: 在用户退出,关闭session时调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
close_session(SID, User, Server, Resource) ->
    Info = case mnesia:dirty_read({session, SID}) of
    [] -> [];
    [#session{info=I}] -> I
    end,
    F = fun() ->
        mnesia:delete({session, SID}),
        mnesia:dirty_update_counter(session_counter,
                        jlib:nameprep(Server), -1)
    end,
    mnesia:sync_dirty(F),
    JID = jlib:make_jid(User, Server, Resource),
    ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver,
               [SID, JID, Info]).
  • user_send_packet: 在C2S进程收到客户端发送的消息时被调用。

sm_register_connection_hook的hook函数user_onlineuser_send_packet的hook函数user_send都会调用start_ping函数。

1
2
3
start_ping(Host, JID) ->
    Proc = gen_mod:get_module_proc(Host, ?MODULE),
    gen_server:cast(Proc, {start_ping, JID}).

start_ping向该HOST的worker进程发送一个{start_ping, JID}消息。worker进程调用handle_cast进行处理:

1
2
3
handle_cast({start_ping, JID}, State) ->
    Timers = add_timer(JID, State#state.ping_interval, State#state.timers),
    {noreply, State#state{timers = Timers}};

handle_cast调用add_timer为该客户端创建一个timer。

1
2
3
4
5
6
7
8
9
10
11
add_timer(JID, Interval, Timers) ->
    LJID = jlib:jid_tolower(JID),
    NewTimers = case ?DICT:find(LJID, Timers) of
            {ok, OldTRef} ->
            cancel_timer(OldTRef),
            ?DICT:erase(LJID, Timers);
            _ ->
            Timers
        end,
    TRef = erlang:start_timer(Interval * 1000, self(), {ping, JID}),
    ?DICT:store(LJID, TRef, NewTimers).

由于用户每次发送消息时都会调用add_timer函数,因而add_timer中需要检查之前是否已经存在timer。如果存在,则先取消旧的timer, 再创建新的Timer。

当timer超时后,即客户若干时间内没有活动,进程收到{ping, JID}消息,此时ejabberd应向客户端发送PING消息。进程调用handle_info处理。

1
2
3
4
5
6
7
8
9
10
11
handle_info({timeout, _TRef, {ping, JID}}, State) ->
    IQ = #iq{type = get,
             sub_el = [{xmlelement, "ping", [{"xmlns", ?NS_PING}], []}]},
    Pid = self(),
    F = fun(Response) ->
        gen_server:cast(Pid, {iq_pong, JID, Response})
    end,
    From = jlib:make_jid("", State#state.host, ""),
    ejabberd_local:route_iq(From, JID, IQ, F),
    Timers = add_timer(JID, State#state.ping_interval, State#state.timers),
    {noreply, State#state{timers = Timers}};

handle_info创建IQ消息后,设置回调函数F,调用ejabberd_local:route_iq/4消息IQ消息发送给客户端。当收到该IQ消息的响应或者超过32秒依然没有收到客户端的响应,回调函数F将会被调用。如果响应超时,Response为timeout,F将向进程发送{iq_pong, JID, timeout}消息。进程调用handle_cast处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
handle_cast({iq_pong, JID, timeout}, State) ->
    Timers = del_timer(JID, State#state.timers),
    ejabberd_hooks:run(user_ping_timeout, State#state.host, [JID]),
    case State#state.timeout_action of
    kill ->
        #jid{user = User, server = Server, resource = Resource} = JID,
        case ejabberd_sm:get_session_pid(User, Server, Resource) of
        Pid when is_pid(Pid) ->
            ejabberd_c2s:stop(Pid);
        _ ->
            ok
        end;
    _ ->
        ok
    end,
    {noreply, State#state{timers = Timers}};

如果timeout_action设置为kill, 则调用ejabberd_c2s:stop关闭相应的客户端连接。

因为在sm_remove_connection_hook注册了hook函数user_offline, 当用户退出时会调用stop_ping函数,向worker进程发送{stop_ping, JID}消息。

1
2
3
stop_ping(Host, JID) ->
    Proc = gen_mod:get_module_proc(Host, ?MODULE),
    gen_server:cast(Proc, {stop_ping, JID}).

worker进程调用del_timer函数将该客户端的timer删除。

1
2
3
handle_cast({stop_ping, JID}, State) ->
    Timers = del_timer(JID, State#state.timers),
    {noreply, State#state{timers = Timers}};
1
2
3
4
5
6
7
8
9
del_timer(JID, Timers) ->
    LJID = jlib:jid_tolower(JID),
    case ?DICT:find(LJID, Timers) of
        {ok, TRef} ->
        cancel_timer(TRef),
        ?DICT:erase(LJID, Timers);
        _ ->
        Timers
    end.

模块及进程停止的逻辑与模块和进程初始化的逻辑相反,本文略过。

NGINX发送响应分析

NGINX中使用ngx_http_output_filter()向一个请求发送响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ngx_int_t
ngx_http_output_filter(ngx_http_request_t *r, ngx_chain_t *in)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = r->connection;

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http output filter "%V?%V"", &r->uri, &r->args);

    rc = ngx_http_top_body_filter(r, in);

    if (rc == NGX_ERROR) {
        /* NGX_ERROR may be returned by any filter */
        c->error = 1;
    }

    return rc;
}

其中,r是请求结构体,in为以ngx_chain_t结构链接起来的需要发送的内容。ngx_http_output_filter()会调用ngx_http_top_body_filter()。NGINX采用filter机制对响应进行处理。filter分为header filterbody filter。NGINX分别将两种filter各构建成一个链表。全局变量ngx_http_top_header_filter指向header filter链表的头结点,而全局变量ngx_http_top_body_filter指向body filter链表的头结点。每个filter中用一个变量记录下一个filter,依据情况决定是调用下一个filter,还是直接返回。

body filter链表顺序如下图:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
  +--------------------------+
  |ngx_http_range_body_filter|
  +----------+---------------+
             |
             v
  +----------+---------+
  |ngx_http_copy_filter|
  +----------+---------+
             |
             v
  +----------+-----------------+
  |ngx_http_charset_body_filter|
  +----------+-----------------+
             |
             v
  +----------+-------------+
  |ngx_http_ssi_body_filter|
  +----------+-------------+
             |
             v
  +----------+-------------+
  |ngx_http_postpone_filter|
  +----------+-------------+
             |
             v
  +----------+--------------+
  |ngx_http_gzip_body_filter|
  +----------+--------------+
             |
             v
  +----------+-----------------+
  |ngx_http_chunked_body_filter|
  +----------+-----------------+
             |
             v
  +---------------------+
  |ngx_http_write_filter|
  +---------------------+

header filter链表顺序如图:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  +----------------------------+
  |ngx_http_not_modified_filter|
  +----------+-----------------+
             |
             v
  +----------+------------+
  |ngx_http_headers_filter|
  +----------+------------+
             |
             v
  +----------+-----------+
  |ngx_http_userid_filter|
  +----------+-----------+
             |
             v
  +----------+-------------------+
  |ngx_http_charset_header_filter|
  +----------+-------------------+
             |
             v
  +----------+---------------+
  |ngx_http_ssi_header_filter|
  +----------+---------------+
             |
             v
  +----------+----------------+
  |ngx_http_gzip_header_filter|
  +----------+----------------+
             |
             v
  +----------+-----------------+
  |ngx_http_range_header_filter|
  +----------+-----------------+
             |
             v
  +----------+-------------------+
  |ngx_http_chunked_header_filter|
  +----------+-------------------+
             |
             v
  +----------+-----------+
  |ngx_http_header_filter|
  +----------------------+

ngx_http_write_filter()是最后一个被调用的body filter,它进行真正的网络I/O操作,将响应发送给客户端。实际上, ngx_http_header_filter()也是调用ngx_http_write_filter()来发送响应中的headers。

ngx_http_write_filter()的简化流程如下:

  • 检查之前是否有错误发生(c->error被置位)。如果有错误发生,则没有必要再进行网络I/O操作,直接返回NGX_ERROR。
1
2
3
if (c->error) {
    return NGX_ERROR;
}
  • 计算之前没有发送完成的内容大小并检查是否存在特殊标志。为了优化性能,当没有必要立即发送响应且响应内容大小没有达到设置的阀值时,NGINX可以暂时推迟发送该部分响应。参看:postpone_output指令。flush标志表示需要立即发送响应。recycled表示该buffer需要循环使用,因而需要立即发送以释放该buffer被重新使用。last标志表示该buffer是响应的最后一部分内容,因而也需要立即发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (cl = r->out; cl; cl = cl->next) {
    ll = &cl->next;

    ......

    size += ngx_buf_size(cl->buf);

    if (cl->buf->flush || cl->buf->recycled) {
        flush = 1;
    }

    if (cl->buf->last_buf) {
        last = 1;
    }
}
  • 计算本次将发送的内容大小,检查是否存在特殊标志,并将内容链接到r->out上。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for (ln = in; ln; ln = ln->next) {
    cl = ngx_alloc_chain_link(r->pool);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    cl->buf = ln->buf;
    *ll = cl;
    ll = &cl->next;

    ...

    size += ngx_buf_size(cl->buf);

    if (cl->buf->flush || cl->buf->recycled) {
        flush = 1;
    }

    if (cl->buf->last_buf) {
        last = 1;
    }
}
  • 根据情况决定是需要真正进行网络I/O操作, 还是直接返回。
1
2
3
if (!last && !flush && in && size < (off_t) clcf->postpone_output) {
    return NGX_OK;
}
  • 真正进行网络I/O操作,发送内容。
1
chain = c->send_chain(c, r->out, limit);
  • 回收发送完成内容的buffer和chain结构, 将没有发送完成的内容存入r->out
1
2
3
4
5
6
7
for (cl = r->out; cl && cl != chain; /* void */) {
    ln = cl;
    cl = cl->next;
    ngx_free_chain(r->pool, ln);
}

r->out = chain;
  • 根据发送是否完成,返回NGX_OKNGX_AGAIN
1
2
3
4
5
6
7
8
9
10
11
12
if (chain) {
    c->buffered |= NGX_HTTP_WRITE_BUFFERED;
    return NGX_AGAIN;
}

c->buffered &= ~NGX_HTTP_WRITE_BUFFERED;

if ((c->buffered & NGX_LOWLEVEL_BUFFERED) && r->postponed == NULL) {
    return NGX_AGAIN;
}

return NGX_OK;

此外,ngx_http_write_filter()中也处理了限速发送的逻辑,本文不详述。

Ejabberd登录验证流程分析

XMPP使用SASL进行登录验证。ejabberd中模仿CyrusSASL库自己实现了SASL协议,API用法与CyrusSASL类似。

ejabberd启动时,ejabberd_app:start/0会调用cyrsasl:start/0。它创建了一个ets表sasl_mechanism,然后调用各个SASL机制模块的start函数。

1
2
3
4
5
6
7
8
9
start() ->
    ets:new(sasl_mechanism, [named_table,
                             public,
                             {keypos, #sasl_mechanism.mechanism}]),
    cyrsasl_plain:start([]),
    cyrsasl_digest:start([]),
    cyrsasl_scram:start([]),
    cyrsasl_anonymous:start([]),
    ok.

各个SASL机制模块的start/1函数会调用cyrsasl:register_mechanism/3, 参数分别为SASL机制名称,机制处理模块,机制支持的密码存储类型。register_mechanism会将机制信息添加到sasl_mechanism表中。

1
2
3
start(_Opts) ->
    cyrsasl:register_mechanism("PLAIN", ?MODULE, plain),
    ok.
1
2
3
4
5
register_mechanism(Mechanism, Module, PasswordType) ->
    ets:insert(sasl_mechanism,
               #sasl_mechanism{mechanism = Mechanism,
                               module = Module,
                               password_type = PasswordType}).

客户端连接到ejabberd后,ejabberd会创建两个进程。一个ejabberd_c2s进程来处理连接状态并向客户端发送数据,另一个ejabberd_receiver进程接收客户端数据。ejabberd_c2s进程实现了gen_fsm行为,有限状态机的初始状态为wait_for_stream。ejabberd_c2s:init/1的简化代码逻辑为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
init([{SockMod, Socket}, Opts]) ->
    ...
    IP = peerip(SockMod, Socket),
    %% Check if IP is blacklisted:
    case is_ip_blacklisted(IP) of
    true ->
        ?INFO_MSG("Connection attempt from blacklisted IP: ~s (~w)",
                  [jlib:ip_to_list(IP), IP]),
        {stop, normal};
    false ->
        ...
        {ok, wait_for_stream, #state{socket         = Socket1,
                                     sockmod        = SockMod,
                                     socket_monitor = SocketMonitor,
                                     xml_socket     = XMLSocket,
                                     zlib           = Zlib,
                                     tls            = TLS,
                                     tls_required   = StartTLSRequired,
                                     tls_enabled    = TLSEnabled,
                                     tls_options    = TLSOpts,
                                     streamid       = new_id(),
                                     access         = Access,
                                     shaper         = Shaper,
                                     ip             = IP},
         ?C2S_OPEN_TIMEOUT}
    end.

ejabberd_receiver进程解析到XML流头之后会调用gen_fsm:send_event向ejabberd_c2s进程发送消息。

1
2
3
4
5
6
7
8
9
10
11
12
process_data([Element|Els], #state{c2s_pid = C2SPid} = State)
  when element(1, Element) == xmlelement;
       element(1, Element) == xmlstreamstart;
       element(1, Element) == xmlstreamelement;
       element(1, Element) == xmlstreamend ->
    if
    C2SPid == undefined ->
        State;
    true ->
        catch gen_fsm:send_event(C2SPid, element_wrapper(Element)),
        process_data(Els, State)
    end;

ejabberd_c2s进程接收到{xmlstreamstart, _Name, Attrs}消息后,调用状态函数wait_for_stream/2来处理。wait_for_stream在一系列正确性校验通过之后,回应给客户端XML流头。如果这个XML流之前还没有通过登录验证,则进行登录验证过程。简化的代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
case StateData#state.authenticated of
false ->
    SASLState = cyrsasl:server_new(
        "jabber", Server, "", [],
        fun(U) ->
            ejabberd_auth:get_password_with_authmodule(U, Server)
        end,
        fun(U, P) ->
            ejabberd_auth:check_password_with_authmodule(U, Server, P)
        end,
        fun(U, P, D, DG) ->
            ejabberd_auth:check_password_with_authmodule(U, Server, P, D, DG)
        end),
    Mechs = lists:map(
            fun(S) ->
                {xmlelement, "mechanism", [], [{xmlcdata, S}]}
            end, cyrsasl:listmech(Server)),
    ...

    send_element(StateData,
            {xmlelement, "stream:features", [],
            TLSFeature ++ CompressFeature ++
            [{xmlelement, "mechanisms",
            [{"xmlns", ?NS_SASL}],
            Mechs}] ++
            ejabberd_hooks:run_fold(
                c2s_stream_features,
                Server,
                [], [Server])}),
    fsm_next_state(wait_for_feature_request,
            StateData#state{
            server = Server,
            sasl_state = SASLState,
            lang = Lang});
_ ->
    ...
end

首先调用cyrsasl:server_new/7创建一个SASL验证状态, 其中存储了3个用于密码校验的回调函数。

1
2
3
4
5
6
7
8
server_new(Service, ServerFQDN, UserRealm, _SecFlags,
       GetPassword, CheckPassword, CheckPasswordDigest) ->
    #sasl_state{service = Service,
                myname = ServerFQDN,
                realm = UserRealm,
                get_password = GetPassword,
                check_password = CheckPassword,
                check_password_digest= CheckPasswordDigest}.

wait_for_stream函数接着调用cyrsasl:listmech/1获取当前域名所支持的SASL验证机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
listmech(Host) ->
    Mechs = ets:select(sasl_mechanism,
       [{#sasl_mechanism{mechanism = '$1',
                         password_type = '$2',
                         _ = '_'},
        case catch ejabberd_auth:store_type(Host) of
        external ->
             [{'==', '$2', plain}];
        scram ->
             [{'/=', '$2', digest}];
        {'EXIT',{undef,[{Module,store_type,[]} | _]}} ->
             ?WARNING_MSG("~p doesn't implement the function store_type/0", [Module]),
             [];
        _Else ->
             []
        end,
        ['$1']}]),
    filter_anonymous(Host, Mechs).

listmech函数调用ejabberd_auth:store_type/1从ejabberd.cfg文件获取密码存储格式(auth_password_format)的配置。从sasl_mechanism表中查询出支持该密码存储格式的SASL机制。wait_for_stream函数将这些机制组织成XMPP协议格式发送回客户端,将当前进程状态改为wait_for_feature_request。比如,发送的机制列表为:

1
2
3
4
5
6
7
8
9
<stream:features>
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
<mechanism>DIGEST-MD5</mechanism>
<mechanism>SCRAM-SHA-1</mechanism>
</mechanisms>
<c xmlns="http://jabber.org/protocol/caps" node="http://www.process-one.net/en/ejabberd/" ver="yy7di5kE0syuCXOQTXNBTclpNTo=" hash="sha-1"/>
<register xmlns="http://jabber.org/features/iq-register"/>
</stream:features>

客户端从其中选择一个机制并发送给ejabberd服务器。如:

1
<auth xmlns="urn:ietf:params:xml:ns:xmpp-sasl" mechanism="PLAIN">AGFhYQAxMjM=</auth>

ejabberd_receiver进程解析完这个XML元素后,发送消息{xmlstreamelement, El}给ejabberd_c2s进程。ejabberd_c2s进程调用wait_for_feature_request函数进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Mech = xml:get_attr_s("mechanism", Attrs),
ClientIn = jlib:decode_base64(xml:get_cdata(Els)),
case cyrsasl:server_start(StateData#state.sasl_state,
                          Mech,
                          ClientIn) of
{ok, Props} ->
    (StateData#state.sockmod):reset_stream(StateData#state.socket),
    send_element(StateData, {xmlelement, "success", [{"xmlns", ?NS_SASL}], []}),
    U = xml:get_attr_s(username, Props),
    AuthModule = xml:get_attr_s(auth_module, Props),
    ...
    fsm_next_state(wait_for_stream,
                   StateData#state{
                       streamid = new_id(),
                       authenticated = true,
                       auth_module = AuthModule,
                       user = U });
{continue, ServerOut, NewSASLState} ->
    send_element(StateData,
                 {xmlelement, "challenge",
                 [{"xmlns", ?NS_SASL}],
                 [{xmlcdata,
                 jlib:encode_base64(ServerOut)}]}),
    fsm_next_state(wait_for_sasl_response,
                   StateData#state{
                       sasl_state = NewSASLState});
{error, Error, Username} ->
    IP = peerip(StateData#state.sockmod, StateData#state.socket),
    ...
    send_element(StateData,
                 {xmlelement, "failure",
                 [{"xmlns", ?NS_SASL}],
                 [{xmlelement, Error, [], []}]}),
    {next_state, wait_for_feature_request, StateData, ?C2S_OPEN_TIMEOUT};
{error, Error} ->
    send_element(StateData,
                 {xmlelement, "failure",
                 [{"xmlns", ?NS_SASL}],
                 [{xmlelement, Error, [], []}]}),
    fsm_next_state(wait_for_feature_request, StateData)
end;

ejabberd_c2s进程判断收到的XML元素是auth请求后,从请求中获取客户端选择的机制mechanism,读取客户端发送的信息,然后调用cyrsasl:server_start/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
server_start(State, Mech, ClientIn) ->
    case lists:member(Mech, listmech(State#sasl_state.myname)) of
    true ->
        case ets:lookup(sasl_mechanism, Mech) of
        [#sasl_mechanism{module = Module}] ->
            {ok, MechState} = Module:mech_new(
                    State#sasl_state.myname,
                    State#sasl_state.get_password,
                    State#sasl_state.check_password,
                    State#sasl_state.check_password_digest),
            server_step(State#sasl_state{mech_mod = Module,
                         mech_state = MechState},
                ClientIn);
        _ ->
            {error, "no-mechanism"}
        end;
    false ->
        {error, "no-mechanism"}
    end.

server_start从sasl_mechanism表中查询出机制模块,并调用机制模块的mech_new/4。这个函数会创建一个机制本身的状态结构。如,PLAIN机制模块的mech_new/4:

1
2
mech_new(_Host, _GetPassword, CheckPassword, _CheckPasswordDigest) ->
    {ok, #state{check_password = CheckPassword}}.

server_start函数将这个机制状态保存到SASL状态结构里的mech_state字段,调用server_step。而server_step则调用机制模块的mech_step。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
server_step(State, ClientIn) ->
    Module = State#sasl_state.mech_mod,
    MechState = State#sasl_state.mech_state,
    case Module:mech_step(MechState, ClientIn) of
    {ok, Props} ->
        case check_credentials(State, Props) of
        ok ->
            {ok, Props};
        {error, Error} ->
            {error, Error}
        end;
    {ok, Props, ServerOut} ->
        case check_credentials(State, Props) of
        ok ->
            {ok, Props, ServerOut};
        {error, Error} ->
            {error, Error}
        end;
    {continue, ServerOut, NewMechState} ->
        {continue, ServerOut,
         State#sasl_state{mech_state = NewMechState}};
    {error, Error, Username} ->
        {error, Error, Username};
    {error, Error} ->
        {error, Error}
    end.

Module:mech_step根据自身机制状态,返回不同的值。当验证通过时,返回ok信息。如若还需要其他信息继续验证,则返回continue信息。验证出错时,返回error信息。wait_for_feature_request函数根据不同的返回值,进行不同的处理。

  • 当返回ok信息时,ejabberd_c2s进程向客户端发送验证成功的消息,登录验证流程结束。
  • 当返回continue信息时,表示验证流程需要继续,因而向客户端返回服务端的chelange信息,ejabberd_c2s进程状态变为wait_for_sasl_response。当客户端再回应验证信息后,wait_for_sasl_response函数再次调用server_step进行验证处理。
  • 当返回error信息时,ejabberd_c2s进程向客户端发送验证失败的消息,登录验证流程结束。

PLAIN机制只需要用户名和密码,这些信息附在客户端选择机制的AUTH请求中。因而只需要调用一次mech_step函数,mech_step也因而不会返回continue。cyrsasl_plain:mech_step调用mech_new中传入的check_password回调函数(不同机制会调用不同的回调函数)来检查用户名和密码是否正确。这部分逻辑由ejabberd_auth模块封装不同的模块完成,本文略过。

1
2
3
4
5
6
7
8
9
10
11
12
13
mech_step(State, ClientIn) ->
    case prepare(ClientIn) of
    [AuthzId, User, Password] ->
        case (State#state.check_password)(User, Password) of
        {true, AuthModule} ->
            {ok, [{username, User}, {authzid, AuthzId},
              {auth_module, AuthModule}]};
        _ ->
            {error, "not-authorized", User}
        end;
    _ ->
        {error, "bad-protocol"}
    end.

SCRAM机制需要多次Chelange/Response交互,需要多次调用它的mech_step。因而它在机制状态内部来分步骤完成,具体可以参考cyrsasl_scram.erl,本文不详述。

当机制mech_step返回ok或error时,ejabberd_c2s进程返回给客户端相应的回应,登录验证的流程结束。

注: ejabberd代码版本为2.1.13。

SASL与身份验证

身份验证是很多C/S模式应用协议的通用需求,为了避免每个协议都单独实现一套验证逻辑,SASL(Simple Authentication and Secure Layer)被提出了, 它定位成为基于可靠连接的应用协议提供身份验证和数据安全服务的通用框架。SASL定义了通用的身份验证信息交换的流程, 并且包含一系列验证机制。这些验证机制完成具体的身份验证逻辑。这样,SASL就成为了一个将应用协议和验证机制相连接的抽象层,如下图所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-------------------------------------------------------------

      +----+   +----+    +----+   +-------------------+
      |SMTP|   |LDAP|    |XMPP|   |Other protocols ...|
      +--+-+   +--+-+    +--+-+   +--+----------------+
         |        |         |        |
         |        |         |        |
   ------+--------+---------+--------+------------------
                 SASL abstraction layer
   ------+--------+---------+--------+------------------
         |        |         |        |
         |        |         |        |
   +-----+--+  +--+---+  +--+--+  +--+-----------------+
   |EXTERNAL|  |GSSAPI|  |PLAIN|  |Other machanisms ...|
   +--------+  +------+  +-----+  +--------------------+

-------------------------------------------------------------

任何应用协议都可以使用任何验证机制,而具体使用哪个机制则由应用协议的客户端和服务器进行协商。

分别以”C:”和”S:”代表客户端和服务端,SASL规定的验证信息交换的基本流程为:

1
2
3
4
5
C: 请求验证交换
S: 最初的挑战码
C: 最初的响应消息
<额外的挑战码/响应消息>
S: 身份验证结果

根据机制不同,流程略有差异。

具体应用哪个机制进行身份验证由使用SASL的应用协议来协商。服务器向客户端通告服务器所支持的机制, 客户端从中选择一个它支持且最合适的机制并通知服务器,请求开始身份验证。接下来便是上述的一系列Chalenges/Responses信息交换。这些信息载体的形式由应用协议指定。最终服务器发送回身份验证的结果。

SASL机制注册由IANA维护: http://www.iana.org/assignments/sasl-mechanisms/sasl-mechanisms.xhtml

下面说明几个具体的SASL机制:

  • EXTERNAL:

EXTERNAL机制允许客户端请求服务器使用其他途径获取的验证信息来验证该客户端。如通过TLS获取的验证信息。以ACAP(Application Configuration Access Protocol)协议来举例:

1
2
3
4
5
6
7
8
9
S: * ACAP (SASL "DIGEST-MD5")
C: a001 STARTTLS
S: a001 OK "Begin TLS negotiation now"
<TLS negotiation, further commands are under TLS layer>
S: * ACAP (SASL "DIGEST-MD5" "EXTERNAL")
C: a002 AUTHENTICATE "EXTERNAL"
S: + ""
C: + ""
S: a002 OK "Authenticated"

在TLS安全层建立后,服务端通告它支持DIGEST-MD5和EXTERNAL机制,客户端选择使用EXTERNAL机制,并且不使用其他授权实体。服务器使用外部信息验证通过后,返回成功的响应。

  • PLAIN

PLAIN机制只需要传递一条消息,这个消息由授权实体,验证实体和密码三部分组成。如下图所示:

1
authzid<NUL>authcid<NUL>passwd

授权实体authzid为可选的。如果提供了它,身份验证通过后,如果权限允许,将以authzid身份进行操作。如果权限不允许,则服务器返回授权失败。由于PLAIN机制直接传递密码本身,因而不应该在没有私密性保护的连接上使用。 同样以ACAP协议举例:

1
2
3
4
5
6
7
8
S: * ACAP (SASL "CRAM-MD5") (STARTTLS)
C: a001 STARTTLS
S: a001 OK "Begin TLS negotiation now"
<TLS negotiation, further commands are under TLS layer>
S: * ACAP (SASL "CRAM-MD5" "PLAIN")
C: a002 AUTHENTICATE "PLAIN" {20+}
C: Ursel<NUL>Kurt<NUL>xipj3plmq
S: a002 NO "Not authorized to requested authorization identity"

TLS安全层建立后,服务器通告它支持CRAM-MD5和PLAIN机制,客户端选择PLAIN机制,并发送身份验证消息,服务器返回授权失败,即Kurt身份验证通过,但不能以Ursel的身份进行操作。

  • SCRAM-SHA-1

SCRAM是一系统机制的统称,具体机制名称后缀上算法所使用的HASH函数。我们以SCRAM-SHA-1举例,它使用SHA-1哈希函数。PLAIN机制在网络上传输的是密码本身,因而只应该用在TLS等安全层之上。SCRAM机制则没有这个限制。

下面的例子略去机制协商的过程, 用户名为”user”, 密码为”pencil”:

1
2
3
4
C: n,,n=user,r=fyko+d2lbbFgONRv9qkxdawL
S: r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92, i=4096
C: c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=
S: v=rmF9pqV8S7suAoZWja4dJRkFsKQ=

SCRAM机制的消息由多个属性构成,每个属性为”a=xxx”的形式,而且属性有顺序要求。

客户端发送的首条消息包括了以下内容:

  1. 一个GS2头,它包括一个字符,只能为”n”, “y”, “p”,是通道绑定的标识,和一个授权实体(例子中没有提供,因而为”,,“,当需要指定时使用属性a, 如”a=dummy”)。
  2. 属性n, 表示身份验证的用户名。
  3. 属性r, 表示客户端nonce, 一个随机的可打印字符串(不能包括”,“)。

我们把后两部分称为client_first_message_bare, 后面算法中要使用它。

然后服务器回应首条消息。属性r为客户端NONCE拼接上服务器的随机NONCE值,属性s为使用BASE64编码后的用户密码的salt值,属性i为迭代次数。在后面介绍的算法中可以看到具体用途。这条消息我们称为server_first_message, 后面算法需要使用它。

接着,客户端发送末条消息。属性c为使用BASE64编码的GS2头及通道绑定数据。例子中的”biws”解码后为”n,,“, 即客户端首条消息的第一部分,属性r与服务器回应的属性r必须相同。属性p为使用BASE64编码的客户端证明信息(ClientProof)。它由客户端使用后面介绍的算法计算得到。我们把前两个属性称为client_final_message_without_proof, 后面算法要使用它。

服务端验证客户端发送的NONCE值和证明信息(ClientProof),如果提供了授权实体,则也需要验证是否可以授权给该实体,然后发送服务端末条消息。属性v为服务器签名(ServerSignature)。客户端通过比较计算得到的和从服务端所收到的签名是否相同来对服务器进行身份验证。如果服务器验证失败,将回应属性e, 它可以用来诊断错误原因。

下面介绍客户端和服务器签名的具体算法:

1
2
3
4
5
6
7
8
SaltedPassword := Hi(Normalize(password), salt, i)
ClientKey := HMAC(SaltedPassword, "Client Key")
StoredKey := H(ClientKey)
AuthMessage := client-first-message-bare + "," + server-first-message + "," + client-final-message-without-proof
ClientSignature := HMAC(StoredKey, AuthMessage)
ClientProof := ClientKey XOR ClientSignature
ServerKey := HMAC(SaltedPassword, "Server Key")
ServerSignature := HMAC(ServerKey, AuthMessage)

其中HMAC原型为HMAC(key, str), Hi函数算法为:

1
2
3
4
5
6
7
8
Hi(str, salt, i):

U1 := HMAC(str, salt + INT(1))
U2 := HMAC(str, U1)
...
Ui-1 := HMAC(str, Ui-2)
Ui := HMAC(str, Ui-1)
Hi := U1 XOR U2 XOR ... XOR Ui

用PHP实现该算法来验证上述例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?php
function hi($str, $salt, $i) {
    $int1 = "\0\0\0\1";
    $ui = hash_hmac("sha1", $salt . $int1, $str, true);
    $result = $ui;

    for ($k = 1; $k < $i; $k++) {
        $ui = hash_hmac("sha1", $ui, $str, true);
        $result = $result ^ $ui;
    }

    return $result;
}

$password = "pencil";
$salt = base64_decode('QSXCR+Q6sek8bf92');
$i = 4096;
$client_first_message_bare = 'n=user,r=fyko+d2lbbFgONRv9qkxdawL';
$server_first_message = 'r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j,s=QSXCR+Q6sek8bf92,i=4096';
$client_final_message_without_proof = 'c=biws,r=fyko+d2lbbFgONRv9qkxdawL3rfcNHYJY1ZVvWVs7j';

$salted_password = hi($password, $salt, $i);
$client_key = hash_hmac("sha1", "Client Key", $salted_password, true);
$stored_key = sha1($client_key, true);
$auth_message = $client_first_message_bare . ","
                . $server_first_message . ","
                . $client_final_message_without_proof;
$client_signature = hash_hmac("sha1", $auth_message, $stored_key, true);
$client_proof = $client_key ^ $client_signature;

$server_key = hash_hmac("sha1", "Server Key", $salted_password, true);
$server_signature = hash_hmac("sha1", $auth_message, $server_key, true);

echo "p=" . base64_encode($client_proof) . "\n";
echo "v=" . base64_encode($server_signature) . "\n";
?>

输出结果为:

1
2
p=v0X8v3Bz2T0CJGbJQyF0X+HI4Ts=
v=rmF9pqV8S7suAoZWja4dJRkFsKQ=

与上述例子中值相符。

在实际项目中,一般不需要自己来实现这些验证算法。C语言可直接使用CyrusSASL库或GNU的libgsasl。

Ejabberd消息处理流程分析

ejabberd的程序入口为ejabberd_app.erl中的start/2,简化后逻辑如下:

1
2
3
4
5
6
7
8
9
start(normal, _Args) ->
    ...
    Sup = ejabberd_sup:start_link(),
    ...
    ejabberd_listener:start_listeners(),
    ?INFO_MSG("ejabberd ~s is started in the node ~p", [?VERSION, node()]),
    Sup;
start(_, _) ->
    {error, badarg}.

ejabberd_sup是一个supervisor,调用start_link时,它会生成各功能组件进程,如下图所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
------------------------------------------------

                +------------+
                |ejabberd_sup|
                +-----+------+
                      |
+---------------------+------------------------+
|                                              |
|  +-----+              +-------+              |
|  |hooks|              |c2s_sup|              |
|  +-----+              +-------+              |
|                                              |
|  +-----------+        +---------+            |
|  |node_groups|        |s2sin_sup|            |
|  +-----------+        +---------+            |
|                                              |
|  +--------------+     +----------+           |
|  |system_monitor|     |s2sout_sup|           |
|  +--------------+     +----------+           |
|                                              |
|  +------+             +-----------+          |
|  |router|             |service_sup|          |
|  +------+             +-----------+          |
|                                              |
|  +--+                 +--------+             |
|  |sm|                 |http_sup|             |
|  +--+                 +--------+             |
|                                              |
|  +---+                +-------------+        |
|  |s2s|                |http_poll_sup|        |
|  +---+                +-------------+        |
|                                              |
|  +-----+              +-------------------+  |
|  |local|              |frontend_socket_sup|  |
|  +-----+              +-------------------+  |
|                                              |
|  +-------+            +------+               |
|  |captcha|            |iq_sup|               |
|  +-------+            +------+               |
|                                              |
|  +--------+           +--------+             |
|  |listener|           |stun_sup|             |
|  +--------+           +--------+             |
|                                              |
|  +------------+       +-------------+        |
|  |receiver_sup|       |cache_tab_sup|        |
|  +------------+       +-------------+        |
|                                              |
+----------------------------------------------+

部分关键组件功能:

  • hooks: 执行各模块注册的HOOK函数
  • router: 分发用户发送的消息
  • sm: session manager, 处理用户到用户的消息分发
  • local: 处理发送给Server本身的消息。
  • listener: supervisor进程,创建子进程监听网络端口
  • receiver_sup: supervisor进程, 它为每一个TCP连接创建一个进程来接收该TCP连接的数据
  • c2s_sup: supervisor进程, 它为每一个TCP连接创建一个进程来处理协议状态,并负责向TCP连接发送数据

ejabberd_listener:start_listerners/0会从配置文件(ejabberd.cfg)中获取“listen”选项。 listen配置的一般形式为:

1
2
3
4
5
6
7
8
9
10
{listen,
 [
  {5222, ejabberd_c2s, [
            {access, c2s},
            {shaper, c2s_shaper},
            {max_stanza_size, 65536}
               ]},
  ...
 ]
}

每一个端口需要指定一个处理模块。

start_listeners/0遍历listen配置中指定的所有端口,为每个端口调用start_listener/3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
start_listeners() ->
    case ejabberd_config:get_local_option(listen) of
    undefined ->
        ignore;
    Ls ->
        Ls2 = lists:map(
            fun({Port, Module, Opts}) ->
                case start_listener(Port, Module, Opts) of
                {ok, _Pid} = R -> R;
                {error, Error} ->
                throw(Error)
            end
        end, Ls),
        report_duplicated_portips(Ls),
        {ok, {{one_for_one, 10, 1}, Ls2}}
    end.

start_listerner最终会调用start_listener_sup/3。它通过supervisor:start_child令ejabberd_listeners进程创建子进程执行ejabberd_listener:start/3。

1
2
3
4
5
6
7
8
start_listener_sup(Port, Module, Opts) ->
    ChildSpec = {Port,
         {?MODULE, start, [Port, Module, Opts]},
         transient,
         brutal_kill,
         worker,
         [?MODULE]},
    supervisor:start_child(ejabberd_listeners, ChildSpec).

ejabberd_listener:start/3依据该端口处理模块的socket_type/0函数的返回值进行相应处理。如果返回值为independent时,则表示该处理模块自行处理监听端口a。否则,调用start_dependent/3。

1
2
3
4
5
6
7
start(Port, Module, Opts) ->
    %% Check if the module is an ejabberd listener or an independent listener
    ModuleRaw = strip_frontend(Module),
    case ModuleRaw:socket_type() of
    independent -> ModuleRaw:start_listener(Port, Opts);
    _ -> start_dependent(Port, Module, Opts)
    end.

start_dependent/3中会创建子进程执行init/3, init/3函数根据配置参数调用init_tcp/6或init_udp/6。 init_tcp/6开始监听相应端口,然后调用accept/0等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
    ...
    Res = gen_tcp:listen(Port, [binary,
                {packet, 0},
                {active, false},
                {reuseaddr, true},
                {nodelay, true},
                {send_timeout, ?TCP_SEND_TIMEOUT},
                {keepalive, true} |
                SockOpts2]),
    case Res of
    {ok, ListenSocket} ->
        %% Inform my parent that this port was opened succesfully
        proc_lib:init_ack({ok, self()}),
        %% And now start accepting connection attempts
        accept(ListenSocket, Module, Opts);
    {error, Reason} ->
        socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
    end.

如果监听UDP端口,则init/3调用init_udp/6。init/6打开一个UDP端口,调用udp_recv/3等待接收UDP数据。

1
2
3
4
5
6
7
8
9
10
11
12
init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
    case gen_udp:open(Port, [binary,
                 {active, false},
                 {reuseaddr, true} |
                 SockOpts]) of
    {ok, Socket} ->
        %% Inform my parent that this port was opened succesfully
        proc_lib:init_ack({ok, self()}),
        udp_recv(Socket, Module, Opts);
    {error, Reason} ->
        socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
    end.

至此ejabberd启动完成。

当UDP数据到来后,udp_recv/3调用该端口处理模块的udp_recv/5处理,接着递归调用udp_recv继续等待接收UDP数据。处理模块的udp_recv/5函数中需要处理返回UDP响应等逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
udp_recv(Socket, Module, Opts) ->
    case gen_udp:recv(Socket, 0) of
    {ok, {Addr, Port, Packet}} ->
        case catch Module:udp_recv(Socket, Addr, Port, Packet, Opts) of
        {'EXIT', Reason} ->
            ?ERROR_MSG("failed to process UDP packet:~n"
                   "** Source: {~p, ~p}~n"
                   "** Reason: ~p~n** Packet: ~p",
                   [Addr, Port, Reason, Packet]);
        _ ->
            ok
        end,
        udp_recv(Socket, Module, Opts);
    {error, Reason} ->
        ?ERROR_MSG("unexpected UDP error: ~s", [format_error(Reason)]),
        throw({error, Reason})
    end.

当有TCP连接成功后,gen_tcp:accept/1返回,accept/3根据配置中处理模块是否指定了frontend, 选择调用ejabberd_frontend_socket:start/4或ejabberd_socket:start/4, 然后递归调用accept再次等待TCP连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
accept(ListenSocket, Module, Opts) ->
    case gen_tcp:accept(ListenSocket) of
    {ok, Socket} ->
        case {inet:sockname(Socket), inet:peername(Socket)} of
        {{ok, Addr}, {ok, PAddr}} ->
            ?INFO_MSG("(~w) Accepted connection ~w -> ~w",
                  [Socket, PAddr, Addr]);
        _ ->
            ok
        end,
        CallMod = case is_frontend(Module) of
              true -> ejabberd_frontend_socket;
              false -> ejabberd_socket
              end,
        CallMod:start(strip_frontend(Module), gen_tcp, Socket, Opts),
        accept(ListenSocket, Module, Opts);
    {error, Reason} ->
        ?INFO_MSG("(~w) Failed TCP accept: ~w",
              [ListenSocket, Reason]),
        accept(ListenSocket, Module, Opts)
    end.

ejabberd_socket:start/4函数根据处理模块的socket_type/0的返回值进行不同处理。5222端口的处理模块是ejabberd_c2s,该模块socket_type/0返回xml_stream。

ejabberd_socket:start/4对于xml_stream的处理的简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
RecPid = ejabberd_receiver:start(
       Socket, SockMod, none, MaxStanzaSize),

{ReceiverMod, Receiver, RecRef} =
    {ejabberd_receiver, RecPid, RecPid}

SocketData = #socket_state{sockmod = SockMod,
               socket = Socket,
               receiver = RecRef},

case Module:start({?MODULE, SocketData}, Opts) of
{ok, Pid} ->
    case SockMod:controlling_process(Socket, Receiver) of
    ok ->
        ok;
    {error, _Reason} ->
        SockMod:close(Socket)
    end,
    ReceiverMod:become_controller(Receiver, Pid);
{error, _Reason} ->
    SockMod:close(Socket)
end;

首先,它调用ejabberd_receiver:start/4, 它创建一个receiver进程,接着调用处理模块的start/2创建一个处理模块子进程。然后将新的receiver进程设为该TCP Socket的控制进程,以后从该Socket收到数据将以消息形式发送给该receiver进程。最后调用ejabberd_receiver:become_controller/2, 向该receiver进程发送become_controller消息。receiver进程处理该消息,生成一个XML解析状态并将处理进程Pid保存在状态中。

1
2
3
4
5
6
7
handle_call({become_controller, C2SPid}, _From, State) ->
    XMLStreamState = xml_stream:new(C2SPid, State#state.max_stanza_size),
    NewState = State#state{c2s_pid = C2SPid,
               xml_stream_state = XMLStreamState},
    activate_socket(NewState),
    Reply = ok,
    {reply, Reply, NewState, ?HIBERNATE_TIMEOUT};

当该Socket有数据可读时,receiver进程将收到TCP消息,receiver进程调用process_data/1函数来处理收到的数据。它调用xml_stream:parse进行解析,当解析出XML元素后,它会通过gen_fsm:send_event向该TCP的处理进程发送消息。处理进程根据消息进行协议状态的转换。

当XMPP协商完成后,处理进程状态为session_established。此时收到XMPP消息,处理进程解析出From和To属性, 调用ejabberd_router:route/3分发消息。ejabberd_router:route/3调用do_route进行分发。它查询route表中是否已经注册了JID所在域名。ejabberd_local进程启动时会在route表中注册配置中所添加的域名。如果已经注册,该消息则应该由当前服务器处理,否则路由至其他Server。

处理本地域名时,do_route首先获取ejabbred_local进程的Pid,如果只有一个进程,并且该进程位于当前节点,则直接调用ejabberd_local:route/3进行处理,否则发送route消息至相应的Pid。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
LDstDomain = To#jid.lserver,
case mnesia:dirty_read(route, LDstDomain) of
[] ->
    ejabberd_s2s:route(From, To, Packet);
[R] ->
    Pid = R#route.pid,
    if
    node(Pid) == node() ->
        case R#route.local_hint of
        {apply, Module, Function} ->
            Module:Function(From, To, Packet);
        _ ->
            Pid ! {route, From, To, Packet}
        end;
    is_pid(Pid) ->
        Pid ! {route, From, To, Packet};
    true ->
        drop
    end;
...
end

这两种方式都会调用ejabberd_local:do_route来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
do_route(From, To, Packet) ->
    ?DEBUG("local route~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
       [From, To, Packet, 8]),
    if
    To#jid.luser /= "" ->
        ejabberd_sm:route(From, To, Packet);
    To#jid.lresource == "" ->
        {xmlelement, Name, _Attrs, _Els} = Packet,
        case Name of
        "iq" ->
            process_iq(From, To, Packet);
        "message" ->
            ok;
        "presence" ->
            ok;
        _ ->
            ok
        end;
    true ->
        {xmlelement, _Name, Attrs, _Els} = Packet,
        case xml:get_attr_s("type", Attrs) of
        "error" -> ok;
        "result" -> ok;
        _ ->
            ejabberd_hooks:run(local_send_to_resource_hook,
                       To#jid.lserver,
                       [From, To, Packet])
        end
    end.
end

如果”To”属性的JID指定了user, 则该消息应该分发至用户,调用ejabberd_sm:route/3进行分发。ejabberd根据JID的Resource是否为空进行了不同处理。JID的Resource为空的情况本文略过。JID的Resource不为空时,ejabberd_sm:route/3的处理逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR, #session.usr) of
[] ->
    case Name of
    "message" ->
        route_message(From, To, Packet);
    "iq" ->
        case xml:get_attr_s("type", Attrs) of
        "error" -> ok;
        "result" -> ok;
        _ ->
            Err =
            jlib:make_error_reply(
              Packet, ?ERR_SERVICE_UNAVAILABLE),
            ejabberd_router:route(To, From, Err)
        end;
    _ ->
        ?DEBUG("packet droped~n", [])
    end;
Ss ->
    Session = lists:max(Ss),
    Pid = element(2, Session#session.sid),
    ?DEBUG("sending to process ~p~n", [Pid]),
    Pid ! {route, From, To, Packet}
end

它从session表中读取出该用户的信息,这些信息是用户连接上由该TCP的处理进程添加到session表中的。从中获得为该用户TCP连接的处理进程,向该进程发送route消息。处理进程处理该route消息,向该用户Socket发送消息。这便完成了一个用户到另一个用户的消息传递。

本文中ejabberd代码版本为2.1.3。