1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
|
use std::{panic::catch_unwind, thread, time::Duration};
use futures_timer::Delay;
use threadfin::ThreadPool;
fn single_thread() -> ThreadPool {
ThreadPool::builder().size(0..1).build()
}
#[test]
#[should_panic(expected = "thread pool name must not contain null bytes")]
fn name_with_null_bytes_panics() {
ThreadPool::builder().name("uh\0oh").build();
}
#[test]
#[allow(clippy::reversed_empty_ranges)]
#[should_panic(expected = "thread pool minimum size cannot be larger than maximum size")]
fn invalid_size_panics() {
ThreadPool::builder().size(2..1);
}
#[test]
#[should_panic(expected = "thread pool maximum size must be non-zero")]
fn invalid_size_zero_panics() {
ThreadPool::builder().size(0);
}
#[test]
fn execute() {
let pool = single_thread();
let result = pool.execute(|| 2 + 2).join();
assert_eq!(result, 4);
}
#[test]
fn execute_future() {
let pool = single_thread();
let result = pool.execute_future(async { 2 + 2 }).join();
assert_eq!(result, 4);
}
#[test]
fn task_join_timeout() {
let pool = single_thread();
let result = pool
.execute(|| thread::sleep(Duration::from_secs(5)))
.join_timeout(Duration::from_millis(10));
assert!(result.is_err());
}
#[test]
fn futures_that_yield_are_run_concurrently() {
let pool = single_thread();
assert_eq!(pool.running_tasks(), 0);
let first = pool
.try_execute_future(Delay::new(Duration::from_millis(400)))
.unwrap();
// Even though there's only one worker thread, it should become idle quickly
// and start polling for more work, because a delay future yields
// immediately and doesn't wake for a while.
thread::sleep(Duration::from_millis(100));
assert_eq!(pool.running_tasks(), 1);
let second = pool
.try_execute_future(Delay::new(Duration::from_millis(200)))
.unwrap();
thread::sleep(Duration::from_millis(100));
// Now both tasks are running, but there's still only 1 worker thread!
assert_eq!(pool.running_tasks(), 2);
assert_eq!(pool.threads(), 1);
first.join();
second.join();
// Both tasks completed.
assert_eq!(pool.completed_tasks(), 2);
}
#[test]
fn try_execute_under_core_count() {
let pool = ThreadPool::builder().size(1).build();
// Give some time for thread to start...
thread::sleep(Duration::from_millis(100));
assert_eq!(pool.threads(), 1);
assert!(pool.try_execute(|| 2 + 2).is_ok());
}
#[test]
fn try_execute_over_core_count() {
let pool = ThreadPool::builder().size(0..1).build();
assert!(pool.try_execute(|| 2 + 2).is_ok());
}
#[test]
fn try_execute_over_limit() {
let pool = ThreadPool::builder().size(0..1).queue_limit(0).build();
assert!(pool.try_execute(|| 2 + 2).is_ok());
assert!(pool.try_execute(|| 2 + 2).is_err());
fn task() -> usize {
2 + 2
}
// The returned function in the error is identical to the function given.
let error = pool.try_execute(task).unwrap_err();
assert_eq!(error.into_inner() as usize, task as usize);
}
#[test]
fn name() {
let pool = ThreadPool::builder().name("foo").build();
let name = pool
.execute(|| thread::current().name().unwrap().to_owned())
.join();
assert_eq!(name, "foo");
}
#[test]
#[should_panic(expected = "oh no!")]
fn panic_propagates_to_task() {
let pool = single_thread();
pool.execute(|| panic!("oh no!")).join();
}
#[test]
fn panic_count() {
let pool = single_thread();
assert_eq!(pool.panicked_tasks(), 0);
let task = pool.execute(|| panic!("oh no!"));
let _ = catch_unwind(move || {
task.join();
});
assert_eq!(pool.panicked_tasks(), 1);
}
#[test]
fn thread_count() {
let pool = ThreadPool::builder().size(0..1).build();
assert_eq!(pool.threads(), 0);
pool.execute(|| 2 + 2).join();
assert_eq!(pool.threads(), 1);
let pool_with_starting_threads = ThreadPool::builder().size(1).build();
// Give some time for thread to start...
thread::sleep(Duration::from_millis(50));
assert_eq!(pool_with_starting_threads.threads(), 1);
}
#[test]
fn idle_shutdown() {
let pool = ThreadPool::builder()
.size(0..1)
.keep_alive(Duration::from_millis(100))
.build();
assert_eq!(pool.threads(), 0, "pool starts out empty");
pool.execute(|| 2 + 2).join();
assert_eq!(pool.threads(), 1, "one thread was added");
thread::sleep(Duration::from_millis(200));
assert_eq!(
pool.threads(),
0,
"thread became idle and terminated after timeout"
);
}
#[test]
fn join() {
// Just a dumb test to make sure join doesn't do anything strange.
ThreadPool::default().join();
}
#[test]
fn join_timeout_expiring() {
let pool = ThreadPool::builder().size(1).build();
assert_eq!(pool.threads(), 1);
// Schedule a slow task on the only thread. We have to keep the task
// around, because dropping it could cancel the task.
let _task = pool.execute(|| thread::sleep(Duration::from_millis(500)));
// Joining should time out since there's one task still running longer
// than our join timeout.
assert!(!pool.join_timeout(Duration::from_millis(10)));
}
#[test]
fn configure_common_after_init_returns_error() {
threadfin::common(); // init
assert!(threadfin::configure_common(|b| b).is_err());
}
|