Simple task-thread-pool

Simple task-thread-pool
Photo by davide ragusa / Unsplash

This simple task-thread-pool for Rust is for tasks like shearing sheeps.
It uses a limited amount of threads from the pool to handle every task implementing trait TaskRunner.
Threads will be reused if there are more tasks available.

It is using crate log to log errors, debugs and traces.

//! Simple thread pool for running tasks and reusing existing threads

use std::{
    collections::VecDeque,
    sync::{
        atomic::{AtomicBool, AtomicU8, Ordering},
        Arc, Mutex, RwLock,
    },
    thread,
    time::Duration,
};

use log::{debug, error, trace};

const MANAGER_THREAD_SLEEP: Duration = Duration::from_millis(25);
const TASK_THREAD_SLEEP: Duration = Duration::from_millis(10);

/// The [`Pool`] with a maximum thread count of `N`  
/// running tasks implementing `T` [`TaskRunner<R>`]  
/// with `R` beeing the return type of [`TaskRunner<R>::run`].
///
#[derive(Debug)]
pub struct Pool<T, R, const N: u8>
where
    R: std::fmt::Debug + Send + Sync + 'static,
    T: TaskRunner<R>,
{
    active_threads: Arc<AtomicU8>,
    manager: Mutex<Option<thread::JoinHandle<()>>>,
    stop_manager: Arc<AtomicBool>,
    tasks: Arc<RwLock<VecDeque<T>>>,
    tasks_threads: Arc<Mutex<Vec<thread::JoinHandle<R>>>>,
}

impl<T, R, const N: u8> Pool<T, R, N>
where
    R: std::fmt::Debug + Send + Sync + 'static,
    T: TaskRunner<R>,
{
    /// Create new [`Pool`]
    #[must_use]
    pub fn new() -> Self {
        Self {
            active_threads: Arc::new(AtomicU8::default()),
            manager: Mutex::new(None),
            stop_manager: Arc::new(AtomicBool::default()),
            tasks: Arc::new(RwLock::new(VecDeque::new())),
            tasks_threads: Arc::new(Mutex::new(Vec::with_capacity(N as usize))),
        }
    }

    /// Add new task with type [`T`] to pool
    pub fn add_task(&self, task: T) {
        match self.tasks.write() {
            Ok(mut tasks) => tasks.push_back(task),
            Err(err) => error!("{err}"),
        }
    }

    /// Start working on pooled tasks or wait for tasks in background thread
    pub fn start(&self) {
        let active_threads = Arc::clone(&self.active_threads);
        let stop_manager = Arc::clone(&self.stop_manager);
        let tasks = Arc::clone(&self.tasks);
        let tasks_threads = Arc::clone(&self.tasks_threads);

        if let Ok(mut manager) = self.manager.lock() {
            *manager = Some(thread::spawn(move || {
                trace!("manager start");

                let mut is_sleep = false;

                while !stop_manager.load(Ordering::Relaxed) {
                    if let Ok(mut tasks_vec) = tasks.write() {
                        while active_threads.load(Ordering::Relaxed) < N {
                            if stop_manager.load(Ordering::Relaxed) {
                                break;
                            } else if let Some(task) = tasks_vec.pop_front() {
                                debug!("create task thread");
                                if let Ok(mut tasks_threads) = tasks_threads.lock() {
                                    let _ = active_threads.fetch_add(1, Ordering::Relaxed);
                                    let active_threads = Arc::clone(&active_threads);
                                    let stop_manager = Arc::clone(&stop_manager);
                                    let tasks = Arc::clone(&tasks);

                                    tasks_threads.push(thread::spawn(move || {
                                        let mut is_sleep = false;

                                        debug!("task run");
                                        let mut res = task.run();
                                        debug!("task finish");
                                        trace!("task: {res:?}");

                                        // continue taking and running available tasks
                                        while !stop_manager.load(Ordering::Relaxed) {
                                            if is_sleep {
                                                thread::sleep(TASK_THREAD_SLEEP);
                                            }
                                            if let Ok(mut tasks_vec) = tasks.write() {
                                                if let Some(task) = tasks_vec.pop_front() {
                                                    debug!("task run");
                                                    res = task.run();
                                                    debug!("task finish");
                                                    trace!("task: {res:?}");
                                                } else {
                                                    debug!("no task");
                                                    if is_sleep {
                                                        break;
                                                    }
                                                    is_sleep = true;
                                                }
                                            } else {
                                                error!("tasks lock fail");
                                            }
                                        }

                                        let _ = active_threads.fetch_sub(1, Ordering::Relaxed);

                                        debug!("finish task thread");

                                        res
                                    }));
                                } else {
                                    error!("tasks_threads lock fail");
                                }
                                is_sleep = false;
                            } else {
                                debug!("no task");
                                is_sleep = true;
                                break;
                            }
                        }
                    } else {
                        error!("tasks lock fail");
                    }

                    if is_sleep {
                        thread::sleep(MANAGER_THREAD_SLEEP);
                    }
                }

                trace!("manager finish");
            }));
        }
    }

    /// Stop handling tasks in [`Pool`]
    pub fn stop(&self) {
        self.stop_manager.store(true, Ordering::Relaxed);

        if let Ok(mut manager) = self.manager.lock() {
            if let Some(manager) = manager.take() {
                if let Err(err) = manager.join() {
                    error!("{err:?}");
                }
            }
        } else {
            error!("manager lock fail");
        }

        trace!("manager stop");

        if let Ok(mut task_threads) = self.tasks_threads.lock() {
            while let Some(task_thread) = task_threads.pop() {
                if let Err(err) = task_thread.join() {
                    error!("{err:?}");
                }
            }
        }

        trace!("task threads joined");
    }
}

impl<T, R, const N: u8> Default for Pool<T, R, N>
where
    R: std::fmt::Debug + Send + Sync + 'static,
    T: TaskRunner<R>,
{
    fn default() -> Self {
        Self::new()
    }
}

/// Trait needs to be implemented by the tasks to be added to [`Pool`]
pub trait TaskRunner<R>: Send + Sync + 'static
where
    R: std::fmt::Debug,
{
    /// Is called in a thread
    fn run(&self) -> R;
}

In the file are also unit tests.