client-go和golang源码中的技巧( 二 )


package mainimport ("fmt""sync")var speakCh = make(chan string)var stopReadChan = make(chan struct{})var stopWriteChan = make(chan struct{})func readChan(stopCh <-chan struct{}){for {select {case words := <- speakCh:fmt.Println("received:",words)case <- stopCh:fmt.Println("stop read!")return}}}func writeChan(stopCh <-chan struct{}){for {select {case <- stopCh:fmt.Println("stop write!")close(stopReadChan)returndefault:}speakCh <- "hi"time.Sleep(time.Second*2)}}func main(){go readChan(stopReadChan)go writeChan(stopWriteChan)time.Sleep(time.Second*6)close(stopWriteChan)time.Sleep(time.Second*6)}结果:received: hireceived: hireceived: histop write!stop read!

  • 协程间使用context进行同步
context用于对协程进行管理 , 如主动退出协程 , 超时退出协程等 , 可以看作是使用chan管理协程的扩展 。 在使用时首先创建一个context , 使用cancel()可以取消context , 并使用Done()返回的chan管理协程 。
官方推荐的用法如下:
func Stream(ctx context.Context, out chan<- Value) error {for {v, err := DoSomething(ctx)if err != nil {return err}select {case <-ctx.Done():return ctx.Err()case out <- v:}}}下例中使用context.WithCancel创建一个context , 使用cancel()给这一组context发送信号 , 在协程中使用Done()处理退出事件 。
package mainimport ("fmt""context")func main(){ctx,cancel := context.WithCancel(context.Background())go testCtx(ctx,"ctx1")go testCtx(ctx,"ctx2")go testCtx(ctx,"ctx3")time.Sleep(time.Second*3)cancel()time.Sleep(time.Second*5)}func testCtx(ctx context.Context, name string) error{for {select {case <-ctx.Done():fmt.Println("ctx.Done:",name)return ctx.err()default:fmt.Println("default:",name)time.Sleep(time.Second*2)}}}结果:default: ctx1default: ctx3default: ctx2default: ctx3default: ctx1default: ctx2ctx.Done: ctx1ctx.Done: ctx3ctx.Done: ctx2创建context的方式如下 , 其余三个可以看作是WithCancel的扩展
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)//需要主动取消contextfunc WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)//在deadline时间点后取消contextfunc WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) //在超时后取消contextfunc WithValue(parent Context, key, val interface{}) Context再看一个WithTimeout的例子 , 下面设置context的超时时间为3s且没有主动cancel() , 3s超时后可以看到该context对应的协程正常退出
func main(){ctx,_ := context.WithTimeout(context.Background(),time.Second*3)go testCtx(ctx,"ctx1")go testCtx(ctx,"ctx2")go testCtx(ctx,"ctx3")time.Sleep(time.Second*5)}结果:default: ctx3default: ctx1default: ctx2default: ctx3default: ctx1default: ctx2ctx.Done: ctx3ctx.Done: ctx2ctx.Done: ctx1context可以看作是一个树 , 当cancel一个context时 , 会同时cancle它的子context 。 下面首先创建一个ctx , 然后在此ctx下面创建一个subctx 。 当执行cancle() ctx时会同时cancel() 该的subctx 。
context.Background()就是已经实现的首个context 。
func main(){ctx,cancel := context.WithCancel(context.Background())subctx,_ := context.WithCancel(ctx)go testCtx(ctx,"ctx1")go testCtx(subctx,"subctx1")go testCtx(subctx,"subctx2")time.Sleep(time.Second*3)canclel()time.Sleep(time.Second*10)}结果:default: subctx2default: ctx1default: subctx1default: subctx2default: ctx1default: subctx1timeoutctx.Done: ctx1ctx.Done: subctx1ctx.Done: subctx2下例中仅cancel() subctx , 可以看到并没有影响subctx的parent 。
func main(){ctx, _:= context.WithCancel(context.Background())subctx,subcancel := context.WithCancel(ctx)go testCtx(ctx,"ctx1")go testCtx(subctx,"subctx1")go testCtx(subctx,"subctx2")time.Sleep(time.Second*3)subcancel()time.Sleep(time.Second*10)}结果:default: subctx1default: subctx2default: ctx1default: ctx1default: subctx1default: subctx2timeoutctx.Done: subctx2default: ctx1ctx.Done: subctx1default: ctx1default: ctx1default: ctx1default: ctx1
  • wait.Group(k8s.io/apimachinery/pkg/util/wait/wait.go)
client-go中的wait.Group创造性地将sync.WaitGroup与chan和ctx结合 , 实现了协程间同步和等待全部Group中的协程结束的功能 。 由于StartWithChannel和StartWithContext的入参函数类型比较固定 , 因此使用上并不通用 , 但可以作为参考 , 在实际中扩展使用 。 下例中给出了简单用法 。
func (g *Group) Wait() func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{}))func (g *Group) StartWithContext(ctx context.Context, f func(context.Context))func main(){f1:= func(ctx context.Context) {for {select {case <- ctx.Done():returndefault:fmt.Println("hi11")time.Sleep(time.Second)}}}wg := wait.Group{}ctx, cancel := context.WithCancel(context.Background())wg.StartWithContext(ctx,f1)time.Sleep(time.Second*3)cancel()wg.Wait()}结果:hihihi