Simple task-thread-pool
This simple task-thread-pool for Rust is for tasks like shearing sheeps.
It uses a limited amount of threads from the pool to handle every task implementing trait TaskRunner
.
Threads will be reused if there are more tasks available.
It is using crate log to log errors, debugs and traces.
//! Simple thread pool for running tasks and reusing existing threads
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc, Mutex, RwLock,
},
thread,
time::Duration,
};
use log::{debug, error, trace};
const MANAGER_THREAD_SLEEP: Duration = Duration::from_millis(25);
const TASK_THREAD_SLEEP: Duration = Duration::from_millis(10);
/// The [`Pool`] with a maximum thread count of `N`
/// running tasks implementing `T` [`TaskRunner<R>`]
/// with `R` beeing the return type of [`TaskRunner<R>::run`].
///
#[derive(Debug)]
pub struct Pool<T, R, const N: u8>
where
R: std::fmt::Debug + Send + Sync + 'static,
T: TaskRunner<R>,
{
active_threads: Arc<AtomicU8>,
manager: Mutex<Option<thread::JoinHandle<()>>>,
stop_manager: Arc<AtomicBool>,
tasks: Arc<RwLock<VecDeque<T>>>,
tasks_threads: Arc<Mutex<Vec<thread::JoinHandle<R>>>>,
}
impl<T, R, const N: u8> Pool<T, R, N>
where
R: std::fmt::Debug + Send + Sync + 'static,
T: TaskRunner<R>,
{
/// Create new [`Pool`]
#[must_use]
pub fn new() -> Self {
Self {
active_threads: Arc::new(AtomicU8::default()),
manager: Mutex::new(None),
stop_manager: Arc::new(AtomicBool::default()),
tasks: Arc::new(RwLock::new(VecDeque::new())),
tasks_threads: Arc::new(Mutex::new(Vec::with_capacity(N as usize))),
}
}
/// Add new task with type [`T`] to pool
pub fn add_task(&self, task: T) {
match self.tasks.write() {
Ok(mut tasks) => tasks.push_back(task),
Err(err) => error!("{err}"),
}
}
/// Start working on pooled tasks or wait for tasks in background thread
pub fn start(&self) {
let active_threads = Arc::clone(&self.active_threads);
let stop_manager = Arc::clone(&self.stop_manager);
let tasks = Arc::clone(&self.tasks);
let tasks_threads = Arc::clone(&self.tasks_threads);
if let Ok(mut manager) = self.manager.lock() {
*manager = Some(thread::spawn(move || {
trace!("manager start");
let mut is_sleep = false;
while !stop_manager.load(Ordering::Relaxed) {
if let Ok(mut tasks_vec) = tasks.write() {
while active_threads.load(Ordering::Relaxed) < N {
if stop_manager.load(Ordering::Relaxed) {
break;
} else if let Some(task) = tasks_vec.pop_front() {
debug!("create task thread");
if let Ok(mut tasks_threads) = tasks_threads.lock() {
let _ = active_threads.fetch_add(1, Ordering::Relaxed);
let active_threads = Arc::clone(&active_threads);
let stop_manager = Arc::clone(&stop_manager);
let tasks = Arc::clone(&tasks);
tasks_threads.push(thread::spawn(move || {
let mut is_sleep = false;
debug!("task run");
let mut res = task.run();
debug!("task finish");
trace!("task: {res:?}");
// continue taking and running available tasks
while !stop_manager.load(Ordering::Relaxed) {
if is_sleep {
thread::sleep(TASK_THREAD_SLEEP);
}
if let Ok(mut tasks_vec) = tasks.write() {
if let Some(task) = tasks_vec.pop_front() {
debug!("task run");
res = task.run();
debug!("task finish");
trace!("task: {res:?}");
} else {
debug!("no task");
if is_sleep {
break;
}
is_sleep = true;
}
} else {
error!("tasks lock fail");
}
}
let _ = active_threads.fetch_sub(1, Ordering::Relaxed);
debug!("finish task thread");
res
}));
} else {
error!("tasks_threads lock fail");
}
is_sleep = false;
} else {
debug!("no task");
is_sleep = true;
break;
}
}
} else {
error!("tasks lock fail");
}
if is_sleep {
thread::sleep(MANAGER_THREAD_SLEEP);
}
}
trace!("manager finish");
}));
}
}
/// Stop handling tasks in [`Pool`]
pub fn stop(&self) {
self.stop_manager.store(true, Ordering::Relaxed);
if let Ok(mut manager) = self.manager.lock() {
if let Some(manager) = manager.take() {
if let Err(err) = manager.join() {
error!("{err:?}");
}
}
} else {
error!("manager lock fail");
}
trace!("manager stop");
if let Ok(mut task_threads) = self.tasks_threads.lock() {
while let Some(task_thread) = task_threads.pop() {
if let Err(err) = task_thread.join() {
error!("{err:?}");
}
}
}
trace!("task threads joined");
}
}
impl<T, R, const N: u8> Default for Pool<T, R, N>
where
R: std::fmt::Debug + Send + Sync + 'static,
T: TaskRunner<R>,
{
fn default() -> Self {
Self::new()
}
}
/// Trait needs to be implemented by the tasks to be added to [`Pool`]
pub trait TaskRunner<R>: Send + Sync + 'static
where
R: std::fmt::Debug,
{
/// Is called in a thread
fn run(&self) -> R;
}
In the file are also unit tests.