File: bench.rs

package info (click to toggle)
rust-concurrent-queue 2.5.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 248 kB
  • sloc: makefile: 2
file content (93 lines) | stat: -rw-r--r-- 2,810 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use std::{any::type_name, fmt::Debug};

use concurrent_queue::{ConcurrentQueue, PopError};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use easy_parallel::Parallel;

const COUNT: usize = 100_000;
const THREADS: usize = 7;

fn spsc<T: Default + std::fmt::Debug + Send>(recv: &ConcurrentQueue<T>, send: &ConcurrentQueue<T>) {
    Parallel::new()
        .add(|| loop {
            match recv.pop() {
                Ok(_) => (),
                Err(PopError::Empty) => (),
                Err(PopError::Closed) => break,
            }
        })
        .add(|| {
            for _ in 0..COUNT {
                send.push(T::default()).unwrap();
            }
            send.close();
        })
        .run();
}

fn mpsc<T: Default + std::fmt::Debug + Send>(recv: &ConcurrentQueue<T>, send: &ConcurrentQueue<T>) {
    Parallel::new()
        .each(0..THREADS, |_| {
            for _ in 0..COUNT {
                send.push(T::default()).unwrap();
            }
        })
        .add(|| {
            let mut recieved = 0;
            while recieved < THREADS * COUNT {
                match recv.pop() {
                    Ok(_) => recieved += 1,
                    Err(PopError::Empty) => (),
                    Err(PopError::Closed) => unreachable!(),
                }
            }
        })
        .run();
}

fn single_thread<T: Default + std::fmt::Debug>(
    recv: &ConcurrentQueue<T>,
    send: &ConcurrentQueue<T>,
) {
    for _ in 0..COUNT {
        send.push(T::default()).unwrap();
    }
    for _ in 0..COUNT {
        recv.pop().unwrap();
    }
}

// Because we can't pass generic functions as const parameters.
macro_rules! bench_all(
    ($name:ident, $f:ident) => {
        fn $name(c: &mut Criterion) {
            fn helper<T: Default + Debug + Send>(c: &mut Criterion) {
                let name = format!("unbounded_{}_{}", stringify!($f), type_name::<T>());

                c.bench_function(&name, |b| b.iter(|| {
                    let q = ConcurrentQueue::unbounded();
                    $f::<T>(black_box(&q), black_box(&q));
                }));

                let name = format!("bounded_{}_{}", stringify!($f), type_name::<T>());

                c.bench_function(&name, |b| b.iter(|| {
                    let q = ConcurrentQueue::bounded(THREADS * COUNT);
                    $f::<T>(black_box(&q), black_box(&q));
                }));
            }
            helper::<u8>(c);
            helper::<u16>(c);
            helper::<u32>(c);
            helper::<u64>(c);
            helper::<u128>(c);
        }
    }
);

bench_all!(bench_spsc, spsc);
bench_all!(bench_mpsc, mpsc);
bench_all!(bench_single_thread, single_thread);

criterion_group!(generic_group, bench_single_thread, bench_spsc, bench_mpsc);
criterion_main!(generic_group);