基于 ZooKeeper Watcher 机制实现的轻量级跨进程异步事件总线。无需依赖消息队列(MQ),利用 ZooKeeper 临时节点 + Redis 缓存 + DelayQueue 超时兜底,实现发布-订阅模式。
┌──────────────────────────────────────┐
│ Listener │
│ addListener(id, cb, timeout) │
│ onMessage(response) 回调 │
└──────────┬───────────────────────────┘
│ 创建临时节点 + 注册 Watcher
│ Watcher 触发
▼
┌────────────────┴───────────────────────────┐
│ ZooKeeper │
│ /event/bus/watcher/{requestId} │
│ 临时节点 (Ephemeral) + Curator Watcher │
└────────────────┬───────────────────────────┘
│ 更新节点数据
│ 变更通知
▼
┌──────────────────────────────────────┐
│ Publisher │
│ publish(id, response) │
│ 检查 Redis + ZK 确认活跃 │
└──────────────────────────────────────┘
┌──────────────────────┐ ┌──────────────────────┐
│ DelayQueue │ │ Redis │
│ 超时兜底 │ │ 存在标志 │
│ take() 阻塞等待 │ │ TTL=2×timeout │
│ 超时 → timeout 回调 │ │ 轻量级过滤 │
└──────────────────────┘ └──────────────────────┘
sequenceDiagram
participant L as Listener
participant R as Redis
participant Z as ZooKeeper
participant P as Publisher
participant D as DelayQueue
L->>R: 设置存在标志 (TTL=2*timeout)
L->>Z: 创建临时节点
L->>Z: 注册 Curator Watcher
L->>D: 启动超时计时
P->>R: 检查标志是否存在
alt 监听活跃
P->>Z: 更新节点数据
Z->>L: Watcher 触发
L->>L: callback.onMessage(response)
L->>Z: 删除节点并清理
else 无监听或超时
P->>P: 直接返回
D->>L: 执行 timeout 回调
L->>Z: 清理节点
end
Loading
Listener 调用 addListener(requestId, callback, timeout, timeoutCallback)
Listener 在 Redis 中设置存在标志(TTL = 2 × timeoutSeconds)
Listener 在 ZooKeeper 创建临时节点 /event/bus/watcher/{requestId}
Listener 注册 Curator Watcher 监听该节点数据变更
Listener 将请求元数据放入 DelayQueue,开始超时计时
Publisher 检查 Redis 和 ZK 确认有活跃监听者,若无则直接返回
Publisher 更新 ZK 节点数据,触发 Watcher
Watcher 读取新数据,调用 callback.onMessage(response),清理资源
超时分支 :若超时未收到响应,DelayQueue 消费者线程执行 timeout 回调
public interface IAsyncListener {
void addListener (String requestId ,
AsyncCallback callback ,
Integer timeoutSeconds ,
Runnable timeout );
}
public interface IAsyncPublisher {
void publish (String requestId , String response );
}
public interface AsyncCallback {
void onMessage (String response );
}
// Listener 端
asyncListener .addListener (requestId ,
(response ) -> log .info ("收到回调数据: {}" , response ),
30 , () -> log .info ("超时未收到响应" ));
// Publisher 端
asyncPublisher .publish (requestId , "response data" );
参数
默认值
说明
ZOOKEEPER_HOSTS
127.0.0.1:2181
ZooKeeper 连接地址
REDIS_HOST
127.0.0.1:6379
Redis 连接地址
REDIS_PASSWORD
-
Redis 密码(如有)
依赖
用途
curator-recipes 5.5.0
ZooKeeper 客户端操作
redisson-spring-boot-starter 3.24.3
Redis 客户端
Spring Boot 2.7.18
应用框架
hutool-all 5.8.23
工具库
src/main/java/io/github/passio/distributed/
DistributedEventBusApplication.java # Spring Boot 入口
eventbus/
constants/NodePath.java # ZK 路径生成
service/
AsyncCallback.java # 回调接口
IAsyncListener.java # 监听端接口
IAsyncPublisher.java # 发布端接口
impl/AsyncListener.java # 监听实现(核心)
impl/AsyncPublisher.java # 发布实现
zookeeper/
configuration/ZookeeperConfiguration.java # CuratorFramework Bean
repository/ZookeeperRepository.java # ZK 操作封装
properties/ZookeeperProperties.java # ZK 配置属性
src/main/resources/
application.yml # 应用配置
src/test/java/io/github/passio/distributed/eventbus/
AsyncEventBusTest.java # 集成测试