scuffle_http/backend/hyper/
handler.rs

1use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
2use hyper_util::server::conn::auto;
3use scuffle_context::ContextFutExt;
4use tokio::io::{AsyncRead, AsyncWrite};
5
6use crate::error::HttpError;
7use crate::service::{HttpService, HttpServiceFactory};
8
9/// Helper function used by hyper server to handle incoming connections.
10pub(crate) async fn handle_connection<F, S, I>(
11    ctx: scuffle_context::Context,
12    service: S,
13    io: I,
14    http1: bool,
15    http2: bool,
16) -> Result<(), HttpError<F>>
17where
18    F: HttpServiceFactory<Service = S>,
19    F::Error: std::error::Error,
20    S: HttpService + Clone + Send + 'static,
21    S::Error: std::error::Error + Send + Sync,
22    S::ResBody: Send,
23    <S::ResBody as http_body::Body>::Data: Send,
24    <S::ResBody as http_body::Body>::Error: std::error::Error + Send + Sync,
25    I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
26{
27    let io = TokioIo::new(io);
28
29    let hyper_proxy_service = hyper::service::service_fn(move |req: http::Request<hyper::body::Incoming>| {
30        let mut service = service.clone();
31        async move {
32            let (parts, body) = req.into_parts();
33            let body = crate::body::IncomingBody::from(body);
34            let req = http::Request::from_parts(parts, body);
35            service.call(req).await
36        }
37    });
38
39    let mut builder = auto::Builder::new(TokioExecutor::new());
40
41    if http1 && http2 {
42        #[cfg(feature = "http1")]
43        builder.http1().timer(TokioTimer::new());
44
45        #[cfg(feature = "http2")]
46        builder.http2().timer(TokioTimer::new());
47
48        builder
49            .serve_connection_with_upgrades(io, hyper_proxy_service)
50            .with_context(ctx)
51            .await
52            .transpose()
53            .map_err(HttpError::HyperConnection)?;
54    } else if http1 {
55        #[cfg(not(feature = "http1"))]
56        unreachable!("http1 enabled but http1 feature disabled");
57
58        #[cfg(feature = "http1")]
59        builder
60            .http1_only()
61            .serve_connection_with_upgrades(io, hyper_proxy_service)
62            .with_context(ctx)
63            .await
64            .transpose()
65            .map_err(HttpError::HyperConnection)?;
66    } else if http2 {
67        #[cfg(not(feature = "http2"))]
68        unreachable!("http2 enabled but http2 feature disabled");
69
70        #[cfg(feature = "http2")]
71        builder
72            .http2_only()
73            .serve_connection_with_upgrades(io, hyper_proxy_service)
74            .with_context(ctx)
75            .await
76            .transpose()
77            .map_err(HttpError::HyperConnection)?;
78    } else {
79        #[cfg(feature = "tracing")]
80        tracing::warn!("both http1 and http2 are disabled, closing connection");
81    }
82
83    Ok(())
84}