共享狀態並行

訊息傳遞是個很好的並行處理方式,但它不是唯一。回憶一下之前 Go 語言技術文件中的口號:「別透過共享記憶體來溝通。」

透過共享記憶體來溝通會是什麼樣子呢?除此之外,為何訊息傳遞愛好者不喜歡這種方式並反其道而行呢?

任何程式語言的通道某方面來說類似於單一所有權,因為一旦你轉移數值給通道,你就不該使用該數值。共享記憶體並行則像多重所有權,數個執行緒可以同時存取同個記憶體位置。如同你在第十五章所見到的,智慧指標讓多重所有權成為可能,但多重所有權會增加複雜度,因為我們會需要管理這些不同的擁有者。Rust 的型別系統與所有權規則大幅地協助了正確管理這些所有權。作為範例就讓我們看看互斥鎖(mutexes),這是共享記憶體中常見的並行原始元件之一。

使用互斥鎖在同時間只允許一條執行緒存取資料

互斥鎖(Mutex)mutual exclusion 的縮寫,顧名思義互斥鎖在任意時刻只允許一條執行緒可以存取一些資料。要取得互斥鎖中的資料,執行緒必須先透過獲取互斥鎖的鎖(lock)來表示它想要進行存取。鎖是互斥鎖其中一部分的資料結構,用來追蹤當前誰擁有資料的獨佔存取權。因此互斥鎖被描述為會透過鎖定系統守護(guarding) 其所持有的資料。

互斥鎖以難以使用著名,因為你必須記住兩個規則:

  • 你必須在使用資料前獲取鎖。
  • 當你用完互斥鎖守護的資料,你必須解鎖資料,所以其他的執行緒才能獲取鎖。

要用真實世界來比喻互斥鎖的話,想像在會議中有個座談會只有一支麥克風。如果有講者想要發言時,他們需要請求或示意他們想要使用麥克風。當他們取得麥克風時,他們想講多久都沒問題,直到將麥克風遞給下個要求發言的講者。如果講者講完後忘記將麥克風遞給其他人的話,就沒有人有辦法發言。如果麥克風的分享出狀況的話,座談會就無法如期進行!

互斥鎖的管理要正確處理是極為困難的,這也是為何這麼多人傾向於使用通道。然而有了 Rust 的型別系統與所有權規則,你就不會在鎖定與解鎖之間出錯了。

Mutex<T> 的 API

作為使用互斥鎖的範例,讓我們先在單執行緒使用互斥鎖,如範例 16-12 所示:

檔案名稱:src/main.rs

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

範例 16-12:基於簡便考量先用單一執行緒探討 Mutex<T> 的 API

就像許多型別一樣,我們使用關聯函式 new 建立 Mutex<T>。要取得互斥鎖內的資料,我們使用 lock 方法來獲取鎖。此呼叫會阻擋當前執行緒做任何事,直到輪到它取得鎖。

如果其他持有鎖的執行緒恐慌的話 lock 的呼叫就會失敗。在這樣的情況下,就沒有任何人可以獲得鎖,因此當我們遇到這種情況時,我們選擇 unwrap 並讓此執行緒恐慌。

在我們獲取鎖之後,我們在此例可以將回傳的數值取作 num,作為內部資料的可變引用。型別系統能確保我們在使用數值 m 之前有獲取鎖,Mutex<i32> 並不是 i32,所以我們必須取得鎖才能使用 i32 數值。我們不可能會忘記這麼做,不然型別系統不會讓我們存取內部的 i32

如同你所想像的,Mutex<T> 就是個智慧指標。更精確的來說,lock 的呼叫會回傳一個智慧指標叫做 MutexGuard,這是我們從 LockResult 呼叫 unwrap 取得的型別。MutexGuard 智慧指標有實作 Deref 特徵來指向我們的內部資料。此智慧指標也有 Drop 的實作,這會在 MutexGuard 離開作用域時自動釋放鎖,在範例 16-12 的內部作用域結尾就會執行此動作。這樣一來,我們就不會忘記釋放鎖,怕互斥鎖會阻擋其他執行緒,因為鎖會自動被釋放。

在釋放鎖之後,我們就能印出互斥鎖的數值並觀察到我們能夠變更內部的 i32 為 6。

在數個執行緒間共享 Mutex<T>

現在讓我們來透過 Mutex<T> 來在數個執行緒間分享數值。我們會建立 10 個執行緒並讓它們都會對一個計數增加 1,讓計數能從 0 加到 10。作為下個例子的範例 16-13 會出現一個編譯錯誤,我們會用此錯誤瞭解如何使用 Mutex<T> 以及 Rust 如何協助我們來正確使用它。

檔案名稱:src/main.rs

use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("結果:{}", *counter.lock().unwrap());
}

範例 16-13:十個執行緒都會對 Mutex<T> 守護的計數增加 1

我們建立個變數 counter 並在 Mutex<T> 內存有 i32,就像我們在範例 16-12 所做的一樣。接著我們透過指定的範圍建立 10 個執行緒。我們使用 thread::spawn 讓所有的執行緒都有相同的閉包,此閉包會將計數移入執行緒、呼叫 lock 以獲取 Mutex<T> 的鎖,然後將互斥鎖內的數值加 1。當有執行緒執行完它的閉包時,num 會離開作用域並釋放鎖,讓其他的執行緒可以獲取它。

在主執行緒中,我們要收集所有的執行緒。然後如同我們在範例 16-2 所做的,我們呼叫每個執行緒的 join 來確保所有執行緒都有完成。在這時候,主執行緒就能獲取鎖並印出此程式的結果。

我們曾暗示範例不會編譯過,讓我們看看是為何吧!

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: use of moved value: `counter`
  --> src/main.rs:9:36
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `std::sync::Mutex<i32>`, which does not implement the `Copy` trait
...
9  |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^ value moved into closure here, in previous iteration of loop
10 |             let mut num = counter.lock().unwrap();
   |                           ------- use occurs due to use in closure

error: aborting due to previous error

For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state`.

To learn more, run the command again with --verbose.

錯誤訊息表示 counter 數值在之前的迴圈循環中被移動了,所以 Rust 告訴我們我們無法將 counter 鎖的所有權移至數個執行緒中。讓我們用第十五章提到的多重所有權方法來修正此編譯錯誤吧。

多重執行緒中的多重所有權

在第十五章中,我們透過智慧指標 Rc<T> 來建立引用計數數值讓該資料可以擁有數個擁有者。讓我們在此也做同樣的動作來看看會發生什麼事。我們會在範例 16-14 將 Mutex<T> 封裝進 Rc<T> 並在將所有權移至執行緒前克隆 Rc<T>。現在既然我們已經知道到錯誤的原因,讓我們也改回使用 for 迴圈,並保留閉包的 move 關鍵字。

檔案名稱:src/main.rs

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("結果:{}", *counter.lock().unwrap());
}

範例 16-14:嘗試使用 Rc<T> 來允許數個執行緒擁有 Mutex<T>

再編譯一次的話我們會得到... 不同的錯誤!編譯器真的是教了我們很多事。

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
   --> src/main.rs:11:22
    |
11  |         let handle = thread::spawn(move || {
    |                      ^^^^^^^^^^^^^ `std::rc::Rc<std::sync::Mutex<i32>>` cannot be sent between threads safely
    |
    = help: within `[[email protected]/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::sync::Mutex<i32>>`
    = note: required because it appears within the type `[[email protected]/main.rs:11:36: 15:10 counter:std::rc::Rc<std::sync::Mutex<i32>>]`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state`.

To learn more, run the command again with --verbose.

哇,這個錯誤訊息的內容真多!這是我們需要注意到的部分:`Rc<Mutex<i32>>` cannot be sent between threads safely。編譯器也告訴了我們原因:the trait `Send` is not implemented for `Rc<Mutex<i32>>` 。我們會在下一個段落討論 Send,這是其中一種確保我們在執行緒中所使用的型別可以用於並行場合的特徵。

不幸的是 Rc<T> 無法安全地跨執行緒分享。當 Rc<T> 管理引用計數時,它會在每個 clone 的呼叫增加計數,並在每個克隆釋放時減少計數。但是它沒有使用任何並行原始元件來確保計數的改變不會被其他執行緒中斷。這樣的計數可能會導致微妙的程式錯誤,像是記憶體泄漏或是在數值釋放時嘗試使用其值。我們需要一個型別和 Rc<T> 一模一樣,但是其引用計數在執行緒間是安全的。

原子引用計數 Arc<T>

幸運的是 Arc<T> 正是一個類似 Rc<T> 且能安全用在並行場合的型別。字母 A 指的是原子性(atomic) 代表這是個原子性引用的計數型別。原子型別是另一種我們不會在此討論的並行原始元件,你可以查閱標準函式庫的 std::sync::atomic 技術文件以瞭解更多詳情。在此你只需要知道原子型別和原始型別類似,但它們可以安全在執行緒間共享。

你可能會好奇為何原始型別不是原子性的,以及為何標準函式庫的型別預設不使用 Arc<T> 來實作。原因是因為執行緒安全意味著效能開銷,你會希望在你真的需要時才買單。如果你只是在單一執行緒對數值做運算的話,你的程式碼就不必強制具有原子性的保障並能執行地更快。

讓我們回到我們的範例:Arc<T>Rc<T> 具有相同的 API,所以我們透過更改 use 這行、new 的呼叫以及 clone 的呼叫來修正我們程式,。範例 16-15 的程式碼最終將能夠編譯並執行:

檔案名稱:src/main.rs

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("結果:{}", *counter.lock().unwrap());
}

範例 16-15:使用 Arc<T> 封裝 Mutex<T> 來在數個執行緒間分享所有權

此程式碼會印出以下結果:

結果:10

我們辦到了!我們從 0 數到了 10,雖然看起來不是很令人印象深刻,但這的確教會了我們很多有關 Mutex<T> 與執行緒安全的知識。你也可以使用此程式結構來做更多複雜的運算,而不只是數數而已。使用此策略,你可以將運算拆成數個獨立部分,將它們分配給執行緒,然後使用 Mutex<T> 來讓每個執行緒更新該部分的結果。

RefCell<T>/Rc<T>Mutex<T>/Arc<T> 之間的相似度

你可能已經注意到 counter 是不可變的,但我們卻可以取得數值其內部的可變引用,這代表 Mutex<T> 有提供內部可變性,就像 Cell 家族一樣。我們在第十五章也以相同的方式使用 RefCell<T> 來讓我們能改變 Rc<T> 內部的數值,而在此我們使用 Mutex<T> 改變 Arc<T> 內部的內容。

另一個值得注意的細節是當你使用 Mutex<T> 時,Rust 無法避免所有種類的邏輯錯誤。 回憶一下第十五章使用 Rc<T> 時會有可能產生引用循環的風險,兩個 Rc<T> 數值可能會彼此引用,造成記憶體泄漏。同樣地,Mutex<T> 有產生死結(deadlocks) 的風險。這會發生在當有個動作需要鎖定兩個資源,而有兩個執行緒分別擁有其中一個鎖,導致它們永遠都在等待彼此。如果你對此有興趣的話,歡迎嘗試建立一個有死結的 Rust 程式,然後研究看看任何語言中避免的互斥鎖死結的策略,並嘗試實作它們在 Rust 中。標準函式庫中 Mutex<T>MutexGuard 的 API 技術文件可以提供些實用資訊。

接下來在本章結尾我們會來討論 SendSync 特徵以及我們如何在自訂型別中使用它們。