File: support.rs

package info (click to toggle)
rust-tower 0.5.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 988 kB
  • sloc: makefile: 2
file content (112 lines) | stat: -rw-r--r-- 2,724 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#![allow(dead_code)]

use futures::future;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tower::Service;

pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
    let subscriber = tracing_subscriber::fmt()
        .with_test_writer()
        .with_max_level(tracing::Level::TRACE)
        .with_thread_names(true)
        .finish();
    tracing::subscriber::set_default(subscriber)
}

pin_project_lite::pin_project! {
    #[derive(Clone, Debug)]
    pub struct IntoStream<S> {
        #[pin]
        inner: S
    }
}

impl<S> IntoStream<S> {
    pub fn new(inner: S) -> Self {
        Self { inner }
    }
}

impl<I> Stream for IntoStream<mpsc::Receiver<I>> {
    type Item = I;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project().inner.poll_recv(cx)
    }
}

impl<I> Stream for IntoStream<mpsc::UnboundedReceiver<I>> {
    type Item = I;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project().inner.poll_recv(cx)
    }
}

#[derive(Clone, Debug)]
pub struct AssertSpanSvc {
    span: tracing::Span,
    polled: bool,
}

pub struct AssertSpanError(String);

impl fmt::Debug for AssertSpanError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Display::fmt(&self.0, f)
    }
}

impl fmt::Display for AssertSpanError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Display::fmt(&self.0, f)
    }
}

impl std::error::Error for AssertSpanError {}

impl AssertSpanSvc {
    pub fn new(span: tracing::Span) -> Self {
        Self {
            span,
            polled: false,
        }
    }

    fn check(&self, func: &str) -> Result<(), AssertSpanError> {
        let current_span = tracing::Span::current();
        tracing::debug!(?current_span, ?self.span, %func);
        if current_span == self.span {
            return Ok(());
        }

        Err(AssertSpanError(format!(
            "{} called outside expected span\n expected: {:?}\n  current: {:?}",
            func, self.span, current_span
        )))
    }
}

impl Service<()> for AssertSpanSvc {
    type Response = ();
    type Error = AssertSpanError;
    type Future = future::Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        if self.polled {
            return Poll::Ready(self.check("poll_ready"));
        }

        cx.waker().wake_by_ref();
        self.polled = true;
        Poll::Pending
    }

    fn call(&mut self, _: ()) -> Self::Future {
        future::ready(self.check("call"))
    }
}