多线程编程(C语言版)

C语言中使用pthread库进行多线程编程。

创建线程

  • pthread_t用于声明线程ID;

1
2
3
4
5
6
7
/* Create a new thread, starting with execution of START-ROUTINE
getting passed ARG. Creation attributed come from ATTR. The new
handle is stored in *NEWTHREAD. */
extern int pthread_create (pthread_t *__restrict __newthread,
const pthread_attr_t *__restrict __attr,
void *(*__start_routine) (void *),
void *__restrict __arg) __THROWNL __nonnull ((1, 3));

pthread_create函数使用线程ID创建一个线程,包含四个参数:线程ID线程属性函数指针参数指针

  • 线程ID类型为pthread_t *,因此在传递时需要对线程ID取地址;
  • 线程属性无特殊要求使用NULL即可,也可以自定义线程的栈大小、调度优先级等;
  • 线程函数类型为void *,函数名本身就可以表示函数地址,传递时直接使用函数名即可,并且函数的返回值必须为void *
  • 参数指针必须为void *类型,没有参数时传递NULL即可;

函数返回值0表示线程创建成功,!0表示失败

  • EAGAIN:系统资源不足;
  • EINVAL:传递的attr参数无效
  • EPERM:属性设置非法,或者没有设置权限

1
2
3
4
5
6
7
/* Make calling thread wait for termination of the thread TH.  The
exit status of the thread is stored in *THREAD_RETURN, if THREAD_RETURN
is not NULL.

This function is a cancellation point and therefore not marked with
__THROW. */
extern int pthread_join (pthread_t __th, void **__thread_return);

pthread_join函数用于等待一个线程返回,并且获取其返回值。包括两个参数线程名接收返回值的指针。函数返回0表示等待成功,!0表示等待失败。

示例代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <malloc.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>

pthread_t sub_thread_id;

void *sub_thread(void *arg) {
char *info = (char *)arg;
sleep(1);
int ret = printf("%s ", info);
if (ret > 0) {
printf("Successfully!\n");
return (void *)"OK";
} else {
printf("Failed!\n");
return (void *)"ERROR";
}
}

int main(int argc, char **argv) {
char *info = "Child Thread~";
int ret = pthread_create(&sub_thread_id, NULL, sub_thread, (void *)info);
if (ret) {
printf("Create child thread failed, return %d.\n", ret);
} else {
printf("Create child thread successfully.\n");
}

printf("Continue...\n");

char *result = (char *)malloc(10);
if (result == NULL) {
printf("Out of Mem!\n");
return -1;
}
pthread_join(sub_thread_id, (void *)(&result));
printf("Child thread end : %s \n", result);

return 0;
}

示例分析

main-race

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include "mythreads.h"
#include <stdio.h>

int balance = 0;

void *worker(void *arg) {
balance++; // unprotected access

return NULL;
}

int main(int argc, char *argv[]) {
pthread_t p;
Pthread_create(&p, NULL, worker, NULL);
balance++; // unprotected access
Pthread_join(p, NULL);

return 0;
}
  • helgrind工具分析结果如下(省略部分无用信息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
❯ valgrind --tool=helgrind ./main-race
==23772== Command: ./main-race
==23772==
==23772== ---Thread-Announcement------------------------------------------
==23772==
==23772== Thread #1 is the program's root thread
==23772==
==23772== ---Thread-Announcement------------------------------------------
==23772==
==23772== Thread #2 was created
==23772== at 0x49979F3: clone (clone.S:76)
==23772== by 0x49988EE: __clone_internal (clone-internal.c:83)
==23772== by 0x49066D8: create_thread (pthread_create.c:295)
==23772== by 0x49071FF: pthread_create@@GLIBC_2.34 (pthread_create.c:828)
==23772== by 0x48572A6: pthread_create_WRK (hg_intercepts.c:445)
==23772== by 0x4858BB2: pthread_create@* (hg_intercepts.c:478)
==23772== by 0x109564: Pthread_create (mythreads.h:49)
==23772== by 0x109654: main (main-race.c:16)

由上面输出信息可知程序有一个主线程#1和一个由程序创建的子线程#2

1
2
3
4
5
6
7
8
9
10
11
==23772== Possible data race during read of size 4 at 0x10C014 by thread #1
==23772== Locks held: none
==23772== at 0x109655: main (main-race.c:17)
==23772==
==23772== This conflicts with a previous write of size 4 by thread #2
==23772== Locks held: none
==23772== at 0x109609: worker (main-race.c:10)
==23772== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==23772== by 0x4906AC2: start_thread (pthread_create.c:442)
==23772== by 0x4997A03: clone (clone.S:100)
==23772== Address 0x10c014 is 0 bytes inside data symbol "balance"

这部分内容说明程序运行时可能存在数据竞争,线程#1在地址0x10C014处进行了一次4字节的读取操作,对应main-race.c文件的第17行main函数中的balance++; // unprotected access,发生数据竞争的原因是这次读取与线程#2之前在同一地址处的写入操作存在冲突。对应main-race.c文件的第10行worker函数中的balance++; // unprotected access

1
2
3
4
5
6
7
8
9
10
11
==23772== Possible data race during write of size 4 at 0x10C014 by thread #1
==23772== Locks held: none
==23772== at 0x10965E: main (main-race.c:17)
==23772==
==23772== This conflicts with a previous write of size 4 by thread #2
==23772== Locks held: none
==23772== at 0x109609: worker (main-race.c:10)
==23772== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==23772== by 0x4906AC2: start_thread (pthread_create.c:442)
==23772== by 0x4997A03: clone (clone.S:100)
==23772== Address 0x10c014 is 0 bytes inside data symbol "balance"

main函数中的balance++既要读取又要修改该变量,因此helgrind工具还检测到线程#1在地址0x10C014处进行了一次写入操作,这也可能存在数据竞争。

1
==23772== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 0 from 0)

综上,该程序中存在两个数据竞争问题。在多线程环境中,balance这个共享资源没有适当的同步机制(如互斥锁)来保护其访问。因此,当两个线程尝试同时访问(读取或写入)这个变量时,就可能发生数据竞争,导致数据损坏或不一致的结果。


下面尝试修改这段代码:

  1. 当删除主线程或子线程中的balance++时,不存在数据竞争问题,helgrind工具分析结果如下
1
==31766== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
  1. 如果只在共享变量balance的一个更新周围加锁,仍然存在数据竞争问题;需要在共享变量的两个更新周围都加锁,即
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include "mythreads.h"

int balance = 0;
pthread_mutex_t lock;

void *worker(void *arg) {
Pthread_mutex_lock(&lock);
balance++;
Pthread_mutex_unlock(&lock);

return NULL;
}

int main(int argc, char *argv[]) {
pthread_mutex_init(&lock, NULL);
pthread_t p;
Pthread_create(&p, NULL, worker, NULL);

Pthread_mutex_lock(&lock);
balance++;
Pthread_mutex_unlock(&lock);

Pthread_join(p, NULL);
pthread_mutex_destroy(&lock);

return 0;
}

helgrind工具分析结果表明此时不存在数据竞争问题

1
==4407== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 7 from 7)

main-deadlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "mythreads.h"
#include <stdio.h>

pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;

void *worker(void *arg) {
if ((long long)arg == 0) {
Pthread_mutex_lock(&m1);
Pthread_mutex_lock(&m2);
} else {
Pthread_mutex_lock(&m2);
Pthread_mutex_lock(&m1);
}
Pthread_mutex_unlock(&m1);
Pthread_mutex_unlock(&m2);

return NULL;
}

int main(int argc, char *argv[]) {
pthread_t p1, p2;
Pthread_create(&p1, NULL, worker, (void *)(long long)0);
Pthread_create(&p2, NULL, worker, (void *)(long long)1);
Pthread_join(p1, NULL);
Pthread_join(p2, NULL);

return 0;
}
  • helgrind工具分析结果如下(省略部分与上文重复的内容)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
❯ valgrind --tool=helgrind ./main-deadlock
==7873== ---Thread-Announcement------------------------------------------
==7873== Thread #3 was created
==7873== ----------------------------------------------------------------
==7873==
==7873== Thread #3: lock order "0x10C040 before 0x10C080" violated
==7873==
==7873== Observed (incorrect) order is: acquisition of lock at 0x10C080
==7873== at 0x4853D81: mutex_lock_WRK (hg_intercepts.c:944)
==7873== by 0x4858FBF: pthread_mutex_lock (hg_intercepts.c:960)
==7873== by 0x1093A6: Pthread_mutex_lock (mythreads.h:23)
==7873== by 0x109639: worker (main-deadlock.c:16)
==7873== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==7873== by 0x4906AC2: start_thread (pthread_create.c:442)
==7873== by 0x4997A03: clone (clone.S:100)
==7873==
==7873== followed by a later acquisition of lock at 0x10C040
==7873== at 0x4853D81: mutex_lock_WRK (hg_intercepts.c:944)
==7873== by 0x4858FBF: pthread_mutex_lock (hg_intercepts.c:960)
==7873== by 0x1093A6: Pthread_mutex_lock (mythreads.h:23)
==7873== by 0x109648: worker (main-deadlock.c:17)
==7873== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==7873== by 0x4906AC2: start_thread (pthread_create.c:442)
==7873== by 0x4997A03: clone (clone.S:100)
==7873==
==7873== Required order was established by acquisition of lock at 0x10C040
==7873== at 0x4853D81: mutex_lock_WRK (hg_intercepts.c:944)
==7873== by 0x4858FBF: pthread_mutex_lock (hg_intercepts.c:960)
==7873== by 0x1093A6: Pthread_mutex_lock (mythreads.h:23)
==7873== by 0x109619: worker (main-deadlock.c:13)
==7873== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==7873== by 0x4906AC2: start_thread (pthread_create.c:442)
==7873== by 0x4997A03: clone (clone.S:100)
==7873==
==7873== followed by a later acquisition of lock at 0x10C080
==7873== at 0x4853D81: mutex_lock_WRK (hg_intercepts.c:944)
==7873== by 0x4858FBF: pthread_mutex_lock (hg_intercepts.c:960)
==7873== by 0x1093A6: Pthread_mutex_lock (mythreads.h:23)
==7873== by 0x109628: worker (main-deadlock.c:14)
==7873== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==7873== by 0x4906AC2: start_thread (pthread_create.c:442)
==7873== by 0x4997A03: clone (clone.S:100)
==7873==
==7873== Lock at 0x10C040 was first observed
==7873== at 0x4853D81: mutex_lock_WRK (hg_intercepts.c:944)
==7873== by 0x4858FBF: pthread_mutex_lock (hg_intercepts.c:960)
==7873== by 0x1093A6: Pthread_mutex_lock (mythreads.h:23)
==7873== by 0x109619: worker (main-deadlock.c:13)
==7873== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==7873== by 0x4906AC2: start_thread (pthread_create.c:442)
==7873== by 0x4997A03: clone (clone.S:100)
==7873== Address 0x10c040 is 0 bytes inside data symbol "m1"
==7873==
==7873== Lock at 0x10C080 was first observed
==7873== at 0x4853D81: mutex_lock_WRK (hg_intercepts.c:944)
==7873== by 0x4858FBF: pthread_mutex_lock (hg_intercepts.c:960)
==7873== by 0x1093A6: Pthread_mutex_lock (mythreads.h:23)
==7873== by 0x109628: worker (main-deadlock.c:14)
==7873== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==7873== by 0x4906AC2: start_thread (pthread_create.c:442)
==7873== by 0x4997A03: clone (clone.S:100)
==7873== Address 0x10c080 is 0 bytes inside data symbol "m2"
==7873==
==7873==
==7873==
==7873== Use --history-level=approx or =none to gain increased speed, at
==7873== the cost of reduced accuracy of conflicting-access information
==7873== For lists of detected and suppressed errors, rerun with: -s
==7873== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 7 from 7)

提示信息表明线程#3违反了锁的顺序,即违反了先获取地址 0x10C040 的锁,再获取地址 0x10C080 的锁的规则。通过阅读源代码main-deadlock.c也可以发现死锁问题。

worker函数中的锁顺序依赖于传入的参数,导致存在一个潜在的死锁情况:

如果线程 p1(参数为 0)获得了m1锁,并且线程 p2(参数为 1)同时获得了m2锁,那么两个线程都会阻塞在尝试获取第二个锁的操作上:线程 p1 在等待 m2 锁,而线程 p2 在等待 m1 锁。由于每个线程持有另一个线程需要的锁,并且都不释放自己的锁,因此它们都无法继续执行,从而造成死锁。

要解决这个问题,可以确保所有线程以相同的顺序获取锁,或者使用其它同步机制来避免这种锁顺序依赖。

main-deadlock-global

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include "mythreads.h"
#include <stdio.h>

pthread_mutex_t g = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;

void *worker(void *arg) {
Pthread_mutex_lock(&g);
if ((long long)arg == 0) {
Pthread_mutex_lock(&m1);
Pthread_mutex_lock(&m2);
} else {
Pthread_mutex_lock(&m2);
Pthread_mutex_lock(&m1);
}
Pthread_mutex_unlock(&m1);
Pthread_mutex_unlock(&m2);
Pthread_mutex_unlock(&g);
return NULL;
}

int main(int argc, char *argv[]) {
pthread_t p1, p2;
Pthread_create(&p1, NULL, worker, (void *)(long long)0);
Pthread_create(&p2, NULL, worker, (void *)(long long)1);
Pthread_join(p1, NULL);
Pthread_join(p2, NULL);
return 0;
}

main-deadlock-global.c程序中使用了全局互斥锁解决死锁问题。具体分析如下:

worker函数的开始,所有线程都会尝试获取全局互斥锁g。这意味着在任何线程可以继续执行并尝试获取其它锁之前,它们必须先获取这个全局锁,在任何给定时间,只有一个线程可以持有g,因此只有一个线程可以进入临界区并尝试获取其它锁。即使线程在获取m1m2的顺序上有所不同,由于它们是在持有g的情况下进行操作,所以不会发生死锁。这是因为持有g的线程已经确保了在它尝试获取其它锁时,没有其它线程可以持有这些锁。

但是helgrind工具分析结果显示

1
2
3
Thread #3: lock order "0x10C080 before 0x10C0C0" violated
...
==20204== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 7 from 7)

helgrind发生了误报,说明helgrind这种工具在锁顺序方面采取了保守的策略,会提示一些并非真正错误的警告,从而确保程序的安全性。

main-signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include "mythreads.h"
#include <stdio.h>

int done = 0;

void *worker(void *arg) {
printf("this should print first\n");
done = 1;

return NULL;
}

int main(int argc, char *argv[]) {
pthread_t p;
Pthread_create(&p, NULL, worker, NULL);
while (done == 0)
;
printf("this should print last\n");

return 0;
}

main-sginal.c程序创建了一个子线程#2,该线程执行worker函数,该函数打印一条消息,并将全局变量done设置为1。主线程在while循环中等待done变为1,然后打印另一条消息。

  • helgrind工具分析结果如下(省略部分与上文重复的内容)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
❯ valgrind --tool=helgrind ./main-signal
==27862== Command: ./main-signal
==27862==
this should print first
==27862== ---Thread-Announcement------------------------------------------
==27862==
==27862== Thread #2 was created
==27862==
==27862== ---Thread-Announcement------------------------------------------
==27862==
==27862== Thread #1 is the program's root thread
==27862==
==27862== ----------------------------------------------------------------
==27862==
==27862== Possible data race during write of size 4 at 0x10C014 by thread #2
==27862== Locks held: none
==27862== at 0x109633: worker (main-signal.c:13)
==27862== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==27862== by 0x4907AC2: start_thread (pthread_create.c:442)
==27862== by 0x4998A03: clone (clone.S:100)
==27862==
==27862== This conflicts with a previous read of size 4 by thread #1
==27862== Locks held: none
==27862== at 0x109684: main (main-signal.c:20)
==27862== Address 0x10c014 is 0 bytes inside data symbol "done"
==27862==
==27862== ----------------------------------------------------------------
==27862==
==27862== Possible data race during read of size 4 at 0x10C014 by thread #1
==27862== Locks held: none
==27862== at 0x109684: main (main-signal.c:20)
==27862==
==27862== This conflicts with a previous write of size 4 by thread #2
==27862== Locks held: none
==27862== at 0x109633: worker (main-signal.c:13)
==27862== by 0x48574A0: mythread_wrapper (hg_intercepts.c:406)
==27862== by 0x4907AC2: start_thread (pthread_create.c:442)
==27862== by 0x4998A03: clone (clone.S:100)
==27862== Address 0x10c014 is 0 bytes inside data symbol "done"
==27862==
this should print last
==27862==
==27862== Use --history-level=approx or =none to gain increased speed, at
==27862== the cost of reduced accuracy of conflicting-access information
==27862== For lists of detected and suppressed errors, rerun with: -s
==27862== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 62 from 35)

helgrind报告了两个数据竞争问题:

  • 写操作数据竞争:子线程#2在未持有任何锁的情况下对变量done(地址为0x10C014)进行了写操作;

  • 读操作数据竞争:主线程#1在未持有任何锁的情况下对变量done进行了读操作,与子线程#2的写操作冲突;

程序被编译执行后会先输出first后输出last,但是helgrind报告数据竞争是因为两个线程都访问了全局变量done,而没有适当的同步机制(如互斥锁)来保护这个变量的访问。


这段代码效率低下的原因在于主线程使用了一个忙等待循环来检查子线程是否完成了工作。

while (done == 0)循环中,主线程不断地检查done变量的值,直到它变为1。即使没有任何有用的计算在进行,CPU也会不断地执行循环,在这期间,CPU周期被浪费在重复检查一个变量的值上,这会导致CPU资源的低效使用。并且在多任务操作系统中,需要重新调度线程以响应其他任务或进程,忙等待循环可能会导致频繁的上下文切换。

可以通过以下几种方法改进:

  • 使用条件变量和互斥锁可以让主线程在等待子线程时释放CPU,直到子线程完成工作并通过条件变量通知主线程;

  • 使用pthread_join函数可以让主线程阻塞,直到子线程完成。这样,主线程就不会在子线程运行时消耗CPU资源;

main-signal-cv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include "mythreads.h"
#include <stdio.h>

//
// simple synchronizer: allows one thread to wait for another
// structure "synchronizer_t" has all the needed data
// methods are:
// init (called by one thread)
// wait (to wait for a thread)
// done (to indicate thread is done)
//
typedef struct __synchronizer_t {
pthread_mutex_t lock;
pthread_cond_t cond;
int done;
} synchronizer_t;

synchronizer_t s;

void signal_init(synchronizer_t *s) {
Pthread_mutex_init(&s->lock, NULL);
Pthread_cond_init(&s->cond, NULL);
s->done = 0;
}

void signal_done(synchronizer_t *s) {
Pthread_mutex_lock(&s->lock);
s->done = 1;
Pthread_cond_signal(&s->cond);
Pthread_mutex_unlock(&s->lock);
}

void signal_wait(synchronizer_t *s) {
Pthread_mutex_lock(&s->lock);
while (s->done == 0)
Pthread_cond_wait(&s->cond, &s->lock);
Pthread_mutex_unlock(&s->lock);
}

void *worker(void *arg) {
printf("this should print first\n");
signal_done(&s);

return NULL;
}

int main(int argc, char *argv[]) {
pthread_t p;
signal_init(&s);
Pthread_create(&p, NULL, worker, NULL);
signal_wait(&s);
printf("this should print last\n");

return 0;
}
  • signal_init函数初始化互斥锁和条件变量,并将done标志设置为0。

  • signal_done函数设置done标志为1,并通过条件变量cond发送信号,通知正在等待的线程。互斥锁在发送条件变量信号之前被获取,并在之后释放,这保证了done标志的设置与条件变量的信号通知是原子操作。

  • signal_wait函数使用Pthread_cond_wait,它会在等待条件变量时自动释放互斥锁,并在被唤醒时重新获取互斥锁。这避免了死锁,并确保了线程在检查done标志之前有互斥锁。


  • helgrind工具分析结果如下(省略部分与上文重复的内容)
1
2
3
4
5
6
7
8
9
10
❯ valgrind --tool=helgrind ./main-signal-cv
==36705== Command: ./main-signal-cv
==36705==
this should print first
this should print last
==36705==
==36705== Use --history-level=approx or =none to gain increased speed, at
==36705== the cost of reduced accuracy of conflicting-access information
==36705== For lists of detected and suppressed errors, rerun with: -s
==36705== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 7 from 7)

说明使用条件变量互斥锁之后不再有数据竞争的问题。并且主线程在等待子线程时释放CPU,直到子线程完成工作并通过条件变量通知主线程,节省了CPU资源,程序的性能相较于main-signal更高。

参考资料

C语言多线程编程