Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming responses (from an AsyncWrite) #570

Open
Nerixyz opened this issue Aug 18, 2022 · 3 comments
Open

Streaming responses (from an AsyncWrite) #570

Nerixyz opened this issue Aug 18, 2022 · 3 comments
Labels
good first issue Good for newcomers new-example Request for a new example

Comments

@Nerixyz
Copy link

Nerixyz commented Aug 18, 2022

I think these might be two issues in one:

  1. There's no example (except maybe the SSE one) that has streaming responses. Which is probably because it's hard to come up with a good example for streaming responses.
  2. I haven't found good documentation on how to "pipe" an AsyncWrite into a Stream<Item=Bytes(Mut)> (maybe something for actix-web-lab).

Note: Also see discussion on actix-web Discord: https://discord.com/channels/771444961383153695/771447545154371646/1009110232473014383

The proposed example is one that streams files from a directory (files) as a zip file, i.e. it dynamically creates the zip file.

For this I'm using async_zip which exposes an API that requires the user to pass in an AsyncWrite (ZipFileWriter::new).
To "pipe" the AsyncWrite to a Stream, I'm using a DuplexStream and the BytesCodec.

main.rs
use actix_web::{get, http, App, HttpResponse, HttpServer, Responder};
use async_zip::write::{EntryOptions, ZipFileWriter};
use futures::stream::TryStreamExt;
use std::io;
use tokio::io::AsyncWrite;
use tokio_util::codec;

#[get("/")]
async fn index() -> impl Responder {
    let (to_write, to_read) = tokio::io::duplex(2048);
    tokio::spawn(async move {
        let mut zipper = async_zip::write::ZipFileWriter::new(to_write);
        if let Err(e) = read_dir(&mut zipper).await {
            // TODO: do something
            eprintln!("Failed to write files from directory to zip: {e}")
        }
        if let Err(e) = zipper.close().await {
            // TODO: do something
            eprintln!("Failed to close zipper: {e}")
        }
    });

    let stream = codec::FramedRead::new(to_read, codec::BytesCodec::new()).map_ok(|b| b.freeze());
    HttpResponse::Ok()
        .append_header((
            http::header::CONTENT_DISPOSITION,
            r#"attachment; filename="folder.zip""#,
        ))
        // not sure if this is really necessary,
        // but we're already sending compressed data,
        // so make sure other middleware won't compress this again
        .append_header((http::header::CONTENT_ENCODING, "identity"))
        .streaming(stream)
}

async fn read_dir<W>(zipper: &mut ZipFileWriter<W>) -> Result<(), io::Error>
where
    W: AsyncWrite + Unpin,
{
    let mut dir = tokio::fs::read_dir("files").await?;
    while let Ok(Some(entry)) = dir.next_entry().await {
        if !entry.metadata().await.map(|m| m.is_file()).unwrap_or(false) {
            continue;
        }
        let mut file = match tokio::fs::OpenOptions::new()
            .read(true)
            .open(entry.path())
            .await
        {
            Ok(f) => f,
            Err(_) => continue, // we can't read the file
        };
        let filename = match entry.file_name().into_string() {
            Ok(s) => s,
            Err(_) => continue, // the file has a non UTF-8 name
        };

        let mut entry = zipper
            .write_entry_stream(EntryOptions::new(filename, async_zip::Compression::Deflate))
            .await
            .map_err(zip_to_io_err)?;
        tokio::io::copy(&mut file, &mut entry).await?;
        entry.close().await.map_err(zip_to_io_err)?;
    }
    Ok(())
}

fn zip_to_io_err(e: async_zip::error::ZipError) -> io::Error {
    io::Error::new(io::ErrorKind::Other, e)
}

#[actix_web::main]
async fn main() -> io::Result<()> {
    HttpServer::new(move || App::new().service(index))
        .bind(("127.0.0.1", 8080))?
        .run()
        .await
}

As I explained on Discord, using a DuplexStream is probably overkill, since it's supposed to be used bi-directional (see example in tokio docs), so I tried to extract the internal Pipe used by the tokio implementation and made a pipe specifically for (buffered) piping of AsyncWrite to a Stream<BytesMut>. I'm not sure if this should be included in actix-web-lab as a utility when dealing with AsyncWrite (or maybe in some other crate?).

async_pipe.rs
use std::{sync::{Arc, Mutex, MutexGuard}, task::{self,Waker, Poll}, pin::Pin};

use bytes::{BytesMut, Buf};
use futures::Stream;
use tokio::io::AsyncWrite;

/// The `AsyncWrite` half of an [`async_pipe`]
pub struct AsyncPipeWriter(Arc<Mutex<Pipe>>);
/// The `Stream` half of an [`async_pipe`]
pub struct AsyncPipeReader(Arc<Mutex<Pipe>>);

/// Creates buffered pipe that pipes writes from an `AsyncWrite` to a `Stream<Item=BytesMut>`.
/// 
/// `max_buf_size` is the maximum amount of bytes that can be written to the pipe's internal buffer
/// before further writes return `Poll::Pending`.
pub fn async_pipe(max_buf_size: usize) -> (AsyncPipeWriter, AsyncPipeReader) {
    let pipe = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
    (AsyncPipeWriter(pipe.clone()), AsyncPipeReader(pipe))
}

/// A unidirectional IO over a piece of memory.
///
/// Data can be written to the pipe, and reading will return that data.
/// 
/// [tokio's](https://github.com/tokio-rs/tokio/blob/de81985762a242c77361a6ab9de198372ca85987/tokio/src/io/util/mem.rs#L54-L76)
/// internal representation of a pipe for a `tokio::io::DuplexStream`.
#[derive(Debug)]
struct Pipe {
    /// The buffer storing the bytes written, also read from.
    ///
    /// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
    /// functionality already.
    buffer: BytesMut,
    /// Determines if the write side has been closed.
    is_closed: bool,
    /// The maximum amount of bytes that can be written before returning
    /// `Poll::Pending`.
    max_buf_size: usize,
    /// If the `read` side has been polled and is pending, this is the waker
    /// for that parked task.
    read_waker: Option<Waker>,
    /// If the `write` side has filled the `max_buf_size` and returned
    /// `Poll::Pending`, this is the waker for that parked task.
    write_waker: Option<Waker>,
}

impl Pipe {
    fn new(max_buf_size: usize) -> Self {
        Pipe {
            buffer: BytesMut::new(),
            is_closed: false,
            max_buf_size,
            read_waker: None,
            write_waker: None,
        }
    }

    fn close_write(&mut self) {
        self.is_closed = true;
        // needs to notify any readers that no more data will come
        if let Some(waker) = self.read_waker.take() {
            waker.wake();
        }
    }

    fn close_read(&mut self) {
        self.is_closed = true;
        // needs to notify any writers that they have to abort
        if let Some(waker) = self.write_waker.take() {
            waker.wake();
        }
    }

    fn poll_read_internal(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>
    ) -> Poll<Option<BytesMut>> {
        if self.buffer.has_remaining() {
            let bytes = std::mem::take(&mut self.buffer);
            if let Some(waker) = self.write_waker.take() {
                waker.wake();
            }
            Poll::Ready(Some(bytes))
        } else if self.is_closed {
            Poll::Ready(None)
        } else {
            self.read_waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }

    fn poll_write_internal(
        mut self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        if self.is_closed {
            return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into()));
        }
        let avail = self.max_buf_size - self.buffer.len();
        if avail == 0 {
            self.write_waker = Some(cx.waker().clone());
            return Poll::Pending;
        }

        let len = buf.len().min(avail);
        self.buffer.extend_from_slice(&buf[..len]);
        if let Some(waker) = self.read_waker.take() {
            waker.wake();
        }
        Poll::Ready(Ok(len))
    }
}

impl AsyncWrite for Pipe {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        self.poll_write_internal(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<std::io::Result<()>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(
        mut self: Pin<&mut Self>,
        _: &mut task::Context<'_>,
    ) -> Poll<std::io::Result<()>> {
        self.close_write();
        Poll::Ready(Ok(()))
    }
}

impl AsyncWrite for AsyncPipeWriter {
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
        buf: &[u8],
    ) -> Poll<std::io::Result<usize>> {
        Pin::new(&mut *always_lock(&self.0)).poll_write(cx, buf)
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut *always_lock(&self.0)).poll_flush(cx)
    }

    fn poll_shutdown(
        self: Pin<&mut Self>,
        cx: &mut task::Context<'_>,
    ) -> Poll<std::io::Result<()>> {
        Pin::new(&mut *always_lock(&self.0)).poll_shutdown(cx)
    }
}

impl Stream for Pipe {
    type Item = BytesMut;

    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_read_internal(cx)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        let remaining = self.buffer.remaining();
        (remaining, Some(remaining))
    }
}

impl Stream for AsyncPipeReader {
    type Item = BytesMut;

    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut *always_lock(&self.0)).poll_next(cx)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        always_lock(&self.0).size_hint()
    }
}

impl Drop for AsyncPipeWriter {
    fn drop(&mut self) {
        // notify the other side of the closure
        always_lock(&self.0).close_write();
    }
}

impl Drop for AsyncPipeReader {
    fn drop(&mut self) {
        // notify the other side of the closure
        always_lock(&self.0).close_read();
    }
}

#[inline]
fn always_lock<T>(mtx: &Mutex<T>) -> MutexGuard<T> {
    match mtx.lock() {
        Ok(g) => g,
        Err(e) => e.into_inner(),
    }
}
index handler with async_pipe
#[get("/")]
async fn index() -> impl Responder {
    let (to_write, stream) = async_pipe(2048);
    tokio::spawn(async move {
        let mut zipper = async_zip::write::ZipFileWriter::new(to_write);
        if let Err(e) = read_dir(&mut zipper).await {
            // TODO: do something
            eprintln!("Failed to write files from directory to zip: {e}")
        }
        if let Err(e) = zipper.close().await {
            // TODO: do something
            eprintln!("Failed to close zipper: {e}")
        }
    });
    HttpResponse::Ok()
        .append_header((
            http::header::CONTENT_DISPOSITION,
            r#"attachment; filename="folder.zip""#,
        ))
        // not sure if this is really necessary,
        // but we're already sending compressed data,
        // so make sure other middleware won't compress this again
        .append_header((http::header::CONTENT_ENCODING, "identity"))
        .streaming(stream.map(|b| Ok::<_, io::Error>(b.freeze())))
}
cargo.toml
[package]
name = "actix-zippy"
version = "0.1.0"
edition = "2021"

[dependencies]
actix-web = "4.1.0"
async_zip = "0.0.8"
bytes = "1.2.1"
futures = "0.3.23"
tokio = { version = "1.20.1", features = ["io-util", "fs"] }
tokio-stream = "0.1.9"
tokio-util = { version = "0.7.3", features = ["codec"] }
@pashinin
Copy link

pashinin commented Aug 20, 2022

Where does stream.map(...) come from?

44 |         .streaming(stream.map(|b| Ok::<_, io::Error>(b.freeze())))
   |                           ^^^ `AsyncPipeReader` is not an iterator
   |
  ::: async_pipe.rs:9:1
   |
9  | pub struct AsyncPipeReader(Arc<Mutex<Pipe>>);
   | --------------------------------------------- doesn't satisfy `AsyncPipeReader: Iterator`

@Nerixyz
Copy link
Author

Nerixyz commented Aug 20, 2022

Where does stream.map(...) come from?

It's from futures::stream::StreamExt.

@robjtede
Copy link
Member

maybe something for actix-web-lab

had a play with it and come up with something similar to the body::channel ideas:

@robjtede robjtede added new-example Request for a new example good first issue Good for newcomers labels Sep 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers new-example Request for a new example
Projects
None yet
Development

No branches or pull requests

3 participants