This commit is contained in:
2024-04-28 15:52:36 +02:00
parent 97e90d8178
commit 3fe3e5036d
13 changed files with 939 additions and 841 deletions

1467
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -8,11 +8,11 @@ name = "packager"
path = "src/main.rs" path = "src/main.rs"
[features] [features]
jaeger = ["dep:opentelemetry", "dep:tracing-opentelemetry", "dep:opentelemetry-jaeger", "tokio/tracing"] opentelemetry = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:tracing-opentelemetry", "tokio/tracing"]
prometheus = ["dep:axum-prometheus"] prometheus = ["dep:axum-prometheus"]
tokio-console = ["dep:console-subscriber"] tokio-console = ["dep:console-subscriber"]
default = ["jaeger", "prometheus", "tokio-console"] default = ["opentelemetry", "prometheus", "tokio-console"]
[profile.dev] [profile.dev]
opt-level = 0 opt-level = 0
@@ -24,23 +24,22 @@ lto = "off"
version = "0.1" version = "0.1"
[dependencies.opentelemetry] [dependencies.opentelemetry]
version = "0.20" version = "0.22"
optional = true
[dependencies.opentelemetry_sdk]
version = "0.22"
optional = true optional = true
[dependencies.tracing-opentelemetry] [dependencies.tracing-opentelemetry]
version = "0.21" version = "0.23"
optional = true optional = true
[dependencies.tracing-log] [dependencies.tracing-log]
version = "0.1" version = "0.2"
[dependencies.opentelemetry-jaeger]
version = "0.19"
features = ["rt-tokio"]
optional = true
[dependencies.http] [dependencies.http]
version = "0.2" version = "1.1"
[dependencies.log] [dependencies.log]
version = "0.4" version = "0.4"
@@ -50,19 +49,19 @@ version = "4"
features = ["derive"] features = ["derive"]
[dependencies.axum] [dependencies.axum]
version = "0.6" version = "0.7"
features = ["headers", "macros"] features = ["macros"]
[dependencies.tokio] [dependencies.tokio]
version = "1" version = "1"
features = ["macros", "rt-multi-thread"] features = ["macros", "rt-multi-thread"]
[dependencies.console-subscriber] [dependencies.console-subscriber]
version = "0.1" version = "0.2"
optional = true optional = true
[dependencies.hyper] [dependencies.hyper]
version = "0.14" version = "1.3"
features = ["full"] features = ["full"]
[dependencies.tower] [dependencies.tower]
@@ -70,7 +69,7 @@ version = "0.4"
features = ["timeout"] features = ["timeout"]
[dependencies.tower-http] [dependencies.tower-http]
version = "0.4" version = "0.5"
features = ["trace", "request-id"] features = ["trace", "request-id"]
[dependencies.tracing] [dependencies.tracing]
@@ -84,7 +83,7 @@ version = "0.3"
features = ["json", "env-filter"] features = ["json", "env-filter"]
[dependencies.maud] [dependencies.maud]
version = "0.25" version = "0.26"
features = [ features = [
"axum", "axum",
] ]
@@ -123,14 +122,14 @@ features = ["derive"]
version = "0.1" version = "0.1"
[dependencies.axum-prometheus] [dependencies.axum-prometheus]
version = "0.4" version = "0.6"
optional = true optional = true
[dependencies.metrics] [dependencies.metrics]
version = "0.21" version = "0.22"
[dependencies.sha2] [dependencies.sha2]
version = "0.10" version = "0.10"
[dependencies.base64] [dependencies.base64]
version = "0.21" version = "0.22"

View File

@@ -1,9 +1,11 @@
use axum::{extract::State, middleware::Next, response::IntoResponse}; use axum::{
extract::{Request, State},
middleware::Next,
response::IntoResponse,
};
use futures::FutureExt; use futures::FutureExt;
use tracing::Instrument; use tracing::Instrument;
use hyper::Request;
use crate::models::user::User; use crate::models::user::User;
use super::models; use super::models;
@@ -16,11 +18,21 @@ pub enum Config {
} }
#[tracing::instrument(name = "check_auth", skip(state, request, next))] #[tracing::instrument(name = "check_auth", skip(state, request, next))]
pub async fn authorize<B>( pub async fn authorize(
State(state): State<AppState>, State(state): State<AppState>,
mut request: Request<B>, mut request: Request,
next: Next<B>, next: Next,
) -> Result<impl IntoResponse, Error> { ) -> Result<impl IntoResponse, Error> {
// We must not access `request` inside the async block above, otherwise there will be
// errors like the following:
//
// the trait `tower::Service<http::Request<axum::body::Body>>` is not implemented for
// `FromFn<fn(State<AppState>, Request<Body>, Next) -> impl Future<Output =
// Result<impl IntoResponse, Error>> {authorize}, AppState, Route, _>
//
// I am honestly not sure about the reason
let username_header = request.headers().get("x-auth-username");
let user = async { let user = async {
let auth: Result<Result<User, AuthError>, Error> = match state.auth_config { let auth: Result<Result<User, AuthError>, Error> = match state.auth_config {
Config::Disabled { assume_user } => { Config::Disabled { assume_user } => {
@@ -35,7 +47,7 @@ pub async fn authorize<B>(
}; };
Ok(user) Ok(user)
} }
Config::Enabled => match request.headers().get("x-auth-username") { Config::Enabled => match username_header {
None => Ok(Err(AuthError::AuthenticationHeaderMissing)), None => Ok(Err(AuthError::AuthenticationHeaderMissing)),
Some(username) => match username.to_str() { Some(username) => match username.to_str() {
Err(e) => Ok(Err(AuthError::AuthenticationHeaderInvalid { Err(e) => Ok(Err(AuthError::AuthenticationHeaderInvalid {
@@ -54,7 +66,6 @@ pub async fn authorize<B>(
}, },
}, },
}; };
auth auth
} }
.instrument(tracing::debug_span!("authorize")) .instrument(tracing::debug_span!("authorize"))
@@ -72,15 +83,17 @@ pub async fn authorize<B>(
format!("packager_auth_{}_total", { format!("packager_auth_{}_total", {
match auth { match auth {
Ok(_) => "success".to_string(), Ok(_) => "success".to_string(),
Err(ref e) => format!("failure_{}", e.to_prom_metric_name()), Err(ref e) => {
format!("failure_{}", e.to_prom_metric_name())
}
} }
}), }),
1,
&match &auth { &match &auth {
Ok(user) => vec![("username", user.username.clone())], Ok(user) => vec![("username", user.username.clone())],
Err(e) => e.to_prom_labels(), Err(e) => e.to_prom_labels(),
} }
); )
.increment(1);
auth auth
}) })
}) })
@@ -89,5 +102,5 @@ pub async fn authorize<B>(
.await??; .await??;
request.extensions_mut().insert(user); request.extensions_mut().insert(user);
Ok(next.run(request).await) Ok::<http::Response<axum::body::Body>, Error>(next.run(request).await)
} }

View File

@@ -43,7 +43,7 @@ pub struct Args {
#[arg(long)] #[arg(long)]
pub database_url: String, pub database_url: String,
#[cfg(feature = "jaeger")] #[cfg(feature = "opentelemetry")]
#[arg(long, value_enum, default_value_t = BoolArg::False)] #[arg(long, value_enum, default_value_t = BoolArg::False)]
pub enable_opentelemetry: BoolArg, pub enable_opentelemetry: BoolArg,

View File

@@ -134,7 +134,7 @@ pub mod route {
use crate::{models::user::User, AppState}; use crate::{models::user::User, AppState};
use axum::{ use axum::{
body::{BoxBody, HttpBody}, body::Body,
extract::{Path, Query, State}, extract::{Path, Query, State},
http::HeaderMap, http::HeaderMap,
response::Response, response::Response,
@@ -160,7 +160,7 @@ pub mod route {
headers: HeaderMap, headers: HeaderMap,
path: Path<Self::UrlParams>, path: Path<Self::UrlParams>,
form: Form<Self::Form>, form: Form<Self::Form>,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
} }
#[async_trait] #[async_trait]
@@ -176,7 +176,7 @@ pub mod route {
headers: HeaderMap, headers: HeaderMap,
query: Query<Self::QueryParams>, query: Query<Self::QueryParams>,
path: Path<Self::UrlParams>, path: Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
} }
#[async_trait] #[async_trait]
@@ -191,7 +191,7 @@ pub mod route {
state: State<AppState>, state: State<AppState>,
headers: HeaderMap, headers: HeaderMap,
path: Path<Self::UrlParams>, path: Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
async fn save( async fn save(
user: Extension<User>, user: Extension<User>,
@@ -199,14 +199,14 @@ pub mod route {
headers: HeaderMap, headers: HeaderMap,
path: Path<Self::UrlParams>, path: Path<Self::UrlParams>,
form: Form<Self::UpdateForm>, form: Form<Self::UpdateForm>,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
async fn cancel( async fn cancel(
user: Extension<User>, user: Extension<User>,
state: State<AppState>, state: State<AppState>,
headers: HeaderMap, headers: HeaderMap,
path: Path<Self::UrlParams>, path: Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
} }
#[async_trait] #[async_trait]
@@ -222,14 +222,14 @@ pub mod route {
headers: HeaderMap, headers: HeaderMap,
params: Self::UrlParams, params: Self::UrlParams,
value: bool, value: bool,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
async fn set_true( async fn set_true(
Extension(user): Extension<User>, Extension(user): Extension<User>,
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
Path(path): Path<Self::UrlParams>, Path(path): Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
<Self as ToggleFallback>::set(user, state, headers, path, true).await <Self as ToggleFallback>::set(user, state, headers, path, true).await
} }
@@ -238,15 +238,11 @@ pub mod route {
State(state): State<AppState>, State(state): State<AppState>,
headers: HeaderMap, headers: HeaderMap,
Path(path): Path<Self::UrlParams>, Path(path): Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
<Self as ToggleFallback>::set(user, state, headers, path, false).await <Self as ToggleFallback>::set(user, state, headers, path, false).await
} }
fn router<B>() -> axum::Router<AppState, B> fn router() -> axum::Router<AppState>;
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Send,
<B as HttpBody>::Error: std::error::Error + Sync + Send;
} }
#[async_trait] #[async_trait]
@@ -267,13 +263,13 @@ pub mod route {
ctx: &crate::Context, ctx: &crate::Context,
state: AppState, state: AppState,
params: Self::UrlParams, params: Self::UrlParams,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
async fn on( async fn on(
Extension(user): Extension<User>, Extension(user): Extension<User>,
State(state): State<AppState>, State(state): State<AppState>,
Path(path): Path<Self::UrlParams>, Path(path): Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
let (ctx, state, params) = <Self as ToggleHtmx>::set(user, state, path, true).await?; let (ctx, state, params) = <Self as ToggleHtmx>::set(user, state, path, true).await?;
<Self as ToggleHtmx>::response(&ctx, state, params).await <Self as ToggleHtmx>::response(&ctx, state, params).await
} }
@@ -282,25 +278,16 @@ pub mod route {
Extension(user): Extension<User>, Extension(user): Extension<User>,
State(state): State<AppState>, State(state): State<AppState>,
Path(path): Path<Self::UrlParams>, Path(path): Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
let (ctx, state, params) = <Self as ToggleHtmx>::set(user, state, path, false).await?; let (ctx, state, params) = <Self as ToggleHtmx>::set(user, state, path, false).await?;
<Self as ToggleHtmx>::response(&ctx, state, params).await <Self as ToggleHtmx>::response(&ctx, state, params).await
} }
fn router<B>() -> axum::Router<AppState, B> fn router() -> axum::Router<AppState>;
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Send,
<B as HttpBody>::Error: std::error::Error + Sync + Send;
} }
pub trait Toggle: ToggleHtmx + ToggleFallback { pub trait Toggle: ToggleHtmx + ToggleFallback {
fn router<B>() -> axum::Router<AppState, B> fn router() -> axum::Router<AppState> {
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Send,
<B as HttpBody>::Error: std::error::Error + Sync + Send,
{
axum::Router::new() axum::Router::new()
.merge(<Self as ToggleHtmx>::router()) .merge(<Self as ToggleHtmx>::router())
.merge(<Self as ToggleFallback>::router()) .merge(<Self as ToggleFallback>::router())
@@ -318,14 +305,10 @@ pub mod route {
state: State<AppState>, state: State<AppState>,
headers: HeaderMap, headers: HeaderMap,
path: Path<Self::UrlParams>, path: Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error>; ) -> Result<Response<Body>, crate::Error>;
} }
pub trait Router: Create + Delete { pub trait Router: Create + Delete {
fn router<B>() -> axum::Router<AppState, B> fn router() -> axum::Router<AppState>;
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Send,
<B as HttpBody>::Error: std::error::Error + Sync + Send;
} }
} }

View File

@@ -2,7 +2,7 @@ pub mod list;
pub use list::List; pub use list::List;
use axum::{ use axum::{
body::{BoxBody, HttpBody}, body::Body,
extract::{Form, Path, State as StateExtractor}, extract::{Form, Path, State as StateExtractor},
http::HeaderMap, http::HeaderMap,
response::{IntoResponse, Redirect, Response}, response::{IntoResponse, Redirect, Response},
@@ -653,7 +653,7 @@ impl route::Create for Todo {
headers: HeaderMap, headers: HeaderMap,
Path((trip_id,)): Path<Self::UrlParams>, Path((trip_id,)): Path<Self::UrlParams>,
Form(form): Form<Self::Form>, Form(form): Form<Self::Form>,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
let ctx = Context::build(current_user); let ctx = Context::build(current_user);
// method output is not required as we reload the whole trip todos anyway // method output is not required as we reload the whole trip todos anyway
let _todo_item = <Self as crud::Create>::create( let _todo_item = <Self as crud::Create>::create(
@@ -700,7 +700,7 @@ impl route::Delete for Todo {
StateExtractor(state): StateExtractor<AppState>, StateExtractor(state): StateExtractor<AppState>,
_headers: HeaderMap, _headers: HeaderMap,
Path((trip_id, todo_id)): Path<Self::UrlParams>, Path((trip_id, todo_id)): Path<Self::UrlParams>,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
let ctx = Context::build(current_user); let ctx = Context::build(current_user);
let deleted = <Self as crud::Delete>::delete( let deleted = <Self as crud::Delete>::delete(
&ctx, &ctx,
@@ -737,12 +737,7 @@ impl route::Delete for Todo {
} }
impl route::Router for Todo { impl route::Router for Todo {
fn router<B>() -> axum::Router<AppState, B> fn router() -> axum::Router<AppState> {
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Send,
<B as HttpBody>::Error: std::error::Error + Sync + Send,
{
axum::Router::new() axum::Router::new()
.route("/new", axum::routing::post(<Self as route::Create>::create)) .route("/new", axum::routing::post(<Self as route::Create>::create))
.route( .route(
@@ -972,7 +967,7 @@ impl route::ToggleFallback for StateUpdate {
headers: HeaderMap, headers: HeaderMap,
(trip_id, todo_id): (Uuid, Uuid), (trip_id, todo_id): (Uuid, Uuid),
value: bool, value: bool,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
let ctx = Context::build(current_user); let ctx = Context::build(current_user);
<Self as crud::Toggle>::set( <Self as crud::Toggle>::set(
&ctx, &ctx,
@@ -988,12 +983,7 @@ impl route::ToggleFallback for StateUpdate {
Ok(Redirect::to(get_referer(&headers)?).into_response()) Ok(Redirect::to(get_referer(&headers)?).into_response())
} }
fn router<B>() -> axum::Router<AppState, B> fn router() -> axum::Router<AppState> {
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Send,
<B as HttpBody>::Error: std::error::Error + Sync + Send,
{
axum::Router::new() axum::Router::new()
.route(Self::URL_TRUE, post(Self::set_true)) .route(Self::URL_TRUE, post(Self::set_true))
.route(Self::URL_FALSE, post(Self::set_false)) .route(Self::URL_FALSE, post(Self::set_false))
@@ -1023,7 +1013,7 @@ impl route::ToggleHtmx for StateUpdate {
ctx: &Context, ctx: &Context,
state: AppState, state: AppState,
(trip_id, todo_id): Self::UrlParams, (trip_id, todo_id): Self::UrlParams,
) -> Result<Response<BoxBody>, crate::Error> { ) -> Result<Response<Body>, crate::Error> {
let todo_item = Todo::find( let todo_item = Todo::find(
ctx, ctx,
&state.database_pool, &state.database_pool,
@@ -1047,12 +1037,7 @@ impl route::ToggleHtmx for StateUpdate {
.into_response()) .into_response())
} }
fn router<B>() -> axum::Router<AppState, B> fn router() -> axum::Router<AppState> {
where
B: HttpBody + Send + 'static,
<B as HttpBody>::Data: Send,
<B as HttpBody>::Error: std::error::Error + Sync + Send,
{
axum::Router::new() axum::Router::new()
.route(Self::URL_TRUE, post(Self::on)) .route(Self::URL_TRUE, post(Self::on))
.route(Self::URL_FALSE, post(Self::off)) .route(Self::URL_FALSE, post(Self::off))

View File

@@ -6,6 +6,7 @@ use std::str::FromStr;
use packager::{ use packager::{
auth, cli, models, routing, sqlite, telemetry, AppState, ClientState, Error, StartError, auth, cli, models, routing, sqlite, telemetry, AppState, ClientState, Error, StartError,
}; };
use tokio::net::TcpListener;
struct MainResult(Result<(), Error>); struct MainResult(Result<(), Error>);
@@ -41,7 +42,7 @@ async fn main() -> MainResult {
}; };
telemetry::tracing::init( telemetry::tracing::init(
#[cfg(feature = "jaeger")] #[cfg(feature = "opentelemetry")]
if args.enable_opentelemetry.into() { if args.enable_opentelemetry.into() {
telemetry::tracing::OpenTelemetryConfig::Enabled telemetry::tracing::OpenTelemetryConfig::Enabled
} else { } else {
@@ -117,19 +118,19 @@ async fn main() -> MainResult {
tracing::debug!("listening on {}", addr); tracing::debug!("listening on {}", addr);
if let Err(e) = axum::Server::try_bind(&addr) axum::serve(
.map_err(|e| { TcpListener::bind(&addr).await.map_err(|e| {
Error::Start(StartError::BindError { Error::Start(StartError::BindError {
addr, addr,
message: e.to_string(), message: e.to_string(),
}) })
})? })?,
.serve(app.into_make_service()) app,
)
.await .await
{ // Error = Infallible
return Err(<hyper::Error as Into<Error>>::into(e)); .unwrap();
} unreachable!()
Ok(())
}); });
// now we wait for all tasks. none of them are supposed to finish // now we wait for all tasks. none of them are supposed to finish

View File

@@ -325,7 +325,7 @@ impl InventoryItem {
weight: u32, weight: u32,
) -> Result<Uuid, Error> { ) -> Result<Uuid, Error> {
let user_id = ctx.user.id.to_string(); let user_id = ctx.user.id.to_string();
let weight = i64::try_from(weight).unwrap(); let weight = i64::from(weight);
let id_param = id.to_string(); let id_param = id.to_string();
crate::execute_returning_uuid!( crate::execute_returning_uuid!(

View File

@@ -1036,28 +1036,6 @@ impl Trip {
ctx: &Context, ctx: &Context,
pool: &sqlite::Pool, pool: &sqlite::Pool,
) -> Result<(), Error> { ) -> Result<(), Error> {
struct Row {
id: String,
name: String,
active: i32,
}
impl TryFrom<Row> for TripType {
type Error = Error;
fn try_from(row: Row) -> Result<Self, Self::Error> {
Ok(TripType {
id: Uuid::try_parse(&row.id)?,
name: row.name,
active: match row.active {
0 => false,
1 => true,
_ => unreachable!(),
},
})
}
}
let user_id = ctx.user.id.to_string(); let user_id = ctx.user.id.to_string();
let id = self.id.to_string(); let id = self.id.to_string();
let types = crate::query_all!( let types = crate::query_all!(
@@ -1066,7 +1044,7 @@ impl Trip {
component: sqlite::Component::Trips, component: sqlite::Component::Trips,
}, },
pool, pool,
Row, TripTypeRow,
TripType, TripType,
" "
SELECT SELECT
@@ -1406,6 +1384,28 @@ pub struct TripType {
pub active: bool, pub active: bool,
} }
struct TripTypeRow {
id: String,
name: String,
active: i32,
}
impl TryFrom<TripTypeRow> for TripType {
type Error = Error;
fn try_from(row: TripTypeRow) -> Result<Self, Self::Error> {
Ok(TripType {
id: Uuid::try_parse(&row.id)?,
name: row.name,
active: match row.active {
0 => false,
1 => true,
_ => unreachable!(),
},
})
}
}
impl TripsType { impl TripsType {
#[tracing::instrument] #[tracing::instrument]
pub async fn all(ctx: &Context, pool: &sqlite::Pool) -> Result<Vec<Self>, Error> { pub async fn all(ctx: &Context, pool: &sqlite::Pool) -> Result<Vec<Self>, Error> {

View File

@@ -1,7 +1,6 @@
use axum::{ use axum::{
error_handling::HandleErrorLayer, error_handling::HandleErrorLayer,
http::header::HeaderMap, http::{header::HeaderMap, StatusCode},
http::StatusCode,
middleware, middleware,
routing::{get, post}, routing::{get, post},
BoxError, Router, BoxError, Router,

View File

@@ -137,7 +137,7 @@ pub fn sqlx_query(
("query_type", classification.query_type.to_string()), ("query_type", classification.query_type.to_string()),
("query_component", classification.component.to_string()), ("query_component", classification.component.to_string()),
]); ]);
metrics::counter!("packager_database_queries_total", 1, &labels); metrics::counter!("packager_database_queries_total", &labels).increment(1);
} }
// This does not work, as the query*! macros expect a string literal for the query, so // This does not work, as the query*! macros expect a string literal for the query, so

View File

@@ -4,11 +4,10 @@ use axum::routing::get;
use axum::Router; use axum::Router;
use axum_prometheus::{Handle, MakeDefaultHandle, PrometheusMetricLayerBuilder}; use axum_prometheus::{Handle, MakeDefaultHandle, PrometheusMetricLayerBuilder};
use tokio::net::TcpListener;
use crate::{Error, StartError}; use crate::{Error, StartError};
pub struct LabelBool(bool);
/// Serves metrics on the specified `addr`. /// Serves metrics on the specified `addr`.
/// ///
/// You will get two outputs back: Another router, and a task that you have /// You will get two outputs back: Another router, and a task that you have
@@ -25,19 +24,19 @@ pub fn prometheus_server(
let app = Router::new().route("/metrics", get(|| async move { metric_handle.render() })); let app = Router::new().route("/metrics", get(|| async move { metric_handle.render() }));
let task = async move { let task = async move {
if let Err(e) = axum::Server::try_bind(&addr) axum::serve(
.map_err(|e| { TcpListener::bind(addr).await.map_err(|e| {
Error::Start(StartError::BindError { Error::Start(StartError::BindError {
message: e.to_string(),
addr, addr,
message: e.to_string(),
}) })
})? })?,
.serve(app.into_make_service()) app,
)
.await .await
{ // Error = Infallible
return Err(<hyper::Error as Into<Error>>::into(e)); .unwrap();
} unreachable!()
Ok(())
}; };
(router.layer(prometheus_layer), task) (router.layer(prometheus_layer), task)

View File

@@ -22,8 +22,10 @@ use tracing::Instrument;
use uuid::Uuid; use uuid::Uuid;
#[cfg(feature = "jaeger")] #[cfg(feature = "opentelemetry")]
use opentelemetry::{global, runtime::Tokio}; use opentelemetry::{global, trace::TracerProvider as _};
#[cfg(feature = "opentelemetry")]
use opentelemetry_sdk::trace::TracerProvider;
pub enum OpenTelemetryConfig { pub enum OpenTelemetryConfig {
Enabled, Enabled,
@@ -64,17 +66,8 @@ fn get_stdout_layer<
stdout_layer.boxed() stdout_layer.boxed()
} }
trait Forwarder { #[cfg(feature = "opentelemetry")]
type Config; fn get_opentelemetry_layer<
fn build(
config: Self::Config,
shutdown_functions: &mut Vec<ShutdownFunction>,
) -> Option<Box<dyn tracing_subscriber::Layer<dyn tracing::Subscriber>>>;
}
#[cfg(feature = "jaeger")]
fn get_jaeger_layer<
T: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>, T: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
>( >(
config: &OpenTelemetryConfig, config: &OpenTelemetryConfig,
@@ -82,16 +75,20 @@ fn get_jaeger_layer<
) -> Option<impl tracing_subscriber::Layer<T>> { ) -> Option<impl tracing_subscriber::Layer<T>> {
match config { match config {
OpenTelemetryConfig::Enabled => { OpenTelemetryConfig::Enabled => {
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); global::set_text_map_propagator(
// Sets up the machinery needed to export data to Jaeger opentelemetry_sdk::propagation::TraceContextPropagator::new(),
);
// Sets up the machinery needed to export data to an opentelemetry endpoint.
// There are other OTel crates that provide pipelines for the vendors // There are other OTel crates that provide pipelines for the vendors
// mentioned earlier. // mentioned earlier.
let tracer = opentelemetry_jaeger::new_agent_pipeline() let provider = TracerProvider::builder()
.with_service_name(env!("CARGO_PKG_NAME")) // .with_service_name()
.with_max_packet_size(50_000) // .with_max_packet_size(50_000)
.with_auto_split_batch(true) // .with_auto_split_batch(true)
.install_batch(Tokio) // .install_batch(Tokio)
.unwrap(); .build();
let tracer = provider.tracer(env!("CARGO_PKG_NAME"));
let opentelemetry_filter = { let opentelemetry_filter = {
Targets::new() Targets::new()
@@ -122,7 +119,7 @@ fn get_jaeger_layer<
type ShutdownFunction = Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error>>>; type ShutdownFunction = Box<dyn FnOnce() -> Result<(), Box<dyn std::error::Error>>>;
pub async fn init<Func, T>( pub async fn init<Func, T>(
#[cfg(feature = "jaeger")] opentelemetry_config: OpenTelemetryConfig, #[cfg(feature = "opentelemetry")] opentelemetry_config: OpenTelemetryConfig,
#[cfg(feature = "tokio-console")] tokio_console_config: TokioConsoleConfig, #[cfg(feature = "tokio-console")] tokio_console_config: TokioConsoleConfig,
args: crate::cli::Args, args: crate::cli::Args,
f: Func, f: Func,
@@ -131,12 +128,12 @@ where
Func: FnOnce(crate::cli::Args) -> Pin<Box<dyn Future<Output = T>>>, Func: FnOnce(crate::cli::Args) -> Pin<Box<dyn Future<Output = T>>>,
T: std::process::Termination, T: std::process::Termination,
{ {
// mut is dependent on features (it's only required when jaeger is set), so // mut is dependent on features (it's only required when opentelemetry is set), so
// let's just disable the lint // let's just disable the lint
#[cfg(feature = "jaeger")] #[cfg(feature = "opentelemetry")]
let mut shutdown_functions: Vec<ShutdownFunction> = vec![]; let mut shutdown_functions: Vec<ShutdownFunction> = vec![];
#[cfg(not(feature = "jaeger"))] #[cfg(not(feature = "opentelemetry"))]
let shutdown_functions: Vec<ShutdownFunction> = vec![]; let shutdown_functions: Vec<ShutdownFunction> = vec![];
#[cfg(feature = "tokio-console")] #[cfg(feature = "tokio-console")]
@@ -147,16 +144,17 @@ where
let stdout_layer = get_stdout_layer(); let stdout_layer = get_stdout_layer();
#[cfg(feature = "jaeger")] #[cfg(feature = "opentelemetry")]
let jaeger_layer = get_jaeger_layer(&opentelemetry_config, &mut shutdown_functions); let opentelemetry_layer =
get_opentelemetry_layer(&opentelemetry_config, &mut shutdown_functions);
let registry = Registry::default(); let registry = Registry::default();
#[cfg(feature = "tokio-console")] #[cfg(feature = "tokio-console")]
let registry = registry.with(console_layer); let registry = registry.with(console_layer);
#[cfg(feature = "jaeger")] #[cfg(feature = "opentelemetry")]
let registry = registry.with(jaeger_layer); let registry = registry.with(opentelemetry_layer);
// just an example, you can actuall pass Options here for layers that might be // just an example, you can actuall pass Options here for layers that might be
// set/unset at runtime // set/unset at runtime