基于 Redis 和 Tower Middleware 实现 Tonic 服务的分布式熔断与限流


最近一个项目里,我们的核心服务开始承受越来越大的压力。这是一个典型的微服务架构,由多个 Rust 服务构成,彼此通过 Tonic (gRPC) 通信。问题出在一个下游的依赖服务上,它偶尔会因为负载过高或内部错误而响应变慢,甚至完全不可用。最初的重试逻辑很简单,但这在下游服务故障时引发了“重试风暴”,瞬间压垮了我们自己的服务实例。

单个实例内的熔断器无法解决问题。在 Kubernetes 环境下,我们的服务有多个 Pod,流量被负载均衡到任意一个。一个 Pod 里的熔断器跳闸了,但其他 Pod 还在继续向故障服务发送请求。我们需要的是一个所有实例共享状态的分布式熔断器。同时,为了保护下游服务,一个全局的、分布式的请求限流器也变得至关重要。

我们的技术栈是 Tonic,它底层依赖 Tower 服务抽象。这意味着最优雅的实现方式是构建自定义的 tower::Layer 中间件。状态共享方面,Redis 是不二之选:它性能极高,并且其原子操作(特别是 Lua 脚本)是实现分布式锁和计数器的完美工具。

这次复盘的目标,就是记录如何从零开始,为 Tonic 服务构建一套基于 Redis 的、生产级的分布式限流与熔断中间件。

第一步:设计与实现分布式限流中间件

限流的首要目标是防止系统被突发流量打垮。我们选择滑动窗口日志(Sliding Window Log)算法,它比固定窗口或令牌桶更精确,能平滑处理边界情况下的流量尖峰。

其基本思路是:为每个请求记录一个带时间戳的日志。当新请求到达时,首先移除窗口之外的旧日志,然后统计窗口内剩余的日志数量。如果数量未超过阈值,则允许请求通过并记录新日志;否则,拒绝请求。

在分布式环境中,这个日志必须存储在所有实例都能访问的地方。Redis 的 Sorted Set 是完美的数据结构,我们可以用时间戳作为 score,用一个唯一标识(例如 UUID)作为 member。

原子性是关键:Lua 脚本

直接使用多个 Redis 命令(ZREMRANGEBYSCORE, ZCARD, ZADD)会产生竞态条件。在高并发下,两个实例可能同时读取到一个未满的窗口,然后都添加了新日志,导致实际请求数超出阈值。因此,必须将整个逻辑封装在一个 Lua 脚本中,保证其原子执行。

这是我们的核心 Lua 脚本 sliding_window_limiter.lua:

-- ARGV[1]: key - a unique key for the resource being rate-limited
-- ARGV[2]: window_size_ms - the time window in milliseconds
-- ARGV[3]: limit - the maximum number of requests allowed in the window
-- ARGV[4]: current_time_ms - current unix timestamp in milliseconds
-- ARGV[5]: request_id - a unique identifier for the current request

local key = ARGV[1]
local window_size = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
local current_time = tonumber(ARGV[4])
local request_id = ARGV[5]

-- 1. 移除窗口之外的旧请求记录
-- score (时间戳) 小于 (current_time - window_size) 的都被移除
local window_start = current_time - window_size
redis.call('ZREMRANGEBYSCORE', key, '-inf', '(' .. window_start)

-- 2. 获取当前窗口内的请求数量
local current_count = redis.call('ZCARD', key)

-- 3. 检查是否超过限制
if current_count < limit then
    -- 未超限,添加当前请求记录
    redis.call('ZADD', key, current_time, request_id)
    -- 设置一个过期时间,防止冷数据永久占用内存
    redis.call('EXPIRE', key, math.ceil(window_size / 1000) * 2) 
    return 1 -- 代表允许
else
    return 0 -- 代表拒绝
end

这个脚本确保了“清理-检查-添加”三步操作的原子性,彻底杜绝了竞态条件。注意 EXPIRE 的设置,这是一个很好的防御性措施,避免某个限流 key 因为不再被访问而永久留在内存中。

在 Rust 中实现 Tower::Layer

现在,我们将这个逻辑包装成一个 Tower 中间件。

首先,定义 LayerService 的结构体。

// src/middleware/rate_limiter.rs

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    sync::Arc,
    time::{Duration, SystemTime, UNIX_EPOCH},
};
use redis::{AsyncCommands, Script};
use tonic::{body::BoxBody, transport::Body, Status};
use http::{Request, Response};
use tower::{Layer, Service};
use uuid::Uuid;

// Lua 脚本的常量
const LUA_SCRIPT: &str = include_str!("sliding_window_limiter.lua");

// 配置
#[derive(Clone)]
pub struct RateLimiterConfig {
    pub key_prefix: String,
    pub window_size: Duration,
    pub limit: u64,
}

// Layer
#[derive(Clone)]
pub struct RateLimiterLayer {
    pub client: Arc<redis::Client>,
    pub config: Arc<RateLimiterConfig>,
}

impl<S> Layer<S> for RateLimiterLayer {
    type Service = RateLimiterService<S>;

    fn layer(&self, inner: S) -> Self::Service {
        RateLimiterService {
            inner,
            client: self.client.clone(),
            config: self.config.clone(),
            script: Arc::new(Script::new(LUA_SCRIPT)),
        }
    }
}

// Service
#[derive(Clone)]
pub struct RateLimiterService<S> {
    inner: S,
    client: Arc<redis::Client>,
    config: Arc<RateLimiterConfig>,
    script: Arc<Script>,
}

// ... Service 实现细节将在下面展示

接下来是 Service trait 的核心实现 poll_readycall

// ... continuation of src/middleware/rate_limiter.rs

impl<S> Service<Request<Body>> for RateLimiterService<S>
where
    S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
    S::Future: Send + 'static,
    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // 限流检查发生在 call 方法中,这里直接透传内部服务的状态
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, req: Request<Body>) -> Self::Future {
        let mut clone = self.clone();
        Box::pin(async move {
            let conn_result = clone.client.get_async_connection().await;
            let mut conn = match conn_result {
                Ok(c) => c,
                Err(e) => {
                    // 在真实项目中,这里需要详细的日志记录
                    eprintln!("RateLimiter: Failed to connect to Redis: {}", e);
                    // Redis 连接失败,这是一个关键的权衡点。我们选择“故障开放”(Fail Open),
                    // 即允许请求通过,保证核心业务可用性,牺牲临时的限流保护。
                    // 另一种选择是“故障关闭”(Fail Closed),返回错误,这会更安全但可能中断服务。
                    return clone.inner.call(req).await;
                }
            };

            let current_time_ms = SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap_or_default()
                .as_millis();

            // 这里的 key 可以根据请求的路径、用户IP、API Key等信息动态生成
            // 为简化,我们使用一个固定的 key
            let key = format!("{}:{}", clone.config.key_prefix, req.uri().path());
            let request_id = Uuid::new_v4().to_string();

            let allowed: Result<i32, _> = clone.script
                .key(key)
                .arg(clone.config.window_size.as_millis() as u64)
                .arg(clone.config.limit)
                .arg(current_time_ms)
                .arg(request_id)
                .invoke_async(&mut conn)
                .await;

            match allowed {
                Ok(1) => {
                    // 允许通行
                    clone.inner.call(req).await
                },
                Ok(0) => {
                    // 拒绝
                    let res = Response::builder()
                        .status(429) // Too Many Requests
                        .body(BoxBody::empty())
                        .unwrap();
                    // 这里需要将 http::Response<BoxBody> 转换为 S::Response
                    // 这通常需要一个转换函数,或者假设 S::Response 就是 Response<BoxBody>
                    // 在 Tonic 中,我们需要返回一个 gRPC Status
                    // 但在 Tower 中间件层面,我们操作的是 http::Request/Response
                    // 一个更健壮的方式是返回一个特定的错误,让上层转换为 Status::ResourceExhausted
                    // 为了简化,我们直接返回一个空的错误响应
                    // THIS IS A SIMPLIFICATION. In a real Tonic setup, you'd map this to a gRPC Status error.
                     let resp = http::Response::new(tonic::body::empty_body());
                     let grpc_status = Status::resource_exhausted("Rate limit exceeded");
                     let (mut parts, _body) = resp.into_parts();
                     let status_header = grpc_status.to_header_map().unwrap();
                     parts.headers.extend(status_header);
                     Ok(Response::from_parts(parts, BoxBody::empty()))
                },
                Ok(_) => {
                    // 不应该发生
                    eprintln!("RateLimiter: Unexpected Lua script response");
                    clone.inner.call(req).await
                }
                Err(e) => {
                    // Redis 命令执行失败,同样采取 Fail Open 策略
                    eprintln!("RateLimiter: Redis command failed: {}", e);
                    clone.inner.call(req).await
                }
            }
        })
    }
}

注意 call 方法中的错误处理。当 Redis 连接失败或命令执行失败时,我们选择让请求通过(Fail Open)。这是基于可用性优先的考虑。在金融等对安全要求极高的场景,可能会选择 Fail Closed。这是一个必须根据业务场景做出的重要架构决策。

第二步:构建更复杂的分布式熔断器

熔断器的状态机(Closed, Open, Half-Open)在分布式环境下变得复杂。我们需要在 Redis 中原子地更新状态、失败次数和时间戳。

  • Closed: 正常状态,请求可以通过。但需要记录连续失败次数。当失败次数达到阈值,状态变为 Open。
  • Open: 熔断状态,所有请求立即被拒绝,避免冲击下游服务。此状态有一个持续时间(reset_timeout),超时后进入 Half-Open。
  • Half-Open: 试探状态。允许一个(或少量)请求通过。如果请求成功,则认为下游服务已恢复,状态转为 Closed。如果失败,则回到 Open 状态,并重置计时器。

Redis 数据结构与 Lua 脚本

我们将使用一个 Redis Hash 来存储一个熔断器的所有状态信息。

  • state: “closed”, “open”, “half-open” (用 0, 1, 2 表示)
  • failures: 连续失败次数
  • last_failure_time: 最后一次失败的时间戳
  • opened_at: 进入 Open 状态的时间戳

挑战在于如何原子地处理状态转换,特别是 Half-Open 状态下“只允许一个请求通过”的逻辑。

stateDiagram-v2
    [*] --> Closed: Initial State
    Closed --> Open: on failure_threshold_reached
    Open --> HalfOpen: after reset_timeout
    HalfOpen --> Closed: on success
    HalfOpen --> Open: on failure
    Closed --> Closed: on success (reset failures)
    Closed --> Closed: on failure (increment failures)

我们需要两个 Lua 脚本:一个用于请求前的检查,一个用于请求后的结果上报。

1. check_and_acquire_permit.lua (请求前调用)

-- ARGV[1]: key - a unique key for the circuit breaker
-- ARGV[2]: current_time_ms - current unix timestamp in milliseconds
-- ARGV[3]: reset_timeout_ms - duration to stay in Open state

local key = ARGV[1]
local current_time = tonumber(ARGV[2])
local reset_timeout = tonumber(ARGV[3])

local state_data = redis.call('HGETALL', key)
local state = 0 -- 0: Closed, 1: Open, 2: Half-Open
local opened_at = 0

if #state_data > 0 then
    for i=1, #state_data, 2 do
        if state_data[i] == 'state' then
            state = tonumber(state_data[i+1])
        elseif state_data[i] == 'opened_at' then
            opened_at = tonumber(state_data[i+1])
        end
    end
end

if state == 1 then -- Open state
    if current_time > (opened_at + reset_timeout) then
        -- Timeout expired, transition to Half-Open
        redis.call('HSET', key, 'state', 2)
        return 2 -- Permit as Half-Open
    else
        return 1 -- Reject as Open
    end
elseif state == 2 then -- Half-Open state
    -- This is a simplification. A robust implementation would use a lock.
    -- For example, use SET with NX and EX to allow only one probe.
    -- Let's implement that:
    local lock_key = key .. ":half_open_lock"
    local lock = redis.call('SET', lock_key, '1', 'NX', 'PX', '1000') -- 1 sec lock
    if lock then
        return 2 -- Permit as Half-Open probe
    else
        return 1 -- Reject, another probe is in flight
    end
else -- Closed state
    return 0 -- Permit as Closed
end

这个脚本处理了 Open 到 Half-Open 的自动转换,并且通过 SETNX 机制巧妙地解决了分布式环境下的 Half-Open 探针唯一性问题。

2. report_execution_result.lua (请求后调用)

-- ARGV[1]: key - the circuit breaker key
-- ARGV[2]: success - "1" for success, "0" for failure
-- ARGV[3]: current_time_ms
-- ARGV[4]: failure_threshold - consecutive failures to trip
-- ARGV[5]: generation - a generation number to prevent stale updates

local key = ARGV[1]
local success = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local failure_threshold = tonumber(ARGV[4])
local request_generation = tonumber(ARGV[5])

local state = tonumber(redis.call('HGET', key, 'state') or 0)
local current_generation = tonumber(redis.call('HGET', key, 'generation') or 0)

-- Stale update detection
if request_generation < current_generation then
    return -- This result is from a previous generation, ignore it.
end

if success == 1 then
    if state == 2 then -- Half-Open success -> transition to Closed
        -- Reset the breaker state
        redis.call('HMSET', key, 'state', 0, 'failures', 0, 'generation', current_generation + 1)
        redis.call('DEL', key .. ":half_open_lock")
    elseif state == 0 then -- Closed success -> reset failure count
        redis.call('HSET', key, 'failures', 0)
    end
else -- Failure
    if state == 2 then -- Half-Open failure -> back to Open
        redis.call('HMSET', key, 'state', 1, 'opened_at', current_time, 'generation', current_generation + 1)
        redis.call('DEL', key .. ":half_open_lock")
    elseif state == 0 then -- Closed failure
        local failures = redis.call('HINCRBY', key, 'failures', 1)
        redis.call('HSET', key, 'last_failure_time', current_time)
        if failures >= failure_threshold then
            -- Trip the breaker -> transition to Open
            redis.call('HMSET', key, 'state', 1, 'opened_at', current_time, 'generation', current_generation + 1)
        end
    end
end

这个脚本的复杂性更高。它处理了成功和失败两种情况下的状态转换。特别地,引入了 generation 字段来解决一个棘手的问题:当熔断器从 Half-Open 成功变为 Closed 后,之前那个慢速的 Half-Open 探针请求可能才返回失败结果,这个过期的失败结果不应该再次将熔断器置为 Open 状态。generation 确保了只有最新的状态转换才能生效。

在 Rust 中实现熔断器中间件

结构与限流器类似,但 call 方法的逻辑要复杂得多,因为它需要在 inner.call() 的前后都与 Redis 交互。

// src/middleware/circuit_breaker.rs (structs and boilerplate similar to rate_limiter)

// ...
// Inside the impl<S> Service<Request<Body>> for CircuitBreakerService<S>

fn call(&mut self, req: Request<Body>) -> Self::Future {
    let mut clone = self.clone();
    Box::pin(async move {
        // Step 1: Check permit before calling inner service
        let mut conn = match clone.client.get_async_connection().await {
            Ok(c) => c,
            Err(e) => {
                eprintln!("CircuitBreaker: Redis connection failed: {}", e);
                return clone.inner.call(req).await; // Fail open
            }
        };

        let key = clone.config.key_prefix.clone();
        let current_time_ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
        
        let permit_result: Result<i32, _> = clone.check_script
            .key(&key)
            .arg(current_time_ms)
            .arg(clone.config.reset_timeout.as_millis())
            .invoke_async(&mut conn)
            .await;
        
        let generation: u64 = conn.hget(&key, "generation").await.unwrap_or(0);

        match permit_result {
            Ok(0) | Ok(2) => { // Permit as Closed or Half-Open
                // Proceed with the actual call
                let response_future = clone.inner.call(req);
                let response = response_future.await;

                // Step 2: Report result after the call
                let success = response.is_ok();
                let report_success = if success { 1 } else { 0 };
                
                let report_script = clone.report_script.clone();
                let report_key = key.clone();
                let report_config = clone.config.clone();

                // Reporting is best-effort and happens in the background
                tokio::spawn(async move {
                    if let Ok(mut conn) = clone.client.get_async_connection().await {
                        let current_time_ms_report = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis();
                        let _: Result<(), _> = report_script
                            .key(report_key)
                            .arg(report_success)
                            .arg(current_time_ms_report)
                            .arg(report_config.failure_threshold)
                            .arg(generation) // Pass the generation number read before the call
                            .invoke_async(&mut conn)
                            .await;
                    }
                });

                response
            },
            Ok(1) => { // Reject as Open
                // Return gRPC Status UNAVAILABLE
                let resp = http::Response::new(tonic::body::empty_body());
                let grpc_status = Status::unavailable("Circuit breaker is open");
                let (mut parts, _body) = resp.into_parts();
                let status_header = grpc_status.to_header_map().unwrap();
                parts.headers.extend(status_header);
                Ok(Response::from_parts(parts, BoxBody::empty()))
            },
            Err(e) => {
                eprintln!("CircuitBreaker: Redis check failed: {}", e);
                clone.inner.call(req).await // Fail open
            }
            _ => { // Should not happen
                eprintln!("CircuitBreaker: Unexpected Lua script response");
                clone.inner.call(req).await // Fail open
            }
        }
    })
}

这里的实现将结果上报放到了一个 tokio::spawn 中,使其成为一个“即发即忘”的操作。这是为了不让熔断器状态更新的延迟影响到主请求的响应时间。这是一个性能上的权衡,代价是可能会丢失极少数的状态更新。在多数场景下,这是可以接受的。

整合与应用

现在,我们有了两个独立的、生产级的 Tower::Layer。将它们应用到一个 Tonic 服务上非常简单。

首先,Cargo.toml 需要包含这些依赖:

[dependencies]
tonic = "0.10"
prost = "0.12"
tokio = { version = "1", features = ["full"] }
http = "0.2"
tower = "0.4"
redis = { version = "0.23", features = ["tokio-comp"] }
uuid = { version = "1.4", features = ["v4"] }
serde = { version = "1.0", features = ["derive"] }

然后是 Tonic 服务的启动代码:

// src/main.rs

use tonic::{transport::Server, Request, Response, Status};
use tower::ServiceBuilder;
use std::sync::Arc;
use std::time::Duration;

// Import our middleware
mod middleware;
use middleware::rate_limiter::{RateLimiterLayer, RateLimiterConfig};
use middleware::circuit_breaker::{CircuitBreakerLayer, CircuitBreakerConfig};


// Define a gRPC service for demonstration
pub mod hello_world {
    tonic::include_proto!("helloworld");
}
use hello_world::{
    greeter_server::{Greeter, GreeterServer},
    HelloRequest, HelloReply,
};

#[derive(Default)]
pub struct MyGreeter;

#[tonic::async_trait]
impl Greeter for MyGreeter {
    async fn say_hello(
        &self,
        request: Request<HelloRequest>,
    ) -> Result<Response<HelloReply>, Status> {
        // Simulate a service that can fail
        if rand::random::<f32>() > 0.5 {
             // Simulate slow response
            tokio::time::sleep(Duration::from_millis(200)).await;
            Ok(Response::new(HelloReply {
                message: format!("Hello {}!", request.into_inner().name),
            }))
        } else {
             // Simulate failure
            Err(Status::internal("Something went wrong"))
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:50051".parse()?;
    let greeter = MyGreeter::default();

    let redis_client = Arc::new(redis::Client::open("redis://127.0.0.1/")?);

    // Middleware Configuration
    let rate_limiter_config = Arc::new(RateLimiterConfig {
        key_prefix: "grpc:greeter:ratelimit".to_string(),
        window_size: Duration::from_secs(10),
        limit: 5,
    });

    let circuit_breaker_config = Arc::new(CircuitBreakerConfig {
        key_prefix: "grpc:greeter:cb".to_string(),
        failure_threshold: 3,
        reset_timeout: Duration::from_secs(30),
    });

    // Build the middleware stack
    let layer = ServiceBuilder::new()
        .layer(RateLimiterLayer {
            client: redis_client.clone(),
            config: rate_limiter_config,
        })
        .layer(CircuitBreakerLayer {
            client: redis_client.clone(),
            config: circuit_breaker_config,
        })
        .into_inner();

    println!("GreeterServer listening on {}", addr);

    Server::builder()
        .layer(layer)
        .add_service(GreeterServer::new(greeter))
        .serve(addr)
        .await?;

    Ok(())
}

中间件的顺序很重要。在这里,RateLimiterLayer 在外层,CircuitBreakerLayer 在内层。这意味着请求会先通过限流检查,如果被限流,就不会计入熔断器的失败统计,这通常是我们想要的行为。

局限与展望

这套方案并非银弹。首先,它强依赖于 Redis 的可用性和性能。每一次 gRPC 请求都至少伴随着一次到 Redis 的网络往返,这会增加请求的基线延迟。对于延迟极度敏感的服务,需要仔细评估这个开销。在我们的场景中,几十毫秒的延迟是可以接受的,远好于服务雪崩。

其次,Redis 实例本身也可能成为瓶颈或单点故障。生产环境中需要高可用的 Redis 集群(如 Sentinel 或 Redis Cluster)来缓解这个问题。我们选择的“故障开放”策略也是对这种风险的一种妥协。

未来的迭代方向可以考虑:

  1. 性能优化:在服务实例本地增加一个内存缓存(如 moka-rs),短时间内(例如100毫秒)缓存熔断器的 Open 状态。如果一个熔断器刚跳闸,本地缓存可以立即拒绝请求,避免了对 Redis 的无效查询。这需要处理缓存与 Redis 状态的最终一致性问题。
  2. 更智能的策略:目前的熔断基于连续失败次数。可以引入更高级的策略,如基于失败率的熔断。限流也可以采用自适应算法,根据下游服务的健康状况(如响应延迟)动态调整限流阈值。
  3. 可观测性:将熔断器的状态转换、限流事件作为指标暴露给 Prometheus。这样可以建立监控告警,让我们实时了解系统韧性层的运行状况,而不是等到用户报告问题。

  目录