Channels

Channel adalah cara termudah untuk menggunakan banyak thread dan mengirimkannya ke satu tempat. Ia cukup popular karena ia sangatlah mudah untuk digunakan. Anda bisa membuat channel di Rust menggunakan std::sync::mpsc. mpsc adalah singkatan dari "multiple producer, single consumer", jadi "banyak thread mengirim ke satu tempat". Untuk menggunakan channel, Anda bisa menuliskan channel(). Ia akan membuat Sender dan Receiver yang mana terhubung satu sama lain. Anda bisa melihat ini pada function signaturenya:


#![allow(unused)]
fn main() {
// 🚧
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
}

Jadi Anda harus menentukan satu nama untuk sender dan satu lagi untuk receiver. Biasanya Anda menuliskannya dengan format let (sender, receiver) = channel(); untuk memulainya. Namun, karena ini adalah generic, Rust tidak mengetahui typenya jika hanya itu yang Anda tulis:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel(); // ⚠️
}

Compiler mengatakan:

error[E0282]: type annotations needed for `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`
  --> src\main.rs:30:30
   |
30 |     let (sender, receiver) = channel();
   |         ------------------   ^^^^^^^ cannot infer type for type parameter `T` declared on the function `channel`
   |         |
   |         consider giving this pattern the explicit type `(std::sync::mpsc::Sender<T>, std::sync::mpsc::Receiver<T>)`, where
the type parameter `T` is specified

Ia menyarankan untuk menambahkan type untuk Sender dan Receiver. Anda bisa menuliskannya seperti ini jika Anda mau:

use std::sync::mpsc::{channel, Sender, Receiver}; // tambahkan Sender dan Receiver disini

fn main() {
    let (sender, receiver): (Sender<i32>, Receiver<i32>) = channel();
}

Tapi Anda tidak perlu menuliskannya seperti itu juga. Di saat kita mulai menggunakan Sender dan Receiver, Rust dapat menebak typenya.

Jadi, mari kita lihat cara termudah untuk menggunakan channel.

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();

    sender.send(5);
    receiver.recv(); // recv = receive, bukan "rec v"
}

Sekarang compiler mengetahui typenya. sender adalah Result<(), SendError<i32>> dan receiver adalah Result<i32, RecvError>. Sehingga Anda bisa menggunakan .unwrap() untuk melihat apakah pengirimannya bekerja, atau menggunakan error handling yang lebih baik. Mari tambahkan .unwrap() dan juga println! untuk melihat hasilnya:

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();

    sender.send(5).unwrap();
    println!("{}", receiver.recv().unwrap());
}

Hasilnya adalah 5.

channel mirip seperti Arc karena Anda bisa melakukan clone dan mengirimkan clonenya ke thread yang lain. Mari kita membuat dua buah thread dan mengirim valuenya ke receiver. Code di bawah ini akan berjalan, tapi bukanlah seperti yang kita inginkan.

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();

    std::thread::spawn(move|| { // move sender ke dalam thread
        sender.send("Send a &str this time").unwrap();
    });

    std::thread::spawn(move|| { // move sender_clone ke dalam thread
        sender_clone.send("And here is another &str").unwrap();
    });

    println!("{}", receiver.recv().unwrap());
}

Dua thread mulai mengirim, dan kemudian kita lakukan println!. Hasilnya adalah Send a &str this time atau And here is another &str, tergantung dari thread mana yang terlebih dahulu selesai. Mari kita buat join handle Untuk membuatnya menunggu.

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![]; // Letakkan handlenya disini

    handle_vec.push(std::thread::spawn(move|| {  // push thread ke dalam vec
        sender.send("Send a &str this time").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {  // dan push thread yang ini juga ke dalam vec
        sender_clone.send("And here is another &str").unwrap();
    }));

    for _ in handle_vec { // sekarang handle_vec memiliki 2 item. Mari kita print hasilnya
        println!("{:?}", receiver.recv().unwrap());
    }
}

Hasil printnya adalah:

"Send a &str this time"
"And here is another &str"

Sekarang, mari kita buat results_vec alih-alih langsung melakukan print.

use std::sync::mpsc::channel;

fn main() {
    let (sender, receiver) = channel();
    let sender_clone = sender.clone();
    let mut handle_vec = vec![];
    let mut results_vec = vec![];

    handle_vec.push(std::thread::spawn(move|| {
        sender.send("Send a &str this time").unwrap();
    }));

    handle_vec.push(std::thread::spawn(move|| {
        sender_clone.send("And here is another &str").unwrap();
    }));

    for _ in handle_vec {
        results_vec.push(receiver.recv().unwrap());
    }

    println!("{:?}", results_vec);
}

Sekarang, hasilnya berada di dalam results_vec: ["Send a &str this time", "And here is another &str"].

Sekarang, anggap saja bahwa kita memiliki banyak task untuk dilakukan, dan kita ingin menggunakan thread. Kita memiliki vec yang besar yang berisi 1 juta item/element, yang semua elementnya itu berisi angka 0. Kita ingin mengganti setiap 0 dengan 1. Kita ingin menggunakan 10 thread, dan setiap thread akan melakukan sepersepuluh (porsi 1/10) dari keseluruhan task tersebut. Kita akan membuat vec baru dan menggunakan .extend() untuk membagi tugasnya.

use std::sync::mpsc::channel;
use std::thread::spawn;

fn main() {
    let (sender, receiver) = channel();
    let hugevec = vec![0; 1_000_000];
    let mut newvec = vec![];
    let mut handle_vec = vec![];

    for i in 0..10 {
        let sender_clone = sender.clone();
        let mut work: Vec<u8> = Vec::with_capacity(hugevec.len() / 10); // vec baru untuk membagi-bagi tugasnya. Ukurannya adalah 1/10 dari ukuran hugevec
        work.extend(&hugevec[i*100_000..(i+1)*100_000]); // bagian pertama mengambil 0..100_000, selanjutnya mengambil 100_000..200_000, dst.
        let handle =spawn(move || { // membuatnya handlenya

            for number in work.iter_mut() { // lakukan tugasnya, yaitu mengubah 0 menjadi 1
                *number += 1;
            };
            sender_clone.send(work).unwrap(); // gunakan sender_clone untuk mengirim `work` ke receiver
        });
        handle_vec.push(handle);
    }
    
    for handle in handle_vec { // menunggu sampai semua threadnya selesai
        handle.join().unwrap();
    }
    
    while let Ok(results) = receiver.try_recv() {
        newvec.push(results); // push result dari receiver.recv() ke dalam newvec
    }

    // Sekarang kita menggunakan Vec<Vec<u8>>. Untuk menggabungkannya menjadi satu, kita bisa menggunakan .flatten()
    let newvec = newvec.into_iter().flatten().collect::<Vec<u8>>(); // sudah menjadi sebuah vec dengan 1_000_000 element yang bertype u8
    
    println!("{:?}, {:?}, total length: {}", // cetak beberapa angka untuk memastikan bahwa semuanya berubah menjadi 1
        &newvec[0..10], &newvec[newvec.len()-10..newvec.len()], newvec.len() // Dan tunjukkan pula bahwa panjangnya adalah 1_000_000 element
    );
    
    for number in newvec { // Dan beritahukan Rust bahwa ia akan panic jika ada satu angka yang bernilai 1
        if number != 1 {
            panic!();
        }
    }
}