顺晟科技
2021-08-28 09:41:15
155
背景
在选择负载平衡算法时,您希望满足以下要求:
具有分区和机房调度亲和力。
每次选择的节点可能的负载更低
选择尽快响应的节点
不需要手动介入故障节点
当一个节点发生故障时,负载平衡算法允许主动隔离节点
故障节点恢复后,可以主动恢复该节点的流量分布
基于这种思考,go-zero选择了p2c EWMA算法。
算法的核心思想
P2c
选择2选择(P2C)2选择1 3360多个节点中随机选择两个节点。
在Go-zero,如果随机选择三次,一次选定节点的衰弱条件满足要求,就会停止选择,采用两个节点。(威廉莎士比亚,《北方执行报》(Northern Exposure))。
EWMA
“经济加权移动平均(EWMA)指数移动加权平均法:”是指每个数字的加权系数随着时间呈指数级增长,随着越来越近,数字加权系数越大,反映了最近一段时间的平均值。
官方:
变量解释:
Vt:表示第t个请求的EWMA值
Vt-13360表示T-1申请的EWMA值。
:是常数
EWMA算法的缺点
EWMA与一般计算平均算法相比,不需要保持所有数字,计算量大大减少,存储资源也减少。
传统的计算平均算法对网络时间不敏感,但EWMA可以经常通过申请调整EWMA,快速监控网络悬停或整体平均。
当申请比较频繁时,表明节点网络负载增加,此时,当节点想监控解决申请所需的时间(侧面反映节点的负载状态)时,相应地降低。EWMA越小,EWMA值就越接近这个时间,这样就可以快速监控网络毛刺。
当申请不太频繁时,我们提高值。这样计算的EWMA值越接近平均值,
贝塔计算
Go-zero采用牛顿冷却定律的衰减函数模型计算EWMA算法的值3360。
其中 t是申请两次的距离,e,k是常数
GRPC自定义负载平衡器的实现
首先,必须实施Google . golang . org/GRPC/Balancer/Build/base . go/picker builder接口,该接口会在有服务节点更新时调用接口上的构建方法
Type PickerBuilder介面{
//build returns a picker that will be used by grpc to pick a subconn .
Build(Info Pickerbuildinfo)Balancer . picker
}
它还实现了Google . golang . org/grpc/balancer/balancer . go/picker接口。此接口执行第二次负载平衡,为应用程序过滤节点
Type Picker界面{
信息拾取信息(pick)(pick result,error)
}
注册最初在负载平衡映射中实现的负载平衡器
Go-zero负载平衡辅助逻辑
每当每个节点更新时,gRPC都会调用Build方法,在Build中保留所有节点信息。
GRPC在收集节点解决请求时调用Pick方法获取节点。Go-zero在Pick方法中实现p2c算法,过滤节点,通过节点的EWMA值计算负载状态,并为gRPC应用程序返回负载较低的节点。
申请结束后,gRPC将由PickResult .调用Done方法。go-zero存储了这次申请需要很多时间的信息等,计算了EWMA值,计算了下次申请时可以计算负荷等的应用程序。
负载平衡代码分析
保留服务的所有节点信息
为了解决此次申请,需要保留保存节点所需的时间、EWMA等信息。go-zero为每个节点设计了以下配置:
Type subConn struct {
Addr resolver。Address
Conn balancer。SubConn
用于保持Lag uint64 //EWMA值
Inflight int64 //预约后节点上正在解决的请求总数
Success uint64 //用于在一段时间内识别这种一致的衰弱状态
请求int 64//用于保存请求总数
用于为Last int64 //EWMA值计算保留最后一个请求时间
Pick int64 //保留上次选择的时间
}
P2cPicker是balancer。实施Picker接口,conns保留服务的所有节点信息
Type p2cPicker结构{
Conns []*subConn //保留所有节点的信息
R *rand。Rand
Stamp *syncx。AtomicDuration
Lock sync。Mutex
}
GRPC在节点更新时调用Build方法传递所有节点信息。在此,将每个节点信息保存为subConn配置。以P2cPicker结构一起保存
构建func(b* p2 cpickerbuilder)(info base . pickerbuildinfo)balancer . picker {
.
Var conns []*subConn
Forconn,conninfo:=range ready SCS {
Conns=append(conns,subConn{
Addr: connInfo。Address,
Conn: conn,
Success: initSuccess,
})。
}
Return p2cPicker{
Conns: conns,
r : rand . new(rand . new source(time . now())。UNIX nano(),
stamp 3360 syncx . newatomicduration(),
}
}
随机过滤节点信息。这里有三种情况。
只有一个服务节点可以间接返回给gRPC应用程序
有两个服务节点,它们使用EWMA值计算负载,并为gRPC应用程序返回负载较低的节点
有多个服务节点。在这种情况下,通过p2c算法选择两个节点,将负载较低的节点返回到gRPC应用程序(与负载状态相比)。
辅助实现代码如下:
Switch len(p.conns) {
case 03360//无节点,返回错误
Return emptypickresult,balancer.errnosubconnavailable
CASE 13360//存在间接返回此节点的节点
Chosen=p.choose(p.conns[0],nil)
CASE 23360//两个节点,负载计算,负载较低的节点返回
Chosen=p.choose (p.conns [0],p.conns [1])
有多个Default 3360//节点,p2c比两个节点的负载过滤两个节点,并返回负载较低的节点
Var node1、node2 *subConn
//3随机选择两个节点
FOR I :=0;I pickTimesI {
A :=p.r.Intn(len(p.conns))
B :=p.r.Intn(len(p.conns)-1)
If b=a {
B
}
节点1=p.conns [a]
节点2=p.conns [b]
//当这次选择的节点达到衰弱的要求时,停止选择
if node 1 . healthy()node 2 . healthy(){
布雷克
}
}
//与两个节点的负载状态相比,选择低负载
Chosen=p.choose(node1,node2)
}
Load计算节点的负载状态
以下choose方法调用load方法来计算节点负载。
计算负载的公式为: load=EWMA * inflight
在此简要说明一下,EWMA对统一申请需要很长时间,inflight是以后节点正在解决申请的数量,乘以后,将大致计算以后节点的网络负载。
加载FUNC(C * Subconn)()Int 64 {
通过//EWMA计算节点的负载状态。加1是为了防止0人的情况
lag :=int 64(math . sqrt(float 64(atomic . loaduint 64(c . lag)1)))
load 3360=lag *(atomic . loadint 64(c . inflight)1)
If load==0 {
Return penalty
}
Return load
}
申请结束、节点EWMA更新等信息
节点正在解决请求的总数减少1。
为了计算最后一个节点解决申请的差异和计算EWMA的EWMA值,请保持索赔解决结束的时间。
计算此请求需要很长时间,并且EWMA值保留在节点的lag属性中
计算节点的弱化状态保留在节点的success属性中
func(p * p2c picker)builddonefunc(c * subconn)func(info balancer . done info){
Start :=int64(timex .Now())
return func(info balancer . done info){
//正在解决的申请数减去1
Atomic .AddInt64(c.inflight,-1)
Now :=timex。Now()
//保留这次申请结束时的时间,最后申请时提取时间
last 3360=atomic . swapint 64(c . last,int64 (now))
Td :=int64(now)-last
If TD 0 {
Td=0
}
利用//牛顿冷却定律的衰减函数模型计算EWMA算法的值。
w :=math . exp(float 64(-TD)/float 64(decay time))
//保管这次申请书需要很多时间
Lag:=int64 (now)-启动
If lag 0 {
Lag=0
}
olag :=atomic . loaduint 64(c . lag)
If olag==0 {
W=0
}
//EWMA值计算
Atomic.storeuint64 (c.lag,uint 64(float 64(olag)* w float 64(lag)*(1-w))
Success :=initSuccess
If info。Err!=nil!Codes。Acceptable(info .Err) {
Success=0
}
o succ :=atomic . loaduint 64(c . success)
Atomic.storeuint64 (c.success,uint 64(float 64(osucc)* w float 64(success)*(1-w))
Stamp :=p.stamp.Load()
If now-stamp=logInterval {
If p.stamp.compareandswap (stamp,now) {
P.logStats()
}
}
}
}
16
2022-03
28
2021-08
28
2021-08
28
2021-08
28
2021-08
28
2021-08