1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,3 +25,3 @@ repository = "https://github.com/rayon-r
[dependencies.crossbeam-deque]
-version = "0.2.0"
+version = "0.6.1"
--- a/src/iter/par_bridge.rs
+++ b/src/iter/par_bridge.rs
@@ -1,4 +1,4 @@
-use crossbeam_deque::{Deque, Stealer, Steal};
+use crossbeam_deque::{self as deque, Worker, Stealer, Steal};
use std::thread::yield_now;
use std::sync::{Mutex, TryLockError};
@@ -78,10 +78,9 @@ impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
where C: UnindexedConsumer<Self::Item>
{
let split_count = AtomicUsize::new(current_num_threads());
- let deque = Deque::new();
- let stealer = deque.stealer();
+ let (worker, stealer) = deque::fifo();
let done = AtomicBool::new(false);
- let iter = Mutex::new((self.iter, deque));
+ let iter = Mutex::new((self.iter, worker));
bridge_unindexed(IterParallelProducer {
split_count: &split_count,
@@ -95,7 +94,7 @@ impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
struct IterParallelProducer<'a, Iter: Iterator + 'a> {
split_count: &'a AtomicUsize,
done: &'a AtomicBool,
- iter: &'a Mutex<(Iter, Deque<Iter::Item>)>,
+ iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
items: Stealer<Iter::Item>,
}
@@ -159,11 +158,15 @@ impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<
let count = current_num_threads();
let count = (count * count) * 2;
- let (ref mut iter, ref deque) = *guard;
+ let (ref mut iter, ref worker) = *guard;
- while deque.len() < count {
+ // while worker.len() < count {
+ // TODO the new deque doesn't let us count items. We can just
+ // push a number of items, but that doesn't consider active
+ // stealers elsewhere.
+ for _ in 0..count {
if let Some(it) = iter.next() {
- deque.push(it);
+ worker.push(it);
} else {
self.done.store(true, Ordering::SeqCst);
break;
|