C++的MemoryOrder以及Fence | Blurred code

C++的MemoryOrder以及Fence

2022/08/16

LastMod:2022/08/21

Categories: cpp

Memory Order

借Wiki的定义,

Memory ordering describes the order of accesses to computer memory by a CPU. The term can refer either to the memory ordering generated by the compiler during compile time, or to the memory ordering generated by a CPU during runtime.

内存顺序可以指编译器乱序或者运行期乱序。

编译期乱序

编译器在编译的时候会尝试重排指令以执行优化。 比如gcc在这段的代码:https://godbolt.org/z/enPGxeMjf

int g_a = 0;
int g_b = 0;
void foo() {
    g_a = g_b + 1;
    g_b = 1;
}
void bar() {
    while (g_b == 0) continue;
    assert(g_a == 1);// maybe fail. 
}

以上代码可能会被优化成如下汇编

foo():
        mov     eax, DWORD PTR g_b[rip] // int tmp = g_b;
        mov     DWORD PTR g_b[rip], 1    // g_b = 1;
        add     eax, 1                   // tmp += 1
        // g_a = tmp;
        //(存在race condition),当g_b被赋值时,g_a可能还等于0
        mov     DWORD PTR g_a[rip], eax  
        ret

手动加上编译期fence后,cppreference指出不同于std::atomic_thread_fencestd::atomic_signal_fence不会生成运行期fence指令

void foo() {
g_a = g_b + 1;
std::atomic_signal_fence(std::memory_order_seq_cst);
g_b = 1;
}
//  编译出的汇编
foo():
        mov     eax, DWORD PTR g_b[rip] //int tmp = g_b;
        add     eax, 1 // tmp += 1;
        mov     DWORD PTR g_a[rip], eax //g_a = tmp;
        mov     DWORD PTR g_b[rip], 1 //g_b = 1;
        ret

以上race condition是从汇编推断的,实际运行了半个小时我也没触发。。可能因为指令太短了吧 再找一个必定触发的编期间重排的例子,大概来自Is it legal for a C++ optimizer to reorder calls to clock()?

例子的链接可以见: https://godbolt.org/z/9re9YTb9z

MSVC2015以后的cl都可以复现,在/O2

auto start = std::chrono::steady_clock::now();
int result = LongComputation(1000);
auto stop = std::chrono::steady_clock::now();

会被优化成

auto start = std::chrono::steady_clock::now();
auto stop = std::chrono::steady_clock::now();
int result = LongComputation(1000);

并且这个问题对于MSVC还更复杂一点。 如果LongComputation()是一个编译期能看见其完全实现,且能确定其不会有副作用的情况下,加MemoryBarrier不会阻挡MSVC对其进行重排序。

加入MemoryBarrier()的例子可以见: https://godbolt.org/z/e5zP71vn1

int fib(int x)
{
    if (x < 2) return 1;
    int res = fib(x - 1) + fib(x - 2);
    return res;
}

int main()
{
    auto start = std::chrono::steady_clock::now();
    MemoryBarrier(); // 即使加了屏障,编译器仍然会进行重排
    std::atomic_signal_fence(std::memory_order_seq_cst);
    int result = fib(42);
    auto stop = std::chrono::steady_clock::now();
			...
}

编译出来的汇编为

        call    _Query_perf_frequency
        mov     rbp, rax
        call    _Query_perf_counter
        mov     rdi, rax
        lock or DWORD PTR [rsp], 0      ; MemoryBarrier()生成了运行期fence,然而没能阻挡编译期重排
        npad    1
        call    _Query_perf_frequency
        mov     r14, rax
        call    _Query_perf_counter		; 连续执行两次Now后再执行fib
        mov     ecx, 41                             ; 00000029H
        mov     rsi, rax
        call    int fib(int)                            ; fib
        mov     ecx, 40                             ; 00000028H
        mov     ebx, eax
        call    int fib(int)                            ; fib

要完全阻挡MSVC的重排序可以考虑写作,对volatile变量的读写能阻挡编译器做编译期重排序,但是不是运行期(所以volatile变量不能在多线程里做同步,MSVC的扩展实现volatile带有acl-rel语义)

int tmp = fib(x - 1) + fib(x - 2);
int res = *(volatile int*)&(tmp);

运行期乱序

在现代CPU中,CPU可以乱序执行指令以充分发挥其计算能力,这个过程对编程者是透明的。 然而在多线程程序下,不同线程之间的变量读写往往具有一定的假定顺序,而CPU的乱序执行会导致错误。

Intel的资料里 Intel_Reordering_318147.pdf 第7页描述了4种内存读写情况(内存读写只有Load/Store两种操作,其组合只有4种):

X86是强一致性内存模型,其只允许一种情况下出现运行期重排: Store-Load。

edit-54f6b85f3d154f5ca39de3cf90ef7e0e-2022-08-16-00-28-08

一个用于观察Store-Load的程序可以见饶萌大佬的文章(https://zhuanlan.zhihu.com/p/41872203),以下例子修改自其文章

#include <atomic>
#include <bits/stdc++.h>
using namespace std;

inline void nocpufence(std::atomic<int>& x, std::atomic<int>& y, std::atomic<int>& r) {
    x.store(1,std::memory_order_release);
    r.store(y.load(std::memory_order_acquire),std::memory_order_release);
}

inline void cpufence(std::atomic<int>& x, std::atomic<int>& y, std::atomic<int>& r) {
    x.store(1,std::memory_order_release);
    std::atomic_thread_fence(std::memory_order_seq_cst);
    r.store(y.load(std::memory_order_acquire),std::memory_order_release);
}



volatile int g_cnt = 0;
void threadfun(volatile int& loop_cnt, std::atomic<int>& x, std::atomic<int>& y, std::atomic<int>& r) {
    while(true) {
        while(g_cnt == loop_cnt) ;
        asm volatile("" ::: "memory");

        cpufence(x,y,r);
        // nocpufence(x, y, r);

        asm volatile("" ::: "memory");
        loop_cnt++;
    }
}

int main() {
    alignas(64) volatile int cnt1 = 0;
    alignas(64) volatile int cnt2 = 0;
    alignas(64) std::atomic<int> x{0};
    alignas(64) std::atomic<int> y{0};
    alignas(64) std::atomic<int> r1{0};
    alignas(64) std::atomic<int> r2{0};
    thread thr1(threadfun, ref(cnt1), ref(x), ref(y), ref(r1));
    thread thr2(threadfun, ref(cnt2), ref(y), ref(x), ref(r2));

    int detected = 0;
    while(true) {
        // x = y = 0;
        x.store(0,memory_order_release);
        y.store(0,memory_order_release);
        asm volatile("" ::: "memory");
        g_cnt++;
        while(cnt1 != g_cnt || cnt2 != g_cnt)
            ;

        asm volatile("" ::: "memory");
        if(r1 == 0 && r2 == 0) {
            detected++;
            cout << "bad, g_cnt: " << g_cnt << " detected: " << detected << endl;
        }
    }
    return 0;
}

Seq_Cst

为了控制运行期的重排,需要编写对应平台的汇编指令以实现内存屏障阻止重排,其中x86下一种可行的汇编是mfence指令。 为了跨平台,C++11抽象出了std::memory_order概念,其主要包含relaxed,release-consume/acquireseq_cst几种概念。

enum memory_order {
    memory_order_relaxed,
    memory_order_consume,
    memory_order_acquire,
    memory_order_release,
    memory_order_acq_rel,
    memory_order_seq_cst
};

其中memory_order_acq_rel,memory_order_seq_cst一般用于原子变量。 比如以下场景: (https://godbolt.org/z/zbTKaWjYv)

std::atomic<uint32_t> g_counter{0};
void foo()
{
    g_counter.fetch_add(1, std::memory_order_acq_rel); 
}

在RISC-V下的汇编为,可以明显看到fence指令,

 lui     a5,%hi(g_counter)
        li      a4,1
        addi    a5,a5,%lo(g_counter)
        fence iorw,ow; amoadd.w.aq zero,a4,0(a5)
        lui     a4,%hi(j)
        lw      a5,%lo(j)(a4)
        addiw   a5,a5,1
        sw      a5,%lo(j)(a4)

其在x86下为

foo():
        lock add        DWORD PTR g_counter[rip], 1
        add     DWORD PTR j[rip], 1

其中lock add会锁总线,所以也会起到内存屏障的作用。

当把其内存序改成relaxed以后,RISCV的gcc放弃了fence指令

void foo2()
{
    g_counter.fetch_add(1, std::memory_order_relaxed); 
    j++;
}

foo2():
        lui     a5,%hi(g_counter) //把g_counter的高20位放到a5寄存器
        li      a4,1            // a4寄存器放1
        addi    a5,a5,%lo(g_counter) // addi只能加12位,把g_counter低12位补上
        amoadd.w zero,a4,0(a5) // fence 指令不见了
        lui     a4,%hi(j)
        lw      a5,%lo(j)(a4)
        addiw   a5,a5,1
        sw      a5,%lo(j)(a4)
        ret

Release-Acquire

此外,ReleaseAcquire需要成对使用,以在不同线程之间同步变量以及构成Memory-Order。Release-Consume语义似乎没有编译器实现,忽略吧。

对x86而言只有seq_cst必须加fence,其他的除了relaxed以外主要起提示编译器禁止重排的作用。

C++中不同MemoryOrder到对应架构的Fence指令可以见(https://www.cl.cam.ac.uk/~pes20/cpp/cpp0xmappings.html)

可以看到只有在Store Seq-Cst下必须有mfence/XCHG/Lock相关的指令。 edit-54f6b85f3d154f5ca39de3cf90ef7e0e-2022-08-16-01-14-04

Release-Acquire语义中,同一个原子变量在一个线程A里通过Store(Release)操作进行store,而在另一个线程B通过Load(Acquire)操作读取,构成Release-Acquire屏障。

Release-Acquire屏障中,任何在线程A Store(Release)之前的Store操作,无论是原子的还是非原子的,都在线程B Load(Acquire)操作后确定可见。

对于X86说这是默认实现的,因此从汇编中看不出什么东西,然而在弱内存序的RISC-V/ARM中,会生产对应的屏障指令以构建符合期望的内存顺序。 一个例子可以见: https://godbolt.org/z/GW3bos5WW

std::atomic<std::string*> ptr;
int data;
void producer()
{
    std::string* p  = new std::string("Hello");
    data = 42;
    ptr.store(p, std::memory_order_release);
}
 
void consumer()
{
    std::string* p2;
    while (!(p2 = ptr.load(std::memory_order_acquire)))
        ;
    assert(*p2 == "Hello"); // never fires
    assert(data == 42); // never fires
}

可以看见在RV的汇编中

        sw      a4,%lo(data)(a5)
        lui     a5,%hi(ptr[abi:cxx11])
        addi    a5,a5,%lo(ptr[abi:cxx11])
        fence iorw,ow; amoswap.d zero,a0,0(a5) //屏障
        ld      ra,8(sp)
        addi    sp,sp,16
        jr      ra

以及在ARM的汇编中

        movs    r1, #42
        str     r5, [r3, #4]
        str     r1, [r2, #4]
        dmb     ish // Data Memory Barrier

在生成了对应的运行期屏障指令后,我们通过原子变量ptr的读写操作建立了两个线程之间的同步关系,在consumer()线程中当p2读取成功时候,我们能够断定data变量已经被写入。

此外,C++的原子变量的读写都会插入编译期屏障,编译器不会试图重排原子变量前后的读写操作(待实验)。

Lockfree SPSC-Queue

我们可以用Acquire-Release语义构建无锁固定大小的SPSC队列。

分析MengRao大佬的代码: (https://github.com/MengRao/SPSC_Queue.git)

SPSCQueue为固定大小的单入单出队列,消息大小以128字节对齐。

固定大小的SPSCQueue可以被实现为RingBuffer的形式,由read_idxwrite_idx在环上一直走。其中push/front只关心write_idx走到哪里,因此可以构成一个同步。pop/alloc操作与read_idx有关,构成另外一对同步。

其用法可以在multhread_q.cc中看到。

当写入消息时,其代码约为

struct Msg
{
  int val_len;
  uint64_t ts;
  long val[4];
};


  Msg* msg = q->alloc();
  msg->val = ..
  q->push();

读取消息时为

 Msg* msg = q->front();
 ...
 q->pop();

其中包含两个Acquire-Release同步关系,一个是pushfront之间

void push() {
    ((std::atomic<uint32_t>*)&write_idx)->store(write_idx + 1, std::memory_order_release);
}
T* front() {
    if (read_idx == ((std::atomic<uint32_t>*)&write_idx)->load(std::memory_order_acquire)) {
        return nullptr;
    }
    return &data[read_idx % CNT];
}

在消息写入线程中,调用push会更新write_idx的数值,并且获得release语义,而在读取线程中会调用load + acquire语义,构成读写屏障。 在q->push()之前,也就是填充msg结构体的相关store操作,将会在读取线程load以后立刻可见。

另外一对同步发生在allocpop之间。

T* alloc() {
    if (write_idx - read_idx_cach == CNT) {
    read_idx_cach = ((std::atomic<uint32_t>*)&read_idx)->load(std::memory_order_consume);
    if (__builtin_expect(write_idx - read_idx_cach == CNT, 0)) { // no enough space
        return nullptr;
    }
    }
    return &data[write_idx % CNT];
}
void pop() {
    ((std::atomic<uint32_t>*)&read_idx)->store(read_idx + 1, std::memory_order_release);
}

其中alloc是判断队列是否还有空的slot,而pop是移动read_idx而让出新的slot。 在这里作者做了点优化,即缓存了read_idx的值。

popallocstore-load构成release-acquire语义(没有编译器实现了consume语义,因此consume就等于acquire)。在写的线程read_idx->load时,req_acq语义确保其能看到最新的read_idx的值。