Just For Coding

Keep learning, keep living …

Twemproxy实现分析

twemproxy(https://github.com/twitter/twemproxy)是Twitter开源的Redis和Memcached代理程序,它可以将多个后端server组织成一个ServerPool, 基于请求的Key从Pool中选取一个server实例进行操作,从而实现分片存储。

twemproxy采用事件驱动处理网络数据收发。程序启动后会单独创建一个线程来处理stats请求,而主线程进入事件循环处理访问所有ServerPool的Redis或Memcached请求。

扩展Redis命令支持CIDR查询

我们的NGINX的IP封禁功能基于Redis实现。当只支持单IP封禁时,直接以IP作为KEY,调用”GET”命令,根据Value判断是否需要封禁该IP。若要支持网段封禁,需要取出所有的CIDR段,然后判断IP是否在CIDR范围内。随着CIDR越来越多,从Redis中取出的数据则越来越多,性能消耗越来越大。为了减少数据传输量,则可以将判断逻辑改由Redis来完成。

Redis本身支持Lua脚本的执行,可以由Lua来实现相应逻辑。不过Lua语言本身不支持位运算(5.2之后支持),需要第三方库支持。所以,我们直接通过修改Redis代码扩展Redis命令来实现该功能。

LVS FULLNAT模式下客户端真实地址的传递

在LVS的FULLNAT转发模式下, LVS对数据包同时做SNAT和DNAT,将数据包的源IP、源端口更换为LVS本地的IP和端口,将数据包的目的IP和目的端口修改为RS的IP和端口,从而不再依赖特定网络拓朴转发数据包。

这种方式存在一个问题: RealServer中接收到数据包中源IP和源端口为LVS机器的IP和端口,这样应用层程序获取到的TCP连接的客户端地址为LVS的IP地址,很多依赖客户端地址的功能就不能正常工作了。

为了解决这问题,FULLNAT模式在转发包的时候,在TCP包中添加一个OPTION,来传递客户端的真实地址。RealServer中通过内核模块toa令应用层程序获取真实的客户端地址。

使用Lua定制Varnish处理逻辑

Varnish使用状态机机制处理请求,在各个状态中,用户通过使用Varnish自己实现的VCL(Varnish Configuration Language)定制处理逻辑。

比如,Varnish接收并解析完请求就进入vcl_recv状态。在这个阶段,我们可以使用VCL来决定是否要服务该请求,怎么服务,以及使用哪个Backend来服务等。

VCL实现了状态的流程控制,请求信息的读取和修改等,但很多业务层面需要的逻辑没办法由VCL简单完成。比如,对某些信息进行BASE64编码等等。由于Varnish支持外部模块,用户可以使用C语言开发自己的模块,由VCL来调用这些模块来完成这些处理。但每种功能都由C模块来开发成本较大。

Lua是一种优秀的脚本语言,可以非常轻松地嵌入C语言中,而Lua语言本身有大量的库来实现各种各样的功能。因而我开发了VMOD_LUA这个Varnish模块,使用它来执行Lua脚本,由Lua代码来定制各种处理逻辑。

Ngx_http_limit_conn_module模块分析

ngx_http_limit_conn_module模块用来限制某个KEY的并发连接数。它的实现与ngx_http_limit_req_module模块类似,整体逻辑和实现更为简单。LimitConn模块也将某个KEY的信息存储在共享内存RBTREE中的节点中, 但不需要QUEUE结构。LimitConn模块只需要在节点中记录当前的连接数信息:

1
2
3
4
5
6
typedef struct {
    u_char                     color;
    u_char                     len;
    u_short                    conn;
    u_char                     data[1];
} ngx_http_limit_conn_node_t;

Keepalived Libipvs分析

LVS包转发功能由内核模块IPVS实现。Keepalived的Check进程周期性地对后端RealServer进行健康检测,根据检测结果摘除或恢复。摘除和恢复RealServer等操作本质上为Keepalived这个用户态进程与IPVS内核模块的通信操作。

libipvs封装了用户态程序对内核模块IPVS可以进行的操作,如:

  • 创建LVS服务
  • 删除LVS服务
  • 添加RealServer
  • 删除RealServer
  • 获取相关信息

ejabberd中ACL实现

ejbberd中多个模块或组件都允许管理员在配置文件中基于用户JID进行访问限制, 如在ejabberd_c2s模块中禁用某用户.

ejabberd实现了一套通用的ACL机制来满足各模块的需求.

配置方法如下:

  • 添加acl规则, 如:
1
{acl, blocked, {user, "test"}}.

acl元组第2个元素为该条acl规则的名称, 第3个元素为JID的过滤规则. 示例中的过滤规则表示用户JID的User部分为”test”.

  • 添加access规则, 如:
1
{access, c2s, [{deny, blocked}, {allow, all}]}.

access元组第2个元素为access规则的名称, 第3个元素中的每个元素为一个指定值和一个acl规则名字. 当JID满足某条acl规则时, 该条access规则的值则为该acl规则的对应值. 示例中, 当用户JID中的User部分为”test”时, access规则c2s值为deny, 否则为allow.

  • 为模块或组件指定access规则(不同的模块或组件所用的配置指令可能不同) 如:
1
2
3
4
5
{5222, ejabberd_c2s, [
                        {access, c2s},
                        {shaper, c2s_shaper},
                        {max_stanza_size, 65536}
                       ]}.

按示例配置后, ejabberd_c2s会根据access规则c2s的值禁用相应用户, 如JID中User为”test”的用户全部被禁用.

PowerDNS中PacketCache实现

PowerDNS中DNS解析由各类Backend模块处理。如果解析相关的数据存储于MySQL, Postgres等数据库,Backend需要在这些数据库中查询相应的记录是否存在。查询数据库的性能很低。因而PowerDNS中实现了PacketCache来提高性能。PowerDNS接收到请求后,先在PacketCache中查询是否已经有相应的DNS响应。如果有则直接返回该缓存。否则交由backend处理,处理后再添加到PacketCache中。

PacketCache实现主要位于packetcache.hh和packetcache.cc中。

Flashcache简介及模块初始化

flashcache是Facebook开源的用SSD来缓存机械磁盘数据的Linux内核模块。

简单来说,Linux的I/O栈层次结构可以表示为:

token data
1
2
3
4
5
6
7
+----------------------------+
| VFS(Virtual File System)   |
+----------------------------+
| Block I/O Layer            |
+----------------------------+
| Devices                    |
+----------------------------+

文件系统I/O操作传递给块设备,抽象块设备最终传递给真实的设备驱动,完成I/O操作。Linux在块设备层实现了Device Mapper(DM)的映射机制。DM机制可以将一个或多个块设备再映射成一个虚拟块设备。具体的映射规则由mapping table来实现。到达虚拟块设备的I/O请求,DM机制会根据映射表找到正确的目标设备,将请求放到目标设备的请求队列中,目标设备根据目标类型进行处理。结构图如下:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
      +---------------+
      | Mapped Device |
      +-------+-------+
              |
              v
      +---------------+
      | mapping table |
      +--+---------+--+
         |         |
         |         |
         v         v
 +---------------+  +-----------------+
 |Physical Device|  | Physical Device |
 +---------------+  +-----------------+

DM机制的映射目标类型可以通过内核模块进行扩展,flashcache就是通过定义一种名称为”flashcache”的映射目标类型来实现SSD做为机械磁盘的缓存。结构示意图如下:

token data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+---------------+
| Mapped Device |
+-------+-------+
        |
        v
+---------------+
| mapper table  |
+-------+-------+
        |
        | flashcache
        |
    +---+---+
    |       |
    v       v
+-----+  +------+
| SSD |  | DISK |
+-----+  +------+

flashcache做为缓存层,提供了三种写入模式:

  • writeback
  • writethrough
  • writearound

三种模式的区别如下图所示:

token data
1
2
3
4
5
6
7
 writethrough  writearound  writeback
      |            |           |
 -----|------------|-----------|------------
      v            |           v       SSD
 -----|------------|------------------------
      v            v                   DISK
 -------------------------------------------
  • writethrough:进行写操作时,会同时写入SSD和磁盘
  • writearound: 进行写操作时,只对磁盘进行写操作
  • writeback: 进行写操作时,只写入SSD,flashcache异步地将内容写入磁盘

下面分析flashcache模块的初始化。

flashcache内核模块的入口函数为flashcache_init。

1
module_init(flashcache_init);

flashcache_init首先会对job相关的变量及结构进行初始化:

1
2
3
4
5
r = flashcache_jobs_init();
if (r)
    return r;
atomic_set(&nr_cache_jobs, 0);
atomic_set(&nr_pending_jobs, 0);

flashcache_jobs_init会分配job操作所需的内存池,简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int
flashcache_jobs_init(void)
{
    _job_cache = kmem_cache_create("kcached-jobs",
                                   sizeof(struct kcached_job),
                                   __alignof__(struct kcached_job),
                                   0, NULL);
    ...

    _job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
                               mempool_free_slab, _job_cache);

    _pending_job_cache = kmem_cache_create("pending-jobs",
                           sizeof(struct pending_job),
                           __alignof__(struct pending_job),
                           0, NULL);

    ...

    _pending_job_pool = mempool_create(MIN_JOBS, mempool_alloc_slab,
                       mempool_free_slab, _pending_job_cache);
    ...

    return 0;
}

然后,flashcache_init调用dm_io_client_create创建一个dm_io_client结构,它为块设备I/O请求执行过程提供内存池。

1
flashcache_io_client = dm_io_client_create();

接着,flashcache_init调用dm_kcopyd_client_create创建了一个dm_kcopyd_client结构。kcopyd是一个内核进程,它用于异步地copy一个块设备的区域到一个或多个其他的块设备。dm_kcopyd_client用于向kcopyd提交任务。

1
flashcache_kcp_client = dm_kcopyd_client_create(NULL);

然后,初始化了一个内核工作队列,执行do_work去处理各种job, 本文略过。

1
INIT_WORK(&_kcached_wq, do_work);

接着,注册flashcache的DM机制目标类型, 目标类型中具体回调本文略过。

1
r = dm_register_target(&flashcache_target);

再来看flashcache_init的最后部分:

1
2
3
4
5
6
flashcache_module_procfs_init();
flashcache_control = (struct flashcache_control_s *)
    kmalloc(sizeof(struct flashcache_control_s), GFP_KERNEL);
flashcache_control->synch_flags = 0;
register_reboot_notifier(&flashcache_notifier);
return 0;

首先,调用flashcache_module_procfs_init创建”/proc/flashcache”目录及文件”/proc/flashcache/flashcache_version”。 flachcache_module_procfs_init简化逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void
flashcache_module_procfs_init(void)
{
#ifdef CONFIG_PROC_FS
    struct proc_dir_entry *entry;

    if (proc_mkdir("flashcache", NULL)) {
        entry = create_proc_entry("flashcache/flashcache_version", 0, NULL);
        if (entry)
            entry->proc_fops =  &flashcache_version_operations;
    }
#endif /* CONFIG_PROC_FS */
}

最后,通过register_reboot_notifier注册函数flashcache_notify_reboot。这函数会在机器重启或关闭时被调用。

1
2
3
4
5
static struct notifier_block flashcache_notifier = {
    .notifier_call  = flashcache_notify_reboot,
    .next       = NULL,
    .priority   = INT_MAX, /* should be > ssd pri's and disk dev pri's */
};

至此,flashcache内核模块初始化完成,可以使用dmsetup指定flashcache目标类型创建虚拟块设备了。