1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
use tempfile::NamedTempFile;
use tokio_uring::{buf::IoBuf, fs::File};
#[path = "../src/future.rs"]
#[allow(warnings)]
mod future;
#[test]
#[ignore]
fn complete_ops_on_drop() {
use std::sync::Arc;
struct MyBuf {
data: Vec<u8>,
_ref_cnt: Arc<()>,
}
unsafe impl IoBuf for MyBuf {
fn stable_ptr(&self) -> *const u8 {
self.data.stable_ptr()
}
fn bytes_init(&self) -> usize {
self.data.bytes_init()
}
fn bytes_total(&self) -> usize {
self.data.bytes_total()
}
}
unsafe impl tokio_uring::buf::IoBufMut for MyBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.data.stable_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
self.data.set_init(pos);
}
}
// Used to test if the buffer dropped.
let ref_cnt = Arc::new(());
let tempfile = tempfile();
let vec = vec![0; 50 * 1024 * 1024];
let mut file = std::fs::File::create(tempfile.path()).unwrap();
std::io::Write::write_all(&mut file, &vec).unwrap();
let file = tokio_uring::start(async {
let file = File::create(tempfile.path()).await.unwrap();
poll_once(async {
file.read_at(
MyBuf {
data: vec![0; 64 * 1024],
_ref_cnt: ref_cnt.clone(),
},
25 * 1024 * 1024,
)
.await
.0
.unwrap();
})
.await;
file
});
assert_eq!(Arc::strong_count(&ref_cnt), 1);
// little sleep
std::thread::sleep(std::time::Duration::from_millis(100));
drop(file);
}
#[test]
#[ignore]
fn too_many_submissions() {
let tempfile = tempfile();
tokio_uring::start(async {
let file = File::create(tempfile.path()).await.unwrap();
for _ in 0..600 {
poll_once(async {
file.write_at(b"hello world".to_vec(), 0)
.submit()
.await
.0
.unwrap();
})
.await;
}
});
}
#[test]
#[ignore]
fn completion_overflow() {
use std::process;
use std::{thread, time};
use tokio::task::JoinSet;
let spawn_cnt = 50;
let squeue_entries = 2;
let cqueue_entries = 2 * squeue_entries;
std::thread::spawn(|| {
thread::sleep(time::Duration::from_secs(8)); // 1000 times longer than it takes on a slow machine
eprintln!("Timeout reached. The uring completions are hung.");
process::exit(1);
});
tokio_uring::builder()
.entries(squeue_entries)
.uring_builder(tokio_uring::uring_builder().setup_cqsize(cqueue_entries))
.start(async move {
let mut js = JoinSet::new();
for _ in 0..spawn_cnt {
js.spawn_local(tokio_uring::no_op());
}
while let Some(res) = js.join_next().await {
res.unwrap().unwrap();
}
});
}
fn tempfile() -> NamedTempFile {
NamedTempFile::new().unwrap()
}
async fn poll_once(future: impl std::future::Future) {
// use std::future::Future;
use std::task::Poll;
use tokio::pin;
pin!(future);
std::future::poll_fn(|cx| {
assert!(future.as_mut().poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
|