Files
packager/rust/src/telemetry.rs

257 lines
8.0 KiB
Rust
Raw Normal View History

2023-08-29 21:34:01 +02:00
use std::fmt;
2023-08-29 21:34:01 +02:00
use std::future::Future;
2023-08-29 21:34:01 +02:00
use std::io;
2023-08-29 21:34:01 +02:00
use std::pin::Pin;
2023-08-29 21:34:01 +02:00
use std::time::Duration;
use axum::Router;
use http::Request;
use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
2023-08-29 21:34:01 +02:00
use tracing::{Span, Subscriber};
2023-08-29 21:34:01 +02:00
2023-08-29 21:34:01 +02:00
use tracing::Instrument;
2023-08-29 21:34:01 +02:00
use tracing_subscriber::{
filter::{LevelFilter, Targets},
2023-08-29 21:34:01 +02:00
fmt::{
format::{Format, Writer},
FmtContext, FormatEvent, FormatFields, Layer,
},
2023-08-29 21:34:01 +02:00
layer::SubscriberExt,
prelude::*,
2023-08-29 21:34:01 +02:00
registry::{LookupSpan, Registry},
2023-08-29 21:34:01 +02:00
};
use uuid::Uuid;
2023-08-29 21:34:01 +02:00
use opentelemetry::{global, runtime::Tokio};
2023-08-29 21:34:01 +02:00
use tracing::Event;
2023-08-29 21:34:01 +02:00
2023-08-29 21:34:01 +02:00
pub enum OpenTelemetryConfig {
Enabled,
Disabled,
2023-08-29 21:34:01 +02:00
}
2023-08-29 21:34:01 +02:00
pub enum TokioConsoleConfig {
Enabled,
Disabled,
}
2023-08-29 21:34:01 +02:00
struct DebugFormat;
impl<S, N> FormatEvent<S, N> for DebugFormat
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &FmtContext<'_, S, N>,
mut writer: Writer<'_>,
event: &Event<'_>,
) -> fmt::Result {
let metadata = event.metadata();
write!(
&mut writer,
2023-08-29 21:34:01 +02:00
"\n{}\t{}:\t",
2023-08-29 21:34:01 +02:00
metadata.level(),
metadata.target()
)?;
if let Some(scope) = ctx.event_scope() {
for span in scope.from_root() {
2023-08-29 21:34:01 +02:00
write!(writer, "span: {:?} {:?}\t", span.metadata(), span.id())?;
2023-08-29 21:34:01 +02:00
}
} else {
write!(writer, "NO SPAN\t")?;
};
ctx.field_format().format_fields(writer.by_ref(), event)?;
writeln!(writer)
}
}
2023-08-29 21:34:01 +02:00
pub async fn init_tracing<Func, T>(
opentelemetry_config: OpenTelemetryConfig,
tokio_console_config: TokioConsoleConfig,
args: crate::cmd::Args,
f: Func,
) -> T
where
Func: FnOnce(crate::cmd::Args) -> Pin<Box<dyn Future<Output = T>>>,
T: std::process::Termination,
{
let mut shutdown_functions: Vec<Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error>>>> =
vec![];
2023-08-29 21:34:01 +02:00
// default is the Full format, there is no way to specify this, but it can be
// overridden via builder methods
let stdout_format = Format::default()
.pretty()
.with_ansi(true)
.with_target(true)
.with_level(true)
.with_file(false);
2023-08-29 21:34:01 +02:00
let stdout_format = DebugFormat;
2023-08-29 21:34:01 +02:00
let stdout_layer = Layer::default()
.event_format(stdout_format)
.with_writer(io::stdout);
let stdout_filter = Targets::new()
2023-08-29 21:34:01 +02:00
.with_default(LevelFilter::WARN)
2023-08-29 21:34:01 +02:00
.with_targets(vec![
2023-08-29 21:34:01 +02:00
(env!("CARGO_PKG_NAME"), LevelFilter::DEBUG),
("runtime", LevelFilter::OFF),
("sqlx", LevelFilter::TRACE),
2023-08-29 21:34:01 +02:00
]);
2023-08-29 21:34:01 +02:00
let stdout_filter = Targets::new()
.with_default(LevelFilter::TRACE)
.with_targets(vec![("runtime", LevelFilter::OFF)])
.with_targets(vec![("tokio", LevelFilter::OFF)])
.with_targets(vec![("hyper", LevelFilter::OFF)]);
2023-08-29 21:34:01 +02:00
let stdout_layer = stdout_layer.with_filter(stdout_filter);
2023-08-29 21:34:01 +02:00
let console_layer = match tokio_console_config {
TokioConsoleConfig::Enabled => Some(console_subscriber::Builder::default().spawn()),
TokioConsoleConfig::Disabled => None,
};
let opentelemetry_layer = match opentelemetry_config {
OpenTelemetryConfig::Enabled => {
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
// Sets up the machinery needed to export data to Jaeger
// There are other OTel crates that provide pipelines for the vendors
// mentioned earlier.
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.with_service_name(env!("CARGO_PKG_NAME"))
.with_max_packet_size(20_000)
.with_auto_split_batch(true)
2023-08-29 21:34:01 +02:00
.install_batch(Tokio)
2023-08-29 21:34:01 +02:00
.unwrap();
let opentelemetry_filter = {
Targets::new()
.with_default(LevelFilter::DEBUG)
.with_targets(vec![
(env!("CARGO_PKG_NAME"), LevelFilter::DEBUG),
("runtime", LevelFilter::OFF),
2023-08-29 21:34:01 +02:00
("tokio", LevelFilter::OFF),
("sqlx", LevelFilter::TRACE),
2023-08-29 21:34:01 +02:00
])
};
2023-08-29 21:34:01 +02:00
let opentelemetry_layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(opentelemetry_filter);
2023-08-29 21:34:01 +02:00
shutdown_functions.push(Box::new(|| {
println!("shutting down otel");
global::shutdown_tracer_provider();
Ok(())
}));
println!("set up otel");
Some(opentelemetry_layer)
}
OpenTelemetryConfig::Disabled => None,
};
2023-08-29 21:34:01 +02:00
let registry = Registry::default()
.with(console_layer)
2023-08-29 21:34:01 +02:00
.with(opentelemetry_layer)
2023-08-29 21:34:01 +02:00
// just an example, you can actuall pass Options here for layers that might be
// set/unset at runtime
.with(Some(stdout_layer))
.with(None::<Layer<_>>);
tracing::subscriber::set_global_default(registry).unwrap();
2023-08-29 21:34:01 +02:00
tracing::debug!("tracing setup finished");
tracing_log::log_tracer::Builder::new().init().unwrap();
2023-08-29 21:34:01 +02:00
let result = f(args)
.instrument(tracing::debug_span!(target: env!("CARGO_PKG_NAME"), env!("CARGO_PKG_NAME")))
.await;
2023-08-29 21:34:01 +02:00
for shutdown_func in shutdown_functions {
shutdown_func().unwrap();
}
result
2023-08-29 21:34:01 +02:00
}
struct Latency(Duration);
impl fmt::Display for Latency {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0.as_micros())
}
}
pub fn init_request_tracing(router: Router) -> Router {
router.layer(
TraceLayer::new_for_http()
.make_span_with(|_request: &Request<_>| {
let request_id = Uuid::new_v4();
tracing::debug_span!(
2023-08-29 21:34:01 +02:00
target: "packager::request",
2023-08-29 21:34:01 +02:00
"request",
%request_id,
)
})
.on_request(|request: &Request<_>, _span: &Span| {
let request_headers = request.headers();
let http_version = request.version();
tracing::debug!(
2023-08-29 21:34:01 +02:00
target: "packager::request",
2023-08-29 21:34:01 +02:00
method = request.method().as_str(),
path = request.uri().path(),
?http_version,
?request_headers,
"request received",
);
})
.on_response(
|response: &axum::response::Response, latency: Duration, _span: &Span| {
let response_headers = response.headers();
let latency = Latency(latency);
tracing::debug!(
2023-08-29 21:34:01 +02:00
target: "packager::request",
2023-08-29 21:34:01 +02:00
%latency,
status = response.status().as_str(),
?response_headers,
"finished processing request",
);
},
)
.on_failure(
|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
let latency = Latency(latency);
match error {
ServerErrorsFailureClass::StatusCode(code) => {
tracing::error!(
2023-08-29 21:34:01 +02:00
target: "packager::request",
2023-08-29 21:34:01 +02:00
%latency,
"request failed with error response {}",
code,
);
}
ServerErrorsFailureClass::Error(message) => {
tracing::error!(
2023-08-29 21:34:01 +02:00
target: "packager::request",
2023-08-29 21:34:01 +02:00
%latency,
"request failed: {}",
message,
);
}
}
},
),
)
}