1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
#![allow(dead_code)]
use futures::future;
use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio_stream::Stream;
use tower::Service;
pub(crate) fn trace_init() -> tracing::subscriber::DefaultGuard {
let subscriber = tracing_subscriber::fmt()
.with_test_writer()
.with_max_level(tracing::Level::TRACE)
.with_thread_names(true)
.finish();
tracing::subscriber::set_default(subscriber)
}
pin_project_lite::pin_project! {
#[derive(Clone, Debug)]
pub struct IntoStream<S> {
#[pin]
inner: S
}
}
impl<S> IntoStream<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
}
impl<I> Stream for IntoStream<mpsc::Receiver<I>> {
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_recv(cx)
}
}
impl<I> Stream for IntoStream<mpsc::UnboundedReceiver<I>> {
type Item = I;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_recv(cx)
}
}
#[derive(Clone, Debug)]
pub struct AssertSpanSvc {
span: tracing::Span,
polled: bool,
}
pub struct AssertSpanError(String);
impl fmt::Debug for AssertSpanError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl fmt::Display for AssertSpanError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl std::error::Error for AssertSpanError {}
impl AssertSpanSvc {
pub fn new(span: tracing::Span) -> Self {
Self {
span,
polled: false,
}
}
fn check(&self, func: &str) -> Result<(), AssertSpanError> {
let current_span = tracing::Span::current();
tracing::debug!(?current_span, ?self.span, %func);
if current_span == self.span {
return Ok(());
}
Err(AssertSpanError(format!(
"{} called outside expected span\n expected: {:?}\n current: {:?}",
func, self.span, current_span
)))
}
}
impl Service<()> for AssertSpanSvc {
type Response = ();
type Error = AssertSpanError;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.polled {
return Poll::Ready(self.check("poll_ready"));
}
cx.waker().wake_by_ref();
self.polled = true;
Poll::Pending
}
fn call(&mut self, _: ()) -> Self::Future {
future::ready(self.check("call"))
}
}
|