File: chunks_timeout.rs

package info (click to toggle)
thunderbird 1%3A143.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 4,703,968 kB
  • sloc: cpp: 7,770,492; javascript: 5,943,842; ansic: 3,918,754; python: 1,418,263; xml: 653,354; asm: 474,045; java: 183,079; sh: 111,238; makefile: 20,410; perl: 14,359; objc: 13,059; yacc: 4,583; pascal: 3,405; lex: 1,720; ruby: 999; exp: 762; sql: 715; awk: 580; php: 436; lisp: 430; sed: 69; csh: 10
file content (84 lines) | stat: -rw-r--r-- 2,617 bytes parent folder | download | duplicates (29)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]

use tokio::time;
use tokio_stream::{self as stream, StreamExt};
use tokio_test::assert_pending;
use tokio_test::task;

use futures::FutureExt;
use std::time::Duration;

#[tokio::test(start_paused = true)]
async fn usage() {
    let iter = vec![1, 2, 3].into_iter();
    let stream0 = stream::iter(iter);

    let iter = vec![4].into_iter();
    let stream1 =
        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));

    let chunk_stream = stream0
        .chain(stream1)
        .chunks_timeout(4, Duration::from_secs(2));

    let mut chunk_stream = task::spawn(chunk_stream);

    assert_pending!(chunk_stream.poll_next());
    time::advance(Duration::from_secs(2)).await;
    assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));

    assert_pending!(chunk_stream.poll_next());
    time::advance(Duration::from_secs(2)).await;
    assert_eq!(chunk_stream.next().await, Some(vec![4]));
}

#[tokio::test(start_paused = true)]
async fn full_chunk_with_timeout() {
    let iter = vec![1, 2].into_iter();
    let stream0 = stream::iter(iter);

    let iter = vec![3].into_iter();
    let stream1 =
        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));

    let iter = vec![4].into_iter();
    let stream2 =
        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));

    let chunk_stream = stream0
        .chain(stream1)
        .chain(stream2)
        .chunks_timeout(3, Duration::from_secs(2));

    let mut chunk_stream = task::spawn(chunk_stream);

    assert_pending!(chunk_stream.poll_next());
    time::advance(Duration::from_secs(2)).await;
    assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));

    assert_pending!(chunk_stream.poll_next());
    time::advance(Duration::from_secs(2)).await;
    assert_eq!(chunk_stream.next().await, Some(vec![4]));
}

#[tokio::test]
#[ignore]
async fn real_time() {
    let iter = vec![1, 2, 3, 4].into_iter();
    let stream0 = stream::iter(iter);

    let iter = vec![5].into_iter();
    let stream1 =
        stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));

    let chunk_stream = stream0
        .chain(stream1)
        .chunks_timeout(3, Duration::from_secs(2));

    let mut chunk_stream = task::spawn(chunk_stream);

    assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
    assert_eq!(chunk_stream.next().await, Some(vec![4]));
    assert_eq!(chunk_stream.next().await, Some(vec![5]));
}