1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
pub mod sync {
use std::io::Read;
pub fn to_vec(mut read: impl Read) -> Vec<u8> {
let mut output = vec![];
read.read_to_end(&mut output).unwrap();
output
}
}
#[cfg(feature = "futures-io")]
pub mod futures {
pub mod bufread {
pub use futures::io::AsyncBufRead;
use crate::utils::{InputStream, TrackEof};
use futures::stream::{StreamExt as _, TryStreamExt as _};
pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
TrackEof::new(input.stream().map(Ok).into_async_read())
}
}
pub mod read {
use crate::utils::{block_on, pin_mut};
use futures::io::{copy_buf, AsyncRead, AsyncReadExt, BufReader, Cursor};
pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
// TODO: https://github.com/rust-lang-nursery/futures-rs/issues/1510
// All current test cases are < 100kB
let mut output = Cursor::new(vec![0; 102_400]);
pin_mut!(read);
let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
let mut output = output.into_inner();
output.truncate(len as usize);
output
}
pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
pin_mut!(reader);
block_on(reader.read(output))
}
}
pub mod write {
use crate::utils::{block_on, Pin, TrackClosed};
use futures::io::{AsyncWrite, AsyncWriteExt as _};
use futures_test::io::AsyncWriteTestExt as _;
pub fn to_vec(
input: &[Vec<u8>],
create_writer: impl for<'a> FnOnce(
&'a mut (dyn AsyncWrite + Unpin),
) -> Pin<Box<dyn AsyncWrite + 'a>>,
limit: usize,
) -> Vec<u8> {
let mut output = Vec::new();
{
let mut test_writer = TrackClosed::new(
(&mut output)
.limited_write(limit)
.interleave_pending_write(),
);
{
let mut writer = create_writer(&mut test_writer);
for chunk in input {
block_on(writer.write_all(chunk)).unwrap();
block_on(writer.flush()).unwrap();
}
block_on(writer.close()).unwrap();
}
assert!(test_writer.is_closed());
}
output
}
}
}
#[cfg(feature = "tokio")]
pub mod tokio {
pub mod bufread {
use crate::utils::{InputStream, TrackEof};
use bytes::Bytes;
use futures::stream::StreamExt;
pub use tokio::io::AsyncBufRead;
use tokio_util::io::StreamReader;
pub fn from(input: &InputStream) -> impl AsyncBufRead {
// By using the stream here we ensure that each chunk will require a separate
// read/poll_fill_buf call to process to help test reading multiple chunks.
TrackEof::new(StreamReader::new(
input.stream().map(Bytes::from).map(std::io::Result::Ok),
))
}
}
pub mod read {
use crate::utils::{block_on, pin_mut, tokio_ext::copy_buf};
use std::io::Cursor;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
let mut output = Cursor::new(vec![0; 102_400]);
pin_mut!(read);
let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
let mut output = output.into_inner();
output.truncate(len as usize);
output
}
pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
pin_mut!(reader);
block_on(reader.read(output))
}
}
pub mod write {
use crate::utils::{
block_on, tokio_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin,
};
use std::io::Cursor;
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
pub fn to_vec(
input: &[Vec<u8>],
create_writer: impl for<'a> FnOnce(
&'a mut (dyn AsyncWrite + Unpin),
) -> Pin<Box<dyn AsyncWrite + 'a>>,
limit: usize,
) -> Vec<u8> {
let mut output = Cursor::new(Vec::new());
{
let mut test_writer = TrackClosed::new(
(&mut output)
.limited_write(limit)
.interleave_pending_write(),
);
{
let mut writer = create_writer(&mut test_writer);
for chunk in input {
block_on(writer.write_all(chunk)).unwrap();
block_on(writer.flush()).unwrap();
}
block_on(writer.shutdown()).unwrap();
}
assert!(test_writer.is_closed());
}
output.into_inner()
}
}
}
|