The gRPC-go Require
- Go 1.25+
- gRPC 1.78.0+
Each version corresponds to the corresponding version of GRPC-go.
For example, tag: v1.78.0 -> grpc-go v1.78.0
- 多种负载均衡算法: 轮询、加权轮询、随机加权轮询、最少连接、最小响应时间、P2C
- 熔断器: 自动隔离故障节点
- 节点过滤: 按版本、元数据、地址过滤节点(可选功能,需显式启用)
- 子集选择: 大规模部署时限制连接数量
- 平滑切换: Picker 更新时不丢失请求
- 动态端点: 运行时动态更新服务节点
- 服务发现: 支持静态、轮询、etcd、Consul 等多种服务发现方式
v1.45.0 & v1.46.0-dev running panic like this:
panic: runtime error: comparing uncomparable type MyAttribute
goroutine 1 [running]:
google.golang.org/grpc/attributes.(*Attributes).Equal(0xc00015e4b0, 0xc00013c140)
/project/vendor/google.golang.org/grpc/attributes/attributes.go:95 +0x194
google.golang.org/grpc/resolver.addressMapEntryList.find({0xc00013c1b8, 0x1, 0x8000102}, {{0xc00015e4c8, 0x13}, {0xc00015e4c8, 0xd}, 0xc00013c140, 0x0, 0x0, ...})
/project/vendor/google.golang.org/grpc/resolver/map.go:49 +0xb9
google.golang.org/grpc/resolver.(*AddressMap).Get(0xc00013c188, {{0xc00015e4c8, 0x13}, {0xc00015e4c8, 0xd}, 0xc00013c140, 0x0, 0x0, {0x0, 0x0}})
/project/vendor/google.golang.org/grpc/resolver/map.go:59 +0x94
mesh-sidecar/grpclient_balancer/balancer.(*baseBalancer).UpdateClientConnState(0xc00013b500, {{{0xc000414380, 0x3, 0x3}, 0x0, 0x0}, {0x0, 0x0}})
Your attribute needs implement Equal function. https://github.com/grpc/grpc-go/blob/v1.46.0-dev/attributes/attributes.go#L91
type MyAttribute struct {
attr string
}
func (ma MyAttribute) Equal(o interface{}) bool {
return true
}The gRPC client-side load balancing to work need to main components, the naming resolver and the load balancing policy
The infra image source itnext.io
The gRPC client and server applications used in the example are based on the proto/echo & load_balancing examples found on the gRPC-go examples with the following modifications:
- round robin balancer.RoundRobinBalanceName
- weighted round robin balancer.WeightedRobinBalanceName
- random weighted round robin balancer.RandomWeightedRobinBalanceName
- minimum connection number balancer.MinConnectBalanceName
- minimum response consume balancer.MinRespTimeBalanceName, keep 10 response consume time, remove maximum and minimum and then take the average value.
- P2C (Power of Two Choices) balancer.P2CBalancerName - 随机选择两个节点,选择负载更低的那个,参考 Kratos 实现
项目提供了丰富的使用示例,位于 examples/ 目录:
| 目录 | 说明 |
|---|---|
| examples/basic | 基本负载均衡算法使用示例 |
| examples/circuitbreaker | 熔断器功能使用示例 |
| examples/p2c | P2C 负载均衡算法示例 |
| examples/filter | 节点过滤功能示例 |
| examples/subset | 子集选择功能示例 |
| examples/graceful | 平滑切换 Picker 示例 |
| examples/dynamic | 动态端点更新示例 |
| examples/discovery | 服务发现功能使用示例 |
| examples/comprehensive | 综合使用最佳实践 |
| 配置项 | 类型 | 说明 |
|---|---|---|
Endpoints |
[]string |
服务端点列表 |
BalanceName |
string |
负载均衡算法名称 |
EnableCircuitBreaker |
bool |
是否启用熔断器 |
EnableNodeFilter |
bool |
是否启用节点过滤功能(默认关闭) |
Discovery |
discovery.Discovery |
服务发现实现(设置后 Endpoints 被忽略) |
DiscoveryPollInterval |
time.Duration |
服务发现轮询间隔(默认 30s) |
package main
import (
"github.com/xkeyideal/grpcbalance/grpclient"
"github.com/xkeyideal/grpcbalance/grpclient/balancer"
)
func main() {
cfg := &grpclient.Config{
Endpoints: []string{"127.0.0.1:50051", "127.0.0.1:50052"},
BalanceName: balancer.RoundRobinBalanceName,
EnableHealthCheck: true,
EnableCircuitBreaker: true, // 启用熔断器
}
client, err := grpclient.NewClient(cfg)
if err != nil {
panic(err)
}
defer client.Close()
conn := client.ActiveConnection()
// 使用 conn 创建 gRPC 客户端...
}import (
"github.com/xkeyideal/grpcbalance/grpclient/balancer"
)
// 先注册 P2C 负载均衡器
balancer.RegisterP2CBalance()
cfg := &grpclient.Config{
Endpoints: addrs,
BalanceName: balancer.P2CBalancerName,
EnableHealthCheck: true,
}节点过滤是按请求选择子集节点的能力,常用于灰度/多 AZ/多机房/租户隔离等场景。
注意:picker.MetadataFilterKey(当前值为 _x_grpc_metadata_)是框架内部保留 key,用于在 resolver.Address.Attributes 中保存“整张 metadata map”。
请不要在你的服务发现 metadata(例如 discovery.Endpoint.Metadata)里使用同名 key;该 key 会被忽略以避免覆盖内部结构。
支持的过滤方式(可组合使用):
picker.LabelSelectorFilter(selector)(推荐):一条 selector 表达更复杂的条件picker.VersionFilter(version)/picker.VersionPrefixFilter(prefix):按版本过滤picker.MetadataFilter(key, value)/picker.MetadataExistsFilter(key):按 metadata 过滤picker.AddressFilter(addrs...)/picker.ExcludeAddressFilter(addrs...):按地址白/黑名单picker.HealthyFilter(checker):按自定义健康检查过滤
使用前提:配置里必须显式打开 EnableNodeFilter。
import (
"context"
"github.com/xkeyideal/grpcbalance/grpclient"
"github.com/xkeyideal/grpcbalance/grpclient/balancer"
"github.com/xkeyideal/grpcbalance/grpclient/picker"
)
cfg := &grpclient.Config{
Endpoints: addrs,
BalanceName: balancer.RoundRobinBalanceName,
EnableNodeFilter: true,
EnableHealthCheck: true,
}
_ = cfg
// 每次 RPC 调用时,把过滤器放进 ctx
ctx := picker.WithNodeFilter(context.Background(), picker.MetadataFilter("env", "prod"))
_ = ctx当你能把服务发现/注册中心的 metadata(比如 env=prod、region=cn-north、lane=canary、version=2.1.0)注入到 resolver.Address.Attributes 后,用 LabelSelectorFilter 通常是最省心也最强的写法。
约定:
- selector 优先读取
Attributes[key](例如env、region、lane等) - 当
Attributes[key]缺失时,会回退读取Attributes[picker.MetadataFilterKey]里的 map(仅当其类型为map[string]string)
import (
"context"
"github.com/xkeyideal/grpcbalance/grpclient/picker"
)
f, err := picker.LabelSelectorFilter("env=prod, lane notin (dev), v@^1.1.0 || >=2")
if err != nil {
// selector 解析失败时返回 error(不建议 panic)
return
}
ctx := picker.WithNodeFilter(context.Background(), f)
resp, err := client.SomeRPC(ctx, req)
_ = resp
_ = err更多 selector 语法见 label/README.md。
ctx := picker.WithNodeFilter(context.Background(),
picker.VersionPrefixFilter("v2."),
picker.MetadataFilter("env", "prod"),
picker.ExcludeAddressFilter("127.0.0.1:50052"),
)
resp, err := client.SomeRPC(ctx, req)
_ = resp
_ = errimport (
"github.com/xkeyideal/grpcbalance/grpclient/resolver"
)
// 从 100 个节点中选择 10 个
selector := resolver.NewSubsetSelector(resolver.SubsetConfig{
SubsetSize: 10,
ClientKey: "my-service",
})
subset := selector.Select(allAddrs)import (
"github.com/xkeyideal/grpcbalance/grpclient"
"github.com/xkeyideal/grpcbalance/grpclient/discovery"
)
// 静态发现
staticDiscovery := discovery.NewStaticDiscovery([]string{
"127.0.0.1:8081",
"127.0.0.1:8082",
})
// 轮询发现(自定义获取端点的函数)
pollingDiscovery := discovery.NewPollingDiscovery(
discovery.DiscoveryFunc(func(ctx context.Context) ([]discovery.Endpoint, error) {
return fetchEndpointsFromRegistry()
}),
30*time.Second,
)
cfg := &grpclient.Config{
BalanceName: balancer.RoundRobinBalanceName,
Discovery: pollingDiscovery, // 设置后 Endpoints 被忽略
EnableHealthCheck: true,
}详细的变更记录请参阅 upgrade.md。
-
Modify naming resolver with your requirements, first set attributes.Attributes for per endpoint address, second when one endpoint attributes.Attributes changed then update subConn state.
-
Implement yourself balancer & picker function, then based on attributes.Attributes picker subConn in
Pick(balancer.PickInfo) (balancer.PickResult, error)
Apache 2.0 license.
