כל הפוסטים

Rust MPSC Channels - צ׳אנלים בראסט

אחת התכונות החזקות ביותר בראסט היא מקביליות ואסינכרוניות, וכשמדברים על תכנות אסינכרוני tokio היא הספרייה הנפוצה ביותר. אחד הפיצ׳רים המרכזיים שטוקיו מאפשרת הוא channels, אשר מאפשרי

Rust MPSC Channels - צ׳אנלים בראסט

אחת התכונות החזקות ביותר בראסט היא מקביליות ואסינכרוניות, וכשמדברים על תכנות אסינכרוני tokio היא הספרייה הנפוצה ביותר. אחד הפיצ׳רים המרכזיים שטוקיו מאפשרת הוא channels, אשר מאפשרים תקשורת בין חלקים שונים בתכונה שלנו.

קיימים בראסט מגוון סוגים של channels שמגיעים ממגוון ספריות (כמו crossbeam, std::mpsc וכו׳). בפוסט אדבר על Tokio Channels, ומתי ואיך נשתמש בהם.

מהם צ׳אנלים?

כשם כן הם - ״ערוצים״ אשר מאפשרים דרך לשלוח נתונים בין חלקים שונים של תוכנה (בדרך כלל בין threads או tasks).

צ׳אנלים מורכבים משני מרכיבים עיקריים: שולח (Sender): החלק ששולח נתונים ומקבל (Receiver): החלק שמקבל נתונים.

MPSC (Multi Producer Singler Consumer)

הספריה tokio מאפשרת לנו שימוש בצ׳אנלים מסוג MPSC - כלומר, מספר tasks יכולים לשלוח מידע אל מקבל יחיד. וזה נותן לנו יכולת נוחה וישירה להעביר מידע מאיזה סוג שנבחר, אל task מקבל שבו נעבד את המידע ונתמודד איתו.

דוגמה פשוטה:

use tokio::sync::mpsc;
use tokio::task;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    // Spawn a task that sends messages
    task::spawn(async move {
        for i in 0..10 {
            if tx.send(i).await.is_err() {
                println!("receiver dropped");
                return;
            }
        }
    });

    // Receive messages
    while let Some(val) = rx.recv().await {
        println!("Received: {}", val);
    }
}

בדוגמה הזו אנחנו רואים את יצירת השולח והמקבל tx ו rx. לאחר מכן, יצירת task ברקע ששולח בלולאה 10 מספרים באמצעות tx אל ה task הראשי בו אנחנו מאזינים עם rx ומדפיסים את מה שקיבלנו.

במקרים מורכבים יותר (ושכיחים יותר) אנחנו נעביר אובייקט השליחה (tx) למספר tasks/threads ונשלח דרכו מידע במקרים שונים.

use tokio::sync::mpsc;
use tokio::task;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(100);

    // Spawn the tasks
    let api_task = spawn_api_task(tx.clone());
    let file_task = spawn_file_task(tx.clone());
    let sensor_task = spawn_sensor_task(tx.clone());

    // Close the original sender to signal that no more messages will be sent
    drop(tx);

    // Spawn the receiver task to process the data
    let receiver_task = process_data(rx);

    // Wait for all tasks to complete
    tokio::join!(api_task, file_task, sensor_task, receiver_task);
}

async fn spawn_api_task(tx: mpsc::Sender<String>) {
    task::spawn(async move {
        // Simulate a task that fetches data from an API periodically
        loop {
            let data = "API Response".to_string();
            if tx.send(data).await.is_err() {
                println!("Receiver dropped, stopping API task");
                return;
            }
            sleep(Duration::from_secs(3)).await; // Simulate API latency between requests
        }
    }).await.unwrap();
}

async fn spawn_file_task(tx: mpsc::Sender<String>) {
    task::spawn(async move {
        // Simulate reading a file in chunks asynchronously
        // Example: Reading data from a file at certain intervals
        loop {
            let data = "File Data Chunk".to_string();
            if tx.send(data).await.is_err() {
                println!("Receiver dropped, stopping File task");
                return;
            }
            sleep(Duration::from_secs(5)).await; // Simulate file read delay
        }
    }).await.unwrap();
}

async fn spawn_sensor_task(tx: mpsc::Sender<String>) {
    task::spawn(async move {
        // Simulate continuously receiving sensor data
        loop {
            let data = "Sensor Data".to_string();
            if tx.send(data).await.is_err() {
                println!("Receiver dropped, stopping Sensor task");
                return;
            }
            sleep(Duration::from_secs(1)).await; // Simulate sensor data frequency
        }
    }).await.unwrap();
}

async fn process_data(mut rx: mpsc::Receiver<String>) {
    task::spawn(async move {
        // Process incoming data asynchronously
        while let Some(received) = rx.recv().await {
            println!("Processing received: {}", received);
            sleep(Duration::from_millis(50)).await; // Simulate processing time
        }

        println!("All data processed.");
    }).await.unwrap();
}

בדוגמה הזו ראינו יצירה של שלושה tasks, שירוצו ברקע, כל אחד מהם אחראי על עבודה אחרת (במקרה הזה, להביא מידע ממקור מסוים) ושולח את אותו המידע ל task המעבד לצורך המשך עיבוד.

למה Channels?

- מאפשרים תקשורת בטוחה ובעלת מבנה מסודר של העברת הודעות.

- הפרדת תלויות (decoupling) בין מי שמייצר את המידע למי שמעבד אותו.

- הורדת הצורך ב shared state - ברגע שיש הפרדה בין שולח למקבל המידע עובר בניהם וברוב המקרים לא נראה שיתוף state באמצעות Arc Mutex.

- עמידה בסקייל הופכת לפשוטה יותר ברגע שאפשר להגדיל את מספר ה consumers/producers.

סיכום

בפוסט ראינו שימוש בסיסי ב channels תחת הסביבה האסינכרונית של tokio ודיברנו על הסיבות לשימוש בהם. צ׳אנלים הם כלי מאוד חשוב בסט הכלים של מפתח ראסט, ויכולים לתרום הרבה לחלוקה נכונה של לוגיקה עסקית לפי אזורים ולעזור לנו לייצר מערכת שתדע להתמודד עם סקייל.