跳到主要内容Rust 异步缓存系统的设计与实现 | 极客日志Rust算法
Rust 异步缓存系统的设计与实现
Rust 异步缓存系统利用 Rust 内存安全与异步特性构建高性能组件。设计涵盖 LRU/LFU 策略、并发安全(Arc/Mutex)、TTL 过期机制及错误处理。实现基于 HashMap 与 tokio 库,支持用户同步、订单处理及监控服务集成。优化方案包括原子操作减少锁竞争、批量操作提升吞吐及连接池管理数据库交互。常见问题如穿透、击穿、雪崩分别通过布隆过滤器、互斥锁及随机过期时间解决。该系统适用于高并发 Web 架构以提升响应速度与吞吐量。
机器人12 浏览 Rust 异步缓存系统的设计与实现
一、引言
缓存是现代 Web 应用架构中的核心组件,能够显著提升系统的性能和响应速度。通过将频繁访问的数据存储在高速缓存中,可以减少对数据库或外部 API 的请求,从而降低延迟和提高吞吐量。Rust 语言的异步特性和内存安全保障使得它非常适合用于构建高性能、可靠的异步缓存系统。
本文将深入探讨异步缓存系统的设计与实现,包括缓存策略、数据结构选择、并发安全保障、内存管理、错误处理和过期机制等方面。并通过实战项目集成演示如何在用户同步服务、订单处理服务和监控服务中使用异步缓存系统,以及如何优化缓存系统的性能。
二、异步缓存系统的核心概念
2.1 缓存策略
缓存策略决定了数据在缓存中的存储和淘汰方式,常见的缓存策略包括:
- LRU(Least Recently Used):最近最少使用策略,淘汰最近最少使用的数据。
- LFU(Least Frequently Used):最不经常使用策略,淘汰使用频率最低的数据。
- FIFO(First In First Out):先进先出策略,淘汰最早进入缓存的数据。
- TTL(Time To Live):存活时间策略,数据在缓存中存储一定时间后自动过期。
2.2 异步操作的特点
异步缓存系统的异步操作具有以下特点:
- 非阻塞性:异步操作不会阻塞线程,提高了系统的并发能力。
- 高吞吐量:异步操作可以同时处理多个请求,提高了系统的吞吐量。
- 资源利用率:异步操作可以更有效地利用 CPU 和内存资源。
2.3 并发安全
异步缓存系统需要处理多个任务同时访问共享数据的情况,因此需要确保并发安全。Rust 提供了多种并发安全的工具,如 Arc、Mutex、RwLock 和原子类型。
三、异步缓存系统的设计原则
3.1 并发安全设计
异步缓存系统需要确保多个任务同时访问共享数据时不会发生数据竞争。我们可以使用 Arc 与 Mutex 或 RwLock 来实现线程安全的共享。
使用 Arc 与 Mutex 实现线程安全的共享:
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, V>>>,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone,
V: Clone,
{
pub fn new() -> Self {
Cache {
data: Arc::(Mutex::(HashMap::())),
}
}
(&, key: K) <V> {
= .data.().;
data.(&key).()
}
(&, key: K, value: V) {
= .data.().;
data.(key, value);
}
(&, key: K) {
= .data.().;
data.(&key);
}
}
new
new
new
pub
async
fn
get
self
->
Option
let
data
self
lock
await
get
cloned
pub
async
fn
put
self
let
mut
data
self
lock
await
insert
pub
async
fn
remove
self
let
mut
data
self
lock
await
remove
3.2 内存管理设计
异步缓存系统需要合理管理内存资源,避免内存泄漏和过度使用。我们可以使用 Arc 来管理共享数据的生命周期,使用 HashMap 来存储数据。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, V>>>,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone,
V: Clone,
{
pub fn new() -> Self {
Cache {
data: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn get(&self, key: K) -> Option<V> {
let data = self.data.lock().await;
data.get(&key).cloned()
}
pub async fn put(&self, key: K, value: V) {
let mut data = self.data.lock().await;
data.insert(key, value);
}
pub async fn remove(&self, key: K) {
let mut data = self.data.lock().await;
data.remove(&key);
}
}
3.3 错误处理设计
异步缓存系统需要处理各种错误,如网络错误、数据库错误、缓存操作错误等。我们可以使用自定义错误类型来统一错误处理。
use thiserror::Error;
#[derive(Error, Debug)]
pub enum CacheError {
#[error("Key not found")]
KeyNotFound,
#[error("Invalid key")]
InvalidKey,
#[error("Cache operation failed")]
OperationFailed,
#[error(transparent)]
Unexpected(#[from] anyhow::Error),
}
3.4 过期机制设计
异步缓存系统需要实现数据的过期机制,确保数据在缓存中存储一定时间后自动过期。我们可以使用 tokio::time 库来实现定时任务。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time;
#[derive(Clone)]
pub struct CacheEntry<V> {
value: V,
expiration: SystemTime,
}
impl<V> CacheEntry<V> {
pub fn new(value: V, ttl: Duration) -> Self {
let expiration = SystemTime::now() + ttl;
CacheEntry { value, expiration }
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expiration
}
}
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, CacheEntry<V>>>>,
ttl: Duration,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(ttl: Duration) -> Self {
let cache = Cache {
data: Arc::new(Mutex::new(HashMap::new())),
ttl,
};
cache.start_cleanup_task();
cache
}
fn start_cleanup_task(&self) {
let data = self.data.clone();
let ttl = self.ttl;
tokio::spawn(async move {
loop {
time::sleep(ttl).await;
let mut data = data.lock().await;
data.retain(|_, entry| !entry.is_expired());
}
});
}
pub async fn get(&self, key: K) -> Option<V> {
let mut data = self.data.lock().await;
if let Some(entry) = data.get(&key) {
if entry.is_expired() {
data.remove(&key);
None
} else {
Some(entry.value.clone())
}
} else {
None
}
}
pub async fn put(&self, key: K, value: V) {
let mut data = self.data.lock().await;
let entry = CacheEntry::new(value, self.ttl);
data.insert(key, entry);
}
pub async fn remove(&self, key: K) {
let mut data = self.data.lock().await;
data.remove(&key);
}
}
四、异步缓存系统的实现
4.1 数据结构选择
异步缓存系统的数据结构需要支持快速查找、插入和删除操作。我们可以使用 HashMap 作为底层数据结构,因为它提供了 O(1) 时间复杂度的查找、插入和删除操作。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time;
#[derive(Clone)]
pub struct CacheEntry<V> {
value: V,
expiration: SystemTime,
}
impl<V> CacheEntry<V> {
pub fn new(value: V, ttl: Duration) -> Self {
let expiration = SystemTime::now() + ttl;
CacheEntry { value, expiration }
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expiration
}
}
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, CacheEntry<V>>>>,
ttl: Duration,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(ttl: Duration) -> Self {
let cache = Cache {
data: Arc::new(Mutex::new(HashMap::new())),
ttl,
};
cache.start_cleanup_task();
cache
}
fn start_cleanup_task(&self) {
let data = self.data.clone();
let ttl = self.ttl;
tokio::spawn(async move {
loop {
time::sleep(ttl).await;
let mut data = data.lock().await;
data.retain(|_, entry| !entry.is_expired());
}
});
}
pub async fn get(&self, key: K) -> Option<V> {
let mut data = self.data.lock().await;
if let Some(entry) = data.get(&key) {
if entry.is_expired() {
data.remove(&key);
None
} else {
Some(entry.value.clone())
}
} else {
None
}
}
pub async fn put(&self, key: K, value: V) {
let mut data = self.data.lock().await;
let entry = CacheEntry::new(value, self.ttl);
data.insert(key, entry);
}
pub async fn remove(&self, key: K) {
let mut data = self.data.lock().await;
data.remove(&key);
}
}
4.2 异步操作的实现
异步缓存系统的异步操作包括获取数据、插入数据、删除数据和清理过期数据。我们可以使用 tokio::sync::Mutex 来实现线程安全的共享,使用 tokio::time 库来实现定时任务。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time;
#[derive(Clone)]
pub struct CacheEntry<V> {
value: V,
expiration: SystemTime,
}
impl<V> CacheEntry<V> {
pub fn new(value: V, ttl: Duration) -> Self {
let expiration = SystemTime::now() + ttl;
CacheEntry { value, expiration }
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expiration
}
}
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, CacheEntry<V>>>>,
ttl: Duration,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(ttl: Duration) -> Self {
let cache = Cache {
data: Arc::new(Mutex::new(HashMap::new())),
ttl,
};
cache.start_cleanup_task();
cache
}
fn start_cleanup_task(&self) {
let data = self.data.clone();
let ttl = self.ttl;
tokio::spawn(async move {
loop {
time::sleep(ttl).await;
let mut data = data.lock().await;
data.retain(|_, entry| !entry.is_expired());
}
});
}
pub async fn get(&self, key: K) -> Option<V> {
let mut data = self.data.lock().await;
if let Some(entry) = data.get(&key) {
if entry.is_expired() {
data.remove(&key);
None
} else {
Some(entry.value.clone())
}
} else {
None
}
}
pub async fn put(&self, key: K, value: V) {
let mut data = self.data.lock().await;
let entry = CacheEntry::new(value, self.ttl);
data.insert(key, entry);
}
pub async fn remove(&self, key: K) {
let mut data = self.data.lock().await;
data.remove(&key);
}
}
4.3 过期机制的实现
异步缓存系统的过期机制需要定期清理过期数据。我们可以使用 tokio::time::sleep 函数来实现定时任务,定期检查数据的过期时间。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time;
#[derive(Clone)]
pub struct CacheEntry<V> {
value: V,
expiration: SystemTime,
}
impl<V> CacheEntry<V> {
pub fn new(value: V, ttl: Duration) -> Self {
let expiration = SystemTime::now() + ttl;
CacheEntry { value, expiration }
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expiration
}
}
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, CacheEntry<V>>>>,
ttl: Duration,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(ttl: Duration) -> Self {
let cache = Cache {
data: Arc::new(Mutex::new(HashMap::new())),
ttl,
};
cache.start_cleanup_task();
cache
}
fn start_cleanup_task(&self) {
let data = self.data.clone();
let ttl = self.ttl;
tokio::spawn(async move {
loop {
time::sleep(ttl).await;
let mut data = data.lock().await;
data.retain(|_, entry| !entry.is_expired());
}
});
}
pub async fn get(&self, key: K) -> Option<V> {
let mut data = self.data.lock().await;
if let Some(entry) = data.get(&key) {
if entry.is_expired() {
data.remove(&key);
None
} else {
Some(entry.value.clone())
}
} else {
None
}
}
pub async fn put(&self, key: K, value: V) {
let mut data = self.data.lock().await;
let entry = CacheEntry::new(value, self.ttl);
data.insert(key, entry);
}
pub async fn remove(&self, key: K) {
let mut data = self.data.lock().await;
data.remove(&key);
}
}
4.4 错误处理的实现
异步缓存系统的错误处理需要统一错误类型,并提供友好的错误信息。我们可以使用 thiserror 库来实现自定义错误类型。
use thiserror::Error;
#[derive(Error, Debug)]
pub enum CacheError {
#[error("Key not found")]
KeyNotFound,
#[error("Invalid key")]
InvalidKey,
#[error("Cache operation failed")]
OperationFailed,
#[error(transparent)]
Unexpected(#[from] anyhow::Error),
}
impl From<std::io::Error> for CacheError {
fn from(e: std::io::Error) -> Self {
CacheError::Unexpected(e.into())
}
}
impl From<std::num::ParseIntError> for CacheError {
fn from(e: std::num::ParseIntError) -> Self {
CacheError::Unexpected(e.into())
}
}
五、异步缓存系统的实战项目集成
5.1 用户同步服务的缓存集成
我们将异步缓存系统集成到用户同步服务中,缓存从第三方 API 获取的用户数据。
use crate::sync::{ThirdPartyUser, sync_users};
use crate::config::Config;
use async_cache::{Cache, CacheError};
use std::time::Duration;
pub struct UserCache {
cache: Cache<i32, ThirdPartyUser>,
}
impl UserCache {
pub fn new(ttl: Duration) -> Self {
UserCache {
cache: Cache::new(ttl),
}
}
pub async fn get_user(&self, user_id: i32) -> Result<Option<ThirdPartyUser>, CacheError> {
Ok(self.cache.get(user_id).await)
}
pub async fn put_user(&self, user_id: i32, user: ThirdPartyUser) -> Result<(), CacheError> {
self.cache.put(user_id, user).await?;
Ok(())
}
pub async fn remove_user(&self, user_id: i32) -> Result<(), CacheError> {
self.cache.remove(user_id).await?;
Ok(())
}
pub async fn sync_users(&self, config: &Config) -> Result<(), CacheError> {
let third_party_users = sync_users(config).await?;
for user in third_party_users {
self.put_user(user.id, user).await?;
}
Ok(())
}
}
use axum::{http::StatusCode, response::IntoResponse, routing::{get, post}, Router};
use user_sync_service::config::Config;
use user_sync_service::cache::UserCache;
async fn health() -> impl IntoResponse {
StatusCode::OK
}
async fn sync_users(config: Config, cache: UserCache) -> impl IntoResponse {
match cache.sync_users(&config).await {
Ok(_) => StatusCode::ACCEPTED,
Err(e) => {
tracing::error!("User sync failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
async fn get_user(cache: UserCache, user_id: i32) -> impl IntoResponse {
match cache.get_user(user_id).await {
Ok(Some(user)) => (StatusCode::OK, format!("{:?}", user)).into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => {
tracing::error!("Get user failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
#[tokio::main]
async fn main() {
let config = Config::from_env().unwrap();
let user_cache = UserCache::new(Duration::from_secs(3600));
let app = Router::new()
.route("/health", get(health))
.route("/api/users/sync", post(sync_users))
.route("/api/users/:id", get(get_user))
.with_state(config)
.with_state(user_cache);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
}
5.2 订单处理服务的缓存集成
我们将异步缓存系统集成到订单处理服务中,缓存订单数据和产品信息。
use crate::process::{Order, Product};
use crate::config::Config;
use async_cache::{Cache, CacheError};
use std::time::Duration;
pub struct OrderCache {
order_cache: Cache<i32, Order>,
product_cache: Cache<i32, Product>,
}
impl OrderCache {
pub fn new(order_ttl: Duration, product_ttl: Duration) -> Self {
OrderCache {
order_cache: Cache::new(order_ttl),
product_cache: Cache::new(product_ttl),
}
}
pub async fn get_order(&self, order_id: i32) -> Result<Option<Order>, CacheError> {
Ok(self.order_cache.get(order_id).await)
}
pub async fn put_order(&self, order_id: i32, order: Order) -> Result<(), CacheError> {
self.order_cache.put(order_id, order).await?;
Ok(())
}
pub async fn remove_order(&self, order_id: i32) -> Result<(), CacheError> {
self.order_cache.remove(order_id).await?;
Ok(())
}
pub async fn get_product(&self, product_id: i32) -> Result<Option<Product>, CacheError> {
Ok(self.product_cache.get(product_id).await)
}
pub async fn put_product(&self, product_id: i32, product: Product) -> Result<(), CacheError> {
self.product_cache.put(product_id, product).await?;
Ok(())
}
pub async fn remove_product(&self, product_id: i32) -> Result<(), CacheError> {
self.product_cache.remove(product_id).await?;
Ok(())
}
}
use axum::{http::StatusCode, response::IntoResponse, routing::{get, post}, Router};
use order_processing_service::config::Config;
use order_processing_service::cache::OrderCache;
async fn health() -> impl IntoResponse {
StatusCode::OK
}
async fn process_order(config: Config, cache: OrderCache) -> impl IntoResponse {
match process::process_orders().await {
Ok(_) => StatusCode::ACCEPTED,
Err(e) => {
tracing::error!("Order processing failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
}
}
}
async fn get_order(cache: OrderCache, order_id: i32) -> impl IntoResponse {
match cache.get_order(order_id).await {
Ok(Some(order)) => (StatusCode::OK, format!("{:?}", order)).into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => {
tracing::error!("Get order failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
async fn get_product(cache: OrderCache, product_id: i32) -> impl IntoResponse {
match cache.get_product(product_id).await {
Ok(Some(product)) => (StatusCode::OK, format!("{:?}", product)).into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => {
tracing::error!("Get product failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
#[tokio::main]
async fn main() {
let config = Config::from_env().unwrap();
let order_cache = OrderCache::new(Duration::from_secs(3600), Duration::from_secs(86400));
let app = Router::new()
.route("/health", get(health))
.route("/api/orders/process", post(process_order))
.route("/api/orders/:id", get(get_order))
.route("/api/products/:id", get(get_product))
.with_state(config)
.with_state(order_cache);
axum::Server::bind(&"0.0.0.0:3001".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
}
5.3 监控服务的缓存集成
我们将异步缓存系统集成到监控服务中,缓存系统状态和性能指标。
use crate::monitor::{SystemState, PerformanceMetric};
use crate::config::Config;
use async_cache::{Cache, CacheError};
use std::time::Duration;
pub struct MonitoringCache {
system_state_cache: Cache<String, SystemState>,
performance_metric_cache: Cache<String, PerformanceMetric>,
}
impl MonitoringCache {
pub fn new(system_state_ttl: Duration, performance_metric_ttl: Duration) -> Self {
MonitoringCache {
system_state_cache: Cache::new(system_state_ttl),
performance_metric_cache: Cache::new(performance_metric_ttl),
}
}
pub async fn get_system_state(&self, key: &str) -> Result<Option<SystemState>, CacheError> {
Ok(self.system_state_cache.get(key.to_string()).await)
}
pub async fn put_system_state(&self, key: &str, state: SystemState) -> Result<(), CacheError> {
self.system_state_cache.put(key.to_string(), state).await?;
Ok(())
}
pub async fn remove_system_state(&self, key: &str) -> Result<(), CacheError> {
self.system_state_cache.remove(key.to_string()).await?;
Ok(())
}
pub async fn get_performance_metric(&self, key: &str) -> Result<Option<PerformanceMetric>, CacheError> {
Ok(self.performance_metric_cache.get(key.to_string()).await)
}
pub async fn put_performance_metric(&self, key: &str, metric: PerformanceMetric) -> Result<(), CacheError> {
self.performance_metric_cache.put(key.to_string(), metric).await?;
Ok(())
}
pub async fn remove_performance_metric(&self, key: &str) -> Result<(), CacheError> {
self.performance_metric_cache.remove(key.to_string()).await?;
Ok(())
}
}
use axum::{extract::WebSocketUpgrade, http::StatusCode, response::IntoResponse, routing::{get, post}, Router};
use monitoring_service::config::Config;
use monitoring_service::cache::MonitoringCache;
use monitoring_service::monitor;
async fn health() -> impl IntoResponse {
StatusCode::OK
}
async fn websocket_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
monitor::handle_websocket_connection(ws).await
}
async fn get_system_state(cache: MonitoringCache, key: &str) -> impl IntoResponse {
match cache.get_system_state(key).await {
Ok(Some(state)) => (StatusCode::OK, format!("{:?}", state)).into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => {
tracing::error!("Get system state failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
async fn get_performance_metric(cache: MonitoringCache, key: &str) -> impl IntoResponse {
match cache.get_performance_metric(key).await {
Ok(Some(metric)) => (StatusCode::OK, format!("{:?}", metric)).into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => {
tracing::error!("Get performance metric failed: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
}
#[tokio::main]
async fn main() {
let config = Config::from_env().unwrap();
let monitoring_cache = MonitoringCache::new(Duration::from_secs(60), Duration::from_secs(30));
let app = Router::new()
.route("/health", get(health))
.route("/ws", get(websocket_handler))
.route("/api/system-state/:key", get(get_system_state))
.route("/api/performance-metric/:key", get(get_performance_metric))
.with_state(config)
.with_state(monitoring_cache);
axum::Server::bind(&"0.0.0.0:3002".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
}
六、异步缓存系统的性能优化
6.1 使用原子操作
我们可以使用原子操作来提高缓存系统的性能,减少锁的使用。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Clone)]
pub struct CacheEntry<V> {
value: V,
expiration: SystemTime,
access_count: AtomicUsize,
}
impl<V> CacheEntry<V> {
pub fn new(value: V, ttl: Duration) -> Self {
let expiration = SystemTime::now() + ttl;
CacheEntry {
value,
expiration,
access_count: AtomicUsize::new(0),
}
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expiration
}
pub fn increment_access_count(&self) {
self.access_count.fetch_add(1, Ordering::Relaxed);
}
pub fn get_access_count(&self) -> usize {
self.access_count.load(Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, CacheEntry<V>>>>,
ttl: Duration,
max_size: usize,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(ttl: Duration, max_size: usize) -> Self {
let cache = Cache {
data: Arc::new(Mutex::new(HashMap::new())),
ttl,
max_size,
};
cache.start_cleanup_task();
cache
}
fn start_cleanup_task(&self) {
let data = self.data.clone();
let ttl = self.ttl;
let max_size = self.max_size;
tokio::spawn(async move {
loop {
time::sleep(ttl).await;
let mut data = data.lock().await;
data.retain(|_, entry| !entry.is_expired());
if data.len() > max_size {
let mut entries: Vec<(&K, &CacheEntry<V>)> = data.iter().collect();
entries.sort_by(|a, b| b.1.get_access_count().cmp(&a.1.get_access_count()));
let to_remove = entries.len() - max_size;
for entry in entries.into_iter().take(to_remove) {
data.remove(entry.0);
}
}
}
});
}
pub async fn get(&self, key: K) -> Option<V> {
let mut data = self.data.lock().await;
if let Some(entry) = data.get(&key) {
if entry.is_expired() {
data.remove(&key);
None
} else {
entry.increment_access_count();
Some(entry.value.clone())
}
} else {
None
}
}
pub async fn put(&self, key: K, value: V) {
let mut data = self.data.lock().await;
let entry = CacheEntry::new(value, self.ttl);
data.insert(key, entry);
if data.len() > self.max_size {
let mut entries: Vec<(&K, &CacheEntry<V>)> = data.iter().collect();
entries.sort_by(|a, b| b.1.get_access_count().cmp(&a.1.get_access_count()));
let to_remove = data.len() - self.max_size;
for entry in entries.into_iter().take(to_remove) {
data.remove(entry.0);
}
}
}
pub async fn remove(&self, key: K) {
let mut data = self.data.lock().await;
data.remove(&key);
}
}
6.2 使用批量操作
我们可以使用批量操作来减少锁的使用,提高缓存系统的性能。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time;
#[derive(Clone)]
pub struct CacheEntry<V> {
value: V,
expiration: SystemTime,
}
impl<V> CacheEntry<V> {
pub fn new(value: V, ttl: Duration) -> Self {
let expiration = SystemTime::now() + ttl;
CacheEntry { value, expiration }
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expiration
}
}
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, CacheEntry<V>>>>,
ttl: Duration,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(ttl: Duration) -> Self {
let cache = Cache {
data: Arc::new(Mutex::new(HashMap::new())),
ttl,
};
cache.start_cleanup_task();
cache
}
fn start_cleanup_task(&self) {
let data = self.data.clone();
let ttl = self.ttl;
tokio::spawn(async move {
loop {
time::sleep(ttl).await;
let mut data = data.lock().await;
data.retain(|_, entry| !entry.is_expired());
}
});
}
pub async fn get_batch(&self, keys: Vec<K>) -> Vec<Option<V>> {
let data = self.data.lock().await;
keys.iter().map(|key| {
data.get(key).filter(|entry| !entry.is_expired()).map(|entry| entry.value.clone())
}).collect()
}
pub async fn put_batch(&self, items: Vec<(K, V)>) {
let mut data = self.data.lock().await;
for (key, value) in items {
let entry = CacheEntry::new(value, self.ttl);
data.insert(key, entry);
}
}
pub async fn remove_batch(&self, keys: Vec<K>) {
let mut data = self.data.lock().await;
for key in keys {
data.remove(&key);
}
}
}
6.3 使用连接池
我们可以使用连接池来管理数据库或外部 API 的连接,提高缓存系统的性能。
use sqlx::PgPool;
use std::time::Duration;
use async_cache::{Cache, CacheError};
pub struct DatabaseCache {
pool: PgPool,
cache: Cache<String, String>,
}
impl DatabaseCache {
pub async fn new(url: &str, pool_size: u32, ttl: Duration) -> Result<Self, CacheError> {
let pool = PgPool::connect_with(
sqlx::postgres::PgConnectOptions::new()
.url(url)
.pool_options(sqlx::PoolOptions::new().max_connections(pool_size)),
)
.await?;
Ok(DatabaseCache {
pool,
cache: Cache::new(ttl),
})
}
pub async fn get(&self, key: &str) -> Result<Option<String>, CacheError> {
if let Some(value) = self.cache.get(key.to_string()).await {
return Ok(Some(value));
}
let value = sqlx::query_scalar!("SELECT value FROM cache WHERE key = $1", key)
.fetch_optional(&self.pool)
.await?;
if let Some(value) = value {
self.cache.put(key.to_string(), value.clone()).await?;
}
Ok(value)
}
pub async fn put(&self, key: &str, value: &str) -> Result<(), CacheError> {
sqlx::query!("INSERT INTO cache (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2", key, value)
.execute(&self.pool)
.await?;
self.cache.put(key.to_string(), value.to_string()).await?;
Ok(())
}
pub async fn remove(&self, key: &str) -> Result<(), CacheError> {
sqlx::query!("DELETE FROM cache WHERE key = $1", key)
.execute(&self.pool)
.await?;
self.cache.remove(key.to_string()).await?;
Ok(())
}
}
七、异步缓存系统的常见问题与解决方案
7.1 常见问题 1:缓存穿透
问题描述:缓存穿透是指恶意请求不存在的数据,导致每次请求都直接访问数据库或外部 API,从而降低系统性能。
解决方案:使用布隆过滤器(Bloom Filter)来快速判断数据是否存在,或者将不存在的数据缓存为特殊值。
use bloomfilter::Bloom;
use async_cache::{Cache, CacheError};
use std::time::Duration;
pub struct BloomFilterCache {
bloom: Arc<Mutex<Bloom>>,
cache: Cache<String, String>,
db: sqlx::PgPool,
}
impl BloomFilterCache {
pub async fn new(db: sqlx::PgPool, ttl: Duration) -> Result<Self, CacheError> {
let bloom = Arc::new(Mutex::new(Bloom::new_for_fp_rate(10000, 0.01)));
let cache = Cache::new(ttl);
Ok(BloomFilterCache {
bloom,
cache,
db,
})
}
pub async fn get(&self, key: &str) -> Result<Option<String>, CacheError> {
let bloom = self.bloom.lock().await;
if !bloom.contains(key) {
return Ok(None);
}
drop(bloom);
if let Some(value) = self.cache.get(key.to_string()).await {
return Ok(Some(value));
}
let value = sqlx::query_scalar!("SELECT value FROM cache WHERE key = $1", key)
.fetch_optional(&self.db)
.await?;
if let Some(value) = value {
self.cache.put(key.to_string(), value.clone()).await?;
} else {
let mut bloom = self.bloom.lock().await;
bloom.remove(key);
}
Ok(value)
}
pub async fn put(&self, key: &str, value: &str) -> Result<(), CacheError> {
sqlx::query!("INSERT INTO cache (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2", key, value)
.execute(&self.db)
.await?;
self.cache.put(key.to_string(), value.to_string()).await?;
let mut bloom = self.bloom.lock().await;
bloom.insert(key);
Ok(())
}
pub async fn remove(&self, key: &str) -> Result<(), CacheError> {
sqlx::query!("DELETE FROM cache WHERE key = $1", key)
.execute(&self.db)
.await?;
self.cache.remove(key.to_string()).await?;
let mut bloom = self.bloom.lock().await;
bloom.remove(key);
Ok(())
}
}
7.2 常见问题 2:缓存击穿
问题描述:缓存击穿是指热点数据过期后,大量请求同时访问该数据,导致数据库或外部 API 瞬间压力增大。
解决方案:使用互斥锁(Mutex)或分布式锁(如 Redis 的 SETNX 命令)来确保只有一个请求能够访问数据库或外部 API,并更新缓存。
use std::sync::Arc;
use tokio::sync::Mutex;
use async_cache::{Cache, CacheError};
use std::time::Duration;
pub struct MutexCache {
cache: Cache<String, String>,
db: sqlx::PgPool,
locks: Arc<Mutex<HashMap<String, tokio::sync::Mutex<()>>>>,
}
impl MutexCache {
pub async fn new(db: sqlx::PgPool, ttl: Duration) -> Result<Self, CacheError> {
Ok(MutexCache {
cache: Cache::new(ttl),
db,
locks: Arc::new(Mutex::new(HashMap::new())),
})
}
pub async fn get(&self, key: &str) -> Result<Option<String>, CacheError> {
if let Some(value) = self.cache.get(key.to_string()).await {
return Ok(Some(value));
}
let mut locks = self.locks.lock().await;
let lock = locks.entry(key.to_string()).or_insert_with(|| tokio::sync::Mutex::new(()));
drop(locks);
let _guard = lock.lock().await;
if let Some(value) = self.cache.get(key.to_string()).await {
return Ok(Some(value));
}
let value = sqlx::query_scalar!("SELECT value FROM cache WHERE key = $1", key)
.fetch_optional(&self.db)
.await?;
if let Some(value) = value {
self.cache.put(key.to_string(), value.clone()).await?;
}
Ok(value)
}
pub async fn put(&self, key: &str, value: &str) -> Result<(), CacheError> {
sqlx::query!("INSERT INTO cache (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2", key, value)
.execute(&self.db)
.await?;
self.cache.put(key.to_string(), value.to_string()).await?;
Ok(())
}
pub async fn remove(&self, key: &str) -> Result<(), CacheError> {
sqlx::query!("DELETE FROM cache WHERE key = $1", key)
.execute(&self.db)
.await?;
self.cache.remove(key.to_string()).await?;
let mut locks = self.locks.lock().await;
locks.remove(key);
Ok(())
}
}
7.3 常见问题 3:缓存雪崩
问题描述:缓存雪崩是指大量缓存数据同时过期,导致所有请求都直接访问数据库或外部 API,从而使系统崩溃。
解决方案:使用随机化的过期时间、分层缓存或设置缓存预热来避免缓存雪崩。
use std::sync::Arc;
use tokio::sync::Mutex;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use tokio::time;
use rand::Rng;
#[derive(Clone)]
pub struct CacheEntry<V> {
value: V,
expiration: SystemTime,
}
impl<V> CacheEntry<V> {
pub fn new(value: V, ttl: Duration) -> Self {
let jitter = rand::thread_rng().gen_range(0..300) as u64;
let expiration = SystemTime::now() + ttl + Duration::from_secs(jitter);
CacheEntry { value, expiration }
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expiration
}
}
#[derive(Clone)]
pub struct Cache<K, V> {
data: Arc<Mutex<HashMap<K, CacheEntry<V>>>>,
ttl: Duration,
}
impl<K, V> Cache<K, V>
where
K: std::hash::Hash + Eq + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(ttl: Duration) -> Self {
let cache = Cache {
data: Arc::new(Mutex::new(HashMap::new())),
ttl,
};
cache.start_cleanup_task();
cache
}
fn start_cleanup_task(&self) {
let data = self.data.clone();
let ttl = self.ttl;
tokio::spawn(async move {
loop {
time::sleep(ttl).await;
let mut data = data.lock().await;
data.retain(|_, entry| !entry.is_expired());
}
});
}
pub async fn get(&self, key: K) -> Option<V> {
let mut data = self.data.lock().await;
if let Some(entry) = data.get(&key) {
if entry.is_expired() {
data.remove(&key);
None
} else {
Some(entry.value.clone())
}
} else {
None
}
}
pub async fn put(&self, key: K, value: V) {
let mut data = self.data.lock().await;
let entry = CacheEntry::new(value, self.ttl);
data.insert(key, entry);
}
pub async fn remove(&self, key: K) {
let mut data = self.data.lock().await;
data.remove(&key);
}
}
八、总结
异步缓存系统是现代 Web 应用架构中的核心组件,能够显著提升系统的性能和响应速度。Rust 语言的异步特性和内存安全保障使得它非常适合用于构建高性能、可靠的异步缓存系统。
本文深入探讨了异步缓存系统的设计与实现,包括缓存策略、数据结构选择、并发安全保障、内存管理、错误处理和过期机制等方面。通过实战项目集成演示了如何在用户同步服务、订单处理服务和监控服务中使用异步缓存系统,并提供了性能优化方法和常见问题的解决方案。
通过学习本章内容,可以更好地理解异步缓存系统的工作原理,掌握其实现方法,并在实际项目中构建高效、可靠的异步缓存系统。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown转HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online
- HTML转Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML转Markdown在线工具,online