File: driver.rs

package info (click to toggle)
rust-tokio-uring 0.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 828 kB
  • sloc: makefile: 2
file content (148 lines) | stat: -rw-r--r-- 3,439 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use tempfile::NamedTempFile;

use tokio_uring::{buf::IoBuf, fs::File};

#[path = "../src/future.rs"]
#[allow(warnings)]
mod future;

#[test]
#[ignore]
fn complete_ops_on_drop() {
    use std::sync::Arc;

    struct MyBuf {
        data: Vec<u8>,
        _ref_cnt: Arc<()>,
    }

    unsafe impl IoBuf for MyBuf {
        fn stable_ptr(&self) -> *const u8 {
            self.data.stable_ptr()
        }

        fn bytes_init(&self) -> usize {
            self.data.bytes_init()
        }

        fn bytes_total(&self) -> usize {
            self.data.bytes_total()
        }
    }

    unsafe impl tokio_uring::buf::IoBufMut for MyBuf {
        fn stable_mut_ptr(&mut self) -> *mut u8 {
            self.data.stable_mut_ptr()
        }

        unsafe fn set_init(&mut self, pos: usize) {
            self.data.set_init(pos);
        }
    }

    // Used to test if the buffer dropped.
    let ref_cnt = Arc::new(());

    let tempfile = tempfile();

    let vec = vec![0; 50 * 1024 * 1024];
    let mut file = std::fs::File::create(tempfile.path()).unwrap();
    std::io::Write::write_all(&mut file, &vec).unwrap();

    let file = tokio_uring::start(async {
        let file = File::create(tempfile.path()).await.unwrap();
        poll_once(async {
            file.read_at(
                MyBuf {
                    data: vec![0; 64 * 1024],
                    _ref_cnt: ref_cnt.clone(),
                },
                25 * 1024 * 1024,
            )
            .await
            .0
            .unwrap();
        })
        .await;

        file
    });

    assert_eq!(Arc::strong_count(&ref_cnt), 1);

    // little sleep
    std::thread::sleep(std::time::Duration::from_millis(100));

    drop(file);
}

#[test]
#[ignore]
fn too_many_submissions() {
    let tempfile = tempfile();

    tokio_uring::start(async {
        let file = File::create(tempfile.path()).await.unwrap();
        for _ in 0..600 {
            poll_once(async {
                file.write_at(b"hello world".to_vec(), 0)
                    .submit()
                    .await
                    .0
                    .unwrap();
            })
            .await;
        }
    });
}

#[test]
#[ignore]
fn completion_overflow() {
    use std::process;
    use std::{thread, time};
    use tokio::task::JoinSet;

    let spawn_cnt = 50;
    let squeue_entries = 2;
    let cqueue_entries = 2 * squeue_entries;

    std::thread::spawn(|| {
        thread::sleep(time::Duration::from_secs(8)); // 1000 times longer than it takes on a slow machine
        eprintln!("Timeout reached. The uring completions are hung.");
        process::exit(1);
    });

    tokio_uring::builder()
        .entries(squeue_entries)
        .uring_builder(tokio_uring::uring_builder().setup_cqsize(cqueue_entries))
        .start(async move {
            let mut js = JoinSet::new();

            for _ in 0..spawn_cnt {
                js.spawn_local(tokio_uring::no_op());
            }

            while let Some(res) = js.join_next().await {
                res.unwrap().unwrap();
            }
        });
}

fn tempfile() -> NamedTempFile {
    NamedTempFile::new().unwrap()
}

async fn poll_once(future: impl std::future::Future) {
    // use std::future::Future;
    use std::task::Poll;
    use tokio::pin;

    pin!(future);

    std::future::poll_fn(|cx| {
        assert!(future.as_mut().poll(cx).is_pending());
        Poll::Ready(())
    })
    .await;
}