diff options
author | Stefan Kreutz <mail@skreutz.com> | 2022-12-21 10:25:18 +0100 |
---|---|---|
committer | Stefan Kreutz <mail@skreutz.com> | 2022-12-21 10:25:18 +0100 |
commit | c143a1c9bb3129f15764e5ab2ddc80e027fe5cf4 (patch) | |
tree | c2aa3d36a42fafa135e041cf37ad84dbcaac38aa /src | |
parent | 0669533dca1cb41ea67f9b4cac7727cf600412d1 (diff) | |
download | parseq-c143a1c9bb3129f15764e5ab2ddc80e027fe5cf4.tar |
Test parallel map drop
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 32 |
1 files changed, 31 insertions, 1 deletions
@@ -321,7 +321,10 @@ impl<F: FnMut()> Drop for Canary<F> { #[cfg(test)] mod tests { - use std::time::Duration; + use std::{ + sync::{Arc, Mutex}, + time::Duration, + }; use super::*; @@ -476,4 +479,31 @@ mod tests { assert_eq!(inflight(17, 13), 4); assert_eq!(inflight(13, usize::MAX - 17), 31); } + + /// ParallelMap must stop feeding workers when dropped. + #[test] + fn drop_parallel_map() { + let threads = 5; + let buffer_size = 20; + let consume = 7; + + let counter = Arc::new(Mutex::new(0)); + let count = counter.clone(); + + let mut iter = (0..) + .into_iter() + .map_parallel_limit(threads, buffer_size, move |i| { + if i < consume { + std::thread::sleep(Duration::from_millis(100)); + } + let mut counter = counter.lock().unwrap(); + *counter += 1; + 2 * i + }); + for _ in 0..consume { + iter.next(); + } + drop(iter); + assert!(*count.lock().unwrap() < consume + buffer_size); + } } |