File: async.rs

package info (click to toggle)
rust-combine 4.6.6-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, sid, trixie
  • size: 988 kB
  • sloc: sh: 21; makefile: 2
file content (189 lines) | stat: -rw-r--r-- 6,338 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
#![cfg(feature = "std")]
#![cfg(feature = "tokio")]

use std::{cell::Cell, io::Cursor, rc::Rc, str};

use {futures_03_dep as futures, tokio_dep as tokio};

use {
    bytes::{Buf, BytesMut},
    combine::{
        error::{ParseError, StreamError},
        parser::{
            byte::digit,
            combinator::{any_partial_state, AnyPartialState},
            range::{range, recognize, take},
        },
        skip_many, skip_many1,
        stream::{easy, PartialStream, RangeStream, StreamErrorFor},
        Parser,
    },
    futures::prelude::*,
    partial_io::PartialOp,
    tokio_util::codec::{Decoder, FramedRead},
};

// Workaround partial_io not working with tokio-0.2
#[path = "../tests/support/mod.rs"]
mod support;
use support::*;

pub struct LanguageServerDecoder {
    state: AnyPartialState,
    content_length_parses: Rc<Cell<i32>>,
}

impl Default for LanguageServerDecoder {
    fn default() -> Self {
        LanguageServerDecoder {
            state: Default::default(),
            content_length_parses: Rc::new(Cell::new(0)),
        }
    }
}

/// Parses blocks of data with length headers
///
/// ```
/// Content-Length: 18
///
/// { "some": "data" }
/// ```
// The `content_length_parses` parameter only exists to demonstrate that `content_length` only
// gets parsed once per message
fn decode_parser<'a, Input>(
    content_length_parses: Rc<Cell<i32>>,
) -> impl Parser<Input, Output = Vec<u8>, PartialState = AnyPartialState> + 'a
where
    Input: RangeStream<Token = u8, Range = &'a [u8]> + 'a,
    // Necessary due to rust-lang/rust#24159
    Input::Error: ParseError<Input::Token, Input::Range, Input::Position>,
{
    let content_length = range(&b"Content-Length: "[..])
        .with(recognize(skip_many1(digit())).and_then(|digits: &[u8]| {
            str::from_utf8(digits)
                .unwrap()
                .parse::<usize>()
                // Convert the error from `.parse` into an error combine understands
                .map_err(StreamErrorFor::<Input>::other)
        }))
        .map(move |x| {
            content_length_parses.set(content_length_parses.get() + 1);
            x
        });

    // `any_partial_state` boxes the state which hides the type and lets us store it in
    // `self`
    any_partial_state(
        (
            skip_many(range(&b"\r\n"[..])),
            content_length,
            range(&b"\r\n\r\n"[..]).map(|_| ()),
        )
            .then_partial(|&mut (_, message_length, _)| {
                take(message_length).map(|bytes: &[u8]| bytes.to_owned())
            }),
    )
}

impl Decoder for LanguageServerDecoder {
    type Item = String;
    type Error = Box<dyn std::error::Error + Send + Sync>;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        println!("Decoding `{:?}`", str::from_utf8(src).unwrap_or("NOT UTF8"));

        let (opt, removed_len) = combine::stream::decode(
            decode_parser(self.content_length_parses.clone()),
            // easy::Stream gives us nice error messages
            // (the same error messages that combine has had since its inception)
            // PartialStream lets the parser know that more input should be
            // expected if end of input is unexpectedly reached
            &mut easy::Stream(PartialStream(&src[..])),
            &mut self.state,
        )
        .map_err(|err| {
            // Since err contains references into `src` we must replace these before
            // we can return an error or call `advance` to remove the input we
            // just committed
            let err = err
                .map_range(|r| {
                    str::from_utf8(r)
                        .ok()
                        .map_or_else(|| format!("{:?}", r), |s| s.to_string())
                })
                .map_position(|p| p.translate_position(&src[..]));
            format!("{}\nIn input: `{}`", err, str::from_utf8(src).unwrap())
        })?;

        println!(
            "Accepted {} bytes: `{:?}`",
            removed_len,
            str::from_utf8(&src[..removed_len]).unwrap_or("NOT UTF8")
        );

        // Remove the input we just committed.
        // Ideally this would be done automatically by the call to
        // `stream::decode` but it does unfortunately not work due
        // to lifetime issues (Non lexical lifetimes might fix it!)
        src.advance(removed_len);

        match opt {
            // `None` means we did not have enough input and we require that the
            // caller of `decode` supply more before calling us again
            None => {
                println!("Requesting more input!");
                Ok(None)
            }

            // `Some` means that a message was successfully decoded
            // (and that we are ready to start decoding the next message)
            Some(output) => {
                let value = String::from_utf8(output)?;
                println!("Decoded `{}`", value);
                Ok(Some(value))
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let input = "Content-Length: 6\r\n\
                 \r\n\
                 123456\r\n\
                 Content-Length: 4\r\n\
                 \r\n\
                 true";

    let seq = vec![
        PartialOp::Limited(20),
        PartialOp::Limited(1),
        PartialOp::Limited(2),
        PartialOp::Limited(3),
    ];
    let reader = &mut Cursor::new(input.as_bytes());
    // Using the `partial_io` crate we emulate the partial reads that would happen when reading
    // asynchronously from an io device.
    let partial_reader = PartialAsyncRead::new(reader, seq);

    let decoder = LanguageServerDecoder::default();
    let content_length_parses = decoder.content_length_parses.clone();

    let result = FramedRead::new(partial_reader, decoder).try_collect().await;

    assert!(result.as_ref().is_ok(), "{}", result.unwrap_err());
    let values: Vec<_> = result.unwrap();

    let expected_values = ["123456", "true"];
    assert_eq!(values, expected_values);

    assert_eq!(content_length_parses.get(), expected_values.len() as i32);

    println!("Successfully parsed: `{}`", input);
    println!(
        "Found {} items and never repeated a completed parse!",
        values.len(),
    );
    println!("Result: {:?}", values);
}