最近一个项目里,我们的核心服务开始承受越来越大的压力。这是一个典型的微服务架构,由多个 Rust 服务构成,彼此通过 Tonic (gRPC) 通信。问题出在一个下游的依赖服务上,它偶尔会因为负载过高或内部错误而响应变慢,甚至完全不可用。最初的重试逻辑很简单,但这在下游服务故障时引发了“重试风暴”,瞬间压垮了我们自己的服务实例。
单个实例内的熔断器无法解决问题。在 Kubernetes 环境下,我们的服务有多个 Pod,流量被负载均衡到任意一个。一个 Pod 里的熔断器跳闸了,但其他 Pod 还在继续向故障服务发送请求。我们需要的是一个所有实例共享状态的分布式熔断器。同时,为了保护下游服务,一个全局的、分布式的请求限流器也变得至关重要。
我们的技术栈是 Tonic,它底层依赖 Tower 服务抽象。这意味着最优雅的实现方式是构建自定义的 tower::Layer 中间件。状态共享方面,Redis 是不二之选:它性能极高,并且其原子操作(特别是 Lua 脚本)是实现分布式锁和计数器的完美工具。
这次复盘的目标,就是记录如何从零开始,为 Tonic 服务构建一套基于 Redis 的、生产级的分布式限流与熔断中间件。
第一步:设计与实现分布式限流中间件
限流的首要目标是防止系统被突发流量打垮。我们选择滑动窗口日志(Sliding Window Log)算法,它比固定窗口或令牌桶更精确,能平滑处理边界情况下的流量尖峰。
其基本思路是:为每个请求记录一个带时间戳的日志。当新请求到达时,首先移除窗口之外的旧日志,然后统计窗口内剩余的日志数量。如果数量未超过阈值,则允许请求通过并记录新日志;否则,拒绝请求。
在分布式环境中,这个日志必须存储在所有实例都能访问的地方。Redis 的 Sorted Set 是完美的数据结构,我们可以用时间戳作为 score,用一个唯一标识(例如 UUID)作为 member。
原子性是关键:Lua 脚本
直接使用多个 Redis 命令(ZREMRANGEBYSCORE, ZCARD, ZADD)会产生竞态条件。在高并发下,两个实例可能同时读取到一个未满的窗口,然后都添加了新日志,导致实际请求数超出阈值。因此,必须将整个逻辑封装在一个 Lua 脚本中,保证其原子执行。
这是我们的核心 Lua 脚本 sliding_window_limiter.lua:
-- ARGV[1]: key - a unique key for the resource being rate-limited
-- ARGV[2]: window_size_ms - the time window in milliseconds
-- ARGV[3]: limit - the maximum number of requests allowed in the window
-- ARGV[4]: current_time_ms - current unix timestamp in milliseconds
-- ARGV[5]: request_id - a unique identifier for the current request
local key = ARGV[1]
local window_size = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local current_time = tonumber(ARGV[4])
local request_id = ARGV[5]
-- 1. 移除窗口之外的旧请求记录
-- score (时间戳) 小于 (current_time - window_size) 的都被移除
local window_start = current_time - window_size
redis.call('ZREMRANGEBYSCORE', key, '-inf', '(' .. window_start)
-- 2. 获取当前窗口内的请求数量
local current_count = redis.call('ZCARD', key)
-- 3. 检查是否超过限制
if current_count < limit then
-- 未超限,添加当前请求记录
redis.call('ZADD', key, current_time, request_id)
-- 设置一个过期时间,防止冷数据永久占用内存
redis.call('EXPIRE', key, math.ceil(window_size / 1000) * 2)
return 1 -- 代表允许
else
return 0 -- 代表拒绝
end
这个脚本确保了“清理-检查-添加”三步操作的原子性,彻底杜绝了竞态条件。注意 EXPIRE 的设置,这是一个很好的防御性措施,避免某个限流 key 因为不再被访问而永久留在内存中。
在 Rust 中实现 Tower::Layer
现在,我们将这个逻辑包装成一个 Tower 中间件。
首先,定义 Layer 和 Service 的结构体。
// src/middleware/rate_limiter.rs
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use redis::{AsyncCommands, Script};
use tonic::{body::BoxBody, transport::Body, Status};
use http::{Request, Response};
use tower::{Layer, Service};
use uuid::Uuid;
// Lua 脚本的常量
const LUA_SCRIPT: &str = include_str!("sliding_window_limiter.lua");
// 配置
#[derive(Clone)]
pub struct RateLimiterConfig {
pub key_prefix: String,
pub window_size: Duration,
pub limit: u64,
}
// Layer
#[derive(Clone)]
pub struct RateLimiterLayer {
pub client: Arc<redis::Client>,
pub config: Arc<RateLimiterConfig>,
}
impl<S> Layer<S> for RateLimiterLayer {
type Service = RateLimiterService<S>;
fn layer(&self, inner: S) -> Self::Service {
RateLimiterService {
inner,
client: self.client.clone(),
config: self.config.clone(),
script: Arc::new(Script::new(LUA_SCRIPT)),
}
}
}
// Service
#[derive(Clone)]
pub struct RateLimiterService<S> {
inner: S,
client: Arc<redis::Client>,
config: Arc<RateLimiterConfig>,
script: Arc<Script>,
}
// ... Service 实现细节将在下面展示
接下来是 Service trait 的核心实现 poll_ready 和 call。
// ... continuation of src/middleware/rate_limiter.rs
impl<S> Service<Request<Body>> for RateLimiterService<S>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// 限流检查发生在 call 方法中,这里直接透传内部服务的状态
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let mut clone = self.clone();
Box::pin(async move {
let conn_result = clone.client.get_async_connection().await;
let mut conn = match conn_result {
Ok(c) => c,
Err(e) => {
// 在真实项目中,这里需要详细的日志记录
eprintln!("RateLimiter: Failed to connect to Redis: {}", e);
// Redis 连接失败,这是一个关键的权衡点。我们选择“故障开放”(Fail Open),
// 即允许请求通过,保证核心业务可用性,牺牲临时的限流保护。
// 另一种选择是“故障关闭”(Fail Closed),返回错误,这会更安全但可能中断服务。
return clone.inner.call(req).await;
}
};
let current_time_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
// 这里的 key 可以根据请求的路径、用户IP、API Key等信息动态生成
// 为简化,我们使用一个固定的 key
let key = format!("{}:{}", clone.config.key_prefix, req.uri().path());
let request_id = Uuid::new_v4().to_string();
let allowed: Result<i32, _> = clone.script
.key(key)
.arg(clone.config.window_size.as_millis() as u64)
.arg(clone.config.limit)
.arg(current_time_ms)
.arg(request_id)
.invoke_async(&mut conn)
.await;
match allowed {
Ok(1) => {
// 允许通行
clone.inner.call(req).await
},
Ok(0) => {
// 拒绝
let res = Response::builder()
.status(429) // Too Many Requests
.body(BoxBody::empty())
.unwrap();
// 这里需要将 http::Response<BoxBody> 转换为 S::Response
// 这通常需要一个转换函数,或者假设 S::Response 就是 Response<BoxBody>
// 在 Tonic 中,我们需要返回一个 gRPC Status
// 但在 Tower 中间件层面,我们操作的是 http::Request/Response
// 一个更健壮的方式是返回一个特定的错误,让上层转换为 Status::ResourceExhausted
// 为了简化,我们直接返回一个空的错误响应
// THIS IS A SIMPLIFICATION. In a real Tonic setup, you'd map this to a gRPC Status error.
let resp = http::Response::new(tonic::body::empty_body());
let grpc_status = Status::resource_exhausted("Rate limit exceeded");
let (mut parts, _body) = resp.into_parts();
let status_header = grpc_status.to_header_map().unwrap();
parts.headers.extend(status_header);
Ok(Response::from_parts(parts, BoxBody::empty()))
},
Ok(_) => {
// 不应该发生
eprintln!("RateLimiter: Unexpected Lua script response");
clone.inner.call(req).await
}
Err(e) => {
// Redis 命令执行失败,同样采取 Fail Open 策略
eprintln!("RateLimiter: Redis command failed: {}", e);
clone.inner.call(req).await
}
}
})
}
}
注意 call 方法中的错误处理。当 Redis 连接失败或命令执行失败时,我们选择让请求通过(Fail Open)。这是基于可用性优先的考虑。在金融等对安全要求极高的场景,可能会选择 Fail Closed。这是一个必须根据业务场景做出的重要架构决策。
第二步:构建更复杂的分布式熔断器
熔断器的状态机(Closed, Open, Half-Open)在分布式环境下变得复杂。我们需要在 Redis 中原子地更新状态、失败次数和时间戳。
- Closed: 正常状态,请求可以通过。但需要记录连续失败次数。当失败次数达到阈值,状态变为 Open。
- Open: 熔断状态,所有请求立即被拒绝,避免冲击下游服务。此状态有一个持续时间(
reset_timeout),超时后进入 Half-Open。 - Half-Open: 试探状态。允许一个(或少量)请求通过。如果请求成功,则认为下游服务已恢复,状态转为 Closed。如果失败,则回到 Open 状态,并重置计时器。
Redis 数据结构与 Lua 脚本
我们将使用一个 Redis Hash 来存储一个熔断器的所有状态信息。
-
state: “closed”, “open”, “half-open” (用 0, 1, 2 表示) -
failures: 连续失败次数 -
last_failure_time: 最后一次失败的时间戳 -
opened_at: 进入 Open 状态的时间戳
挑战在于如何原子地处理状态转换,特别是 Half-Open 状态下“只允许一个请求通过”的逻辑。
stateDiagram-v2
[*] --> Closed: Initial State
Closed --> Open: on failure_threshold_reached
Open --> HalfOpen: after reset_timeout
HalfOpen --> Closed: on success
HalfOpen --> Open: on failure
Closed --> Closed: on success (reset failures)
Closed --> Closed: on failure (increment failures)
我们需要两个 Lua 脚本:一个用于请求前的检查,一个用于请求后的结果上报。
1. check_and_acquire_permit.lua (请求前调用)
-- ARGV[1]: key - a unique key for the circuit breaker
-- ARGV[2]: current_time_ms - current unix timestamp in milliseconds
-- ARGV[3]: reset_timeout_ms - duration to stay in Open state
local key = ARGV[1]
local current_time = tonumber(ARGV[2])
local reset_timeout = tonumber(ARGV[3])
local state_data = redis.call('HGETALL', key)
local state = 0 -- 0: Closed, 1: Open, 2: Half-Open
local opened_at = 0
if #state_data > 0 then
for i=1, #state_data, 2 do
if state_data[i] == 'state' then
state = tonumber(state_data[i+1])
elseif state_data[i] == 'opened_at' then
opened_at = tonumber(state_data[i+1])
end
end
end
if state == 1 then -- Open state
if current_time > (opened_at + reset_timeout) then
-- Timeout expired, transition to Half-Open
redis.call('HSET', key, 'state', 2)
return 2 -- Permit as Half-Open
else
return 1 -- Reject as Open
end
elseif state == 2 then -- Half-Open state
-- This is a simplification. A robust implementation would use a lock.
-- For example, use SET with NX and EX to allow only one probe.
-- Let's implement that:
local lock_key = key .. ":half_open_lock"
local lock = redis.call('SET', lock_key, '1', 'NX', 'PX', '1000') -- 1 sec lock
if lock then
return 2 -- Permit as Half-Open probe
else
return 1 -- Reject, another probe is in flight
end
else -- Closed state
return 0 -- Permit as Closed
end
这个脚本处理了 Open 到 Half-Open 的自动转换,并且通过 SETNX 机制巧妙地解决了分布式环境下的 Half-Open 探针唯一性问题。
2. report_execution_result.lua (请求后调用)
-- ARGV[1]: key - the circuit breaker key
-- ARGV[2]: success - "1" for success, "0" for failure
-- ARGV[3]: current_time_ms
-- ARGV[4]: failure_threshold - consecutive failures to trip
-- ARGV[5]: generation - a generation number to prevent stale updates
local key = ARGV[1]
local success = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local failure_threshold = tonumber(ARGV[4])
local request_generation = tonumber(ARGV[5])
local state = tonumber(redis.call('HGET', key, 'state') or 0)
local current_generation = tonumber(redis.call('HGET', key, 'generation') or 0)
-- Stale update detection
if request_generation < current_generation then
return -- This result is from a previous generation, ignore it.
end
if success == 1 then
if state == 2 then -- Half-Open success -> transition to Closed
-- Reset the breaker state
redis.call('HMSET', key, 'state', 0, 'failures', 0, 'generation', current_generation + 1)
redis.call('DEL', key .. ":half_open_lock")
elseif state == 0 then -- Closed success -> reset failure count
redis.call('HSET', key, 'failures', 0)
end
else -- Failure
if state == 2 then -- Half-Open failure -> back to Open
redis.call('HMSET', key, 'state', 1, 'opened_at', current_time, 'generation', current_generation + 1)
redis.call('DEL', key .. ":half_open_lock")
elseif state == 0 then -- Closed failure
local failures = redis.call('HINCRBY', key, 'failures', 1)
redis.call('HSET', key, 'last_failure_time', current_time)
if failures >= failure_threshold then
-- Trip the breaker -> transition to Open
redis.call('HMSET', key, 'state', 1, 'opened_at', current_time, 'generation', current_generation + 1)
end
end
end
这个脚本的复杂性更高。它处理了成功和失败两种情况下的状态转换。特别地,引入了 generation 字段来解决一个棘手的问题:当熔断器从 Half-Open 成功变为 Closed 后,之前那个慢速的 Half-Open 探针请求可能才返回失败结果,这个过期的失败结果不应该再次将熔断器置为 Open 状态。generation 确保了只有最新的状态转换才能生效。
在 Rust 中实现熔断器中间件
结构与限流器类似,但 call 方法的逻辑要复杂得多,因为它需要在 inner.call() 的前后都与 Redis 交互。
// src/middleware/circuit_breaker.rs (structs and boilerplate similar to rate_limiter)
// ...
// Inside the impl<S> Service<Request<Body>> for CircuitBreakerService<S>
fn call(&mut self, req: Request<Body>) -> Self::Future {
let mut clone = self.clone();
Box::pin(async move {
// Step 1: Check permit before calling inner service
let mut conn = match clone.client.get_async_connection().await {
Ok(c) => c,
Err(e) => {
eprintln!("CircuitBreaker: Redis connection failed: {}", e);
return clone.inner.call(req).await; // Fail open
}
};
let key = clone.config.key_prefix.clone();
let current_time_ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
let permit_result: Result<i32, _> = clone.check_script
.key(&key)
.arg(current_time_ms)
.arg(clone.config.reset_timeout.as_millis())
.invoke_async(&mut conn)
.await;
let generation: u64 = conn.hget(&key, "generation").await.unwrap_or(0);
match permit_result {
Ok(0) | Ok(2) => { // Permit as Closed or Half-Open
// Proceed with the actual call
let response_future = clone.inner.call(req);
let response = response_future.await;
// Step 2: Report result after the call
let success = response.is_ok();
let report_success = if success { 1 } else { 0 };
let report_script = clone.report_script.clone();
let report_key = key.clone();
let report_config = clone.config.clone();
// Reporting is best-effort and happens in the background
tokio::spawn(async move {
if let Ok(mut conn) = clone.client.get_async_connection().await {
let current_time_ms_report = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
let _: Result<(), _> = report_script
.key(report_key)
.arg(report_success)
.arg(current_time_ms_report)
.arg(report_config.failure_threshold)
.arg(generation) // Pass the generation number read before the call
.invoke_async(&mut conn)
.await;
}
});
response
},
Ok(1) => { // Reject as Open
// Return gRPC Status UNAVAILABLE
let resp = http::Response::new(tonic::body::empty_body());
let grpc_status = Status::unavailable("Circuit breaker is open");
let (mut parts, _body) = resp.into_parts();
let status_header = grpc_status.to_header_map().unwrap();
parts.headers.extend(status_header);
Ok(Response::from_parts(parts, BoxBody::empty()))
},
Err(e) => {
eprintln!("CircuitBreaker: Redis check failed: {}", e);
clone.inner.call(req).await // Fail open
}
_ => { // Should not happen
eprintln!("CircuitBreaker: Unexpected Lua script response");
clone.inner.call(req).await // Fail open
}
}
})
}
这里的实现将结果上报放到了一个 tokio::spawn 中,使其成为一个“即发即忘”的操作。这是为了不让熔断器状态更新的延迟影响到主请求的响应时间。这是一个性能上的权衡,代价是可能会丢失极少数的状态更新。在多数场景下,这是可以接受的。
整合与应用
现在,我们有了两个独立的、生产级的 Tower::Layer。将它们应用到一个 Tonic 服务上非常简单。
首先,Cargo.toml 需要包含这些依赖:
[dependencies]
tonic = "0.10"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
http = "0.2"
tower = "0.4"
redis = { version = "0.23", features = ["tokio-comp"] }
uuid = { version = "1.4", features = ["v4"] }
serde = { version = "1.0", features = ["derive"] }
然后是 Tonic 服务的启动代码:
// src/main.rs
use tonic::{transport::Server, Request, Response, Status};
use tower::ServiceBuilder;
use std::sync::Arc;
use std::time::Duration;
// Import our middleware
mod middleware;
use middleware::rate_limiter::{RateLimiterLayer, RateLimiterConfig};
use middleware::circuit_breaker::{CircuitBreakerLayer, CircuitBreakerConfig};
// Define a gRPC service for demonstration
pub mod hello_world {
tonic::include_proto!("helloworld");
}
use hello_world::{
greeter_server::{Greeter, GreeterServer},
HelloRequest, HelloReply,
};
#[derive(Default)]
pub struct MyGreeter;
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
// Simulate a service that can fail
if rand::random::<f32>() > 0.5 {
// Simulate slow response
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(Response::new(HelloReply {
message: format!("Hello {}!", request.into_inner().name),
}))
} else {
// Simulate failure
Err(Status::internal("Something went wrong"))
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();
let redis_client = Arc::new(redis::Client::open("redis://127.0.0.1/")?);
// Middleware Configuration
let rate_limiter_config = Arc::new(RateLimiterConfig {
key_prefix: "grpc:greeter:ratelimit".to_string(),
window_size: Duration::from_secs(10),
limit: 5,
});
let circuit_breaker_config = Arc::new(CircuitBreakerConfig {
key_prefix: "grpc:greeter:cb".to_string(),
failure_threshold: 3,
reset_timeout: Duration::from_secs(30),
});
// Build the middleware stack
let layer = ServiceBuilder::new()
.layer(RateLimiterLayer {
client: redis_client.clone(),
config: rate_limiter_config,
})
.layer(CircuitBreakerLayer {
client: redis_client.clone(),
config: circuit_breaker_config,
})
.into_inner();
println!("GreeterServer listening on {}", addr);
Server::builder()
.layer(layer)
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
中间件的顺序很重要。在这里,RateLimiterLayer 在外层,CircuitBreakerLayer 在内层。这意味着请求会先通过限流检查,如果被限流,就不会计入熔断器的失败统计,这通常是我们想要的行为。
局限与展望
这套方案并非银弹。首先,它强依赖于 Redis 的可用性和性能。每一次 gRPC 请求都至少伴随着一次到 Redis 的网络往返,这会增加请求的基线延迟。对于延迟极度敏感的服务,需要仔细评估这个开销。在我们的场景中,几十毫秒的延迟是可以接受的,远好于服务雪崩。
其次,Redis 实例本身也可能成为瓶颈或单点故障。生产环境中需要高可用的 Redis 集群(如 Sentinel 或 Redis Cluster)来缓解这个问题。我们选择的“故障开放”策略也是对这种风险的一种妥协。
未来的迭代方向可以考虑:
- 性能优化:在服务实例本地增加一个内存缓存(如 moka-rs),短时间内(例如100毫秒)缓存熔断器的 Open 状态。如果一个熔断器刚跳闸,本地缓存可以立即拒绝请求,避免了对 Redis 的无效查询。这需要处理缓存与 Redis 状态的最终一致性问题。
- 更智能的策略:目前的熔断基于连续失败次数。可以引入更高级的策略,如基于失败率的熔断。限流也可以采用自适应算法,根据下游服务的健康状况(如响应延迟)动态调整限流阈值。
- 可观测性:将熔断器的状态转换、限流事件作为指标暴露给 Prometheus。这样可以建立监控告警,让我们实时了解系统韧性层的运行状况,而不是等到用户报告问题。