0%

TCP 连接状态的维护是在内核协议栈上的,它保存了一个 TCP 中的序列号、四元组、绑定进程等信息,并且负责维护进程的收发缓冲区,TCP 异常也是根据这些信息来进行处理的

报文处理原则

任何非 RST 报文都是按照次序进行处理的,若出现包延迟或丢包,seq 不连续的报文将会被放入等待队列中等待处理。

异常原则

在 TCP 状态机中,只要有任何一个位置出现异常,都会以一个 RST 包来结束本次 TCP 连接来标识本次连接发生了异常。当内核中的 TCP 协议栈收到 RST 包后,会检查 RST 包的序列号,如果序列号正确,将会立刻丢弃掉缓冲区内的所有数据并且告知本端进程错误,即 Connection Reset By Peer。

本端进程状态

内核协议栈会检查 Socket 所绑定进程的状态,以及该进程对 Socket 的读写权限;并根据这些信息来处理相关异常情况。

  1. Socket 直接由协议栈回收时,将会释放协议栈相关资源,并且向对方发送 RST 包关闭连接;

  2. 若进程关闭了读权限,对端继续写入将会收到 RST 包,因为进程永远不会读到新发送的数据;

  3. 若进程在有读缓冲区的情况下关闭 Socket,将会给对方发送 RST 包;

情况一对应了一个进程崩溃,或者进程没有正确关闭 Socket 的情况。在这种情况下,Socket 并不是由进程主动释放的,而是由协议栈来进行回收的,这时协议栈会判断为进程因为某些原因而异常退出,协议栈所能采取的更安全的做法就是立刻丢弃掉缓冲区内的数据,并且向对端发送 RST。因此,只有在进程退出时主动释放 Socket 相关资源才是安全的,否则会导致写入缓冲区的数据并不会完全被对方收到。

情况二对应了 TCP 四次挥手时进程不正常关闭而导致出现孤儿 Socket。当 TCP 主动关闭的端直接使用 close 来关闭 Socket 的读写时,对端继续写入数据是无法被对端收到的。但协议栈并不能判断对方是否仍然有数据来写入,因此协议栈中仍然会保留该 Socket,如果对端在四次挥手中没有需要写入的数据,那么就会被视为一次正常关闭,否则将会被视为异常关闭。

情况三则代表了虽然协议栈中收到了 TCP 数据包,但是在进程层面来看则并没有收到,因此内核协议栈是有责任来告知这一次连接并不是被安全关闭的。

TCP 状态机

内核协议栈还会根据当前维护的 TCP 状态机来决定如何对收到的 TCP 报文做出应答。虽然 TCP 已经采取了多种机制来避免收到非本次连接中的 TCP 包,但是仍有一定概率会收到异常包。根据收到的报文种类,可以分为以下几个异常情景:

  1. 收到非窗口内的非 SYN 报文,不回复;
  2. 已建立连接收到了一个 SYN 包,这时会回复 Challenge ACK;
  3. 收到一个指向不存在的或不存活的 Socket 的请求,回复 RST;
  4. Time Wait 状态收到 SYN 包。

情况一比较容易理解,即收到了延迟较高的数据包,或者收到了非本次连接的包。

情况二区别于情况一,收到一个乱序的 SYN 包并不会丢弃,而是要采取更为复杂的处理措施。因为该乱序的 SYN 包可能是本次连接的超时包,也可能是来自一个新的三次握手的请求包。即对端崩溃后,本端并没有及时感知到,对端又再一次发送了连接请求。显然,后一种情况下,TCP 连接已经不安全了,因此内核会发送一个带有当前 TCP 序列号的 ACK 包,称为 Challenge ACK,用于确认对端的状态。如果这是一个过期的 SYN 包,那么对端 TCP 仍然是存活的,这时该 ACK 包会在窗口内,对端不会进行回复。如果对端是崩溃后重连,则该 ACK 序列号并不符合期望,对端协议栈将会发送一个 RST 包来重置连接。

情况三也比较容易理解,即当前 host 没有绑定进程,或者 host 正处于 timewait 等不可用状态。

而第四种情况较为特殊,当 Time Wait 状态下收到一个序列号大于当前序列号的 SYN 包,并且四元组相同,该 socket 将会被复用。

序论

并发的一些基本概念

  • 并发:在单个系统中可以同时执行多个独立的任务,这些任务可是不是在同一时刻开始的。并发的关注点为任务分离和任务响应

  • 并行:是指系统在同一时刻创建出多个并发的线程,这些任务的开始间隔很小,近似为同一时刻。并行的关注点再数据批量处理

  • C++中,同一进程的线程之间是共享同一片内存地址的;而进程之间的内存地址之间是隔离开的。因此,进程之间的通信十分复杂,或通信速度较慢;而线程之间的通信简单,且速度很快。

  • 并发的缺点:并发的设计和维护的代价非常大,也更容易引起错误。过多的线程也会造成系统的运行效率下降。

  • C++标准库对系统底层工具进行了封装,这代表其会产生一定的抽象代价

并行的常用方式

  • 任务并行:将一个大任务分为几部分,各自单独运行。数据分批次,算法是整体的。任务并行常常用来提升任务运行速度
  • 数据并行:将一个算法分为几部分,形成线程之间的流水线工作。数据并行常常用来提升数据吞吐量

并发编程常用思想

​ 下面介绍的是在并发编程中经常会用到的一些模式和思想。

函数化编程

​ 函数化编程是指函数的结果只依赖于传入函数的参数,而不依赖于外部的状态。通常所说的外部状态有:全局变量,外部域变量、系统参数等,注意不要忽略系统参数。函数化编程具有以下的优点:

  • 可再现性:函数化编程的结果与输入一一对应,可以避免一些预料之外的错误。
  • 低条件竞争:函数化编程不依赖外部参数,因而很难会发生条件竞争。
  • 高并发:采用函数化编程,可以在同一时刻并发多个线程,而不会造成阻塞。

通信顺序处理

​ 通信顺序处理是指线程之间的消息只通过消息队列传递,而不使用共享地址空间。由于 C++线程共享一块地址空间,所以 C++只能够模拟而非真正实现通信顺序处理。

​ 基于通信顺序处理,可以实现多线程下的有限状态机

线程安全

​ 指数据结构在多线程环境下,无数据的丢失和损毁,所有的数据需要维持原样且无条件竞争。 单纯使用锁会导致序列化,即线程只能轮流访问被保护的数据;在数据结构这个节点上并不能实现真正的并发;线程安全的含义:

  • 确保无线程能看到修改够数据结构的“不变量”时的状态,即数据修改对任何线程是原子的
  • 小心会引起条件竞争的接口,提供完整操作的函数,而非操作步骤
  • 注意数据结构的行为是否会产生异常,从而确保“不变量”的状态,即实现“异常-安全”
  • 降低死锁概率,限制锁的范围,降低锁的粒度,避免嵌套锁的存在。

线程管理

简单地定义一个模板的初始化

C++线程是一个模板类,提供的函数对象和数据(除 std::ref()外)都会被复制到新的线程中去,如果拷贝后主线程的函数或数据发生变化不会造成子线程的变化,这可能会导致一些错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include<thread>

class SomeClass{
public:
void someFunction(int & num){ }

void createFunction(){
//调用类的成员函数时要加上 &类型::
//默认参数需要拷贝到新线程中去,除非显示使用 std::ref
std::thread thread1(&SomeClass::someFunction,this,std::ref(num));
}
};

//一个线程可以被 Join 一次,主线程运行至 join 处
//会等待其他线程运行完毕
if(thread1.joinable)
thread1.join();

//detach 通常用来分离守护线程,被分离后的线程不能 Join
//守护线程:没有显显式的用户接口,在后台持续运行,可能会贯穿整个应用生命周期
thread1.detach();

//yield 用来主动让出时间片,即若有其他线程在等待,则优先让其他线程运行
thread1.yield();
  • std::thread 变量是一个右值类型,不能够拷贝和赋值,只能够使用 std::move 进行移动操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
//假设有两个线程 thread1、thread2 

thread1 = thread2 //这是错误的
thread1.swap(thread2);//thread1和 thread2 相互交换

thread1 = std::move(thread2) // thread2交给 thread1 管理,自身变为空值;thread1原本的会直接被终止,不抛出异常

//函数可以返回一个 std::thread 类型的对象
std::thread createThread(){
//返回的对象必须被初始化
std::thread t();
return t;
}
  • 有关 thread 的一些操作
1
2
3
4
5
6
7
8
9
//线程的标识符
std::this_thread::get_id(); //获得当前运行线程的 id
thread1.get_id(); //获得 thread1 的 id

//获得允许运行的最大线程数量
//注意,这个数量并不是安全的,因为同一时间在多个线程中调用会返回同一个值
//如果按照这个数量来创建线程,往往会超出系统负载
int max_thread_num = std::thread::hardware_concurrency();

  • 多线程编程中的“异常-安全”

线程间的共享数据——mutex

​ 问题来源:当多个线程同时访问一个共享的数据时,它只保留刚刚读取时的信息,若读取后数据被更新,线程并不会保证其所使用的数据为最新版本。当多个线程同时要修改一个数据,就会引发一些问题,这叫作条件竞争。条件竞争是时间敏感的,通常难以在 debug 模式下进行复现

互斥量

​ c++中的基本互斥量有三种:std::mutexstd::shared_mutexstd::recursive_mutex。互斥量本身不绑定任何一个数据,它相当于标志出一个临界区,使用同一个互斥量的线程不能够进入同一个临界区,而非禁止访问资源。

std::shared_mutex 是一个读者-写者锁。用它来保护一个数组的时候,允许在不同的元素上进行读写操作;即他只会保护被写的那一个元素,而非是保护整个数组。

std::recursive_mutex是一个嵌套锁。嵌套锁用于并发访问的类上,用于解决成员函数之间的嵌套使用。如一个类中有 A 和 B 两个函数,两个函数都具有独立功能,但 B 依赖于 A 实现,且 A 和 B 都使用了锁。使用嵌套锁可以解决这个问题,但不推荐。推荐——将公共部分提取出来作为 private 函数,再设计两套接口来分别调用这个函数。

使用互斥量

1
2
3
4
5
6
7
8
#include <mutex>

std::mutex mtx;

void someFunction(){
std::lock_guard<std::mutex> guard(mtx); //不推荐直接对 mutex 进行操作
//临界区
}//离开作用域时退出临界区

避免指针指向临界区资源

使用互斥量时,防止有指针指向临界区内的数据,互斥量向底层传递,不向顶层传递

互斥量可以锁住一个对象本身以及其所用到的所有资源,但是不能够锁住指向这个对象的指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::mutex mtx;

int i=1;
int *_ptr=&i; //通过_ptr 可以指向i

void someFunction(){
std::lock_guard<std::mutex> guard(mtx); //不推荐直接对 mutex 进行操作
//临界区
i++;
}//离开作用域时退出临界区

void otherFunction(){
*_ptr--; //虽然 someFunction 中对 i 上锁,但是通过指针仍然可以访问该数据
}

接口间的条件竞争

​ 接口间的竞争来源:考虑一个 STL 标准容器,其中的 empty 在调用时测试的可能是一个不空的数组,但是这个值返回时可能会有其他线程更改了数组,导致数组变为空的。

​ 解决方法有几种:

  1. 在多线程程序中,尽量避免定义一些描述数据结构“状态”的函数,如 size()、empty()等。

  2. 将状态函数使用时对整个数据结构加锁(会导致其他线程阻塞,并发度下降)

  3. 将状态函数与其他功能函数集成在一起,如使用 try_pop,若失败返回一个空变量。

使用互斥量可能造成死锁问题

​ 死锁来源:不同线程尝试进入临界区的顺序不同,锁住互斥量的顺序不同。

  • 按照一定的优先级顺序来声明信号量,确保锁住互斥量的顺序是一致的。

    优先锁住高频率使用的互斥量,从而来降低线程之间的竞争

  • 使用 std::lock()来一次性锁住多个互斥量

    1
    2
    3
    4
    5
    6
    7
    8
    #include <mutex>
    std::mutex mtx1,mtx2;

    //在声明lock 时,相当于向系统一次性申请多个互斥量
    std::lock(mtx1,mtx2);
    std::lock_guard<std::mutex> lock1(mtx1,std::adopt_lock);
    std::lock_guard<std::mutex> lock2(mtx2,std::adopt_lock);
    //如果 lock1 或 lock2 发生异常,那么将会自动释放所有互斥量
  • 使用std::scoped_lock<>来一次性锁住多个互斥量,C++17标准

    1
    2
    3
    4
    5
    #include <mutex>
    std::mutex mtx1,mtx2;

    //在声明lock 时,相当于向系统一次性申请多个互斥量
    std::scoped_lock guard(mtx1,mtx2);
  • 当线程已经进入临界区后,尽量避免调用其他用户提供的代码

    因为其他用户可能在底层使用了锁,可能会造成死锁

  • 在多线程环境下,访问有回路的数据结构,如图、双向列表等要规定访问顺序。

    1
    2
    3
    4
    5
    6
    7
    //一个可能会造成死锁的例子
    //假设有一个列表 A-B-C

    ////thread1 执行功能从 A 结点删除 B
    ////thread2 执行功能从 B 结点删除 A

    ////规定互斥量必须从 A 开始按照顺序获取,才不会造成死锁

使用层级互斥锁来避免死锁

​ 层次锁:高层次的锁将无法锁住低层次的锁,从而可以按照层级来设计代码,防止层级之间的死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
hierarchical_mutex high_level_mutex(10000); // 1
hierarchical_mutex low_level_mutex(5000); // 2
hierarchical_mutex other_mutex(6000); // 3

int do_low_level_stuff();

int low_level_func(){
std::lock_guard<hierarchical_mutex> lk(low_level_mutex); // 4
return do_low_level_stuff();
}

void high_level_stuff(int some_param);

void high_level_func(){
std::lock_guard<hierarchical_mutex> lk(high_level_mutex); // 6

high_level_stuff(low_level_func()); // 5
}

void thread_a(){
high_level_func();
}

void do_other_stuff();

void other_stuff(){

high_level_func();

do_other_stuff();

}

void thread_b() {
std::lock_guard<hierarchical_mutex> lk(other_mutex); // 9

other_stuff();
}

std::unique_lock()

这是一个“延边锁”,通常运用在提前声明互斥量,但不直接加锁的情况下(如一个循环中)。std::unique_lock 开销较大

将锁声明在外部作用域中,在内部作用域需要不停地进行加锁和解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <mutex>
std::mutex mtx;

std::unique_lock<std::mutex> lk(mtx,std::defer_lock);//这里声明了互斥量,但是并没有真正加锁
while(true){
std::lock(lk); //在这里才真正被加锁
doSomething();
}

///////////////std::unique_lock 也可以被传递//////////////////

std::unique_lock<std::mutex> getLock(){
std::unique_lock<std::mutex> lk(mtx,std::adopt_lock);
////这里锁住一些变量
return lk;
}


锁的粒度与线程阻塞

​ 锁的粒度是一个摆手术语,用来描述通过一个锁保护的数据量的大小。

  • 若锁的粒度过大,可能会造成线程的竞争激烈,许多线程因为无法进入临界区而被阻塞,从而大大降低系统的并发性。

  • 但过小粒度的锁,会造成锁本身占用系统的资源过大,而且也会造成程序的设计维护问题。

  • 只有当一个程序准备好所有资源后,再去请求一个锁,可以明显地降低线程的阻塞

资源初始化——std::call_once

​ 资源初始化是一个看似简单,但其实十分复杂的问题,使用std::call_once可以显著降低多线程中资源初始化的风险。

1
2
3
4
5
6
7
8
std::once_flag resource_flag;

void initCamera();

void createCamera(){
//注意,这里的使用和 std::thread 类似,函数不能加(),如果是类内函数需要标注
std::call_once(resource_flag,initCamera);
}

避免使用双重锁

并发同步操作

​ 期望值解决的是线程之间应当如何解决通信问题。使用共享内存区域进行线程通信的方法虽然简单,但是在无形之中阻塞了程序,造成工作效率的下降;并且,使用std::thread是无法进行函数的右值返回的,因而需要条件变量期望值来解决线程之间的简单通信问题。

条件变量

​ STL 中条件变量主要有两种:std::condition_variable以及std::condition_variable_any。后者可以接受任何锁类型,但是开销较大,一般只采用前一种类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::mutex mtx;
std::condition_variable cond

void someFunction(){
std::lock_guard<std::mutex> lk(mtx);
/////////一系列功能///////////
cond.notify_one();
cond.notify_all();
}

/////////等待的地方要和通知的地方实用相同的锁
int main(){
std::unique_lock<std::mutex> lk(mtx); //注意,等待这里建议使用 unique_lock,因为解锁更加方便
cond.wait(lk,[](){ // 如果不满足后续条件,那么将会解锁 lk
return true;
}) //只有在接收了通知并且 lambda 表达式返回 true 时才能进入临界区
}

期望值std::future

​ 期望值用于仅仅等待一次的情况;使用std::condition_varivable时,每次尝试进入临界区都会进行等待,这对于重复调用的函数会造成较差的性能。

​ 期望值分为唯一期望值std::future<> 以及std::shared_future<>。后者可以被多个线程使用,前者只能被使用一次。两者都是只能被移动而不能够被拷贝

std::async

1
2
3
4
5
6
7
8
9
10
#include <future>

int someFunction();

//这里函数名同样也是不能带括号,其他规则同 thread
std::future<int> future1 = std::async(someFuncion,std::launch::async); //函数必须在该线程上运行
std::future<int> future2 = std::async(someFuncion,std::launch::defered); //需要结果时才会运行
std::future<int> future3 = std::async(someFuncion,std::launch::defered|std::launch::async); //由系统选择

int result = future2.get(); //使用 get()函数来调取结果,future2 在这里才会被运行

std::packaged_task<>

std::packaged_task<函数类型(函数参数类型)>这是用来打包一个函数任务的,通常用来在主线程将任务打包并将结果期望绑定,再将运行放入子线程中。

当打包的任务出现异常时,该异常会被自动传播到主线程去,需要在主线程解决

1
2
3
4
5
6
7
8
9
10
11
////packaged_task仅仅是将一个可执行任务打包,而并没有执行这个任务
////future 相当于一个指针,指向一个 packaged_task 的返回值。
////当使用 future 的时候,线程会等待返回结果(如果没有运行就会一直等待)
std::packaged_task<int(int)> tsk(test);
std::future<int> fut = tsk.get_future();

////使用 std::move将 packaged_task 放入一个线程中
std::thread th(std::move(tsk), 1);
int val = fut.get();
std::cout << "Main thread gets value " << val << std::endl;
th.join();

std::promise

std::promise允许用户自由地在子线程和主线程之间合适的位置来处理异常。可以实现子线程不抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <future>

std::promise<int> promise;

//假设这是一个将要单独在一个线程中运行的函数
int someFunction(){
try{
promise.set_value(1);// 正常运行,将结果送入期望中
}catch(){
promise.set_exception(std::current_exception());//promise 会直接在子线程中对异常进行处理,而非送回到主线程去
}
}

共享期望 std::shared_future

​ 共享期望允许多个线程共同获取一个结果,如何初始化一个共享期望:

1
2
3
4
5
6
////////////////直接绑定//////////////////
std::packaged_task<int(int)> tsk(test);
std::shared_future<int> fut = tsk.get_future();

///////////////绑定已有期望///////////////
std::shared_future<int> fut = future.share();

限定等待时间

​ 除了按照条件等待,线程之间还可以设置等待时间,用于防止过长的等待而造成的线程阻塞。

C++中的时钟

​ C++的时钟库是<chrono>,该库可以调用系统时间(是一个不稳定的时钟:稳定时钟的节拍均匀且不可调整;系统时钟通常可以调整节拍)。

​ C++时钟量,两个时间点相减表示一个时间段;时间点加减时间段等于时间点,会发生隐式的类型转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//指定时钟节拍:
std::ratio<1,25> //代表一秒钟有 25 个节拍
std::ratio<5,2> //代表五秒钟有两个节拍

//获取系统时钟当前的时间
auto now = std::chrono::system_clock::now()

//如何表示一段时延,第一个参数是数据类型,第二个参数是单位
std::chrono::duration<,>
std::chrono::duration<int,std::ratio<60,1>>
std::chrono::duration<long long, std::milli>(100)

//使用 std::chrono::时间量来表示时间段
std::chrono::nanoseconds(15); //表示15纳秒

//时间变量之间的相互转化
std::chrono::milliseconds ms(54802);
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms);

std::wait_for

std::wait_for 用来限制等待线程期望值的时间,但实际等待的时间一般会比指定的时间要长,因为等待期间处理机可能会将线程调出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::future<int> future1 = std::async(someFuncion,std::launch::async);	
//////////////////////////限定线程只等待期望值35ms/////////////////////////////////
if(future.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready)
future.get();

//调用 wait_for 可能会返回的结果:
std::future_status::ready //代表等待的线程已经完成,期望值可以被 get
std::future_status::deferred //代表等待的线程被阻滞(没有在运行)
std::future_status::timeout //代表等待的线程运行超时了


//////////对 condition_variable 使用 wait_for()/////////////
std::condition_variable cond;
std::unique_lock<std::mutex> lk(mtx);
cond.wait(lk,std::chrono::milliseconds(35));

std::wait_until

​ 相比于std::wait_forstd::wait_until等待的时间由绝对时间决定;因此搭配时间点加时间段,可以实现等待准确的指定时间。

1
2
3
4
5
std::future<int> future1 = std::async(someFuncion,std::launch::async);	
//////////////////////////限定线程只等待期望值35ms/////////////////////////////////
if(future.wait_for(std::chrono::system_clock::now() + std::chrono::milliseconds(35)
== std::future_status::ready)
future.get();

互斥时间锁

​ 互斥时间锁是一种与时间相关联的互斥信号量,其在尝试进入临界区时,会限定一段时间,当限定的时间结束后,仍未进入临界区,将会放弃进入临界区的行为。

1
2
3
4
5
std::timed_mutex	tmtx;
std::recursive_timed_mutex rtmtx;
//尝试在 35ms 的时间内获进入临界区,返回返回值为 bool 类型
tmtx.try_lock_for(std::chrono::milliseconds(35));
rtmtx.try_lock_for(std::chrono::milliseconds(35));

std::experimental 额外功能

1
2
3
std::experimental::when_all	//用来收集多个线程的期望

std::experimental::when_any //用来收集多个线程中任意一个的期望

原子操作

​ 原子操作是指不可被分割的操作;即对于处理机来说,原子操作只有两个状态:未开始、已完成。

​ 原子操作经常被用于指定线程访问的顺序从而实现无锁编程,相较于互斥信号量,占用的系统资源更少;但是相应地编程难度也大大提升。

原子操作的用法

  • 作为一个通信的标识符,在线程间传递完成状态等信息。
  • 作为一个模板,在数据结构修改时作为间接容器来存储要 I/O 的数据。当获得原子变量的权限后,才能进行修改

C++中的原子类型

​ C++的标准原子类型定义在头文件<atomic>中,这些所有的操作都是原子的。但其中的大部分都是使用互斥锁来模拟的(性能比用户使用互斥锁要好很多),只有 std::atomic_flag 是保证无锁的。

C++中的标准原子类型不能进行拷贝和赋值,没有相关的操作符,但可以隐式转化成对应的内置类型。

​ C++中的原子类型可以分为 std::atomic_flag 和 std::atomic<>。前者保证无锁,后者通常是有锁的。

原子类型 相关特化类
atomic_bool std::atomic
atomic_char std::atomic
atomic_schar std::atomic
atomic_uchar std::atomic
atomic_int std::atomic
atomic_uint std::atomic
atomic_short std::atomic
atomic_ushort std::atomic
atomic_long std::atomic
atomic_ulong std::atomic
atomic_llong std::atomic
atomic_ullong std::atomic
atomic_char16_t std::atomic
atomic_char32_t std::atomic
atomic_wchar_t std::atomic

std::atomic_flag

std::atomic_flag代表了一个bool 标志,可以在“设置”“清除”两个状态之间转换。其初始化的标志位是“清除”状态。它具有两个个成员函数clear()test_and_set(),前者是一个存储操作,后者是一个“读-改-写”操作。test_and_set()操作意味着,若读取出的值和要设置的值相同则无操作,若不同则设置为新值。

std::atomic_flag对象不能被拷贝和赋值,因为这两种操作会破坏其原子性。经常被用于实现自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//使用 std::atomic_flag 实现一个自旋锁
class SpinLockMutex{
private:
std::atomic_flag flag;
public:
SpinLockMutex() : flag(ATOMIC_FLAG_INIT){} //std::atomic_flag的唯一构造方式

void lock(){
while(flag.test_and_set(std::memory_order_acquire)); //尝试对std::atomic_flag上锁
}

void unlock(){
flag.clear(std::memory_order_release); //清除锁
}
};

std::atomic<>

std::atomic<bool>能够自由选择初始化的值,并且可以使用非原子的 bool 类型进行构造,但其具有抽象代价,不保证无锁。 std::atomic<bool>具有三个基本的成员函数:存储操作store()、加载操作load()、“读-改-写”操作exchange();还具备两个特殊的“比较/交换”操作:compare_exchange_weak()compare_exchange_strong()

  • “比较/交换”操作:是原子类型编程的基石,它比较原子变量的当前值和一个期望值。当两值相等时,存储所提供的值;当两值不等时,期望值会被更新为原子变量中的值。compare_exchange_weak()不保证存储操作成功、compare_exchange_strong()保证存储操作成功。

std::atomic<T*>继承了std::atomic<bool>的全部操作,只是返回类型不同;此外还具有“读-改-写”操作fetch_add()fetch_sub(),用于在存储地址上进行原子加法和原子减法

  • 除了基本类型外,用户可以用 class来自定义一个原子变量,前提是必须有拷贝赋值运算符,即不能有虚函数和基类,并且必须使用编译器默认创建的拷贝赋值操作

​ 原子类型的操作函数具有两种调用方式,一种是作为原子变量的成员函数使用,另外一种是直接采用非成员函数的调用方式。非成员函数的调用方式通常是在成员函数名前加上“atomic_”前缀。

1
2
3
4
5
6
7
//成员函数版调用
std::atomic<int> aint;
aint.load();

//非成员函数版本调用
std::atomic_load(&atomic_var,new_value);
std::atomic_load_explicit(&atomic_var,new_value,std::memory_order_release);//需要指定内存序

操作排序

​ 利用原子操作可以实现对不同操作的排序,从而实现线程之间的有序,这是依赖于原子操作的同步发生与先行发生的。

  • 同步发生:同步发生只能在原子类型之间进行,即同一原子变量的原子操作具有同步传递性,改变后的状态会立刻传递给任何需要用到该原子变量的线程中去。

  • 先行发生:先行发生发生在非原子操作上,且依赖于同步发生。即同步发生的原子操作可以视为一个时间点,在此时间点基础上,若一个操作先于该原子操作,则具有先行性。(A-原子解锁,原子锁-B,那么 A 会先于 B 发生)。

    C++一共具有三种内存模型和六种内存操作,其对应关系如下:

    • 排序一致性序列,对应memory_order_seq_cst

    • 获取-释放序列,对应 memory_order_consume,memory_order_acquire,memory_order_release,memory_order_acq_rel

    • 松散序列,对应 memory_order_relaxed

      对几种内存操作的详细介绍

    1
    2
    3
    4
    5
    6
    7
    8
    //一个简单的原子操作排序,①永远在②前面
    std::atomic<int> count;
    ///////////thread1/////////////
    int signal; //操作顺序①
    count.store(signal,std::memory::release);
    ///////////thread2/////////////
    int received_signal = count.load(std::memory::acquire);
    std::cout << received_singal; //操作顺序②

栅栏

​ 栅栏对内存序列进行约束,使其无法对任何数据进行修改。栅栏属于全局操作,执行栅栏操作可以影响到线程中的其他原子操作。利用栅栏操作可以在不依赖原子变量的情况下实现排序

栅栏既可以和栅栏进行同步,也可以和原子操作进行同步

​ 栅栏一共分为六种,分别具有不同的效果:

内存序列类型 栅栏类型 栅栏效果
memory_order_relaxed 没有任何效果 无效果
memory_order_acquire/memory_order_consume acquire fence 阻止loadstore重排和storestore重排
memory_order_release release fence 阻止loadload重排和loadstore重排
memory_order_acq_rel full fence 防止loadload、loadstore、storestore重排
memory_order_seq_cst 保证有单独全序的full fence 防止loadload、loadstore、storestore重排
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//利用栅栏进行同步的实例
std::string computation(int);
void print( std::string );

std::atomic<int> arr[3] = { -1, -1, -1 }; //use atomic data to communicate
std::string data[1000] //non-atomic data

// Thread A, compute 3 values
void ThreadA( int v0, int v1, int v2 )
{
//assert( 0 <= v0, v1, v2 < 1000 );
data[v0] = computation(v0);
data[v1] = computation(v1);
data[v2] = computation(v2);
std::atomic_thread_fence(std::memory_order_release);
std::atomic_store_explicit(&arr[0], v0, std::memory_order_relaxed);
std::atomic_store_explicit(&arr[1], v1, std::memory_order_relaxed);
std::atomic_store_explicit(&arr[2], v2, std::memory_order_relaxed);
}

// Thread B, prints between 0 and 3 values already computed.
void ThreadB()
{
int v0 = std::atomic_load_explicit(&arr[0], std::memory_order_relaxed);
int v1 = std::atomic_load_explicit(&arr[1], std::memory_order_relaxed);
int v2 = std::atomic_load_explicit(&arr[2], std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_acquire);
// v0, v1, v2 might turn out to be -1, some or all of them.
// otherwise it is safe to read the non-atomic data because of the fences:
if( v0 != -1 ) { print( data[v0] ); }
if( v1 != -1 ) { print( data[v1] ); }
if( v2 != -1 ) { print( data[v2] ); }
}

基于锁的并发数据结构

​ 被用于共享的数据结构需要进行专门设计,否则可能会造成死锁、线程阻塞等一系列问题。以下是并发数据结构的设计目标:

  • 确保无线程能看到修改够数据结构的“不变量”时的状态,即数据修改对任何线程是原子的

  • 小心会引起条件竞争的接口,提供完整操作的函数,而非操作步骤

  • 注意数据结构的行为是否会产生异常,从而确保“不变量”的状态,即实现“异常-安全”

  • 降低死锁概率,限制锁的范围,降低锁的粒度,避免嵌套锁的存在。

无锁并发数据结构

​ 无锁并发结构基于原子操作,通常是无阻塞的自旋的;即处于等待状态下的线程会处于一个空循环load()状态,而不会进入阻塞状态。通常情况下,无锁结构的性能要优于有锁结构。

  • 并发最大化:无锁结构可以允许多个不同功能的线程并发地访问数据结构,但不允许具有相同功能的线程同时访问。
  • 增加鲁棒性:有锁结构中,若一个线程在获取锁状态下被终止,共享数据结构将被破坏。
  • 访问线程无限制:通过操作顺序的约束,理论上无访问线程的限制。
  • 无死锁问题:因为没有使用互斥量,故不会发生死锁。
  • 不正确使用可能会降低整体性能:乒乓缓存等效应存在,对设计要求很高。不当设计会降低性能。

需要注意的一些问题

  • 尽可能使用std::memory_order_seq_cst,该内存序列是总序的,实现比较简单。只有在该内存序列上完成了所要设计的相关功能后,才考虑使用其他序列进行性能优化
  • 无锁并发数据结构不能在任何一个线程中删除某一节点,因此使用智能指针来进行管理
  • “ABA 问题”:数据结构中的值被交换出后,进行了一些修改导致不合法(尤其是),再被交换回数据结构会造成一系列问题。
  • 识别忙等待和帮助其他线程:当线程处于等待过程中,可以让其帮助正在工作的线程完成工作。

并发代码设计

​ 本章主要讲如何从系统的角度来设计并发代码,即如何分配工作使得代码的效率更高。

​ “可扩展”代码:随着系统的核数增加,代码的性能增加,理想情况下是线性增长的。

数据划分

​ 将一个大任务分为几部分,各自单独运行。数据分批次,算法是整体的。常常用来提升任务运行速度

  • 递归划分:每个任务线程都有开辟子线程的能力,会短时间内产生大量的线程,每个线程负责完成一小部分工作。

任务划分

​ 将任务看成几个部分,每个部分分别用一个线程独立完成工作。

当通过任务类型进行划分时,不应该让各个线程处于完全隔离的状态,而是采用流水线的方式进行并发

避免共享内存

​ 在多线程环境下使用共享内存可能会造成许多性能问题,尽量避免线程访问外部数据尽量使用指针访问外部数据

  • 乒乓缓存:因为每个线程的内部变量是独立的,若有多个线程同时访问一个共享数据结构,会导致该数据结构不停地在每个处理机缓存中拷贝(每个线程都要保留最新状态),大大降低运行效率。

    解决方法:使用互斥量对多行缓存进行保护,提高锁的粒度

  • 伪共享:因为处理器缓存是以行来存储的,若一个内存行的数据被多个线程所使用(数据粒度太小),会大大降低数据访问的速度(因为一个内存行一次只能被一个线程访问)。

    解决方法:内存对齐、数据紧凑

  • 休眠造成缓存转移:当一个使用缓存的线程休眠后,可能被调至其他处理机运行,从而导致缓存的复制。

    解决方法:已经使用缓存区的线程,防止其休眠或被调度

异常-安全

​ 多线程环境下,要求在发生异常的情况下不抛出异常,保持共享数据结构的安全性,显然会带来额外的开销。每个线程必须在退出前处理好所有异常,保证异常不被抛出和传播(特殊情况,如果一个工作线程(不与主线程外交互),可以在其主线程上统一处理异常)

​ 通常,可能出现异常的情况有:

  • 分配资源:创建对象,创建指针,开辟内存区域等。
  • 内存异常:内存溢出,指针越界等。
  • 指针异常:更改指针绑定时失败。
  • 函数调用:函数调用时,底层可能会抛出一些异常

高级线程管理

​ 仅仅采用创建线程对象的方式是难以在线程的整个生命周期对其进行管理的,如果需要对线程进行更加灵活和精细的管理,常常需要引入“线程池”和“中断线程”的方式来进行管理。

线程池

线程池集中管理相似度较高的一组任务。这些任务被提交到线程池中的任务将并发执行,提交的任务将会挂在任务队列 上。队列中的每一个任务都会被池中的工作线程所获取,当任务执行完成后,再回到线程池中获取下一个任务。

线程池中的线程并不是直接运行任务函数,而是运行一个从任务队列中调用任务并加以运行的函数,这是为了方便线程池的初始化。

​ 高级线程池通常具备以下几个模块:

  • 共享任务队列:用户将需要被运行的任务投入到共享任务队列中等待被调用。

  • 期望队列:期望队列与共享任务队列中的任务绑定,用户从此处获取运行结果。

  • 线程池:集中构造、析构和管理的一组线程,在线程空闲时使用 yield 或者尝试任务窃取。

  • 本地任务队列:线程池中的线程将共享任务队列拷贝到本地,避免因任务量过小导致乒乓缓存。

  • 任务窃取:当线程无任务可以运行时,可以尝试从其他线程的本地任务队列中窃取。

中断线程

中断线程是对std::thread的一层封装,允许用户主动地去中断线程,被中断的线程会完成一些必要工作后退出。

并行算法

​ C++17版本引入了一些并行算法(保证线程安全),建议直接使用。这些并行算法是对之前标准库的重载,用法与函数名相同,只是引入了一个额外的策略参数(第一个),策略参数一共有三种,被定义在<execution>头文件中:

  • std::execution::seq:要求函数的所有操作都在当前线程下单线程完成,不并行且不并发
  • std::execution::par:在多个线程中执行,并且线程各自具有自己的顺序任务。即并行但不并发。
  • std::execution::par_unseq:算法在多个线程中执行,并且线程可以具有并发的多个任务。即并行和并发。

​ 执行并发算法时,如果算法运行触发了异常机制,且该异常未在调用算法的线程中被捕获,那么算法会自动调用std::terminate来中断。

​ 标准库中的大多数被执行策略重载的算法都在 <algorithm><numeric>头文件中,目前已经包括:

​ all_of,any_of, none_of,for_each,for_each_n,find,find_if,find_end,find_first_of,adjacent_find, count,count_if,mismatch,equal,search,search_n,copy,copy_n,copy_if,move, swap_ranges,transform,replace,replace_if,replace_copy,replace_copy_if,fill, fill_n,generate,generate_n,remove,remove_if,remove_copy,remove_copy_if,unique, unique_copy,reverse,reverse_copy,rotate,rotate_copy,is_partitioned,partition, stable_partition,partition_copy,sort,stable_sort,partial_sort,partial_sort_copy, is_sorted,is_sorted_until,nth_element,merge,inplace_merge,includes,set_union, set_intersection,set_difference,set_symmetric_difference,is_heap,is_heap_until, min_element,max_element,minmax_element,lexicographical_compare,reduce, transform_reduce,exclusive_scan,inclusive_scan,transform_exclusive_scan, transform_inclusive_scan和adjacent_difference