File: impls.rs

package info (click to toggle)
rust-async-compression 0.4.13-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 928 kB
  • sloc: makefile: 2
file content (153 lines) | stat: -rw-r--r-- 5,358 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
pub mod sync {
    use std::io::Read;

    pub fn to_vec(mut read: impl Read) -> Vec<u8> {
        let mut output = vec![];
        read.read_to_end(&mut output).unwrap();
        output
    }
}

#[cfg(feature = "futures-io")]
pub mod futures {
    pub mod bufread {
        pub use futures::io::AsyncBufRead;

        use crate::utils::{InputStream, TrackEof};
        use futures::stream::{StreamExt as _, TryStreamExt as _};

        pub fn from(input: &InputStream) -> impl AsyncBufRead {
            // By using the stream here we ensure that each chunk will require a separate
            // read/poll_fill_buf call to process to help test reading multiple chunks.
            TrackEof::new(input.stream().map(Ok).into_async_read())
        }
    }

    pub mod read {
        use crate::utils::{block_on, pin_mut};
        use futures::io::{copy_buf, AsyncRead, AsyncReadExt, BufReader, Cursor};

        pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
            // TODO: https://github.com/rust-lang-nursery/futures-rs/issues/1510
            // All current test cases are < 100kB
            let mut output = Cursor::new(vec![0; 102_400]);
            pin_mut!(read);
            let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
            let mut output = output.into_inner();
            output.truncate(len as usize);
            output
        }

        pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
            pin_mut!(reader);
            block_on(reader.read(output))
        }
    }

    pub mod write {
        use crate::utils::{block_on, Pin, TrackClosed};
        use futures::io::{AsyncWrite, AsyncWriteExt as _};
        use futures_test::io::AsyncWriteTestExt as _;

        pub fn to_vec(
            input: &[Vec<u8>],
            create_writer: impl for<'a> FnOnce(
                &'a mut (dyn AsyncWrite + Unpin),
            ) -> Pin<Box<dyn AsyncWrite + 'a>>,
            limit: usize,
        ) -> Vec<u8> {
            let mut output = Vec::new();
            {
                let mut test_writer = TrackClosed::new(
                    (&mut output)
                        .limited_write(limit)
                        .interleave_pending_write(),
                );
                {
                    let mut writer = create_writer(&mut test_writer);
                    for chunk in input {
                        block_on(writer.write_all(chunk)).unwrap();
                        block_on(writer.flush()).unwrap();
                    }
                    block_on(writer.close()).unwrap();
                }
                assert!(test_writer.is_closed());
            }
            output
        }
    }
}

#[cfg(feature = "tokio")]
pub mod tokio {
    pub mod bufread {
        use crate::utils::{InputStream, TrackEof};
        use bytes::Bytes;
        use futures::stream::StreamExt;
        pub use tokio::io::AsyncBufRead;
        use tokio_util::io::StreamReader;

        pub fn from(input: &InputStream) -> impl AsyncBufRead {
            // By using the stream here we ensure that each chunk will require a separate
            // read/poll_fill_buf call to process to help test reading multiple chunks.
            TrackEof::new(StreamReader::new(
                input.stream().map(Bytes::from).map(std::io::Result::Ok),
            ))
        }
    }

    pub mod read {
        use crate::utils::{block_on, pin_mut, tokio_ext::copy_buf};
        use std::io::Cursor;
        use tokio::io::{AsyncRead, AsyncReadExt, BufReader};

        pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
            let mut output = Cursor::new(vec![0; 102_400]);
            pin_mut!(read);
            let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
            let mut output = output.into_inner();
            output.truncate(len as usize);
            output
        }

        pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
            pin_mut!(reader);
            block_on(reader.read(output))
        }
    }

    pub mod write {
        use crate::utils::{
            block_on, tokio_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin,
        };
        use std::io::Cursor;
        use tokio::io::{AsyncWrite, AsyncWriteExt as _};

        pub fn to_vec(
            input: &[Vec<u8>],
            create_writer: impl for<'a> FnOnce(
                &'a mut (dyn AsyncWrite + Unpin),
            ) -> Pin<Box<dyn AsyncWrite + 'a>>,
            limit: usize,
        ) -> Vec<u8> {
            let mut output = Cursor::new(Vec::new());
            {
                let mut test_writer = TrackClosed::new(
                    (&mut output)
                        .limited_write(limit)
                        .interleave_pending_write(),
                );
                {
                    let mut writer = create_writer(&mut test_writer);
                    for chunk in input {
                        block_on(writer.write_all(chunk)).unwrap();
                        block_on(writer.flush()).unwrap();
                    }
                    block_on(writer.shutdown()).unwrap();
                }
                assert!(test_writer.is_closed());
            }
            output.into_inner()
        }
    }
}