scuffle_http/backend/h3/
body.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use bytes::{Buf, Bytes};
6use h3::server::RequestStream;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9enum State {
10 Data(Option<u64>),
11 Trailers,
12 Done,
13}
14
15pub struct QuicIncomingBody<S> {
19 stream: RequestStream<S, Bytes>,
20 state: State,
21}
22
23impl<S> QuicIncomingBody<S> {
24 pub fn new(stream: RequestStream<S, Bytes>, size_hint: Option<u64>) -> Self {
26 Self {
27 stream,
28 state: State::Data(size_hint),
29 }
30 }
31}
32
33impl<S: h3::quic::RecvStream> http_body::Body for QuicIncomingBody<S> {
34 type Data = Bytes;
35 type Error = h3::Error;
36
37 fn poll_frame(
38 mut self: Pin<&mut Self>,
39 cx: &mut Context<'_>,
40 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
41 let QuicIncomingBody { stream, state } = self.as_mut().get_mut();
42
43 if *state == State::Done {
44 return Poll::Ready(None);
45 }
46
47 if let State::Data(remaining) = state {
48 match stream.poll_recv_data(cx) {
49 Poll::Ready(Ok(Some(mut buf))) => {
50 let buf_size = buf.remaining() as u64;
51
52 if let Some(remaining) = remaining {
53 if buf_size > *remaining {
54 *state = State::Done;
55 return Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into())));
56 }
57
58 *remaining -= buf_size;
59 }
60
61 return Poll::Ready(Some(Ok(http_body::Frame::data(buf.copy_to_bytes(buf_size as usize)))));
62 }
63 Poll::Ready(Ok(None)) => {
64 *state = State::Trailers;
65 }
66 Poll::Ready(Err(err)) => {
67 *state = State::Done;
68 return Poll::Ready(Some(Err(err)));
69 }
70 Poll::Pending => {
71 return Poll::Pending;
72 }
73 }
74 }
75
76 let resp = match stream.poll_recv_data(cx) {
82 Poll::Ready(Ok(None)) => match std::pin::pin!(stream.recv_trailers()).poll(cx) {
83 Poll::Ready(Ok(Some(trailers))) => Poll::Ready(Some(Ok(http_body::Frame::trailers(trailers)))),
84 Poll::Pending => {
86 #[cfg(feature = "tracing")]
87 tracing::warn!("recv_trailers is pending");
88 Poll::Ready(None)
89 }
90 Poll::Ready(Ok(None)) => Poll::Ready(None),
91 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
92 },
93 Poll::Ready(Ok(Some(_))) => Poll::Ready(Some(Err(h3::error::Code::H3_FRAME_UNEXPECTED.into()))),
95 Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
96 Poll::Pending => return Poll::Pending,
97 };
98
99 *state = State::Done;
100
101 resp
102 }
103
104 fn size_hint(&self) -> http_body::SizeHint {
105 match self.state {
106 State::Data(Some(remaining)) => http_body::SizeHint::with_exact(remaining),
107 State::Data(None) => http_body::SizeHint::default(),
108 State::Trailers | State::Done => http_body::SizeHint::with_exact(0),
109 }
110 }
111
112 fn is_end_stream(&self) -> bool {
113 match self.state {
114 State::Data(Some(0)) | State::Trailers | State::Done => true,
115 State::Data(_) => false,
116 }
117 }
118}