Compare commits

..

5 Commits

Author SHA1 Message Date
8d2a503a89 Release 0.2.0 2024-10-22 23:29:35 +02:00
a8def7bf94 Update examples 2024-10-22 23:29:35 +02:00
9823faceac Add README.md 2024-10-22 23:27:39 +02:00
443d4e98ee Add additional methods 2024-10-22 23:08:19 +02:00
22b1be0390 Replace JSON with CBOR 2024-10-22 23:08:19 +02:00
6 changed files with 275 additions and 57 deletions

View File

@@ -14,7 +14,7 @@ members = ["examples"]
tokio = { version = "1.*", default-features = false, features = [] } tokio = { version = "1.*", default-features = false, features = [] }
serde = { version = "1.0.*", default-features = false, features = ["std", "derive"] } serde = { version = "1.0.*", default-features = false, features = ["std", "derive"] }
chrono = { version = "0.4.*", default-features = false, features = ["std", "now", "serde"] } chrono = { version = "0.4.*", default-features = false, features = ["std", "now", "serde"] }
serde_json = { version = "1.0.*", default-features = false, features = ["std"] } ciborium = { version = "0.2.*", default-features = false, features = ["std"] }
redis = { version = "0.23.*", default-features = false, features = ["serde", "aio", "tokio-comp"] } redis = { version = "0.23.*", default-features = false, features = ["serde", "aio", "tokio-comp"] }
redlock = { version = "2.0.*", default-features = false } redlock = { version = "2.0.*", default-features = false }
tracing = { version = "0.1.*", default-features = false, features = ["attributes"] } tracing = { version = "0.1.*", default-features = false, features = ["attributes"] }

68
README.md Normal file
View File

@@ -0,0 +1,68 @@
# objcache
Cache serde-able objects with Redis.
## Quick overview
Imagine you have the following:
1) A struct or enum that implements `Serialize` and `Deserialize`
```rust
#[derive(Serialize, Deserialize)]
struct Item {
value: usize,
}
```
2) An expensive function that creates an instance of that struct or enum
```rust
async fn get_item(v: usize) -> Result<Item, Error> {
// your business logic here
Ok(Item { value: v })
}
```
3) A redis connection
```rust
let client = RedisClient::new((IpAddr::V4(Ipv4Addr::LOCALHOST), 6379)).unwrap();
```
This library lets you add a cache to your application by `wrap()`ing your function:
```rust
let item = client.wrap(
get_item,
&RedisCacheArgs {
lock_name: b"item_lock",
key_name: b"item",
expiry: Duration::from_secs(5),
},
)(100)
.await?;
assert_eq!(item.value, 100);
```
That's it! Any call to the wrapped function will reuse the latest response if it is not
older than 5 seconds.
## Not implemented
* Cache depending on the input parameters
* Non-async wrap functions (should be easy to implement with some refactoring)
## Status
More of a proof of concept, there may be some big hidden issues.
## Available methods
|Method|Use case|
|---|---|
|`wrap()`|You want to unconditionally cache a response|
|`wrap_opt()`|Your function returns an `Option<T>` and you only want to cache `Some` responses|
|`wrap_const()`|You want to select whether or not to cache depending on a separate parameter taking `CacheDecision`|
|`wrap_on()`|You have a function that takes your `Item` and returns a `CacheDecision`|

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "objcache_examples" name = "objcache_examples"
version = "0.1.0" version = "0.2.0"
edition = "2021" edition = "2021"
rust-version = "1.81" rust-version = "1.81"
@@ -12,5 +12,5 @@ path = "src/main.rs"
tokio = { version = "1.*", default-features = false, features = ["rt-multi-thread", "macros"] } tokio = { version = "1.*", default-features = false, features = ["rt-multi-thread", "macros"] }
tracing = { version = "0.1.40", default-features = false } tracing = { version = "0.1.40", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi"] }
serde = { version = "1.0.210", default-features = false, features = ["std", "derive"] } serde = { version = "1.0.213", default-features = false, features = ["std", "derive"] }
objcache = { path = "../" } objcache = { path = "../" }

View File

@@ -11,7 +11,7 @@ use objcache::{CacheError, Client, RedisCacheArgs, RedisClient};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct Item { struct Item {
val: usize, value: usize,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -32,7 +32,7 @@ impl From<CacheError> for Error {
} }
async fn get_item(v: usize) -> Result<Item, Error> { async fn get_item(v: usize) -> Result<Item, Error> {
Ok(Item { val: v }) Ok(Item { value: v })
} }
#[tokio::main] #[tokio::main]
@@ -59,7 +59,7 @@ async fn main() -> Result<(), Error> {
println!("{item:?}"); println!("{item:?}");
let item = client.wrap( let item = client.wrap(
|v| Box::pin(async move { Ok::<_, Error>(Item { val: v }) }), |v| Box::pin(async move { Ok::<_, Error>(Item { value: v }) }),
&RedisCacheArgs { &RedisCacheArgs {
lock_name: b"item_lock", lock_name: b"item_lock",
key_name: b"item", key_name: b"item",
@@ -68,6 +68,8 @@ async fn main() -> Result<(), Error> {
)(100) )(100)
.await?; .await?;
assert_eq!(item.value, 100);
println!("{item:?}"); println!("{item:?}");
Ok(()) Ok(())

View File

@@ -14,8 +14,14 @@ impl From<redis::RedisError> for CacheError {
} }
} }
impl From<serde_json::Error> for CacheError { impl<T: fmt::Debug> From<ciborium::ser::Error<T>> for CacheError {
fn from(value: serde_json::Error) -> Self { fn from(value: ciborium::ser::Error<T>) -> Self {
Self::Serde(value.to_string())
}
}
impl<T: fmt::Debug> From<ciborium::de::Error<T>> for CacheError {
fn from(value: ciborium::de::Error<T>) -> Self {
Self::Serde(value.to_string()) Self::Serde(value.to_string())
} }
} }

View File

@@ -9,9 +9,33 @@ use tracing::Level;
mod error; mod error;
pub use error::CacheError; pub use error::CacheError;
const LOCK_TTL: usize = 60_000; // milliseconds
const LOCK_RETRY_TIME: Duration = Duration::from_secs(1);
#[derive(Serialize, Deserialize)]
struct CacheItem<Item> {
timestamp: DateTime<Utc>,
payload: Item,
}
#[tracing::instrument(skip_all, level = Level::TRACE)]
async fn query_cache<Item>(
conn: &mut redis::aio::MultiplexedConnection,
key_name: &[u8],
) -> Result<Option<CacheItem<Item>>, CacheError>
where
Item: DeserializeOwned,
{
Ok(conn
.get::<&[u8], Option<Vec<u8>>>(key_name)
.await?
.map(|s| ciborium::from_reader(&s[..]))
.transpose()?)
}
#[derive(Clone)] #[derive(Clone)]
pub struct RedisClient { pub struct RedisClient {
redis: redis::Client, client: redis::Client,
} }
pub struct RedisCacheArgs<'a> { pub struct RedisCacheArgs<'a> {
@@ -20,6 +44,15 @@ pub struct RedisCacheArgs<'a> {
pub expiry: Duration, pub expiry: Duration,
} }
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum CacheDecision {
Cache,
NoCache,
}
type OutFunc<'f, Args, I, E> =
Box<dyn FnOnce(Args) -> Pin<Box<dyn Future<Output = Result<I, E>> + Send + 'f>> + 'f>;
pub trait Client<T> pub trait Client<T>
where where
Self: Sized, Self: Sized,
@@ -34,15 +67,87 @@ where
&'c self, &'c self,
f: Func, f: Func,
cache_args: &'a RedisCacheArgs<'a>, cache_args: &'a RedisCacheArgs<'a>,
) -> Box<dyn FnOnce(Args) -> Pin<Box<dyn Future<Output = Result<Item, E>> + Send + 'f>> + 'f> ) -> OutFunc<'f, Args, Item, E>
where where
Func: Fn(Args) -> Inner + Sync + Send + 'f, Func: Fn(Args) -> Inner + Sync + Send + 'f,
Inner: Future<Output = Result<Item, E>> + Send + 'f, Inner: Future<Output = Result<Item, E>> + Send + 'f,
Args: Send + 'f, Args: Send + 'f,
Item: Send + Serialize + DeserializeOwned, Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>, E: From<CacheError>,
'a: 'f, 'a: 'f,
'c: 'f; 'c: 'f;
fn wrap_const<'c, 'f, 'a, Func, Inner, Args, Item, E>(
&'c self,
f: Func,
cache_args: &'a RedisCacheArgs<'a>,
do_cache: CacheDecision,
) -> OutFunc<'f, Args, Item, E>
where
Func: Fn(Args) -> Inner + Sync + Send + 'f,
Inner: Future<Output = Result<Item, E>> + Send + 'f,
Args: Send + 'f,
Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>,
'a: 'f,
'c: 'f;
fn wrap_opt<'c, 'f, 'a, Func, Inner, Args, Item, E>(
&'c self,
f: Func,
cache_args: &'a RedisCacheArgs<'a>,
) -> OutFunc<'f, Args, Option<Item>, E>
where
Func: Fn(Args) -> Inner + Sync + Send + 'f,
Inner: Future<Output = Result<Option<Item>, E>> + Send + 'f,
Args: Send + 'f,
Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>,
'a: 'f,
'c: 'f;
fn wrap_on<'c, 'f, 'ca, 'fa, 'cf, Func, CacheFunc, Inner, Args, Item, E>(
&'c self,
f: Func,
cache_args: &'ca RedisCacheArgs<'ca>,
do_cache: CacheFunc,
) -> OutFunc<'f, Args, Item, E>
where
Func: Fn(Args) -> Inner + Sync + Send + 'f,
CacheFunc: for<'i> Fn(&'i Item) -> CacheDecision + Send + Sync + 'cf,
Inner: Future<Output = Result<Item, E>> + Send + 'f,
Args: Send + 'fa,
Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>,
'ca: 'f,
'c: 'f,
'cf: 'f,
'fa: 'f;
}
#[tracing::instrument(skip_all, level = Level::TRACE)]
async fn write_cache<Item>(
conn: &mut redis::aio::MultiplexedConnection,
key_name: &[u8],
payload: &Item,
) -> Result<(), CacheError>
where
Item: Serialize,
{
let cache_item = CacheItem {
timestamp: Utc::now(),
payload,
};
let _: () = conn
.set(key_name, {
let mut buf = Vec::new();
ciborium::into_writer(&cache_item, &mut buf)?;
buf
})
.await?;
Ok(())
} }
impl Client<redis::Client> for RedisClient { impl Client<redis::Client> for RedisClient {
@@ -50,52 +155,87 @@ impl Client<redis::Client> for RedisClient {
fn new((ip, port): (net::IpAddr, u16)) -> Result<Self, CacheError> { fn new((ip, port): (net::IpAddr, u16)) -> Result<Self, CacheError> {
Ok(Self { Ok(Self {
redis: redis::Client::open((ip.to_string(), port))?, client: redis::Client::open((ip.to_string(), port))?,
}) })
} }
fn get(&self) -> &redis::Client { fn get(&self) -> &redis::Client {
&self.redis &self.client
} }
fn wrap<'c, 'f, 'a, Func, Inner, Args, Item, E>( fn wrap<'c, 'f, 'a, Func, Inner, Args, Item, E>(
&'c self, &'c self,
f: Func, f: Func,
cache_args: &'a RedisCacheArgs<'a>, cache_args: &'a RedisCacheArgs<'a>,
) -> Box<dyn FnOnce(Args) -> Pin<Box<dyn Future<Output = Result<Item, E>> + Send + 'f>> + 'f> ) -> OutFunc<'f, Args, Item, E>
where where
Func: Fn(Args) -> Inner + Sync + Send + 'f, Func: Fn(Args) -> Inner + Sync + Send + 'f,
Inner: Future<Output = Result<Item, E>> + Send + 'f, Inner: Future<Output = Result<Item, E>> + Send + 'f,
Args: Send + 'f, Args: Send + 'f,
Item: Send + Serialize + DeserializeOwned, Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>, E: From<CacheError>,
'a: 'f, 'a: 'f,
'c: 'f, 'c: 'f,
{ {
const LOCK_TTL: usize = 60_000; // milliseconds self.wrap_const(f, cache_args, CacheDecision::Cache)
const LOCK_RETRY_TIME: Duration = Duration::from_secs(1);
#[derive(Serialize, Deserialize)]
struct CacheItem<Item> {
timestamp: DateTime<Utc>,
payload: Item,
} }
#[tracing::instrument(skip_all, level = Level::TRACE)] fn wrap_opt<'c, 'f, 'a, Func, Inner, Args, Item, E>(
async fn query_cache<Item>( &'c self,
conn: &mut redis::aio::MultiplexedConnection, f: Func,
key_name: &[u8], cache_args: &'a RedisCacheArgs<'a>,
) -> Result<Option<CacheItem<Item>>, CacheError> ) -> OutFunc<'f, Args, Option<Item>, E>
where where
Item: DeserializeOwned, Func: Fn(Args) -> Inner + Sync + Send + 'f,
Inner: Future<Output = Result<Option<Item>, E>> + Send + 'f,
Args: Send + 'f,
Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>,
'a: 'f,
'c: 'f,
{ {
Ok(conn self.wrap_on(f, cache_args, |item| {
.get::<&[u8], Option<String>>(key_name) item.as_ref()
.await? .map_or(CacheDecision::NoCache, |_| CacheDecision::Cache)
.map(|s| serde_json::from_str(&s)) })
.transpose()?)
} }
fn wrap_const<'c, 'f, 'a, Func, Inner, Args, Item, E>(
&'c self,
f: Func,
cache_args: &'a RedisCacheArgs<'a>,
do_cache: CacheDecision,
) -> OutFunc<'f, Args, Item, E>
where
Func: Fn(Args) -> Inner + Sync + Send + 'f,
Inner: Future<Output = Result<Item, E>> + Send + 'f,
Args: Send + 'f,
Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>,
'a: 'f,
'c: 'f,
{
self.wrap_on(f, cache_args, move |_item| do_cache)
}
fn wrap_on<'c, 'f, 'ca, 'fa, 'cf, Func, CacheFunc, Inner, Args, Item, E>(
&'c self,
f: Func,
cache_args: &'ca RedisCacheArgs<'ca>,
do_cache: CacheFunc,
) -> OutFunc<'f, Args, Item, E>
where
Func: Fn(Args) -> Inner + Sync + Send + 'f,
CacheFunc: for<'i> Fn(&'i Item) -> CacheDecision + Send + Sync + 'cf,
Inner: Future<Output = Result<Item, E>> + Send + 'f,
Args: Send + 'fa,
Item: Serialize + DeserializeOwned + Send + Sync,
E: From<CacheError>,
'ca: 'f,
'c: 'f,
'cf: 'f,
'fa: 'f,
{
Box::new(move |args: Args| { Box::new(move |args: Args| {
Box::pin(async move { Box::pin(async move {
let expiry = TimeDelta::from_std(cache_args.expiry) let expiry = TimeDelta::from_std(cache_args.expiry)
@@ -133,25 +273,20 @@ impl Client<redis::Client> for RedisClient {
.in_scope(|| async { .in_scope(|| async {
let payload = f(args).await?; let payload = f(args).await?;
let cache_item = CacheItem { match do_cache(&payload) {
timestamp: Utc::now(), CacheDecision::NoCache => Ok(payload),
payload, CacheDecision::Cache => {
};
let _: () = conn
.set(
cache_args.key_name,
serde_json::to_string(&cache_item)
.map_err(Into::into)?,
)
.await
.map_err(Into::into)?;
lock_manager.unlock(&lock); lock_manager.unlock(&lock);
tracing::trace!("cache updated"); tracing::trace!("cache updated");
Ok(cache_item.payload) write_cache(&mut conn, cache_args.key_name, &payload)
.await?;
Ok(payload)
}
}
}) })
.await .await
} else { } else {
// Could not get lock because it's already. so some other process is already // Could not get lock because it's already taken. So some other process is already
// gathering data. Wait for it to finish and then just return // gathering data. Wait for it to finish and then just return
// the cached response. // the cached response.
tracing::trace!("could not acquire lock"); tracing::trace!("could not acquire lock");
@@ -186,11 +321,18 @@ impl Client<redis::Client> for RedisClient {
break Ok(response.payload); break Ok(response.payload);
} }
None => { None => {
break Err(CacheError::Consistency( tracing::trace!("no cache item returned, generating own response");
"cached item expected but not found" let payload = f(args).await?;
.to_owned(), break match do_cache(&payload) {
) CacheDecision::NoCache => Ok(payload),
.into()) CacheDecision::Cache => {
write_cache(&mut conn, cache_args.key_name, &payload)
.await?;
lock_manager.unlock(&lock);
tracing::trace!("cache updated");
Ok(payload)
}
}
} }
} }
} }