Description: use crate atomic-waker (not not-in-Debian piper)
 This essentially reverts upstream git commits 75cb224 f6fea36.
Author: Jonas Smedegaard <dr@jones.dk>
Forwarded: not-needed
Last-Update: 2025-02-06
---
This patch header follows DEP-3: http://dep.debian.net/deps/dep3/
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,9 +17,10 @@
 [dependencies]
 async-channel = "2.0.0"
 async-task = "4.4.0"
+atomic-waker = "1.0.0"
+fastrand = "2.0.0"
 futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
 futures-lite = { version = "2.0.0", default-features = false }
-piper = "0.2.0"
 tracing = { version = "0.1.37", default-features = false, optional = true }
 
 [dev-dependencies]
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -76,7 +76,6 @@
 //! ```
 
 #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
-#![forbid(unsafe_code)]
 #![doc(
     html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
 )]
@@ -88,11 +87,13 @@
 use std::collections::VecDeque;
 use std::fmt;
 use std::io::{self, Read, Seek, SeekFrom, Write};
+use std::mem;
 use std::num::NonZeroUsize;
 use std::panic;
 use std::pin::Pin;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::{Condvar, Mutex, MutexGuard};
+use std::slice;
+use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering};
+use std::sync::{Arc, Condvar, Mutex, MutexGuard};
 use std::task::{Context, Poll};
 use std::thread;
 use std::time::Duration;
@@ -102,13 +103,13 @@
 
 use async_channel::{bounded, Receiver};
 use async_task::Runnable;
+use atomic_waker::AtomicWaker;
 use futures_io::{AsyncRead, AsyncSeek, AsyncWrite};
 use futures_lite::{
     future::{self, Future},
     ready,
     stream::Stream,
 };
-use piper::{pipe, Reader, Writer};
 
 #[doc(no_inline)]
 pub use async_task::Task;
@@ -833,7 +834,7 @@
                         // Copy bytes from the I/O handle into the pipe until the pipe is closed or
                         // an error occurs.
                         loop {
-                            match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await {
+                            match future::poll_fn(|cx| writer.fill(cx, &mut io)).await {
                                 Ok(0) => return (Ok(()), io),
                                 Ok(_) => {}
                                 Err(err) => return (Err(err), io),
@@ -848,7 +849,7 @@
                 // If reading, read bytes from the pipe.
                 State::Reading(Some(reader), task) => {
                     // Poll the pipe.
-                    let n = ready!(reader.poll_drain(cx, buf))?;
+                    let n = ready!(reader.drain(cx, buf))?;
 
                     // If the pipe is closed, retrieve the I/O handle back from the blocking task.
                     // This is not really a required step, but it's cleaner to drop the handle on
@@ -901,7 +902,7 @@
                         // Copy bytes from the pipe into the I/O handle until the pipe is closed or an
                         // error occurs. Flush the I/O handle at the end.
                         loop {
-                            match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await {
+                            match future::poll_fn(|cx| reader.drain(cx, &mut io)).await {
                                 Ok(0) => return (io.flush(), io),
                                 Ok(_) => {}
                                 Err(err) => {
@@ -917,7 +918,7 @@
                 }
 
                 // If writing, write more bytes into the pipe.
-                State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf),
+                State::Writing(Some(writer), _) => return writer.fill(cx, buf),
             }
         }
     }
@@ -996,6 +997,365 @@
     }
 }
 
+/// Creates a bounded single-producer single-consumer pipe.
+///
+/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to.
+///
+/// When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts
+/// to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes.
+///
+/// When the receiver is dropped, the pipe is closed and no more bytes and be written into it.
+/// Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes.
+fn pipe(cap: usize) -> (Reader, Writer) {
+    assert!(cap > 0, "capacity must be positive");
+    assert!(cap.checked_mul(2).is_some(), "capacity is too large");
+
+    // Allocate the ring buffer.
+    let mut v = Vec::with_capacity(cap);
+    let buffer = v.as_mut_ptr();
+    mem::forget(v);
+
+    let inner = Arc::new(Pipe {
+        head: AtomicUsize::new(0),
+        tail: AtomicUsize::new(0),
+        reader: AtomicWaker::new(),
+        writer: AtomicWaker::new(),
+        closed: AtomicBool::new(false),
+        buffer,
+        cap,
+    });
+
+    let r = Reader {
+        inner: inner.clone(),
+        head: 0,
+        tail: 0,
+    };
+
+    let w = Writer {
+        inner,
+        head: 0,
+        tail: 0,
+        zeroed_until: 0,
+    };
+
+    (r, w)
+}
+
+/// The reading side of a pipe.
+struct Reader {
+    /// The inner ring buffer.
+    inner: Arc<Pipe>,
+
+    /// The head index, moved by the reader, in the range `0..2*cap`.
+    ///
+    /// This index always matches `inner.head`.
+    head: usize,
+
+    /// The tail index, moved by the writer, in the range `0..2*cap`.
+    ///
+    /// This index is a snapshot of `index.tail` that might become stale at any point.
+    tail: usize,
+}
+
+/// The writing side of a pipe.
+struct Writer {
+    /// The inner ring buffer.
+    inner: Arc<Pipe>,
+
+    /// The head index, moved by the reader, in the range `0..2*cap`.
+    ///
+    /// This index is a snapshot of `index.head` that might become stale at any point.
+    head: usize,
+
+    /// The tail index, moved by the writer, in the range `0..2*cap`.
+    ///
+    /// This index always matches `inner.tail`.
+    tail: usize,
+
+    /// How many bytes at the beginning of the buffer have been zeroed.
+    ///
+    /// The pipe allocates an uninitialized buffer, and we must be careful about passing
+    /// uninitialized data to user code. Zeroing the buffer right after allocation would be too
+    /// expensive, so we zero it in smaller chunks as the writer makes progress.
+    zeroed_until: usize,
+}
+
+unsafe impl Send for Reader {}
+unsafe impl Send for Writer {}
+
+/// The inner ring buffer.
+///
+/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the
+/// `0..cap` range. The distance between head and tail indices is never more than `cap`.
+///
+/// The reason why indices are not in the range `0..cap` is because we need to distinguish between
+/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail`
+/// could mean the pipe is either empty or full, but we don't know which!
+struct Pipe {
+    /// The head index, moved by the reader, in the range `0..2*cap`.
+    head: AtomicUsize,
+
+    /// The tail index, moved by the writer, in the range `0..2*cap`.
+    tail: AtomicUsize,
+
+    /// A waker representing the blocked reader.
+    reader: AtomicWaker,
+
+    /// A waker representing the blocked writer.
+    writer: AtomicWaker,
+
+    /// Set to `true` if the reader or writer was dropped.
+    closed: AtomicBool,
+
+    /// The byte buffer.
+    buffer: *mut u8,
+
+    /// The buffer capacity.
+    cap: usize,
+}
+
+unsafe impl Sync for Pipe {}
+unsafe impl Send for Pipe {}
+
+impl Drop for Pipe {
+    fn drop(&mut self) {
+        // Deallocate the byte buffer.
+        unsafe {
+            Vec::from_raw_parts(self.buffer, 0, self.cap);
+        }
+    }
+}
+
+impl Drop for Reader {
+    fn drop(&mut self) {
+        // Dropping closes the pipe and then wakes the writer.
+        self.inner.closed.store(true, Ordering::SeqCst);
+        self.inner.writer.wake();
+    }
+}
+
+impl Drop for Writer {
+    fn drop(&mut self) {
+        // Dropping closes the pipe and then wakes the reader.
+        self.inner.closed.store(true, Ordering::SeqCst);
+        self.inner.reader.wake();
+    }
+}
+
+impl Reader {
+    /// Reads bytes from this reader and writes into blocking `dest`.
+    fn drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll<io::Result<usize>> {
+        let cap = self.inner.cap;
+
+        // Calculates the distance between two indices.
+        let distance = |a: usize, b: usize| {
+            if a <= b {
+                b - a
+            } else {
+                2 * cap - (a - b)
+            }
+        };
+
+        // If the pipe appears to be empty...
+        if distance(self.head, self.tail) == 0 {
+            // Reload the tail in case it's become stale.
+            self.tail = self.inner.tail.load(Ordering::Acquire);
+
+            // If the pipe is now really empty...
+            if distance(self.head, self.tail) == 0 {
+                // Register the waker.
+                self.inner.reader.register(cx.waker());
+                atomic::fence(Ordering::SeqCst);
+
+                // Reload the tail after registering the waker.
+                self.tail = self.inner.tail.load(Ordering::Acquire);
+
+                // If the pipe is still empty...
+                if distance(self.head, self.tail) == 0 {
+                    // Check whether the pipe is closed or just empty.
+                    if self.inner.closed.load(Ordering::Relaxed) {
+                        return Poll::Ready(Ok(0));
+                    } else {
+                        return Poll::Pending;
+                    }
+                }
+            }
+        }
+
+        // The pipe is not empty so remove the waker.
+        self.inner.reader.take();
+
+        // Yield with some small probability - this improves fairness.
+        ready!(maybe_yield(cx));
+
+        // Given an index in `0..2*cap`, returns the real index in `0..cap`.
+        let real_index = |i: usize| {
+            if i < cap {
+                i
+            } else {
+                i - cap
+            }
+        };
+
+        // Number of bytes read so far.
+        let mut count = 0;
+
+        loop {
+            // Calculate how many bytes to read in this iteration.
+            let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon!
+                .min(distance(self.head, self.tail)) // No more than bytes in the pipe.
+                .min(cap - real_index(self.head)); // Don't go past the buffer boundary.
+
+            // Create a slice of data in the pipe buffer.
+            let pipe_slice =
+                unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) };
+
+            // Copy bytes from the pipe buffer into `dest`.
+            let n = dest.write(pipe_slice)?;
+            count += n;
+
+            // If pipe is empty or `dest` is full, return.
+            if n == 0 {
+                return Poll::Ready(Ok(count));
+            }
+
+            // Move the head forward.
+            if self.head + n < 2 * cap {
+                self.head += n;
+            } else {
+                self.head = 0;
+            }
+
+            // Store the current head index.
+            self.inner.head.store(self.head, Ordering::Release);
+
+            // Wake the writer because the pipe is not full.
+            self.inner.writer.wake();
+        }
+    }
+}
+
+impl Writer {
+    /// Reads bytes from blocking `src` and writes into this writer.
+    fn fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll<io::Result<usize>> {
+        // Just a quick check if the pipe is closed, which is why a relaxed load is okay.
+        if self.inner.closed.load(Ordering::Relaxed) {
+            return Poll::Ready(Ok(0));
+        }
+
+        // Calculates the distance between two indices.
+        let cap = self.inner.cap;
+        let distance = |a: usize, b: usize| {
+            if a <= b {
+                b - a
+            } else {
+                2 * cap - (a - b)
+            }
+        };
+
+        // If the pipe appears to be full...
+        if distance(self.head, self.tail) == cap {
+            // Reload the head in case it's become stale.
+            self.head = self.inner.head.load(Ordering::Acquire);
+
+            // If the pipe is now really empty...
+            if distance(self.head, self.tail) == cap {
+                // Register the waker.
+                self.inner.writer.register(cx.waker());
+                atomic::fence(Ordering::SeqCst);
+
+                // Reload the head after registering the waker.
+                self.head = self.inner.head.load(Ordering::Acquire);
+
+                // If the pipe is still full...
+                if distance(self.head, self.tail) == cap {
+                    // Check whether the pipe is closed or just full.
+                    if self.inner.closed.load(Ordering::Relaxed) {
+                        return Poll::Ready(Ok(0));
+                    } else {
+                        return Poll::Pending;
+                    }
+                }
+            }
+        }
+
+        // The pipe is not full so remove the waker.
+        self.inner.writer.take();
+
+        // Yield with some small probability - this improves fairness.
+        ready!(maybe_yield(cx));
+
+        // Given an index in `0..2*cap`, returns the real index in `0..cap`.
+        let real_index = |i: usize| {
+            if i < cap {
+                i
+            } else {
+                i - cap
+            }
+        };
+
+        // Number of bytes written so far.
+        let mut count = 0;
+
+        loop {
+            // Calculate how many bytes to write in this iteration.
+            let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon!
+                .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting.
+                .min(cap - distance(self.head, self.tail)) // No more than space in the pipe.
+                .min(cap - real_index(self.tail)); // Don't go past the buffer boundary.
+
+            // Create a slice of available space in the pipe buffer.
+            let pipe_slice_mut = unsafe {
+                let from = real_index(self.tail);
+                let to = from + n;
+
+                // Make sure all bytes in the slice are initialized.
+                if self.zeroed_until < to {
+                    self.inner
+                        .buffer
+                        .add(self.zeroed_until)
+                        .write_bytes(0u8, to - self.zeroed_until);
+                    self.zeroed_until = to;
+                }
+
+                slice::from_raw_parts_mut(self.inner.buffer.add(from), n)
+            };
+
+            // Copy bytes from `src` into the piper buffer.
+            let n = src.read(pipe_slice_mut)?;
+            count += n;
+
+            // If the pipe is full or closed, or `src` is empty, return.
+            if n == 0 || self.inner.closed.load(Ordering::Relaxed) {
+                return Poll::Ready(Ok(count));
+            }
+
+            // Move the tail forward.
+            if self.tail + n < 2 * cap {
+                self.tail += n;
+            } else {
+                self.tail = 0;
+            }
+
+            // Store the current tail index.
+            self.inner.tail.store(self.tail, Ordering::Release);
+
+            // Wake the reader because the pipe is not empty.
+            self.inner.reader.wake();
+        }
+    }
+}
+
+/// Yield with some small probability.
+fn maybe_yield(cx: &mut Context<'_>) -> Poll<()> {
+    if fastrand::usize(..100) == 0 {
+        cx.waker().wake_by_ref();
+        Poll::Pending
+    } else {
+        Poll::Ready(())
+    }
+}
+
 #[cfg(all(test, not(target_family = "wasm")))]
 mod tests {
     use super::*;
