scuffle_bootstrap_telemetry/
lib.rs1#![cfg_attr(feature = "docs", doc = "\n\nSee the [changelog][changelog] for a full release history.")]
7#![cfg_attr(feature = "docs", doc = "## Feature flags")]
8#![cfg_attr(feature = "docs", doc = document_features::document_features!())]
9#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
105#![cfg_attr(docsrs, feature(doc_auto_cfg))]
106#![deny(missing_docs)]
107#![deny(unsafe_code)]
108#![deny(unreachable_pub)]
109
110use anyhow::Context;
111use bytes::Bytes;
112#[cfg(feature = "opentelemetry-logs")]
113pub use opentelemetry_appender_tracing;
114#[cfg(feature = "opentelemetry")]
115pub use opentelemetry_sdk;
116#[cfg(feature = "prometheus")]
117pub use prometheus_client;
118use scuffle_bootstrap::global::Global;
119use scuffle_bootstrap::service::Service;
120#[cfg(feature = "opentelemetry-traces")]
121pub use tracing_opentelemetry;
122
123#[cfg(feature = "opentelemetry")]
124pub mod opentelemetry;
125
126pub struct TelemetrySvc;
174
175pub trait TelemetryConfig: Global {
177 fn enabled(&self) -> bool {
179 true
180 }
181
182 fn bind_address(&self) -> Option<std::net::SocketAddr> {
184 None
185 }
186
187 fn http_server_name(&self) -> &str {
189 "scuffle-bootstrap-telemetry"
190 }
191
192 fn health_check(&self) -> impl std::future::Future<Output = Result<(), anyhow::Error>> + Send {
196 std::future::ready(Ok(()))
197 }
198
199 #[cfg(feature = "prometheus")]
206 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
207 None
208 }
209
210 #[cfg(feature = "opentelemetry")]
217 fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
218 None
219 }
220}
221
222impl<Global: TelemetryConfig> Service<Global> for TelemetrySvc {
223 async fn enabled(&self, global: &std::sync::Arc<Global>) -> anyhow::Result<bool> {
224 Ok(global.enabled())
225 }
226
227 async fn run(self, global: std::sync::Arc<Global>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
228 if let Some(bind_addr) = global.bind_address() {
229 let global = global.clone();
230
231 let service = scuffle_http::service::fn_http_service(move |req| {
232 let global = global.clone();
233 async move {
234 match req.uri().path() {
235 "/health" => health_check(&global, req).await,
236 #[cfg(feature = "prometheus")]
237 "/metrics" => metrics(&global, req).await,
238 #[cfg(all(feature = "pprof", unix))]
239 "/pprof/cpu" => pprof(&global, req).await,
240 #[cfg(feature = "opentelemetry")]
241 "/opentelemetry/flush" => opentelemetry_flush(&global).await,
242 _ => Ok(http::Response::builder()
243 .status(http::StatusCode::NOT_FOUND)
244 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?),
245 }
246 }
247 });
248
249 scuffle_http::HttpServer::builder()
250 .bind(bind_addr)
251 .ctx(ctx)
252 .service_factory(scuffle_http::service::service_clone_factory(service))
253 .build()
254 .run()
255 .await
256 .context("server run")?;
257 } else {
258 ctx.done().await;
259 }
260
261 #[cfg(feature = "opentelemetry")]
262 if let Some(opentelemetry) = global.opentelemetry().cloned() {
263 if opentelemetry.is_enabled() {
264 tokio::task::spawn_blocking(move || opentelemetry.shutdown())
265 .await
266 .context("opentelemetry shutdown spawn")?
267 .context("opentelemetry shutdown")?;
268 }
269 }
270
271 Ok(())
272 }
273}
274
275async fn health_check<G: TelemetryConfig>(
276 global: &std::sync::Arc<G>,
277 _: http::Request<scuffle_http::body::IncomingBody>,
278) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
279 if let Err(err) = global.health_check().await {
280 tracing::error!("health check failed: {err}");
281 Ok(http::Response::builder()
282 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
283 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
284 } else {
285 Ok(http::Response::builder()
286 .status(http::StatusCode::OK)
287 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
288 }
289}
290
291#[cfg(feature = "prometheus")]
292async fn metrics<G: TelemetryConfig>(
293 global: &std::sync::Arc<G>,
294 _: http::Request<scuffle_http::body::IncomingBody>,
295) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
296 if let Some(metrics) = global.prometheus_metrics_registry() {
297 let mut buf = String::new();
298 if prometheus_client::encoding::text::encode(&mut buf, metrics).is_err() {
299 tracing::error!("metrics encode failed");
300 return http::Response::builder()
301 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
302 .body(http_body_util::Full::new("metrics encode failed".to_string().into()));
303 }
304
305 Ok(http::Response::builder()
306 .status(http::StatusCode::OK)
307 .body(http_body_util::Full::new(Bytes::from(buf)))?)
308 } else {
309 Ok(http::Response::builder()
310 .status(http::StatusCode::NOT_FOUND)
311 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
312 }
313}
314
315#[cfg(unix)]
316#[cfg(feature = "pprof")]
317async fn pprof<G: TelemetryConfig>(
318 _: &std::sync::Arc<G>,
319 req: http::Request<scuffle_http::body::IncomingBody>,
320) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
321 let query = req.uri().query();
322 let query = query.map(querystring::querify).into_iter().flatten();
323
324 let mut freq = 100;
325 let mut duration = std::time::Duration::from_secs(5);
326 let mut ignore_list = Vec::new();
327
328 for (key, value) in query {
329 if key == "freq" {
330 freq = match value.parse() {
331 Ok(v) => v,
332 Err(err) => {
333 return http::Response::builder()
334 .status(http::StatusCode::BAD_REQUEST)
335 .body(http_body_util::Full::new(format!("invalid freq: {err:#}").into()));
336 }
337 };
338 } else if key == "duration" {
339 duration = match value.parse() {
340 Ok(v) => std::time::Duration::from_secs(v),
341 Err(err) => {
342 return http::Response::builder()
343 .status(http::StatusCode::BAD_REQUEST)
344 .body(http_body_util::Full::new(format!("invalid duration: {err:#}").into()));
345 }
346 };
347 } else if key == "ignore" {
348 ignore_list.push(value);
349 }
350 }
351
352 let cpu = scuffle_pprof::Cpu::new(freq, &ignore_list);
353
354 match tokio::task::spawn_blocking(move || cpu.capture(duration)).await {
355 Ok(Ok(data)) => Ok(http::Response::builder()
356 .status(http::StatusCode::OK)
357 .body(http_body_util::Full::new(Bytes::from(data)))?),
358 Ok(Err(err)) => {
359 tracing::error!("cpu capture failed: {err:#}");
360 Ok(http::Response::builder()
361 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
362 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
363 }
364 Err(err) => {
365 tracing::error!("cpu capture failed: {err:#}");
366 Ok(http::Response::builder()
367 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
368 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
369 }
370 }
371}
372
373#[cfg(feature = "opentelemetry")]
374async fn opentelemetry_flush<G: TelemetryConfig>(
375 global: &std::sync::Arc<G>,
376) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
377 if let Some(opentelemetry) = global.opentelemetry().cloned() {
378 if opentelemetry.is_enabled() {
379 match tokio::task::spawn_blocking(move || opentelemetry.flush()).await {
380 Ok(Ok(())) => Ok(http::Response::builder()
381 .status(http::StatusCode::OK)
382 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?),
383 Ok(Err(err)) => {
384 tracing::error!("opentelemetry flush failed: {err:#}");
385 Ok(http::Response::builder()
386 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
387 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
388 }
389 Err(err) => {
390 tracing::error!("opentelemetry flush spawn failed: {err:#}");
391 Ok(http::Response::builder()
392 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
393 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
394 }
395 }
396 } else {
397 Ok(http::Response::builder()
398 .status(http::StatusCode::OK)
399 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
400 }
401 } else {
402 Ok(http::Response::builder()
403 .status(http::StatusCode::NOT_FOUND)
404 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
405 }
406}
407
408#[cfg(test)]
409#[cfg_attr(all(test, coverage_nightly), coverage(off))]
410#[cfg(all(
411 feature = "opentelemetry-metrics",
412 feature = "opentelemetry-traces",
413 feature = "opentelemetry-logs"
414))]
415mod tests {
416 use std::net::SocketAddr;
417 use std::sync::Arc;
418
419 #[cfg(unix)]
420 use bytes::Bytes;
421 #[cfg(feature = "opentelemetry-logs")]
422 use opentelemetry_sdk::logs::SdkLoggerProvider;
423 #[cfg(feature = "opentelemetry-metrics")]
424 use opentelemetry_sdk::metrics::SdkMeterProvider;
425 #[cfg(feature = "opentelemetry-traces")]
426 use opentelemetry_sdk::trace::SdkTracerProvider;
427 use scuffle_bootstrap::{GlobalWithoutConfig, Service};
428
429 use crate::{TelemetryConfig, TelemetrySvc};
430
431 async fn request_metrics(addr: SocketAddr) -> reqwest::Result<String> {
432 reqwest::get(format!("http://{addr}/metrics"))
433 .await
434 .unwrap()
435 .error_for_status()?
436 .text()
437 .await
438 }
439
440 async fn request_health(addr: SocketAddr) -> String {
441 reqwest::get(format!("http://{addr}/health"))
442 .await
443 .unwrap()
444 .error_for_status()
445 .expect("health check failed")
446 .text()
447 .await
448 .expect("health check text")
449 }
450
451 #[cfg(unix)]
452 async fn request_pprof(addr: SocketAddr, freq: &str, duration: &str) -> reqwest::Result<Bytes> {
453 reqwest::get(format!("http://{addr}/pprof/cpu?freq={freq}&duration={duration}"))
454 .await
455 .unwrap()
456 .error_for_status()?
457 .bytes()
458 .await
459 }
460
461 async fn flush_opentelemetry(addr: SocketAddr) -> reqwest::Result<reqwest::Response> {
462 reqwest::get(format!("http://{addr}/opentelemetry/flush"))
463 .await
464 .unwrap()
465 .error_for_status()
466 }
467
468 #[cfg(not(valgrind))] #[tokio::test]
470 async fn telemetry_http_server() {
471 struct TestGlobal {
472 bind_addr: SocketAddr,
473 #[cfg(feature = "prometheus")]
474 prometheus: prometheus_client::registry::Registry,
475 open_telemetry: crate::opentelemetry::OpenTelemetry,
476 }
477
478 impl GlobalWithoutConfig for TestGlobal {
479 async fn init() -> anyhow::Result<Arc<Self>> {
480 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
481 let bind_addr = listener.local_addr()?;
482
483 let mut prometheus = prometheus_client::registry::Registry::default();
484
485 let exporter = scuffle_metrics::prometheus::exporter().build();
486 prometheus.register_collector(exporter.collector());
487
488 let metrics = SdkMeterProvider::builder().with_reader(exporter).build();
489 opentelemetry::global::set_meter_provider(metrics.clone());
490
491 let tracer = SdkTracerProvider::default();
492 opentelemetry::global::set_tracer_provider(tracer.clone());
493
494 let logger = SdkLoggerProvider::builder().build();
495
496 let open_telemetry = crate::opentelemetry::OpenTelemetry::new()
497 .with_metrics(metrics)
498 .with_traces(tracer)
499 .with_logs(logger);
500
501 Ok(Arc::new(TestGlobal {
502 bind_addr,
503 prometheus,
504 open_telemetry,
505 }))
506 }
507 }
508
509 impl TelemetryConfig for TestGlobal {
510 fn bind_address(&self) -> Option<std::net::SocketAddr> {
511 Some(self.bind_addr)
512 }
513
514 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
515 Some(&self.prometheus)
516 }
517
518 fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
519 Some(&self.open_telemetry)
520 }
521 }
522
523 #[scuffle_metrics::metrics]
524 mod example {
525 use scuffle_metrics::{CounterU64, MetricEnum};
526
527 #[derive(MetricEnum)]
528 pub enum Kind {
529 Http,
530 Grpc,
531 }
532
533 #[metrics(unit = "requests")]
534 pub fn request(kind: Kind) -> CounterU64;
535 }
536
537 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
538
539 let bind_addr = global.bind_addr;
540
541 assert!(TelemetrySvc.enabled(&global).await.unwrap());
542
543 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
544
545 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
546
547 let health = request_health(bind_addr).await;
548 assert_eq!(health, "ok");
549
550 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
551 assert!(metrics.starts_with("# HELP target Information about the target\n"));
552 assert!(metrics.contains("# TYPE target info\n"));
553 assert!(metrics.contains("service_name=\"unknown_service\""));
554 assert!(metrics.contains("telemetry_sdk_language=\"rust\""));
555 assert!(metrics.contains("telemetry_sdk_name=\"opentelemetry\""));
556 assert!(metrics.ends_with("# EOF\n"));
557
558 example::request(example::Kind::Http).incr();
559
560 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
561
562 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
563 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
564 assert!(metrics.contains("example_request_requests_total{"));
565 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
566 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
567 assert!(metrics.contains("kind=\"Http\""));
568 assert!(metrics.contains("} 1\n"));
569 assert!(metrics.ends_with("# EOF\n"));
570
571 example::request(example::Kind::Http).incr();
572
573 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
574
575 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
576 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
577 assert!(metrics.contains("example_request_requests_total{"));
578 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
579 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
580 assert!(metrics.contains("kind=\"Http\""));
581 assert!(metrics.contains("} 2\n"));
582 assert!(metrics.ends_with("# EOF\n"));
583
584 #[cfg(unix)]
585 {
586 let timer = std::time::Instant::now();
587 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
588 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
589
590 let res = request_pprof(bind_addr, "invalid", "2").await.expect_err("error expected");
591 assert!(res.is_status());
592 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
593
594 let res = request_pprof(bind_addr, "100", "invalid").await.expect_err("error expected");
595 assert!(res.is_status());
596 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
597 }
598
599 assert!(flush_opentelemetry(bind_addr).await.is_ok());
600
601 let res = reqwest::get(format!("http://{bind_addr}/not_found")).await.unwrap();
603 assert_eq!(res.status(), reqwest::StatusCode::NOT_FOUND);
604
605 scuffle_context::Handler::global().shutdown().await;
606
607 task_handle.await.unwrap().unwrap();
608 }
609
610 #[cfg(not(valgrind))] #[tokio::test]
612 async fn empty_telemetry_http_server() {
613 struct TestGlobal {
614 bind_addr: SocketAddr,
615 }
616
617 impl GlobalWithoutConfig for TestGlobal {
618 async fn init() -> anyhow::Result<Arc<Self>> {
619 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
620 let bind_addr = listener.local_addr()?;
621
622 Ok(Arc::new(TestGlobal { bind_addr }))
623 }
624 }
625
626 impl TelemetryConfig for TestGlobal {
627 fn bind_address(&self) -> Option<std::net::SocketAddr> {
628 Some(self.bind_addr)
629 }
630 }
631
632 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
633
634 let bind_addr = global.bind_addr;
635
636 assert!(TelemetrySvc.enabled(&global).await.unwrap());
637
638 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
639 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
640
641 let health = request_health(bind_addr).await;
642 assert_eq!(health, "ok");
643
644 let res = request_metrics(bind_addr).await.expect_err("error expected");
645 assert!(res.is_status());
646 assert_eq!(res.status(), Some(reqwest::StatusCode::NOT_FOUND));
647
648 #[cfg(unix)]
649 {
650 let timer = std::time::Instant::now();
651 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
652 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
653 }
654
655 let err = flush_opentelemetry(bind_addr).await.expect_err("error expected");
656 assert!(err.is_status());
657 assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
658
659 scuffle_context::Handler::global().shutdown().await;
660
661 task_handle.await.unwrap().unwrap();
662 }
663}
664
665#[cfg(feature = "docs")]
667#[scuffle_changelog::changelog]
668pub mod changelog {}