scuffle_http/backend/h3/
body.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use bytes::{Buf, Bytes};
6use h3::server::RequestStream;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9enum State {
10    Data(Option<u64>),
11    Trailers,
12    Done,
13}
14
15/// An incoming HTTP/3 body.
16///
17/// Implements [`http_body::Body`].
18pub struct QuicIncomingBody<S> {
19    stream: RequestStream<S, Bytes>,
20    state: State,
21}
22
23impl<S> QuicIncomingBody<S> {
24    /// Create a new incoming HTTP/3 body.
25    pub fn new(stream: RequestStream<S, Bytes>, size_hint: Option<u64>) -> Self {
26        Self {
27            stream,
28            state: State::Data(size_hint),
29        }
30    }
31}
32
33impl<S: h3::quic::RecvStream> http_body::Body for QuicIncomingBody<S> {
34    type Data = Bytes;
35    type Error = h3::Error;
36
37    fn poll_frame(
38        mut self: Pin<&mut Self>,
39        cx: &mut Context<'_>,
40    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
41        let QuicIncomingBody { stream, state } = self.as_mut().get_mut();
42
43        if *state == State::Done {
44            return Poll::Ready(None);
45        }
46
47        if let State::Data(remaining) = state {
48            match stream.poll_recv_data(cx) {
49                Poll::Ready(Ok(Some(mut buf))) => {
50                    let buf_size = buf.remaining() as u64;
51
52                    if let Some(remaining) = remaining {
53                        if buf_size > *remaining {
54                            *state = State::Done;
55                            return Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into())));
56                        }
57
58                        *remaining -= buf_size;
59                    }
60
61                    return Poll::Ready(Some(Ok(http_body::Frame::data(buf.copy_to_bytes(buf_size as usize)))));
62                }
63                Poll::Ready(Ok(None)) => {
64                    *state = State::Trailers;
65                }
66                Poll::Ready(Err(err)) => {
67                    *state = State::Done;
68                    return Poll::Ready(Some(Err(err)));
69                }
70                Poll::Pending => {
71                    return Poll::Pending;
72                }
73            }
74        }
75
76        // We poll the recv data again even though we already got the None
77        // because we want to make sure there is not a frame after the trailers
78        // This is a workaround because h3 does not allow us to poll the trailer
79        // directly, so we need to make sure the future recv_trailers is going to be
80        // ready after a single poll We avoid pinning to the heap.
81        let resp = match stream.poll_recv_data(cx) {
82            Poll::Ready(Ok(None)) => match std::pin::pin!(stream.recv_trailers()).poll(cx) {
83                Poll::Ready(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers)))),
84                // We will only poll the recv_trailers once so if pending is returned we are done.
85                Poll::Pending => {
86                    #[cfg(feature = "tracing")]
87                    tracing::warn!("recv_trailers is pending");
88                    Poll::Ready(None)
89                }
90                Poll::Ready(Ok(None)) => Poll::Ready(None),
91                Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
92            },
93            // We are not expecting any data after the previous poll returned None
94            Poll::Ready(Ok(Some(_))) => Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into()))),
95            Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
96            Poll::Pending => return Poll::Pending,
97        };
98
99        *state = State::Done;
100
101        resp
102    }
103
104    fn size_hint(&self) -> http_body::SizeHint {
105        match self.state {
106            State::Data(Some(remaining)) => http_body::SizeHint::with_exact(remaining),
107            State::Data(None) => http_body::SizeHint::default(),
108            State::Trailers | State::Done => http_body::SizeHint::with_exact(0),
109        }
110    }
111
112    fn is_end_stream(&self) -> bool {
113        match self.state {
114            State::Data(Some(0)) | State::Trailers | State::Done => true,
115            State::Data(_) => false,
116        }
117    }
118}