summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Kreutz <mail@skreutz.com>2022-12-21 10:25:18 +0100
committerStefan Kreutz <mail@skreutz.com>2022-12-21 10:25:18 +0100
commitc143a1c9bb3129f15764e5ab2ddc80e027fe5cf4 (patch)
treec2aa3d36a42fafa135e041cf37ad84dbcaac38aa
parent0669533dca1cb41ea67f9b4cac7727cf600412d1 (diff)
downloadparseq-c143a1c9bb3129f15764e5ab2ddc80e027fe5cf4.tar
Test parallel map drop
-rw-r--r--src/lib.rs32
1 files changed, 31 insertions, 1 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 60d3546..7cff65e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -321,7 +321,10 @@ impl<F: FnMut()> Drop for Canary<F> {
#[cfg(test)]
mod tests {
- use std::time::Duration;
+ use std::{
+ sync::{Arc, Mutex},
+ time::Duration,
+ };
use super::*;
@@ -476,4 +479,31 @@ mod tests {
assert_eq!(inflight(17, 13), 4);
assert_eq!(inflight(13, usize::MAX - 17), 31);
}
+
+ /// ParallelMap must stop feeding workers when dropped.
+ #[test]
+ fn drop_parallel_map() {
+ let threads = 5;
+ let buffer_size = 20;
+ let consume = 7;
+
+ let counter = Arc::new(Mutex::new(0));
+ let count = counter.clone();
+
+ let mut iter = (0..)
+ .into_iter()
+ .map_parallel_limit(threads, buffer_size, move |i| {
+ if i < consume {
+ std::thread::sleep(Duration::from_millis(100));
+ }
+ let mut counter = counter.lock().unwrap();
+ *counter += 1;
+ 2 * i
+ });
+ for _ in 0..consume {
+ iter.next();
+ }
+ drop(iter);
+ assert!(*count.lock().unwrap() < consume + buffer_size);
+ }
}
Generated by cgit. See skreutz.com for my tech blog and contact information.