阿里双11同款,流量防卫兵 Sentinel go 源码解读( 四 )

BaseStatNode 对外提供了读写接口 , 其数据写入 BaseStatNode.arr , 读取接口则依赖 BaseStatNode.metric 。 BaseStatNode.arr 是在 NewBaseStatNode() 中创建的 , 指针 SlidingWindowMetric.real 也指向它 。
ResourceNode 则顾名思义 , 其代表了某资源和它的 Metrics 存储 ResourceNode.BaseStatNode 。
全局变量 resNodeMap 存储了所有资源的 Metrics 指标数据 。
3 限流流程本节只分析 Sentinel 库提供的最基础的流量整形功能 -- 限流 , 限流算法多种多样 , 可以使用其内置的算法 , 用户自己也可以进行扩展 。
限流过程有三步步骤:

  • 1 针对特定 Resource 构造其 EntryContext , 存储其 Metrics、限流开始时间等 , Sentinel 称之为 StatPrepareSlot;
  • 2 依据 Resource 的限流算法判定其是否应该进行限流 , 并给出限流判定结果 , Sentinel 称之为 RuleCheckSlot;补充:这个限流算法是一系列判断方法的合集(SlotChain);
  • 3 判定之后 , 除了用户自身根据判定结果执行相应的 action , Sentinel 也需要根据判定结果执行自身的 Action , 以及把整个判定流程所使用的的时间 RT 等指标存储下来 , Sentinel 称之为 StatSlot 。
整体流程如下图所示:
阿里双11同款,流量防卫兵 Sentinel go 源码解读文章插图
3.1 Slot
针对 Check 三个步骤 , 有三个对应的 Slot 分别定义如下:
// StatPrepareSlot is responsible for some preparation before statistic// For example: init structure and so ontype StatPrepareSlot interface {// Prepare function do some initialization// Such as: init statistic structure、node and etc// The result of preparing would store in EntryContext// All StatPrepareSlots execute in sequence// Prepare function should not throw panic.Prepare(ctx *EntryContext)}// RuleCheckSlot is rule based checking strategy// All checking rule must implement this interface.type RuleCheckSlot interface {// Check function do some validation// It can break off the slot pipeline// Each TokenResult will return check result// The upper logic will control pipeline according to SlotResult.Check(ctx *EntryContext) *TokenResult}// StatSlot is responsible for counting all custom biz metrics.// StatSlot would not handle any panic, and pass up all panic to slot chaintype StatSlot interface {// OnEntryPass function will be invoked when StatPrepareSlots and RuleCheckSlots execute pass// StatSlots will do some statistic logic, such as QPS、log、etcOnEntryPassed(ctx *EntryContext)// OnEntryBlocked function will be invoked when StatPrepareSlots and RuleCheckSlots fail to execute// It may be inbound flow control or outbound cir// StatSlots will do some statistic logic, such as QPS、log、etc// blockError introduce the block detailOnEntryBlocked(ctx *EntryContext, blockError *BlockError)// OnCompleted function will be invoked when chain exits.// The semantics of OnCompleted is the entry passed and completed// Note: blocked entry will not call this functionOnCompleted(ctx *EntryContext)}抛却 Prepare 和 Stat , 可以简单的认为:所谓的 slot , 就是 sentinel 提供的某个流控组件 。
值得注意的是 , 根据注释 StatSlot.OnCompleted 只有在 RuleCheckSlot.Check 通过才会执行 , 用于计算从请求开始到结束所使用的 RT 等 Metrics 。
3.2 Prepare
// core/base/slot_chain.go// StatPrepareSlot is responsible for some preparation before statistic// For example: init structure and so ontype StatPrepareSlot interface {// Prepare function do some initialization// Such as: init statistic structure、node and etc// The result of preparing would store in EntryContext// All StatPrepareSlots execute in sequence// Prepare function should not throw panic.Prepare(ctx *EntryContext)}// core/stat/stat_prepare_slot.gotype ResourceNodePrepareSlot struct {}func (s *ResourceNodePrepareSlot) Prepare(ctx *base.EntryContext) {node := GetOrCreateResourceNode(ctx.Resource.Name(), ctx.Resource.Classification())// Set the resource node to the context.ctx.StatNode = node}如前面解释 , Prepare 主要是构造存储 Resource Metrics 所使用的 ResourceNode 。 所有 Resource 的 StatNode 都会存储在 package 级别的全局变量 core/stat/node_storage.go:resNodeMap [type: map[string]*ResourceNode] 中 , 函数 GetOrCreateResourceNode 用于根据 Resource Name 从 resNodeMap 中获取其对应的 StatNode , 如果不存在则创建一个 StatNode 并存入 resNodeMap 。
3.3 Check
RuleCheckSlot.Check() 执行流程:
  • 1 根据 Resource 名称获取其所有的 Rule 集合;
  • 2 遍历 Rule 集合 , 对 Resource 依次执行 Check , 任何一个 Rule 判定 Resource 需要进行限流【Blocked】则返回 , 否则放行 。
type Slot struct {}func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {res := ctx.Resource.Name()tcs := getTrafficControllerListFor(res)result := ctx.RuleCheckResult// Check rules in orderfor _, tc := range tcs {r := canPassCheck(tc, ctx.StatNode, ctx.Input.AcquireCount)if r == nil {// nil means passcontinue}if r.Status() == base.ResultStatusBlocked {return r}if r.Status() == base.ResultStatusShouldWait {if waitMs := r.WaitMs(); waitMs > 0 {// Handle waiting action.time.Sleep(time.Duration(waitMs) * time.Millisecond)}continue}}return result}func canPassCheck(tc *TrafficShapingController, node base.StatNode, acquireCount uint32) *base.TokenResult {return canPassCheckWithFlag(tc, node, acquireCount, 0)}func canPassCheckWithFlag(tc *TrafficShapingController, node base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {return checkInLocal(tc, node, acquireCount, flag)}func checkInLocal(tc *TrafficShapingController, resStat base.StatNode, acquireCount uint32, flag int32) *base.TokenResult {return tc.PerformChecking(resStat, acquireCount, flag)}