nginx互斥锁的实现
发表于 2017-06-22 |
nginx 基于原子操作、信号量以及文件锁实现了一个简单高效的互斥锁,当多个 worker 进程之间需要互斥操作时都会用到。下面来看下 nginx 是如何实现它的。
原子操作
在实现互斥锁时用到了原子操作,先来了解一下 nginx 下提供的两个原子操作相关的函数:
1 2 3 4 5
| static ngx_inline ngx_atomic_uint_t ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old, ngx_atomic_uint_t set)
static ngx_inline ngx_atomic_int_t ngx_atomic_fetch_add(ngx_atomic_t *value, ngx_atomic_int_t add)
|
第一个函数是一个 CAS
操作,首先它比较 lock
地址处的变量是否等于 old
, 如果相等,就把 lock
地址处的变量设为 set
变返回成功,否则返回失败。注意上述过程是作为一个原子一起进行的,不会被打断。 用代码可以描述如下:
1 2 3 4 5 6 7 8 9 10
| static ngx_inline ngx_atomic_uint_t ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old, ngx_atomic_uint_t set) { if (*lock == old) { *lock = set; return 1; }
return 0; }
|
第二个函数是读取 value
地址处的变量,并将其与 add
相加的结果再写入 *lock
,然后返回原来 *lock
的值,这些操作也是作为一个整体完成的,不会被打断。用代码可描述如下:
1 2 3 4 5 6 7 8 9 10
| static ngx_inline ngx_atomic_int_t ngx_atomic_fetch_add(ngx_atomic_t *value, ngx_atomic_int_t add) { ngx_atomic_int_t old;
old = *value; *value += add;
return old; }
|
nginx 在实现这两个函数时会首先判断有没有支持原子操作的库,如果有,则直接使用库提供的原子操作实现,如果没有,则会使用汇编语言自己实现。下面以 x86
平台下实现 ngx_atomic_cmp_set 的汇编实现方式,实现主要使用了 cmpxchgq
指令,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| static ngx_inline ngx_atomic_uint_t ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old, ngx_atomic_uint_t set) { u_char res;
__asm__ volatile (
NGX_SMP_LOCK " cmpxchgl %3, %1; " " sete %0; "
: "=a" (res) : "m" (*lock), "a" (old), "r" (set) : "cc", "memory" );
return res; }
|
上面的代码采用 gcc 嵌入汇编方式来进行编写,了解了 cmpxchgq
指令后还是比较容易理解的。
锁结构体
首先 nginx 使用 ngx_shmtx_lock
结构体表示锁,它的各个成员变量如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| typedef struct { #if (NGX_HAVE_ATOMIC_OPS) ngx_atomic_t *lock; #if (NGX_HAVE_POSIX_SEM) ngx_atomic_t *wait; ngx_uint_t semaphore; sem_t sem; #endif #else ngx_fd_t fd; u_char *name; #endif ngx_uint_t spin; } ngx_shmtx_t;
|
上面的结构体定义使用了两个宏:NGX_HAVE_ATOMIC_OPS
与 NGX_HAVE_POSIX_SEM
,分别用来代表操作系统是否支持原子变量操作与信号量。根据这两个宏的取值,可以有3种不同的互斥锁实现:
- 不支持原子操作。
- 支持原子操作,但不支持信号量
- 支持原子操作,也支持信号量
第1种情况最简单,会直接使用文件锁来实现互斥锁,这时该结构体只有 fd
、 name
和 spin
三个字段,但 spin
字段是不起作用的。对于2和3两种情况 nginx 均会使用原子变量操作来实现一个自旋锁,其中 spin
表示自旋次数。它们两个的区别是:在支持信号量的情况下,如果自旋次数达到了上限而进程还未获取到锁,则进程会在信号量上阻塞等待,进入睡眠状态。不支持信号量的情况,则不会有这样的操作,而是通过调度器直接 「让出」cpu。 下面对这三种情况下锁的实现分别进行介绍。
基于文件锁实现的锁
锁的创建
首先通过下面的函数创建一个锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
|
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) {
if (mtx->name) {
if (ngx_strcmp(name, mtx->name) == 0) { mtx->name = name; return NGX_OK; } ngx_shmtx_destroy(mtx); }
mtx->fd = ngx_open_file(name, NGX_FILE_RDWR, NGX_FILE_CREATE_OR_OPEN, NGX_FILE_DEFAULT_ACCESS); if (mtx->fd == NGX_INVALID_FILE) { ngx_log_error(NGX_LOG_EMERG, ngx_cycle->log, ngx_errno, ngx_open_file_n " \"%s\" failed", name); return NGX_ERROR; } if (ngx_delete_file(name) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_delete_file_n " \"%s\" failed", name); }
mtx->name = name;
return NGX_OK; }
|
阻塞锁的获取
当进程需要进行阻塞加锁时,通过下面的函数进行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_err_t err;
err = ngx_lock_fd(mtx->fd);
if (err == 0) { return; }
ngx_log_abort(err, ngx_lock_fd_n " %s failed", mtx->name); }
|
上面函数主要的操作就是通过 ngx_lock_fd
来获取锁,它的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ngx_err_t ngx_lock_fd(ngx_fd_t fd) { struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLKW, &fl) == -1) { return ngx_errno; }
return 0; }
|
它主要是通过 fcntl
函数来获取文件锁。
非阻塞锁的获取
上面获取锁的方式是阻塞式的,在获取不到锁时进程会阻塞,但有时候我们并不希望这样,而是不能获取锁时直接返回,nginx 通过这么函数来非阻塞的获取锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { ngx_err_t err;
err = ngx_trylock_fd(mtx->fd);
if (err == 0) { return 1; }
if (err == NGX_EAGAIN) { return 0; }
#if __osf__
if (err == NGX_EACCES) { return 0; }
#endif
ngx_log_abort(err, ngx_trylock_fd_n " %s failed", mtx->name);
return 0; }
|
可以看到与阻塞版本相比,非阻塞版本最主要的变化 ngx_lock_fd
换成了 ngx_trylock_fd
, 它的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ngx_err_t ngx_trylock_fd(ngx_fd_t fd) { struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_WRLCK; fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; }
return 0; }
|
锁的释放
上面说了如何加锁,接下来看一下如何释放锁,逻辑比较简单,直接放代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { ngx_err_t err;
err = ngx_unlock_fd(mtx->fd);
if (err == 0) { return; }
ngx_log_abort(err, ngx_unlock_fd_n " %s failed", mtx->name); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ngx_err_t ngx_unlock_fd(ngx_fd_t fd) { struct flock fl;
ngx_memzero(&fl, sizeof(struct flock));
fl.l_type = F_UNLCK; fl.l_whence = SEEK_SET;
if (fcntl(fd, F_SETLK, &fl) == -1) { return ngx_errno; }
return 0; }
|
基于原子操作实现锁
上面谈到了在不支持原子操作时,nginx 如何使用文件锁来实现互斥锁。现在操作系统一般都支持原子操作,用它实现互斥锁效率会较文件锁的方式更高,这也是 nginx 默认选用该种方式实现锁的原因,下面看一下它是如何实现的。
锁的创建
与上面一样,我们还是先看是如何创建锁的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { mtx->lock = &addr->lock;
if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } mtx->spin = 2048;
#if (NGX_HAVE_POSIX_SEM)
mtx->wait = &addr->wait;
if (sem_init(&mtx->sem, 1, 0) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_init() failed"); } else { mtx->semaphore = 1; }
#endif
return NGX_OK; }
|
该函数的 addr
指针变量指向进行原子操作用到的原子变量,它的类型如下:
1 2 3 4 5 6 7 8
| typedef struct { ngx_atomic_t lock; #if (NGX_HAVE_POSIX_SEM) ngx_atomic_t wait; #endif } ngx_shmtx_sh_t;
|
由于锁是多个进程之间共享的, 所以 addr
指向的内存都是在共享内存进行分配的。
阻塞锁的获取
与文件锁实现的互斥锁一样,依然有阻塞和非阻塞类型,下面首先来看下阻塞锁的实现,相比于文件锁实现的方式要复杂很多:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_uint_t i, n;
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
for ( ;; ) {
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; }
if (ngx_ncpu > 1) {
for (n = 1; n < mtx->spin; n <<= 1) {
for (i = 0; i < n; i++) { ngx_cpu_pause(); } if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } } }
#if (NGX_HAVE_POSIX_SEM) if (mtx->semaphore) { (void) ngx_atomic_fetch_add(mtx->wait, 1);
if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { (void) ngx_atomic_fetch_add(mtx->wait, -1); return; }
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wait %uA", *mtx->wait);
while (sem_wait(&mtx->sem) == -1) { ngx_err_t err;
err = ngx_errno;
if (err != NGX_EINTR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err, "sem_wait() failed while waiting on shmtx"); break; } }
ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx awoke"); continue; }
#endif ngx_sched_yield(); } }
|
上面代码的实现流程通过注释已经描述的很清楚了,再强调一点就是,使用信号量与否的区别就在于获取不到锁时进行的操作不同,如果使用信号量,则会在信号量上阻塞,进程进入睡眠状态。而不使用信号量,则是暂时「让出」cpu,进程并不会进入睡眠状态,这会减少内核态与用户态度切换带来的开销,所以往往性能更好,因此在 nginx 中使用锁时一般不使用信号量,比如负载均衡均衡锁的初始化方式如下:
1
| ngx_accept_mutex.spin = (ngx_uint_t) -1;
|
将spin值设为-1,表示不使用信号量。
非阻塞锁的获取
非阻塞锁的代码就比较简单了,因为是非阻塞的,所以在获取不到锁时不需要考虑进程是否需要睡眠,也就不需要使用信号量,实现如下:
1 2 3 4 5
| ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)); }
|
锁的释放
释放锁时,主要操作是将原子变量设为0,如果使用信号量,则可能还需要唤醒在信号量上等候的进程:
1 2 3 4 5 6 7 8 9 10 11 12
| void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { if (mtx->spin != (ngx_uint_t) -1) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); }
if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) { ngx_shmtx_wakeup(mtx); } }
|
其中 ngx_shmtx_wakeup
的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| static void ngx_shmtx_wakeup(ngx_shmtx_t *mtx) {
#if (NGX_HAVE_POSIX_SEM) ngx_atomic_uint_t wait;
if (!mtx->semaphore) { return; }
for ( ;; ) {
wait = *mtx->wait;
if ((ngx_atomic_int_t) wait <= 0) { return; }
if (ngx_atomic_cmp_set(mtx->wait, wait, wait - 1)) { break; } }
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx wake %uA", wait);
if (sem_post(&mtx->sem) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_post() failed while wake shmtx"); }
#endif }
|
锁的销毁
因为锁的销毁代码比较简单,就不分开进行说明了。对于基于文件锁实现的互斥锁在销毁时需要关闭打开的文件。对于基于原子变量实现的锁,如果支持信号量,则需要销毁创建的信号量,代码分别入下:
基于文件锁实现的锁的销毁:
1 2 3 4 5 6 7 8
| void ngx_shmtx_destroy(ngx_shmtx_t *mtx) { if (ngx_close_file(mtx->fd) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, ngx_close_file_n " \"%s\" failed", mtx->name); } }
|
基于原子操作实现的锁的销毁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void ngx_shmtx_destroy(ngx_shmtx_t *mtx) { #if (NGX_HAVE_POSIX_SEM)
if (mtx->semaphore) { if (sem_destroy(&mtx->sem) == -1) { ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno, "sem_destroy() failed"); } }
#endif }
|