°

rcu-bp关键代码解读

1      什么是TLS

原理在网上资料很多,这里不展开。

简单点说,动态申请的每线程变量。有一类比较熟悉的每线程变量是一个带__thread的每线程变量,两者的区别在于,TLS这类每线程变量是动态申请的。有以下一系列接口:

#include <pthread.h>

int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));

int pthread_key_delete(pthread_key_t key);

int pthread_setspecific(pthread_key_t key, const void *value);

void *pthread_getspecific(pthread_key_t key);

 

一般使用方法是

  1. 创建一个关键字key,多线程仅需创建一次,销毁函数可以为null;
  2. 将需要保存的上下文设置到key-value对中
  3. 需要访问时,根据key取出value
  4. 销毁key
  5. 与__thread类型的变量就在于需要使用全局变量保存key。

2      BP锁结构体

struct rcu_reader {

/* Data used by both reader and synchronize_rcu() */

unsigned long ctr;

/* Data used for registry */

struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE)));

pthread_t tid;

int alloc;  /* registry entry allocated */

};

其中的ctr代表当前线程的锁状态,tid表示当前线程号,alloc是一个标记位,代表该锁状态结构体有没有被使用;因为要保存每线程的当前锁状态,bp代码中是以数组的形式访问的,第一次申请8个,第二次申请16个,第三次申请32个,以成倍增加的方式扩大。

锁的状态有以下三种,分别为,

enum rcu_state {

RCU_READER_ACTIVE_CURRENT, 当前读者,写者在第一步抓取这些读者

RCU_READER_ACTIVE_OLD, 老读者,写者在第二步检测这些读者

RCU_READER_INACTIVE, 不在读操作中

};

3      全局gp记录

struct rcu_gp {

/*

* Global grace period counter.

* Contains the current RCU_GP_CTR_PHASE.

* Also has a RCU_GP_COUNT of 1, to accelerate the reader fast path.

* Written to only by writer with mutex taken.

* Read by both writer and readers.

*/

unsigned long ctr;

} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));

struct rcu_gp rcu_gp = { .ctr = RCU_GP_COUNT };

记录了全局的ctr,默认值为1.

4      几个宏

#define RCU_GP_COUNT        (1UL << 0)  ————-1

/* Use the amount of bits equal to half of the architecture long size */

#define RCU_GP_CTR_PHASE        (1UL << (sizeof(long) << 2))———–64位系统为2的32次,32位系统为2的16次

#define RCU_GP_CTR_NEST_MASK    (RCU_GP_CTR_PHASE – 1)———-64位系统0xffffffff,32位系统0xffff

5      初始化

void rcu_bp_register(void);

每个需要使用的bprcu的线程只要调用上面的函数进行注册即可,这个函数的工作是:

  1. 如果是第一个线程注册,则调用pthread_key_create创建每线程变量
  2. 将该线程加入registry链表
  3. 初始化时锁ctr的值为0
  4. 将该rcu_reader设置进key-value对(pthread_setspecific)

6      加读锁

static inline void _rcu_read_lock_update(unsigned long tmp)/* tmp是当前线程的ctr */

{

/* 这段表示,当前的线程中ctr中的低位为全0时,取全局的ctr值 */

if (caa_likely(!(tmp & RCU_GP_CTR_NEST_MASK))) {

_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, _CMM_LOAD_SHARED(rcu_gp.ctr));

urcu_bp_smp_mb_slave();

} else

/* 这段表示如果当前线程中的ctr中的低位不为0,ctr值加1, */

_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, tmp + RCU_GP_COUNT);

}

7      解读锁

static inline void _rcu_read_unlock(void)

{

unsigned long tmp;

 

tmp = URCU_TLS(rcu_reader)->ctr;

urcu_assert(tmp & RCU_GP_CTR_NEST_MASK);

/* Finish using rcu before decrementing the pointer. */

urcu_bp_smp_mb_slave();

_CMM_STORE_SHARED(URCU_TLS(rcu_reader)->ctr, tmp – RCU_GP_COUNT);

cmm_barrier();  /* Ensure the compiler does not reorder us with mutex */

}

仅仅是在当前的ctr基础上减1.

8      加解锁总结

加锁时,如果当前ctr值为0,也就是初始值,即第一次上锁时,需要将ctr的值更新为全局的ctr值。

解锁时,无条件将该当前ctr值减1,所以加解锁必须是对称的。

参与rcu,上锁状态ctr低位值大于0,空闲状态等于0.

9      锁状态的转化

由下面这个函数解析。

static inline enum rcu_state rcu_reader_state(unsigned long *ctr)

{

unsigned long v;

 

/* 没有使用过,返回未进入临界区 */

if (ctr == NULL)

return RCU_READER_INACTIVE;

/*

* Make sure both tests below are done on the same version of *value

* to insure consistency.

*/

v = CMM_LOAD_SHARED(*ctr);

/* RCU_GP_CTR_NEST_MASK这个值为0xffffffff,即为ctr的低位值为0时,返回未进入临界区 */

if (!(v & RCU_GP_CTR_NEST_MASK))

return RCU_READER_INACTIVE;

/* 先看第三步的非运算,即里面的值为0,则是读状态,否则是老的读状态

再看第二部的与运算,检测第16的值是否为1,为1则是老的读状态,为0则是读状态

最后第一步的异或运算,相等则为0,则肯定是读状态,不相等,则要看第十六位是否相同,相同则为读状态,不相等则为老的读状态

总之,只要ctr的值不为0,就是读状态

*/

if (!((v ^ rcu_gp.ctr) & RCU_GP_CTR_PHASE))

return RCU_READER_ACTIVE_CURRENT;

/* 不识别状态 */

return RCU_READER_ACTIVE_OLD;

}

10  等待读者结束

static void wait_for_readers(struct cds_list_head *input_readers,

struct cds_list_head *cur_snap_readers,

struct cds_list_head *qsreaders)

{

unsigned int wait_loops = 0;

struct rcu_reader *index, *tmp;

 

/*

* Wait for each thread URCU_TLS(rcu_reader).ctr to either

* indicate quiescence (not nested), or observe the current

* rcu_gp.ctr value.

*/

for (;;) {

if (wait_loops < RCU_QS_ACTIVE_ATTEMPTS)

wait_loops++;

 

cds_list_for_each_entry_safe(index, tmp, input_readers, node) {

switch (rcu_reader_state(&index->ctr)) {

case RCU_READER_ACTIVE_CURRENT:

if (cur_snap_readers) {

cds_list_move(&index->node,

cur_snap_readers);

break;

}

/* Fall-through */

case RCU_READER_INACTIVE:

cds_list_move(&index->node, qsreaders);

break;

case RCU_READER_ACTIVE_OLD:

/*

* Old snapshot. Leaving node in

* input_readers will make us busy-loop

* until the snapshot becomes current or

* the reader becomes inactive.

*/

break;

}

}

 

if (cds_list_empty(input_readers)) {

break;

} else {

/* Temporarily unlock the registry lock. */

mutex_unlock(&rcu_registry_lock);

if (wait_loops >= RCU_QS_ACTIVE_ATTEMPTS)

(void) poll(NULL, 0, RCU_SLEEP_DELAY_MS);

else

caa_cpu_relax();

/* Re-lock the registry lock before the next loop. */

mutex_lock(&rcu_registry_lock);

}

}

}

这个是snap可能是snapshot的缩写,快照的意思,也就是代表当前正在读的线程合影。

RCU_READER_ACTIVE_CURRENT将处与读状态的线程移入正在读链表,不在读状态的线程移入qs链表,其他状态等待其变成正在读或者不读。

11  Rcu写者的同步等待

void synchronize_rcu(void)

{

CDS_LIST_HEAD(cur_snap_readers);

CDS_LIST_HEAD(qsreaders);

sigset_t newmask, oldmask;

int ret;

 

ret = sigfillset(&newmask);

assert(!ret);

ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);

assert(!ret);

 

mutex_lock(&rcu_gp_lock);

 

mutex_lock(&rcu_registry_lock);

 

if (cds_list_empty(&registry))

goto out;

 

/* All threads should read qparity before accessing data structure

* where new ptr points to. */

/* Write new ptr before changing the qparity */

smp_mb_master();

 

/*

* Wait for readers to observe original parity or be quiescent.

* wait_for_readers() can release and grab again rcu_registry_lock

* interally.

*/

wait_for_readers(&registry, &cur_snap_readers, &qsreaders);

 

/*

* Adding a cmm_smp_mb() which is _not_ formally required, but makes the

* model easier to understand. It does not have a big performance impact

* anyway, given this is the write-side.

*/

cmm_smp_mb();

 

/* Switch parity: 0 -> 1, 1 -> 0 */

CMM_STORE_SHARED(rcu_gp.ctr, rcu_gp.ctr ^ RCU_GP_CTR_PHASE);

 

/*

* Must commit qparity update to memory before waiting for other parity

* quiescent state. Failure to do so could result in the writer waiting

* forever while new readers are always accessing data (no progress).

* Ensured by CMM_STORE_SHARED and CMM_LOAD_SHARED.

*/

 

/*

* Adding a cmm_smp_mb() which is _not_ formally required, but makes the

* model easier to understand. It does not have a big performance impact

* anyway, given this is the write-side.

*/

cmm_smp_mb();

 

/*

* Wait for readers to observe new parity or be quiescent.

* wait_for_readers() can release and grab again rcu_registry_lock

* interally.

*/

wait_for_readers(&cur_snap_readers, NULL, &qsreaders);

 

/*

* Put quiescent reader list back into registry.

*/

cds_list_splice(&qsreaders, &registry);

 

/*

* Finish waiting for reader threads before letting the old ptr being

* freed.

*/

smp_mb_master();

out:

mutex_unlock(&rcu_registry_lock);

mutex_unlock(&rcu_gp_lock);

ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);

assert(!ret);

}

第一步,将参与本次rcu活动的线程挑选出来。

第二步,将全局ctr的值翻转,各位0->1, 1->0。全局ctr只有两种取值情况,

1和0x1 0000 0001。

第三步,等待所有读者退出。

第四步,同步等待结束。

12  Call_rcu写者的异步等待

异步与同步的区别在于写者是等待所有读者退出,本线程执行写操作,还是由call_rcu线程等待所有写者退出,由call_rcu线程执行写操作。因为没有实际研究这段代码,直接看调用栈

#0  0x000000fff066ab90 in syscall () from /lib/libc.so.6

(gdb) bt

#0  0x000000fff066ab90 in syscall () from /lib/libc.so.6

#1  0x000000fff08f7ea8 in smp_mb_master ()

at ../userspace-rcu-0.9.3/urcu.c:165

#2  0x000000fff08f817c in wait_for_readers (

input_readers=0xfff090e5f0 <registry>, cur_snap_readers=0xff60dfe4b8,

qsreaders=0xff60dfe4c8) at ../userspace-rcu-0.9.3/urcu.c:290

#3  0x000000fff08f84ec in synchronize_rcu_memb ()

at ../userspace-rcu-0.9.3/urcu.c:426

#4  0x000000fff08f9b50 in call_rcu_thread (arg=0xff640030b0)

at ../userspace-rcu-0.9.3/urcu-call-rcu-impl.h:362

#5  0x000000fff0a22040 in start_thread () from /lib/libpthread.so.0

#6  0x000000fff066f5c4 in __thread_start () from /lib/libc.so.6

与同步的操作类型,也是通过wait_for_readers的调用等待读者退出

13  为什么要使用异或操作

为了实现读者从不阻塞,而写者应该区别出这位读者是否是本次的读者。

假设存在两个读者群,一个写者。读者群1正在读操作中,写者进入等待读者状态,读者群2也进入了临界区,但是读者群2不应该被列入本次的探测中。

再看一次锁状态的解析

if (!(v & RCU_GP_CTR_NEST_MASK))

return RCU_READER_INACTIVE;

if (!((v ^ rcu_gp.ctr) & RCU_GP_CTR_PHASE))

return RCU_READER_ACTIVE_CURRENT;

return RCU_READER_ACTIVE_OLD;

第一段还是原来的逻辑,没参与本地读写操作。

第二段的操作,具体分析,

Ctr值 全局ctr值 结果
1 1 RCU_READER_ACTIVE_CURRENT
1 0x1 0000 0001 RCU_READER_ACTIVE_OLD
2 1 RCU_READER_ACTIVE_CURRENT
0x1 0000 0001 1 RCU_READER_ACTIVE_OLD
0x1 0000 0001 0x1 0000 0001 RCU_READER_ACTIVE_CURRENT

所以只要非0,都是读状态,只是区别出是写者开始等待前的读者还是写者开始等待后的读者。

14  举例

有2个线程,一个写,一个读。全局ctr为1

  1. 读者1进入临界区,ctr值设置为1
  2. 写者1尝试写,选中了读者1,将其加入本次需要等待的链表,更新全局ctr为0x1 0000 0001,发生调度。
  3. 读者1离开临界区,但是又再次进入临界区,ctr值设置为0x1 0000 0001
  4. 写者开始检测读者1的当前状态,发现是当前读者,而不是老读者,移出该读者。所以这个相位的作用就是区别出该读者是否已经离开上一次的读操作。写者退出等待,继续往下执行。
打赏
  喜欢