阿里双11同款,流量防卫兵 Sentinel go 源码解读( 三 )


  • 1 core/circuitbreaker/circuit_breaker.go:slowRtCircuitBreaker 使用的 slowRequestLeapArray 的底层参数 slowRequestCounter
// core/circuitbreaker/circuit_breaker.gotype slowRequestCounter struct {slowCountuint64totalCount uint64}
  • 2 core/circuitbreaker/circuit_breaker.go:errorRatioCircuitBreaker 使用的 errorCounterLeapArray 的底层参数 errorCounter
// core/circuitbreaker/circuit_breaker.gotype errorCounter struct {errorCount uint64totalCount uint64}1.1 MetricBucket
BucketWrap 可以认作是一种 时间桶模板 , 具体的桶的实体是 MetricsBucket , 其定义如下:
// MetricBucket represents the entity to record metrics per minimum time unit (i.e. the bucket time span).// Note that all operations of the MetricBucket are required to be thread-safe.type MetricBucket struct {// Value of statisticcounter [base.MetricEventTotal]int64minRtint64}MetricBucket 存储了五种类型的 metric:
// There are five events to record// pass + block == Totalconst (// sentinel rules check passMetricEventPass MetricEvent = iota// sentinel rules check blockMetricEventBlockMetricEventComplete// Biz error, used for circuit breakerMetricEventError// request execute rt, unit is millisecondMetricEventRt// hack for the number of eventMetricEventTotal)2 AtomicBucketWrapArray
每个桶只记录了其起始时间和 metric 值 , 至于每个桶的时间窗口长度这种公共值则统一记录在 AtomicBucketWrapArray 内 , AtomicBucketWrapArray 定义如下:
// atomic BucketWrap array to resolve race condition// AtomicBucketWrapArray can not append or delete element after initializingtype AtomicBucketWrapArray struct {// The base address for real data arraybase unsafe.Pointer// The length of slice(array), it can not be modified.length intdata[]*BucketWrap}AtomicBucketWrapArray.base 的值是 AtomicBucketWrapArray.data slice 的 data 区域的首指针 。 因为 AtomicBucketWrapArray.data 是一个固定长度的 slice , 所以 AtomicBucketWrapArray.base 直接存储数据内存区域的首地址 , 以加速访问速度 。
其次 , AtomicBucketWrapArray.data 中存储的是 BucketWrap 的指针 , 而不是 BucketWrap 。
NewAtomicBucketWrapArrayWithTime() 函数会预热一下 , 把所有的时间桶都生成出来 。
2.2 时间轮
1 leapArray
// Give a diagram to illustrate// Suppose current time is 888, bucketLengthInMs is 200ms,// intervalInMs is 1000ms, LeapArray will build the below windows//B0B1B2B3B4//|_______|_______|_______|_______|_______|//1000120014001600800(1000)//^//time=888type LeapArray struct {bucketLengthInMs uint32sampleCountuint32intervalInMsuint32array*AtomicBucketWrapArray// update lockupdateLock mutex}LeapArray 各个成员解析:
  • bucketLengthInMs 是漏桶长度 , 以毫秒为单位;
  • sampleCount 则是时间漏桶个数;
  • intervalInMs 是时间窗口长度 , 以毫秒为单位 。
其注释中的 ASCII 图很好地解释了每个字段的含义 。
LeapArray 核心函数是 LeapArray.currentBucketOfTime() , 其作用是根据某个时间点获取其做对应的时间桶 BucketWrap , 代码如下:
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {if now <= 0 {return nil, errors.New("Current time is less than 0.")}idx := la.calculateTimeIdx(now)bucketStart := calculateStartTime(now, la.bucketLengthInMs)for { //spin to get the current BucketWrapold := la.array.get(idx)if old == nil {// because la.array.data had initiated when new la.array// theoretically, here is not reachablenewWrap :=--tt-darkmode-color: #6A737D;">2 BucketLeapArray
leapArray 实现了滑动时间窗口的所有主体 , 其对外使用接口则是 BucketLeapArray:
// The implementation of sliding window based on LeapArray (as the sliding window infrastructure)// and MetricBucket (as the data type). The MetricBucket is used to record statistic// metrics per minimum time unit (i.e. the bucket time span).type BucketLeapArray struct {dataLeapArraydataType string}从这个 struct 的注释可见 , 其时间窗口 BucketWrap 的实体是 MetricBucket 。
2.3 Metric 数据读写
SlidingWindowMetric
// SlidingWindowMetric represents the sliding window metric wrapper.// It does not store any data and is the wrapper of BucketLeapArray to adapt to different internal bucket// SlidingWindowMetric is used for SentinelRules and BucketLeapArray is used for monitor// BucketLeapArray is per resource, and SlidingWindowMetric support only read operation.type SlidingWindowMetric struct {bucketLengthInMs uint32sampleCountuint32intervalInMsuint32real*BucketLeapArray}SlidingWindowMetric 是对 BucketLeapArray 的一个封装 , 只提供了只读接口 。
ResourceNode
type BaseStatNode struct {sampleCount uint32intervalMsuint32goroutineNum int32arr*sbase.BucketLeapArraymetric *sbase.SlidingWindowMetric}type ResourceNode struct {BaseStatNoderesourceName stringresourceType base.ResourceType}// core/stat/node_storage.gotype ResourceNodeMap map[string]*ResourceNodevar (inboundNode = NewResourceNode(base.TotalInBoundResourceName, base.ResTypeCommon)resNodeMap = make(ResourceNodeMap)rnsMux= new(sync.RWMutex))