本篇博客将介绍线程用来同步彼此行为的两个工具:互斥量(mutex)和条件变量(condition variable)。

互斥量

线程的一个主要优势是,能够通过具有全局属性的变量快速的交换数据。但是这个优势同样带来了弊端:必须确保多个线程不会同时访问同一个变量。术语临界区(critical section)是指一段访问共享资源(即具有全局属性的变量)的代码块,我们希望临界区代码的执行必须具有原子性(atomic),即同一时间只有一个线程能在临界区内运行。

下面来看一个不以原子性访问临界区的代码,看看会出现什么情况。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

int glob = 0;

void* startFunc(void* arg) {
    int loops = *(int *)arg;
    int loc;

    for(int i = 0; i < loops; i++) {
        loc = glob;
        loc++;
        glob = loc;
    }

    return NULL;
}

int main(int argc, char* argv[]) {
    if(argc <= 1) {
        printf("need to specify the loops\n");
        exit(-1);
    }

    int* loops = (int *)malloc(sizeof(int));
    *loops = atoi(argv[1]);

    pthread_t t1, t2;
    int ret = pthread_create(&t1, NULL, startFunc, loops);
    ret = pthread_create(&t2, NULL, startFunc, loops);

    ret = pthread_join(t1, NULL);
    ret = pthread_join(t2, NULL);

    printf("glob: [%d]\n", glob);

    return 0;
}                               

为了减少代码的篇幅,我删去了对系统调用以及库函数返回值的判错,平常学习的时候最好还是都加上。

代码的逻辑很简单,有一个全局变量glob,我们创建两个线程,每个线程都循环增加glob的值,只不过这里用了一个中间变量loc来暂存glob,相当于在一个线程增加glob的值的时候制造了一个空档以便另一个线程有可能在此时获得CPU调度,如下图所示:

可以看到线程2将glob增加了一定次数后,线程1重新获得CPU调度,此时再把loc重新赋值给glob,导致线程2对glob的改动丢失。

在我的测试中,随着loops的上升,glob终于出现不等于loops的两倍的情况。

可能有人觉得把glob加1分成三步是画蛇添足,没事找事。但其实换成glob++这个问题同样存在,因为C语言中自增操作同样不具有原子性,哪怕这只是一条C语句,但CPU仍有可能在运行这条语句的时候将时间片分给其他的线程。

这应当归咎于CPU调度的不确定性,正是因为这种不确定性的存在,才需要各种同步技术,也就是下面要说的互斥量。

基本概念

为了避免更新共享资源的时候出现上面这种问题,就需要使用互斥量(mutex)来保证对共享资源的原子访问。

现在可以把互斥量想象成一个锁,所以互斥量有两种状态:已锁定(locked)未锁定(unlocked)。而线程只需要在访问共享资源之前通过互斥量将其“锁住”,然后就可以放心的对共享资源进行访问了,最后访问完毕当然要记得“解锁”。在此期间,如果有其他线程跑来尝试访问同样的共享资源,他必须同样先对共享资源进行加锁,而这时候就会发现共享资源已经被互斥量“锁住”了,这代表着现在有其他线程正在访问这个资源,于是该线程就会等待锁解开再继续加锁访问,或者直接报错返回,这取决于加锁的方式。

互斥量最重要的特性就是一旦某个线程锁定互斥量,随即成为该互斥量的所有者。只有所有者才能对互斥量解锁。而在此期间其他锁定互斥量的操作都会被阻塞,这就是互斥量可以实现线程同步的基础。

下图展示了两个线程试图访问同一个资源时可能出现的场景:

由上图可知,所谓的锁住共享资源是一种形象的说法,其实真实锁住的是互斥量自己,而这个操作保证了互斥量加锁与互斥量解锁这两条语句中间的所有内容执行的原子性。

最后,要理解互斥量是一种建议而非强制。假如上面的线程B在访问共享资源之前根本不考虑什么加锁解锁,那么他仍旧可以不被阻塞的访问共享资源,所以这一切其实都只是取决于程序员对程序的设计。

互斥量初始化与销毁

操作互斥量的前提是你得有一个互斥量,互斥量的类型为pthread_mutex_t。可以将互斥量申请成静态变量(全局变量以及静态局部变量),也可以申请成动态变量(在栈上分配的普通局部变量以及堆上分配的变量)。这两种情况下对互斥量的初始化稍有不同。互斥量在使用之前必须进行初始化。

首先,对于动态分配的互斥量,必须使用pthread_mutex_init()函数来进行初始化。

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* attr);
// Returns 0 on success, or a positive error number on error

参数mutex指定函数要初始化哪个互斥量。参数attr是指向pthread_mutexattr_t类型的指针,这个类型的变量用于指定创建的互斥量的属性,如果该参数置为NULL则表示使用默认参数。互斥量的属性并不常用,本博客最后会讨论一个相对重要的属性:互斥量类型。

当经由这种方式初始化的互斥量不再需要的时候,应当使用pthread_mutex_destroy()函数将其销毁。

#include <pthread.h>

int pthread_mutex_destroy(pthread_mutex_t* mutex);
// Returns 0 on success, or a positive error number on error

只有当互斥量处于未锁定状态,且后续也无任何线程企图锁定它时,将其销毁才是安全的。若互斥量驻留于动态分配的一片内存区域中,应在释放(free)此内存区域前将其销毁。对于自动分配的互斥量,也应在宿主函数返回前将其销毁。

经由pthread_mutex_destroy()销毁的互斥量,可调用pthread_mutex_init()对其重新初始化。

静态分配的互斥量既可以通过pthread_mutex_init()函数初始化,也可以使用PTHREAD_MUTEX_INITIALIZER初始化。如下所示:

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

对于使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量,不需要进行销毁。

加锁与解锁

初始化之后,互斥量处于未锁定状态。函数pthread_mutex_lock()函数用于锁定某一互斥量,而函数pthread_mutex_unlock()用于将互斥量解锁。

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);
// Both return 0 on success, or a positive number on error

这两个函数的含义十分清晰。需要注意的是,调用加锁解锁函数时由于互斥量状态的不确定而会产生一些不同的结果。如下表所示。

加锁时互斥量的状态 结果
互斥量当前未锁定 锁定互斥量并立刻返回
互斥量已由其他线程锁定 阻塞直到解锁
互斥量已由自己锁定 死锁(Linux) / 返回EDEADLK错误
解锁时互斥量的状态 结果
互斥量由自己锁定 解锁互斥量并立刻返回
互斥量当前未锁定 Linux下函数调用成功
互斥量由其他线程锁定 Linux下函数调用成功

另外,如果有多个线程阻塞在一个互斥量上,当这个互斥量解锁后,哪一个线程成功得到这个互斥量是不确定的。

现在修改一下博客开头的那个例子,通过互斥量实现对glob的原子访问。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

int glob = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void* startFunc(void* arg) {
    int loops = *(int *)arg;
    int loc;

    for(int i = 0; i < loops; i++) {
        pthread_mutex_lock(&mutex);  // lock
        loc = glob;
        loc++;
        glob = loc;
        pthread_mutex_unlock(&mutex);  // unlock
    }

    return NULL;
}

int main(int argc, char* argv[]) {
    if(argc <= 1) {
        printf("need to specify the loops\n");
        exit(-1);
    }

    int* loops = (int *)malloc(sizeof(int));
    *loops = atoi(argv[1]);

    pthread_t t1, t2;
    int ret = pthread_create(&t1, NULL, startFunc, loops);
    ret = pthread_create(&t2, NULL, startFunc, loops);

    ret = pthread_join(t1, NULL);
    ret = pthread_join(t2, NULL);

    printf("glob: [%d]\n", glob);

    return 0;
}     

pthread_mutex_trylock()pthread_mutex_timedlock()

Pthreads API 提供了pthread_mutex_lock()函数的两个变体:pthread_mutex_trylock()pthread_mutex_timedlock()。可以通过手册查看这两个函数的原型。

pthread_mutex_trylock()相当于pthread_mutex_lock()的非阻塞版,只是尝试获取互斥量一次,如果互斥量有锁则返回EBUSY错误。

pthread_mutex_timedlock()只会阻塞固定的时间,如果经过该时间后仍未获得互斥量,则返回ETIMEDOUT错误。

互斥量死锁

有时,一个线程需要同时访问两个或更多不同的共享资源,而每个资源又都由不同的互斥量管理。当超过一个线程加锁同一组互斥量时,就有可能发生死锁。

如上图,线程A线程B各成功锁住了一个互斥量,但又同时尝试去锁另一个互斥量,这就造成了死锁

要避免此类死锁问题,最简单的方法是定义互斥量的层级关系。当多个线程对一组互斥量操作时,总是应该以相同顺序对该组互斥量进行锁定。例如上面的例子中,如果两个线程总是先锁定mutex1再锁定mutex2,死锁就不会出现。有时,互斥量间的层级关系逻辑清晰。不过,即便没有,依然可以设计出所有线程都必须遵循的强制层级顺序。

互斥量类型

下面介绍一个相对常见的互斥量属性:互斥量类型。

PTHREAD_MUTEX_NORMAL

该类型的互斥量不具有死锁检测(自检)功能。如果线程试图对已经由自己锁定的互斥量加锁,则会发生死锁。互斥量处于未锁定状态,或者已由其他线程锁定,对其解锁会导致不确定的结果。(在Linux 上,对这类互斥量的上述两种操作都会成功。)

PTHREAD_MUTEX_ERRORCHECK

对此类互斥量的所有操作都会执行错误检查。所以上面所说的三种情况都会导致相关Pthreads 函数返回错误。这类互斥量运行起来比一般类型要慢,不过可将其作为调试工具,以发现程序在哪里违反了互斥量使用的基本原则。

PTHREAD_MUTEX_RECURSIVE

递归互斥量维护有一个锁计数器。当线程第1 次取得互斥量时,会将锁计数器置1。后续由同一线程执行的每次加锁操作会递增锁计数器的数值,而解锁操作则递减计数器计数。只有当锁计数器值降至0 时,才会释放(release,亦即可为其他线程所用)该互斥量。解锁时如目标互斥量处于未锁定状态,或是已由其他线程锁定,操作都会失败。

PTHREAD_MUTEX_DEFAULT

这是互斥量的默认类型。标准并未定义这个类型的互斥量在上面三种情况下的动作,意在为互斥量的高效实现保留最大的灵活性。Linux 上,PTHREAD_MUTEX_DEFAULT类型互斥量的行为与PTHREAD_MUTEX_NORMAL 类型相仿。

对比同步与等待

互斥量可以用于同步而不能用于等待。什么意思呢?就是说,互斥量只能保证同一时间只有一个线程在访问共享资源,但做不到当某个线程所需的特殊条件达成后再让这个线程访问共享资源,也就是在条件还没达成的时候让这个线程等待着。

那什么场景下才需要这种等待呢?思考下面著名的生产者消费者问题(Producer-Consumer Problem)

生产者消费者是这样一种问题,一个或多个生产者(线程或进程)创建者一个个的数据条目,然后这些条目由一个或多个消费者(线程或进程)处理。如下图:

在这个例子中,单个进程内有多个生产者线程和单个消费者线程。生产者生产的条目在这里就是一个整型数据,然后将这些数据存在buff数组中。为了简单起见,生产者为buff[0]生产数据0,为buff[1]生产数据1。而消费者对这些数据的处理只是验证每个数据元素都是正确的。

#include <stdio.h>                                 
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define MAXITEM 10000000  // 最大条目数
#define MAXTHREADS 10  // 最大线程数

int nitems;  // 要生产的条目的个数
struct {
    pthread_mutex_t mutex;
    int buff[MAXITEM];  // 存放生产者生产的条目
    int nput;  // 生产者下一个要生产条目的下标
    int nval;  // 生产者下一个要生产条目的值
} shared;

void* produce(void* arg) {
    int* count = (int *)arg;

    while(1) {
        pthread_mutex_lock(&(shared.mutex));
        if(shared.nput == nitems) {  // 如果生产数目够了
            pthread_mutex_unlock(&(shared.mutex));
            return NULL;
        }
        shared.buff[shared.nput++] = shared.nval++;
        pthread_mutex_unlock(&(shared.mutex));
        (*count)++;  // 改变count不需要同步,所以放在锁外面
    }
}       

void* consume(void* arg) {
    int count = 0;  // 消费者已经处理的条目个数

    while(count < nitems) {
        pthread_mutex_lock(&(shared.mutex));
        if(count < shared.nput) {  // 如果还有未处理的条目
            if(shared.buff[count] != count) {  // 如果条目的数据不正确
                printf("Buff error: buff[%d] = %d\n", count, shared.buff[count]);
            }
            count++;  // 处理该条目
        }
        pthread_mutex_unlock(&(shared.mutex));
    }

    return NULL;
}

int main(int argc, char* argv[]) {
    int nthreads, count[MAXTHREADS];
    pthread_t tid_producer[MAXTHREADS], tid_consumer;

    pthread_mutex_init(&(shared.mutex), NULL);  // 互斥量赋初值

    if(argc != 3) {
        printf("argc != 3 !!!!\n");
        exit(-1);
    }
    nitems = atoi(argv[1]);
    nthreads = atoi(argv[2]);

    for(int i = 0; i < nthreads; i++) {
        count[i] = 0;
        pthread_create(&tid_producer[i], NULL, produce, &count[i]);
    }
    pthread_create(&tid_consumer, NULL, consume, NULL);                                                                                 

    for(int i = 0; i < nthreads; i++) {
        pthread_join(tid_producer[i], NULL);
    }
    pthread_join(tid_consumer, NULL);

    int total = 0;
    for(int i = 0; i < nthreads; i++) {
        printf("Thread%d produced %d items\n", i, count[i]);
        total += count[i];
    }
    printf("Total produced %d items\n", total);

    return 0;
}                                                      

可以看到,运用了同步技术,每个线程各司其职,没有谁生产了多余的条目,也没有谁生产了错误的条目。

唯一的瑕疵出在消费者的逻辑上,可以看到,消费者如果处理完了所有已生产的条目,仍然会循环加锁解锁,并在此期间重复检查是否有新的条目被生产出来。这称为轮转(spinning)轮询(polling),是一种对CPU时间的浪费。

一种可能的替代方案是,让消费者睡眠一会儿,但没人知道要睡多久。所以这里所需的是另一种类型的同步,它允许一个线程或进程睡眠到某个事件发生为止。

条件变量

互斥锁防止多个线程同时访问同一共享变量。条件变量允许一个线程就某个共享变量(或其他共享资源)的状态变化通知其他线程,并让这些线程在没有收到通知的时候阻塞起来。

条件变量的类型为pthread_cond_t,使用之前同样需要初始化,初始化方式和互斥量类似。动态分配的条件变量只能用pthread_cond_init()初始化,使用完成后必须用pthread_cond_destroy()销毁。而静态分配的条件变量还可以用PTHREAD_COND_INITIALIZER初始化,并且无需摧毁。

通知和等待条件变量

#include <pthread.h>

int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);
int pthread_cond_signal(pthread_cond_t* cond);
int pthread_cond_broadcast(pthread_cond_t* cond);
// All returns 0 on success, or a positive error number on error

pthread_cond_wait()用于等待,pthread_cond_signal()pthread_cond_broadcast()用于通知正在等待的线程。

每个条件变量总是有一个互斥锁与之关联。这一点非常重要,可以看到在pthread_cond_wait()函数的参数中不仅有一个条件变量,还有一个互斥量。这个互斥量用于同步对所等待的那个共享资源的访问,我们下面讨论为什么这个互斥量要传到函数内部而不是由用户在函数外部完成加锁解锁的操作。

现在先介绍pthread_cond_wait()这个函数内部的真正执行流程。首先,这个函数会假设用户已经在外部对mutex参数加过锁了,所以这个函数的第一步是将mutex参数解锁,然后使调用线程陷入沉睡等待其他线程通过pthread_cond_signal()或类似的接口发来一个信号,然后将其唤醒。唤醒后,函数返回前的最后一步是再次将mutex参数加上锁,最后返回。

那么,为什么要这么设计呢?首先,我们一直说条件变量用于通知某个共享资源的状态变化,而这个状态变化的判断都是由用户完成的。也就是说,通常是用户先判断共享资源的状态是否符合要求,如果不符合要求则调用pthread_cond_wait()使线程陷入沉睡,而当另一个线程发现共享资源变化成符合要求的状态时,会调用pthread_cond_signal()或类似的函数唤醒之前沉睡的线程。这就解释了为什么要把一个互斥量当作参数传入pthread_cond_wait()函数,因为必须让判断条件不满足和陷入沉睡这两个动作具有原子性,所以在函数外部需要先对互斥量加锁,然后判断条件是否不满足,然后若不满足则调用pthread_cond_wait()陷入沉睡。试想如果不这么设计会发生什么?即不给条件变量设置这么一个互斥量或者将对互斥量的加锁操作放在pthread_cond_wait()内部会发生什么?首先用户判断条件是否满足,发现不满足然后调用pthread_cond_wait()让线程陷入沉睡,但假如条件满足恰好出现在这两步之间呢?由于没有互斥量的保护,所以条件可能在用户判断完之后变成满足,就导致条件满足的信号在调用pthread_cond_wait()之前发了过来,而线程忽略了这个信号,之后再调用pthread_cond_wait()陷入沉睡,这可能导致线程永远沉睡!!!因为已经把条件满足的信号漏掉了。

下面看一下两个用于通知状态变化的函数pthread_cond_signal()pthread_cond_broadcast()。两者都是用来唤醒阻塞在cond参数上的线程,不同的是pthread_cond_signal()只保证唤醒至少一条阻塞在cond上的线程,而pthread_cond_broadcast()保证唤醒所有阻塞在cond上的线程。

这就是应用场景的不同了,pthread_cond_signal()应用在仅需唤醒任意一条线程的场景下,这通常意味着所有线程都在执行相同的任务。而pthread_cond_signal()应用在所有睡眠的线程处理的任务不同,需要把它们都唤醒的场景下。

pthread_cond_signal()可能比pthread_cond_broadcast()更具效率,因为唤醒多条线程可能导致多余的线程醒来后发现任务已经被别的线程做了,也就是条件仍不满足,这时候该线程会继续陷入沉睡。

应用

下面将条件变量引入上面的生产者消费者问题。

#include <stdio.h>                                 
#include <stdlib.h> 
#include <unistd.h>
#include <pthread.h>

#define MAXITEM 10000000
#define MAXTHREADS 10

int nitems;  // 要生产的条目的个数
struct {
    pthread_mutex_t mutex;
    int buff[MAXITEM];  // 存放生产者生产的条目
    int nput;  // 生产者下一个要生产条目的下标
    int nval;  // 生产者下一个要生产条目的值
} shared;

struct {
    pthread_mutex_t mutex;  // 条件变量用到的互斥量
    pthread_cond_t cond;  // 条件变量
    int nready;  // 条件变量对应的条件,即现在有几个消费者未处理的条目
} nready;

void* produce(void* arg) {
    int* count = (int *)arg;

    while(1) {
        pthread_mutex_lock(&(shared.mutex));
        if(shared.nput == nitems) {  // 如果生产数目够了
            pthread_mutex_unlock(&(shared.mutex));
            return NULL;
        }
        shared.buff[shared.nput++] = shared.nval++;
        pthread_mutex_unlock(&(shared.mutex));

        pthread_mutex_lock(&(nready.mutex));
        pthread_mutex_lock(&(nready.mutex));
        if(nready.nready == 0) {  // 只有当nready从0变到1的时候,才发送信号,防止发送过多的重复信号
            pthread_cond_signal(&(nready.cond));
        }
        nready.nready++;
        pthread_mutex_unlock(&(nready.mutex));

        (*count)++;  // 改变count不需要同步,所以放在锁外面
    }
}

void* consume(void* arg) {
    arg = NULL;
    int count = 0;  // 消费者已经处理的条目个数

    while(count < nitems) {
        pthread_mutex_lock(&(nready.mutex));
        while(nready.nready <= 0) {
            pthread_cond_wait(&(nready.cond), &(nready.mutex));
        }
        count++;  // 处理这个条目
        nready.nready--;  // 减小未处理的条目数
        pthread_mutex_unlock(&(nready.mutex));
    }

    return NULL;
}

int main(int argc, char* argv[]) {
    int nthreads, count[MAXTHREADS];                                                                                                    
    pthread_t tid_producer[MAXTHREADS], tid_consumer;

    pthread_mutex_init(&(shared.mutex), NULL);

    if(argc != 3) {
        printf("argc != 3 !!!!\n");
        exit(-1);
    }
    nitems = atoi(argv[1]);
    nthreads = atoi(argv[2]);

    for(int i = 0; i < nthreads; i++) {
        count[i] = 0;
        pthread_create(&tid_producer[i], NULL, produce, &count[i]);
    }
    pthread_create(&tid_consumer, NULL, consume, NULL);

    for(int i = 0; i < nthreads; i++) {
        pthread_join(tid_producer[i], NULL);
    }
    pthread_join(tid_consumer, NULL);

    int total = 0;
    for(int i = 0; i < nthreads; i++) {
        printf("Thread%d produced %d items\n", i, count[i]);
        total += count[i];
    }
    printf("Total produced %d items\n", total);

    return 0;
}                                         

在应用过程中,一个需要注意的点是,pthread_cond_wait()通常由一个while循环包裹起来,while循环的判断条件就是条件变量对应的条件,这里用while循环而不用if判断是刻意为之的。当第一次执行到这里,whileif判断一次所需条件是否满足,不满足则进入whileif陷入沉睡,而当其被唤醒的时候,其实是无法知道上面的条件是否仍然满足,有可能已经有其他线程率先被唤醒然后捷足先登完成了任务,导致条件再一次不满足,所以必须用while循环,使得线程被唤醒后再一次判断条件。这也是为什么pthread_cond_wait()里最后一步是再次给mutex加上锁的原因,方便函数返回后立刻要再一次进行的条件判断。

Last modification:December 5th, 2019 at 07:26 pm