File: framed_read.rs

package info (click to toggle)
rust-futures-codec 0.4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 208 kB
  • sloc: makefile: 4
file content (87 lines) | stat: -rw-r--r-- 2,359 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use futures::executor;
use futures::stream::StreamExt;
use futures::AsyncRead;
use futures_codec::{Decoder, FramedRead, LinesCodec, BytesMut};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

// Sends two lines at once, then nothing else forever
struct MockBurstySender {
    sent: bool,
}
impl AsyncRead for MockBurstySender {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        const MESSAGES: &'static [u8] = b"one\ntwo\n";
        if !self.sent && buf.len() >= MESSAGES.len() {
            self.sent = true;
            buf[0..MESSAGES.len()].clone_from_slice(MESSAGES);
            Poll::Ready(Ok(MESSAGES.len()))
        } else {
            Poll::Pending
        }
    }
}

#[test]
fn line_read_multi() {
    let io = MockBurstySender { sent: false };
    let mut framed = FramedRead::new(io, LinesCodec {});
    let one = executor::block_on(framed.next()).unwrap().unwrap();
    assert_eq!(one, "one\n");
    let two = executor::block_on(framed.next()).unwrap().unwrap();
    assert_eq!(two, "two\n");
}

struct OneByteAtATime<'a> {
    input: &'a [u8],
}
impl AsyncRead for OneByteAtATime<'_> {
    fn poll_read(
        mut self: Pin<&mut Self>,
        _cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        if self.input.is_empty() {
            Poll::Ready(Ok(0))
        } else {
            buf[0] = self.input[0];
            self.input = &self.input[1..];
            Poll::Ready(Ok(1))
        }
    }
}

/// A decoder that only returns `a` characters from the input.
struct AllTheAs;

impl Decoder for AllTheAs {
    type Item = char;
    type Error = io::Error;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        while !src.is_empty() {
            let buf = src.split_to(1);
            let c = char::from(buf[0]);
            if c == 'a' {
                return Ok(Some(c));
            }
        }
        Ok(None)
    }
}

#[test]
fn read_few_messages() {
    let string: &[u8] = b"aabbbabbbabbbabb";
    let input = OneByteAtATime { input: string };
    let mut framed = FramedRead::new(input, AllTheAs);
    for _ in 0..5 {
        let item = executor::block_on(framed.next()).unwrap().unwrap();
        assert_eq!(item, 'a');
    }
}