Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8d2a503a89 | |||
| a8def7bf94 | |||
| 9823faceac | |||
| 443d4e98ee | |||
| 22b1be0390 |
@@ -14,7 +14,7 @@ members = ["examples"]
|
|||||||
tokio = { version = "1.*", default-features = false, features = [] }
|
tokio = { version = "1.*", default-features = false, features = [] }
|
||||||
serde = { version = "1.0.*", default-features = false, features = ["std", "derive"] }
|
serde = { version = "1.0.*", default-features = false, features = ["std", "derive"] }
|
||||||
chrono = { version = "0.4.*", default-features = false, features = ["std", "now", "serde"] }
|
chrono = { version = "0.4.*", default-features = false, features = ["std", "now", "serde"] }
|
||||||
serde_json = { version = "1.0.*", default-features = false, features = ["std"] }
|
ciborium = { version = "0.2.*", default-features = false, features = ["std"] }
|
||||||
redis = { version = "0.23.*", default-features = false, features = ["serde", "aio", "tokio-comp"] }
|
redis = { version = "0.23.*", default-features = false, features = ["serde", "aio", "tokio-comp"] }
|
||||||
redlock = { version = "2.0.*", default-features = false }
|
redlock = { version = "2.0.*", default-features = false }
|
||||||
tracing = { version = "0.1.*", default-features = false, features = ["attributes"] }
|
tracing = { version = "0.1.*", default-features = false, features = ["attributes"] }
|
||||||
|
|||||||
68
README.md
Normal file
68
README.md
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
# objcache
|
||||||
|
|
||||||
|
Cache serde-able objects with Redis.
|
||||||
|
|
||||||
|
## Quick overview
|
||||||
|
|
||||||
|
Imagine you have the following:
|
||||||
|
|
||||||
|
1) A struct or enum that implements `Serialize` and `Deserialize`
|
||||||
|
|
||||||
|
```rust
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct Item {
|
||||||
|
value: usize,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
2) An expensive function that creates an instance of that struct or enum
|
||||||
|
|
||||||
|
```rust
|
||||||
|
async fn get_item(v: usize) -> Result<Item, Error> {
|
||||||
|
// your business logic here
|
||||||
|
Ok(Item { value: v })
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
3) A redis connection
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let client = RedisClient::new((IpAddr::V4(Ipv4Addr::LOCALHOST), 6379)).unwrap();
|
||||||
|
```
|
||||||
|
|
||||||
|
This library lets you add a cache to your application by `wrap()`ing your function:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let item = client.wrap(
|
||||||
|
get_item,
|
||||||
|
&RedisCacheArgs {
|
||||||
|
lock_name: b"item_lock",
|
||||||
|
key_name: b"item",
|
||||||
|
expiry: Duration::from_secs(5),
|
||||||
|
},
|
||||||
|
)(100)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
assert_eq!(item.value, 100);
|
||||||
|
```
|
||||||
|
|
||||||
|
That's it! Any call to the wrapped function will reuse the latest response if it is not
|
||||||
|
older than 5 seconds.
|
||||||
|
|
||||||
|
## Not implemented
|
||||||
|
|
||||||
|
* Cache depending on the input parameters
|
||||||
|
* Non-async wrap functions (should be easy to implement with some refactoring)
|
||||||
|
|
||||||
|
## Status
|
||||||
|
|
||||||
|
More of a proof of concept, there may be some big hidden issues.
|
||||||
|
|
||||||
|
## Available methods
|
||||||
|
|
||||||
|
|Method|Use case|
|
||||||
|
|---|---|
|
||||||
|
|`wrap()`|You want to unconditionally cache a response|
|
||||||
|
|`wrap_opt()`|Your function returns an `Option<T>` and you only want to cache `Some` responses|
|
||||||
|
|`wrap_const()`|You want to select whether or not to cache depending on a separate parameter taking `CacheDecision`|
|
||||||
|
|`wrap_on()`|You have a function that takes your `Item` and returns a `CacheDecision`|
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "objcache_examples"
|
name = "objcache_examples"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
rust-version = "1.81"
|
rust-version = "1.81"
|
||||||
|
|
||||||
@@ -12,5 +12,5 @@ path = "src/main.rs"
|
|||||||
tokio = { version = "1.*", default-features = false, features = ["rt-multi-thread", "macros"] }
|
tokio = { version = "1.*", default-features = false, features = ["rt-multi-thread", "macros"] }
|
||||||
tracing = { version = "0.1.40", default-features = false }
|
tracing = { version = "0.1.40", default-features = false }
|
||||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi"] }
|
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi"] }
|
||||||
serde = { version = "1.0.210", default-features = false, features = ["std", "derive"] }
|
serde = { version = "1.0.213", default-features = false, features = ["std", "derive"] }
|
||||||
objcache = { path = "../" }
|
objcache = { path = "../" }
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ use objcache::{CacheError, Client, RedisCacheArgs, RedisClient};
|
|||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
struct Item {
|
struct Item {
|
||||||
val: usize,
|
value: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -32,7 +32,7 @@ impl From<CacheError> for Error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn get_item(v: usize) -> Result<Item, Error> {
|
async fn get_item(v: usize) -> Result<Item, Error> {
|
||||||
Ok(Item { val: v })
|
Ok(Item { value: v })
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -59,7 +59,7 @@ async fn main() -> Result<(), Error> {
|
|||||||
println!("{item:?}");
|
println!("{item:?}");
|
||||||
|
|
||||||
let item = client.wrap(
|
let item = client.wrap(
|
||||||
|v| Box::pin(async move { Ok::<_, Error>(Item { val: v }) }),
|
|v| Box::pin(async move { Ok::<_, Error>(Item { value: v }) }),
|
||||||
&RedisCacheArgs {
|
&RedisCacheArgs {
|
||||||
lock_name: b"item_lock",
|
lock_name: b"item_lock",
|
||||||
key_name: b"item",
|
key_name: b"item",
|
||||||
@@ -68,6 +68,8 @@ async fn main() -> Result<(), Error> {
|
|||||||
)(100)
|
)(100)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
assert_eq!(item.value, 100);
|
||||||
|
|
||||||
println!("{item:?}");
|
println!("{item:?}");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
10
src/error.rs
10
src/error.rs
@@ -14,8 +14,14 @@ impl From<redis::RedisError> for CacheError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<serde_json::Error> for CacheError {
|
impl<T: fmt::Debug> From<ciborium::ser::Error<T>> for CacheError {
|
||||||
fn from(value: serde_json::Error) -> Self {
|
fn from(value: ciborium::ser::Error<T>) -> Self {
|
||||||
|
Self::Serde(value.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: fmt::Debug> From<ciborium::de::Error<T>> for CacheError {
|
||||||
|
fn from(value: ciborium::de::Error<T>) -> Self {
|
||||||
Self::Serde(value.to_string())
|
Self::Serde(value.to_string())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
230
src/lib.rs
230
src/lib.rs
@@ -9,9 +9,33 @@ use tracing::Level;
|
|||||||
mod error;
|
mod error;
|
||||||
pub use error::CacheError;
|
pub use error::CacheError;
|
||||||
|
|
||||||
|
const LOCK_TTL: usize = 60_000; // milliseconds
|
||||||
|
const LOCK_RETRY_TIME: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct CacheItem<Item> {
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
payload: Item,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all, level = Level::TRACE)]
|
||||||
|
async fn query_cache<Item>(
|
||||||
|
conn: &mut redis::aio::MultiplexedConnection,
|
||||||
|
key_name: &[u8],
|
||||||
|
) -> Result<Option<CacheItem<Item>>, CacheError>
|
||||||
|
where
|
||||||
|
Item: DeserializeOwned,
|
||||||
|
{
|
||||||
|
Ok(conn
|
||||||
|
.get::<&[u8], Option<Vec<u8>>>(key_name)
|
||||||
|
.await?
|
||||||
|
.map(|s| ciborium::from_reader(&s[..]))
|
||||||
|
.transpose()?)
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct RedisClient {
|
pub struct RedisClient {
|
||||||
redis: redis::Client,
|
client: redis::Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RedisCacheArgs<'a> {
|
pub struct RedisCacheArgs<'a> {
|
||||||
@@ -20,6 +44,15 @@ pub struct RedisCacheArgs<'a> {
|
|||||||
pub expiry: Duration,
|
pub expiry: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, Clone, Copy)]
|
||||||
|
pub enum CacheDecision {
|
||||||
|
Cache,
|
||||||
|
NoCache,
|
||||||
|
}
|
||||||
|
|
||||||
|
type OutFunc<'f, Args, I, E> =
|
||||||
|
Box<dyn FnOnce(Args) -> Pin<Box<dyn Future<Output = Result<I, E>> + Send + 'f>> + 'f>;
|
||||||
|
|
||||||
pub trait Client<T>
|
pub trait Client<T>
|
||||||
where
|
where
|
||||||
Self: Sized,
|
Self: Sized,
|
||||||
@@ -34,15 +67,87 @@ where
|
|||||||
&'c self,
|
&'c self,
|
||||||
f: Func,
|
f: Func,
|
||||||
cache_args: &'a RedisCacheArgs<'a>,
|
cache_args: &'a RedisCacheArgs<'a>,
|
||||||
) -> Box<dyn FnOnce(Args) -> Pin<Box<dyn Future<Output = Result<Item, E>> + Send + 'f>> + 'f>
|
) -> OutFunc<'f, Args, Item, E>
|
||||||
where
|
where
|
||||||
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
||||||
Args: Send + 'f,
|
Args: Send + 'f,
|
||||||
Item: Send + Serialize + DeserializeOwned,
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
E: From<CacheError>,
|
E: From<CacheError>,
|
||||||
'a: 'f,
|
'a: 'f,
|
||||||
'c: 'f;
|
'c: 'f;
|
||||||
|
|
||||||
|
fn wrap_const<'c, 'f, 'a, Func, Inner, Args, Item, E>(
|
||||||
|
&'c self,
|
||||||
|
f: Func,
|
||||||
|
cache_args: &'a RedisCacheArgs<'a>,
|
||||||
|
do_cache: CacheDecision,
|
||||||
|
) -> OutFunc<'f, Args, Item, E>
|
||||||
|
where
|
||||||
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
|
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
||||||
|
Args: Send + 'f,
|
||||||
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
|
E: From<CacheError>,
|
||||||
|
'a: 'f,
|
||||||
|
'c: 'f;
|
||||||
|
|
||||||
|
fn wrap_opt<'c, 'f, 'a, Func, Inner, Args, Item, E>(
|
||||||
|
&'c self,
|
||||||
|
f: Func,
|
||||||
|
cache_args: &'a RedisCacheArgs<'a>,
|
||||||
|
) -> OutFunc<'f, Args, Option<Item>, E>
|
||||||
|
where
|
||||||
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
|
Inner: Future<Output = Result<Option<Item>, E>> + Send + 'f,
|
||||||
|
Args: Send + 'f,
|
||||||
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
|
E: From<CacheError>,
|
||||||
|
'a: 'f,
|
||||||
|
'c: 'f;
|
||||||
|
|
||||||
|
fn wrap_on<'c, 'f, 'ca, 'fa, 'cf, Func, CacheFunc, Inner, Args, Item, E>(
|
||||||
|
&'c self,
|
||||||
|
f: Func,
|
||||||
|
cache_args: &'ca RedisCacheArgs<'ca>,
|
||||||
|
do_cache: CacheFunc,
|
||||||
|
) -> OutFunc<'f, Args, Item, E>
|
||||||
|
where
|
||||||
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
|
CacheFunc: for<'i> Fn(&'i Item) -> CacheDecision + Send + Sync + 'cf,
|
||||||
|
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
||||||
|
Args: Send + 'fa,
|
||||||
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
|
E: From<CacheError>,
|
||||||
|
'ca: 'f,
|
||||||
|
'c: 'f,
|
||||||
|
'cf: 'f,
|
||||||
|
'fa: 'f;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip_all, level = Level::TRACE)]
|
||||||
|
async fn write_cache<Item>(
|
||||||
|
conn: &mut redis::aio::MultiplexedConnection,
|
||||||
|
key_name: &[u8],
|
||||||
|
payload: &Item,
|
||||||
|
) -> Result<(), CacheError>
|
||||||
|
where
|
||||||
|
Item: Serialize,
|
||||||
|
{
|
||||||
|
let cache_item = CacheItem {
|
||||||
|
timestamp: Utc::now(),
|
||||||
|
payload,
|
||||||
|
};
|
||||||
|
|
||||||
|
let _: () = conn
|
||||||
|
.set(key_name, {
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
ciborium::into_writer(&cache_item, &mut buf)?;
|
||||||
|
buf
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client<redis::Client> for RedisClient {
|
impl Client<redis::Client> for RedisClient {
|
||||||
@@ -50,52 +155,87 @@ impl Client<redis::Client> for RedisClient {
|
|||||||
|
|
||||||
fn new((ip, port): (net::IpAddr, u16)) -> Result<Self, CacheError> {
|
fn new((ip, port): (net::IpAddr, u16)) -> Result<Self, CacheError> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
redis: redis::Client::open((ip.to_string(), port))?,
|
client: redis::Client::open((ip.to_string(), port))?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get(&self) -> &redis::Client {
|
fn get(&self) -> &redis::Client {
|
||||||
&self.redis
|
&self.client
|
||||||
}
|
}
|
||||||
|
|
||||||
fn wrap<'c, 'f, 'a, Func, Inner, Args, Item, E>(
|
fn wrap<'c, 'f, 'a, Func, Inner, Args, Item, E>(
|
||||||
&'c self,
|
&'c self,
|
||||||
f: Func,
|
f: Func,
|
||||||
cache_args: &'a RedisCacheArgs<'a>,
|
cache_args: &'a RedisCacheArgs<'a>,
|
||||||
) -> Box<dyn FnOnce(Args) -> Pin<Box<dyn Future<Output = Result<Item, E>> + Send + 'f>> + 'f>
|
) -> OutFunc<'f, Args, Item, E>
|
||||||
where
|
where
|
||||||
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
||||||
Args: Send + 'f,
|
Args: Send + 'f,
|
||||||
Item: Send + Serialize + DeserializeOwned,
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
E: From<CacheError>,
|
E: From<CacheError>,
|
||||||
'a: 'f,
|
'a: 'f,
|
||||||
'c: 'f,
|
'c: 'f,
|
||||||
{
|
{
|
||||||
const LOCK_TTL: usize = 60_000; // milliseconds
|
self.wrap_const(f, cache_args, CacheDecision::Cache)
|
||||||
const LOCK_RETRY_TIME: Duration = Duration::from_secs(1);
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct CacheItem<Item> {
|
|
||||||
timestamp: DateTime<Utc>,
|
|
||||||
payload: Item,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, level = Level::TRACE)]
|
fn wrap_opt<'c, 'f, 'a, Func, Inner, Args, Item, E>(
|
||||||
async fn query_cache<Item>(
|
&'c self,
|
||||||
conn: &mut redis::aio::MultiplexedConnection,
|
f: Func,
|
||||||
key_name: &[u8],
|
cache_args: &'a RedisCacheArgs<'a>,
|
||||||
) -> Result<Option<CacheItem<Item>>, CacheError>
|
) -> OutFunc<'f, Args, Option<Item>, E>
|
||||||
where
|
where
|
||||||
Item: DeserializeOwned,
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
|
Inner: Future<Output = Result<Option<Item>, E>> + Send + 'f,
|
||||||
|
Args: Send + 'f,
|
||||||
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
|
E: From<CacheError>,
|
||||||
|
'a: 'f,
|
||||||
|
'c: 'f,
|
||||||
{
|
{
|
||||||
Ok(conn
|
self.wrap_on(f, cache_args, |item| {
|
||||||
.get::<&[u8], Option<String>>(key_name)
|
item.as_ref()
|
||||||
.await?
|
.map_or(CacheDecision::NoCache, |_| CacheDecision::Cache)
|
||||||
.map(|s| serde_json::from_str(&s))
|
})
|
||||||
.transpose()?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn wrap_const<'c, 'f, 'a, Func, Inner, Args, Item, E>(
|
||||||
|
&'c self,
|
||||||
|
f: Func,
|
||||||
|
cache_args: &'a RedisCacheArgs<'a>,
|
||||||
|
do_cache: CacheDecision,
|
||||||
|
) -> OutFunc<'f, Args, Item, E>
|
||||||
|
where
|
||||||
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
|
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
||||||
|
Args: Send + 'f,
|
||||||
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
|
E: From<CacheError>,
|
||||||
|
'a: 'f,
|
||||||
|
'c: 'f,
|
||||||
|
{
|
||||||
|
self.wrap_on(f, cache_args, move |_item| do_cache)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wrap_on<'c, 'f, 'ca, 'fa, 'cf, Func, CacheFunc, Inner, Args, Item, E>(
|
||||||
|
&'c self,
|
||||||
|
f: Func,
|
||||||
|
cache_args: &'ca RedisCacheArgs<'ca>,
|
||||||
|
do_cache: CacheFunc,
|
||||||
|
) -> OutFunc<'f, Args, Item, E>
|
||||||
|
where
|
||||||
|
Func: Fn(Args) -> Inner + Sync + Send + 'f,
|
||||||
|
CacheFunc: for<'i> Fn(&'i Item) -> CacheDecision + Send + Sync + 'cf,
|
||||||
|
Inner: Future<Output = Result<Item, E>> + Send + 'f,
|
||||||
|
Args: Send + 'fa,
|
||||||
|
Item: Serialize + DeserializeOwned + Send + Sync,
|
||||||
|
E: From<CacheError>,
|
||||||
|
'ca: 'f,
|
||||||
|
'c: 'f,
|
||||||
|
'cf: 'f,
|
||||||
|
'fa: 'f,
|
||||||
|
{
|
||||||
Box::new(move |args: Args| {
|
Box::new(move |args: Args| {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let expiry = TimeDelta::from_std(cache_args.expiry)
|
let expiry = TimeDelta::from_std(cache_args.expiry)
|
||||||
@@ -133,25 +273,20 @@ impl Client<redis::Client> for RedisClient {
|
|||||||
.in_scope(|| async {
|
.in_scope(|| async {
|
||||||
let payload = f(args).await?;
|
let payload = f(args).await?;
|
||||||
|
|
||||||
let cache_item = CacheItem {
|
match do_cache(&payload) {
|
||||||
timestamp: Utc::now(),
|
CacheDecision::NoCache => Ok(payload),
|
||||||
payload,
|
CacheDecision::Cache => {
|
||||||
};
|
|
||||||
let _: () = conn
|
|
||||||
.set(
|
|
||||||
cache_args.key_name,
|
|
||||||
serde_json::to_string(&cache_item)
|
|
||||||
.map_err(Into::into)?,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(Into::into)?;
|
|
||||||
lock_manager.unlock(&lock);
|
lock_manager.unlock(&lock);
|
||||||
tracing::trace!("cache updated");
|
tracing::trace!("cache updated");
|
||||||
Ok(cache_item.payload)
|
write_cache(&mut conn, cache_args.key_name, &payload)
|
||||||
|
.await?;
|
||||||
|
Ok(payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
} else {
|
} else {
|
||||||
// Could not get lock because it's already. so some other process is already
|
// Could not get lock because it's already taken. So some other process is already
|
||||||
// gathering data. Wait for it to finish and then just return
|
// gathering data. Wait for it to finish and then just return
|
||||||
// the cached response.
|
// the cached response.
|
||||||
tracing::trace!("could not acquire lock");
|
tracing::trace!("could not acquire lock");
|
||||||
@@ -186,11 +321,18 @@ impl Client<redis::Client> for RedisClient {
|
|||||||
break Ok(response.payload);
|
break Ok(response.payload);
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
break Err(CacheError::Consistency(
|
tracing::trace!("no cache item returned, generating own response");
|
||||||
"cached item expected but not found"
|
let payload = f(args).await?;
|
||||||
.to_owned(),
|
break match do_cache(&payload) {
|
||||||
)
|
CacheDecision::NoCache => Ok(payload),
|
||||||
.into())
|
CacheDecision::Cache => {
|
||||||
|
write_cache(&mut conn, cache_args.key_name, &payload)
|
||||||
|
.await?;
|
||||||
|
lock_manager.unlock(&lock);
|
||||||
|
tracing::trace!("cache updated");
|
||||||
|
Ok(payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user