Uthread: switching between threads (moderate)

这个lab的目的是在给定的代码上补充 以和xv6结合 实现线程的上下文切换

假如对代码机理足够熟悉 就可以发现进程上下文切换和线程上下文切换是差不多的

这句话可能有点不严谨 但是其的本质确实是类似的 在xv6中的操作方式也类似

于是这里 可以模仿kernel/swtch.S中的赋值取值 写在uthread_switch.S方法当中

uthread_switch.S

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
	.text

/*
* save the old thread's registers,
* restore the new thread's registers.
*/

.globl thread_switch
thread_switch:

sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)

ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)

ret /* return to ra */

但是线程切换的时候没有上下文资源啊 所以就给他造一个上下文资源 线程的context -> tcontext

当然了 并且在thread的结构体中加入 —— 实际上就是一个构建pcb的过程

uthread.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct tcontext {
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};

struct thread {
char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct tcontext context; // 每个线程的独立上下文
};

以上 补充了线程的上下文 切换时的逻辑 还剩下何时切换、 切换时机的问题

进程中是在sched() & schedule() 两者结合调度器线程实现的 这里的话是在thread_schedule() 类似的 由于thread_switch()是针对两个线程之间的上下文切换

所以这里需要传入的形参就是新旧线程的上下文 ps:需要转换成uint64

代码有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

extern void thread_switch(uint64, uint64);

void thread_schedule(void) {
....

if (current_thread != next_thread) { /* switch threads? */
next_thread->state = RUNNING;
t = current_thread;
current_thread = next_thread;
// 在这里完成进程内线程的上下文切换 保存寄存器的状态
thread_switch((uint64)&t->context, (uint64)&current_thread->context);
} else
next_thread = 0;
}

以上切换逻辑已经基本实现了 但是还需要进行初始化 是为了初始化对应的ra和sp

在第一次进行线程切换的时候 给一个初始的位置 以防走丢~

进程的相关内容是在allocproc()当中

而线程在这里的设置也是类似的

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建初始化线程 这里可以类比进程创建时的操作
// 创建新进程的时候也同样需要初始化对应的寄存器 可见allocproc()
void thread_create(void (*func)()) {
struct thread* t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE)
break;
}
t->state = RUNNABLE;
t->context.ra = (uint64)func;
t->context.sp = (uint64)t->stack + STACK_SIZE;
}

运行结果及脚本结果有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
$ uthread
thread_a started
thread_b started
thread_c started
thread_c 0
thread_a 0
thread_b 0
thread_c 1
thread_a 1
thread_b 1
thread_c 2
thread_a 2
thread_b 2
...
thread_c 95
thread_a 95
thread_b 95
thread_c 96
thread_a 96
thread_b 96
thread_c 97
thread_a 97
thread_b 97
thread_c 98
thread_a 98
thread_b 98
thread_c 99
thread_a 99
thread_b 99
thread_c: exit after 100
thread_a: exit after 100
thread_b: exit after 100
thread_schedule: no runnable threads
1
2
3
4
5
6
7
8
9
root@hcss-ecs-bbb1:/home/czc/lab/lab# make grade
......

make[1]: Leaving directory '/home/czc/lab/lab'
== Test uthread ==
$ make qemu-gdb
uthread: OK (7.5s)

......

Using threads

这个实验对于有一定并发编程经验的来说比较简单

这里主要是看出来 他为什么会产生并发错误就完事了

这里是多个线程同时put put完就扫表

put完之后到读前这个过程是同步的

而在读的时候 由于没有修改数据 所以这一部分不会造成竞态问题

就可以聚焦到多线程并发put的时候了:

在进行散列后 拉链法如果查询不到当前有对应的key 就会进行insert操作

而这个操作就是线程不安全的 假如在insert的时候 一个线程在insert 还没有返回

此时切换到了别的线程 就会导致重新创建一个insert 即导致前者的数据直接missing

所以修改只需要将这个地方加上锁即可

ph.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pthread_mutex_t lock;

static void* put_thread(void* xa) {
int n = (int)(long)xa; // thread number
int b = NKEYS / nthread;

for (int i = 0; i < b; i++) {
// 锁住put操作 所有的更新都直接同步
pthread_mutex_lock(&lock);
put(keys[b * n + i], n);
pthread_mutex_unlock(&lock);
}

return NULL;
}

int main(int argc, char* argv[]) {
pthread_mutex_init(&lock, NULL);

pthread_t* tha;
....

但是这样逢put就锁的操作 相当于就是将多线程的put简化成了同步 甚至还要加上线程切换时的成本

所以有没有比一把大锁更好的做法? 有的

这里使用的是哈希表 通过原key的值对桶的值 进行散列计算之后 决定塞到哪个桶里

而每个桶之间是隔离的 所以就可以利用这个特点 put的时候只对当前的这个桶上锁

ph.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 直接初始化 以桶为颗粒度
pthread_mutex_t lock[NBUCKET] = {PTHREAD_MUTEX_INITIALIZER};

static void put(int key, int value) {
// 哈希计算 决定放到哪个桶内
int i = key % NBUCKET;

// is the key already present?
struct entry* e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key)
break;
}
if (e) {
// update the existing key.
e->value = value;
} else {
// the new is new.
// 这里哈希计算之后 就知道锁到哪个桶里了
// 每个桶之间的数据是隔离的 所以就可以直接以这个桶为颗粒度上锁
pthread_mutex_lock(&lock[i]);
insert(key, value, &table[i], table[i]);
pthread_mutex_unlock(&lock[i]);
}
}

印象中java jdk8以前的concurrentHashMap也是利用这个思路实现的 —— 分段锁

到后面就变成了链表和红黑树的延伸 颗粒度更小 只以当前链表的节点 或者当前子树的父节点为颗粒度 上锁

这样就大大的提高了效率 此处不赘述~

两者的测试效果和脚本结果有:

大锁

1
2
3
4
5
6
7
root@hcss-ecs-bbb1:/home/czc/lab/lab# make ph
gcc -o ph -g -O2 -DSOL_THREAD -DLAB_THREAD notxv6/ph.c -pthread
root@hcss-ecs-bbb1:/home/czc/lab/lab# ./ph 2
100000 puts, 8.356 seconds, 11968 puts/second
0: 0 keys missing
1: 0 keys missing
200000 gets, 8.194 seconds, 24410 gets/second

分段锁

1
2
3
4
5
root@hcss-ecs-bbb1:/home/czc/lab/lab# ./ph 2
100000 puts, 3.769 seconds, 26534 puts/second
1: 0 keys missing
0: 0 keys missing
200000 gets, 7.518 seconds, 26602 gets/second

从这里也能看到 修改这个颗粒度对效果的提升非常明显~

1
2
3
4
5
6
7
8
9
10
11
root@hcss-ecs-bbb1:/home/czc/lab/lab# make grade
......

== Test ph_safe == make[1]: Entering directory '/home/czc/lab/lab'
gcc -o ph -g -O2 -DSOL_THREAD -DLAB_THREAD notxv6/ph.c -pthread
make[1]: Leaving directory '/home/czc/lab/lab'
ph_safe: OK (10.6s)
== Test ph_fast == make[1]: Entering directory '/home/czc/lab/lab'
make[1]: 'ph' is up to date.
make[1]: Leaving directory '/home/czc/lab/lab'
ph_fast: OK (25.9s)

Barrier

利用现有的pthread的api实现一个屏障

有点类似join 每个线程都需要在这个barrier处 等待所有线程到达了之后 才能继续运行

达成一个类似线程同步的效果

大概的思路就是保证每一轮之间的独立性

而在每一轮之后 需要重置并重新初始化线程当前的状态(唤醒)

在没结束单轮前 已经到达的线程需要sleep并且wait

所以按照这个思路能得到

barrier.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void barrier() {

// 上锁防止被插入
pthread_mutex_lock(&bstate.barrier_mutex);

bstate.nthread++;
// 当此轮全部遍历完 唤醒所有线程 进入下一轮并且初始化状态
if (bstate.nthread == nthread) {
bstate.nthread = 0;
bstate.round++;
pthread_cond_broadcast(&bstate.barrier_cond);
} else {
// 这个线程完事了 但是没遍历完 所以此时需要等待其他线程
// 放下书中的锁 把资源给别的人用 (虽然这里没的资源)
// 注意 调用wait的时候 手中必须持有锁
// 所以这里的分支就可以 怎么都能放下所
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
}

pthread_mutex_unlock(&bstate.barrier_mutex);
}

ps 除此之外在调用屏障本身的时候就需要加上锁 1是因为调用wait是需要持有锁 2是因为bstate.nthread++这个操作本身是线程不安全的

对应的 运行效果和测试脚本有:

1
2
root@hcss-ecs-bbb1:/home/czc/lab/lab# ./barrier 2
OK; passed
1
2
3
4
5
6
root@hcss-ecs-bbb1:/home/czc/lab/lab# make grade
...
== Test barrier == make[1]: Entering directory '/home/czc/lab/lab'
gcc -o barrier -g -O2 -DSOL_THREAD -DLAB_THREAD notxv6/barrier.c -pthread
make[1]: Leaving directory '/home/czc/lab/lab'
barrier: OK (2.9s)