diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 9c73453..f896ae6 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -175,6 +175,29 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-prometheus" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97def327c5481791abb57ac295bfc70f2e1a0727675b7dbf74bd1b27a72b6fd8" +dependencies = [ + "axum", + "axum-core", + "bytes", + "futures", + "futures-core", + "http", + "http-body", + "matchit", + "metrics", + "metrics-exporter-prometheus", + "once_cell", + "pin-project", + "tokio", + "tower", + "tower-http", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -397,6 +420,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae211234986c545741a7dc064309f67ee1e5ad243d0e48335adc0484d960bcc7" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-queue" version = "0.3.8" @@ -712,6 +748,15 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.14.0" @@ -935,6 +980,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "ipnet" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" + [[package]] name = "is-terminal" version = "0.4.9" @@ -1024,6 +1075,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "mach2" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" +dependencies = [ + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1078,6 +1138,70 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + +[[package]] +name = "metrics" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +dependencies = [ + "ahash", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" +dependencies = [ + "base64 0.21.2", + "hyper", + "indexmap 1.9.3", + "ipnet", + "metrics", + "metrics-util", + "quanta", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-macros" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.29", +] + +[[package]] +name = "metrics-util" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown 0.13.1", + "metrics", + "num_cpus", + "quanta", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.17" @@ -1304,6 +1428,7 @@ name = "packager" version = "0.1.0" dependencies = [ "axum", + "axum-prometheus", "clap", "console-subscriber", "futures", @@ -1431,6 +1556,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "portable-atomic" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f32154ba0af3a075eefa1eda8bb414ee928f62303a54ea85b8d6638ff1a6ee9e" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1502,6 +1633,22 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +dependencies = [ + "crossbeam-utils", + "libc", + "mach2", + "once_cell", + "raw-cpuid", + "wasi", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.33" @@ -1541,6 +1688,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1811,6 +1967,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1" + [[package]] name = "slab" version = "0.4.9" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 3816b2e..c0b943d 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -104,3 +104,7 @@ features = ["derive"] [dependencies.serde_variant] version = "0.1" + +[dependencies] +axum-prometheus = "0.4" + diff --git a/rust/prometheus.yml b/rust/prometheus.yml new file mode 100644 index 0000000..b9b836e --- /dev/null +++ b/rust/prometheus.yml @@ -0,0 +1,11 @@ +global: + scrape_interval: 10s + +scrape_configs: + - job_name: "prometheus" + static_configs: + - targets: ['localhost:9090'] + + - job_name: "packager" + static_configs: + - targets: ['host.docker.internal:3001'] diff --git a/rust/src/cmd.rs b/rust/src/cmd.rs index e40298d..8279cf1 100644 --- a/rust/src/cmd.rs +++ b/rust/src/cmd.rs @@ -1,3 +1,4 @@ +use crate::{Error, StartError}; use clap::{Parser, Subcommand, ValueEnum}; #[derive(ValueEnum, Clone, Copy, Debug)] @@ -8,9 +9,26 @@ pub enum BoolArg { impl From for bool { fn from(arg: BoolArg) -> bool { + arg.bool() + } +} + +impl BoolArg { + fn bool(self) -> bool { + match self { + Self::True => true, + Self::False => false, + } + } +} + +// this is required because the required_if* functions match against the +// *raw* value, before parsing is done +impl From for clap::builder::OsStr { + fn from(arg: BoolArg) -> clap::builder::OsStr { match arg { - BoolArg::True => true, - BoolArg::False => false, + BoolArg::True => "true".into(), + BoolArg::False => "false".into(), } } } @@ -27,6 +45,15 @@ pub struct Args { #[arg(long, value_enum, default_value_t = BoolArg::False)] pub enable_tokio_console: BoolArg, + #[arg(long, value_enum, default_value_t = BoolArg::False)] + pub enable_prometheus: BoolArg, + + #[arg(long, value_enum, required_if_eq("enable_prometheus", BoolArg::True))] + pub prometheus_port: Option, + + #[arg(long, value_enum, required_if_eq("enable_prometheus", BoolArg::True))] + pub prometheus_bind: Option, + #[command(subcommand)] pub command: Command, } @@ -67,3 +94,19 @@ pub struct UserCreate { #[arg(long)] pub fullname: String, } + +impl Args { + pub fn get() -> Result { + let args = Args::parse(); + + if !args.enable_prometheus.bool() + && (args.prometheus_port.is_some() || args.prometheus_bind.is_some()) + { + Err(Error::Start(StartError::CallError { + message: "do not set prometheus options when prometheus is not enabled".to_string(), + })) + } else { + Ok(args) + } + } +} diff --git a/rust/src/error.rs b/rust/src/error.rs index e7baa99..b786f16 100644 --- a/rust/src/error.rs +++ b/rust/src/error.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::net::SocketAddr; use crate::models; use crate::view; @@ -49,6 +50,7 @@ pub enum Error { Request(RequestError), Start(StartError), Command(CommandError), + Exec(tokio::task::JoinError), } impl std::error::Error for Error {} @@ -60,6 +62,7 @@ impl fmt::Display for Error { Self::Request(request_error) => write!(f, "Request error: {request_error}"), Self::Start(start_error) => write!(f, "{start_error}"), Self::Command(command_error) => write!(f, "{command_error}"), + Self::Exec(join_error) => write!(f, "{join_error}"), } } } @@ -82,6 +85,22 @@ impl From for Error { } } +impl From for Error { + fn from(value: tokio::task::JoinError) -> Self { + Self::Exec(value) + } +} + +impl From<(String, std::net::AddrParseError)> for Error { + fn from(value: (String, std::net::AddrParseError)) -> Self { + let (input, error) = value; + Self::Start(StartError::AddrParse { + input, + message: error.to_string(), + }) + } +} + impl IntoResponse for Error { fn into_response(self) -> Response { match self { @@ -139,6 +158,10 @@ impl IntoResponse for Error { StatusCode::INTERNAL_SERVER_ERROR, view::ErrorPage::build(&command_error.to_string()), ), + Self::Exec(join_error) => ( + StatusCode::INTERNAL_SERVER_ERROR, + view::ErrorPage::build(&join_error.to_string()), + ), } .into_response() } @@ -146,8 +169,11 @@ impl IntoResponse for Error { #[derive(Debug)] pub enum StartError { + CallError { message: String }, DatabaseInitError { message: String }, DatabaseMigrationError { message: String }, + AddrParse { input: String, message: String }, + BindError { addr: SocketAddr, message: String }, } impl std::error::Error for StartError {} @@ -155,12 +181,21 @@ impl std::error::Error for StartError {} impl fmt::Display for StartError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { + Self::CallError { message } => { + write!(f, "invalid invocation: {message}") + } Self::DatabaseInitError { message } => { write!(f, "database initialization error: {message}") } Self::DatabaseMigrationError { message } => { write!(f, "database migration error: {message}") } + Self::AddrParse { message, input } => { + write!(f, "error parsing \"{input}\": {message}") + } + Self::BindError { message, addr } => { + write!(f, "error binding network interface {addr}: {message}") + } } } } diff --git a/rust/src/main.rs b/rust/src/main.rs index 9888820..3d45270 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -3,9 +3,9 @@ use std::pin::Pin; use std::process::ExitCode; use std::str::FromStr; -use packager::{auth, cmd, models, routing, sqlite, telemetry, AppState, ClientState, Error}; - -use clap::Parser; +use packager::{ + auth, cmd, models, routing, sqlite, telemetry, AppState, ClientState, Error, StartError, +}; struct MainResult(Result<(), Error>); @@ -27,9 +27,19 @@ impl From for MainResult { } } +impl From for MainResult { + fn from(error: tokio::task::JoinError) -> Self { + Self(Err(error.into())) + } +} + #[tokio::main] async fn main() -> MainResult { - let args = cmd::Args::parse(); + let args = match cmd::Args::get() { + Ok(args) => args, + Err(e) => return e.into(), + }; + telemetry::init_tracing( if args.enable_opentelemetry.into() { telemetry::OpenTelemetryConfig::Enabled @@ -72,26 +82,68 @@ async fn main() -> MainResult { let app = routing::router(state); let app = telemetry::init_request_tracing(app); - let addr = SocketAddr::from(( - IpAddr::from_str(&serve_args.bind) - .map_err(|error| { - format!( - "error parsing bind address {}: {}", - &serve_args.bind, error - ) - }) - .unwrap(), - serve_args.port, - )); - tracing::debug!("listening on {}", addr); - if let Err(e) = axum::Server::try_bind(&addr) - .map_err(|error| format!("error binding to {}: {}", addr, error)) - .unwrap() - .serve(app.into_make_service()) + let mut join_set = tokio::task::JoinSet::new(); + + let app = if args.enable_prometheus.into() { + // we `require_if()` prometheus port & bind when `enable_prometheus` is set, so + // this cannot fail + + let bind = args.prometheus_bind.unwrap(); + let port = args.prometheus_port.unwrap(); + + let ip = IpAddr::from_str(&bind); + + let addr = match ip { + Err(e) => return <_ as Into>::into((bind, e)).into(), + Ok(ip) => SocketAddr::from((ip, port)), + }; + + let (app, task) = telemetry::prometheus_server(app, addr); + join_set.spawn(task); + app + } else { + app + }; + + join_set.spawn(async move { + let addr = SocketAddr::from(( + IpAddr::from_str(&serve_args.bind) + .map_err(|e| (serve_args.bind, e))?, + serve_args.port, + )); + + tracing::debug!("listening on {}", addr); + + if let Err(e) = axum::Server::try_bind(&addr) + .map_err(|e| { + Error::Start(StartError::BindError { + addr, + message: e.to_string(), + }) + })? + .serve(app.into_make_service()) + .await + { + return Err(>::into(e)); + } + Ok(()) + }); + + // now we wait for all tasks. none of them are supposed to finish + + // EXPECT: join_set cannot be empty as it will always at least contain the main_handle + let result = join_set + .join_next() .await - { - return >::into(e).into(); - } + .expect("join_set is empty, this is a bug"); + + // EXPECT: We never expect a JoinError, as all threads run infinitely + let result = result.expect("thread panicked"); + + // If we get an Ok(()), something weird happened + let result = result.expect_err("thread ran to completion"); + + return result.into(); } cmd::Command::Admin(admin_command) => match admin_command { cmd::Admin::User(cmd) => match cmd { diff --git a/rust/src/telemetry.rs b/rust/src/telemetry.rs index 8902f6b..773a9da 100644 --- a/rust/src/telemetry.rs +++ b/rust/src/telemetry.rs @@ -4,11 +4,14 @@ use std::io; use std::pin::Pin; use std::time::Duration; +use axum::routing::get; use axum::Router; use http::Request; use tower_http::{classify::ServerErrorsFailureClass, trace::TraceLayer}; use tracing::Span; +use axum_prometheus::{Handle, MakeDefaultHandle, PrometheusMetricLayerBuilder}; + use tracing::Instrument; use tracing_subscriber::{ filter::{LevelFilter, Targets}, @@ -19,6 +22,8 @@ use tracing_subscriber::{ }; use uuid::Uuid; +use crate::{Error, StartError}; + use opentelemetry::{global, runtime::Tokio}; pub enum OpenTelemetryConfig { @@ -227,3 +232,33 @@ pub fn init_request_tracing(router: Router) -> Router { ), ) } + +pub fn prometheus_server( + router: Router, + addr: std::net::SocketAddr, +) -> (Router, impl Future>) { + let (prometheus_layer, metric_handle) = PrometheusMetricLayerBuilder::new() + .with_prefix(env!("CARGO_PKG_NAME")) + .with_metrics_from_fn(|| Handle::make_default_handle()) + .build_pair(); + + let app = Router::new().route("/metrics", get(|| async move { metric_handle.render() })); + + let task = async move { + if let Err(e) = axum::Server::try_bind(&addr) + .map_err(|e| { + Error::Start(StartError::BindError { + message: e.to_string(), + addr, + }) + })? + .serve(app.into_make_service()) + .await + { + return Err(>::into(e)); + } + Ok(()) + }; + + (router.layer(prometheus_layer), task) +} diff --git a/rust/start-prometheus.sh b/rust/start-prometheus.sh new file mode 100755 index 0000000..4d9040d --- /dev/null +++ b/rust/start-prometheus.sh @@ -0,0 +1,5 @@ +#!/usr/bin/env bash + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +docker run --add-host host.docker.internal:host-gateway --rm --name packager-prometheus -p 9090:9090 -v $SCRIPT_DIR/prometheus.yml:/etc/prometheus/prometheus.yml docker.io/prom/prometheus "${@}" diff --git a/rust/vendor/sqlx b/rust/vendor/sqlx index 6115889..1744240 160000 --- a/rust/vendor/sqlx +++ b/rust/vendor/sqlx @@ -1 +1 @@ -Subproject commit 611588900e500c7b634fcfcd8f138f0f5786527a +Subproject commit 17442400f0ff0f71e25e94d3b83cd18bd51e7961