本文讲解的是golang.org/x/sync这个包中的errgroup

1、errgroup 的基础介绍

学习过 Go 的朋友都知道 Go 实现并发编程是比较容易的事情,只需要使用go关键字就可以开启一个 goroutine。那对于并发场景中,如何实现goroutine的协调控制呢?常见的一种方式是使用sync.WaitGroup 来进行协调控制。

使用过sync.WaitGroup 的朋友知道,sync.WaitGroup 虽然可以实现协调控制,但是不能传递错误,那该如何解决呢?聪明的你可能马上想到使用 chan 或者是 context来传递错误,确实是可以的。那接下来,我们一起看看官方是怎么实现上面的需求的呢?

1.1 errgroup的安装

安装命令:

go get golang.org/x/sync//下面的案例是基于v0.1.0 演示的go get golang.org/x/sync@v0.1.0

1.2 errgroup的基础例子

这里我们需要请求3个url来获取数据,假设请求url2时报错,url3耗时比较久,需要等一秒。

package mainimport ("errors""fmt""golang.org/x/sync/errgroup""strings""time")func main()  {queryUrls := map[string]string{"url1": "http://localhost/url1","url2": "http://localhost/url2","url3": "http://localhost/url3",}var eg errgroup.Groupvar results []stringfor _, url := range queryUrls {url := urleg.Go(func() error {result, err := query(url)if err != nil {return err}results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))return nil})}  // group 的wait方法,等待上面的 eg.Go 的协程执行完成,并且可以接受错误err := eg.Wait()if err != nil {fmt.Println("eg.Wait error:", err)return}for k, v := range results {fmt.Printf("%v ---> %v\n", k, v)}}func query(url string) (ret string, err error) {// 假设这里是发送请求,获取数据if strings.Contains(url, "url2") {// 假设请求 url2 时出现错误fmt.Printf("请求 %s 中....\n", url)return "", errors.New("请求超时")} else if strings.Contains(url, "url3") {// 假设 请求 url3 需要1秒time.Sleep(time.Second*1)}fmt.Printf("请求 %s 中....\n", url)return "success", nil}

执行结果:

请求 http://localhost/url2 中....请求 http://localhost/url1 中....请求 http://localhost/url3 中....eg.Wait error: 请求超时

果然,当其中一个goroutine出现错误时,会把goroutine中的错误传递出来。

我们自己运行一下上面的代码就会发现这样一个问题,请求 url2 出错了,但是依旧在请求 url3 。因为我们需要聚合 url1、url2、url3 的结果,所以当其中一个出现问题时,我们是可以做一个优化的,就是当其中一个出现错误时,取消还在执行的任务,直接返回结果,不用等待任务执行结果。

那应该如何做呢?

这里假设 url1 执行1秒,url2 执行报错,url3执行3秒。所以当url2报错后,就不用等url3执行结束就可以返回了。

package mainimport ("context""errors""fmt""golang.org/x/sync/errgroup""strings""time")func main()  {queryUrls := map[string]string{"url1": "http://localhost/url1","url2": "http://localhost/url2","url3": "http://localhost/url3",}var results []stringctx, cancel := context.WithCancel(context.Background())eg, errCtx := errgroup.WithContext(ctx)for _, url := range queryUrls {url := urleg.Go(func() error {result, err := query(errCtx, url)if err != nil {        //其实这里不用手动取消,看完源码就知道为啥了cancel()return err}results = append(results, fmt.Sprintf("url:%s -- ret: %v", url, result))return nil})}err := eg.Wait()if err != nil {fmt.Println("eg.Wait error:", err)return}for k, v := range results {fmt.Printf("%v ---> %v\n", k, v)}}func query(errCtx context.Context, url string) (ret string, err error) {fmt.Printf("请求 %s 开始....\n", url)// 假设这里是发送请求,获取数据if strings.Contains(url, "url2") {// 假设请求 url2 时出现错误time.Sleep(time.Second*2)return "", errors.New("请求出错")} else if strings.Contains(url, "url3") {// 假设 请求 url3 需要1秒select {case <- errCtx.Done():ret, err = "", errors.New("请求3被取消")returncase <- time.After(time.Second*3):fmt.Printf("请求 %s 结束....\n", url)return "success3", nil}} else {select {case <- errCtx.Done():ret, err = "", errors.New("请求1被取消")returncase <- time.After(time.Second):fmt.Printf("请求 %s 结束....\n", url)return "success1", nil}}}

执行结果:

请求 http://localhost/url2 开始....请求 http://localhost/url3 开始....请求 http://localhost/url1 开始....请求 http://localhost/url1 结束....eg.Wait error: 请求出错

2、errgroup源码分析

看了上面的例子,我们对errgroup有了一定了解,接下来,我们一起看看errgroup做了那些封装。

2.1 errgroup.Group

errgroup.Group源码如下:

// A Group is a collection of goroutines working on subtasks that are part of// the same overall task.//// A zero Group is valid, has no limit on the number of active goroutines,// and does not cancel on error.type Group struct {  // context 的 cancel 方法cancel func()wg sync.WaitGroup  //传递信号的通道,这里主要是用于控制并发创建 goroutine 的数量  //通过 SetLimit 设置过后,同时创建的goroutine 最大数量为nsem chan token  // 保证只接受一次错误errOnce sync.Once  // 最先返回的错误err     error}

看结构体中的内容,发现比原生的sync.WaitGroup多了下面的内容:

  • cancel func()
  • sem chan token
  • errOnce sync.Once
  • err error

2.2 WithContext 方法

// WithContext returns a new Group and an associated Context derived from ctx.//// The derived Context is canceled the first time a function passed to Go// returns a non-nil error or the first time Wait returns, whichever occurs// first.func WithContext(ctx context.Context) (*Group, context.Context) {ctx, cancel := context.WithCancel(ctx)return &Group{cancel: cancel}, ctx}

方法逻辑还是比较简单的,主要做了两件事:

  • 使用contextWithCancel()方法创建一个可取消的Context
  • context.WithCancel(ctx)创建的 cancel赋值给 Group中的cancel

2.3 Go

1.2 最后一个例子说,不用手动去执行 cancel 的原因就在这里。

g.cancel() //这里就是为啥不用手动执行 cancel的原因

// Go calls the given function in a new goroutine.// It blocks until the new goroutine can be added without the number of// active goroutines in the group exceeding the configured limit.//// The first call to return a non-nil error cancels the group's context, if the// group was created by calling WithContext. The error will be returned by Wait.func (g *Group) Go(f func() error) {if g.sem != nil {    //往 sem 通道中发送空结构体,控制并发创建 goroutine 的数量g.sem <- token{}}g.wg.Add(1)go func() {    // done()函数的逻辑就是当 f 执行完后,从 sem 取一条数据,并且 g.wg.Done()defer g.done()if err := f(); err != nil {g.errOnce.Do(func() { // 这里就是确保 g.err 只被赋值一次g.err = errif g.cancel != nil {g.cancel() //这里就是为啥不用手动执行 cancel的原因}})}}()}

2.4 TryGo

看注释,知道此函数的逻辑是:当正在执行的goroutine数量小于通过SetLimit()设置的数量时,可以启动成功,返回 true,否则启动失败,返回false。

// TryGo calls the given function in a new goroutine only if the number of// active goroutines in the group is currently below the configured limit.//// The return value reports whether the goroutine was started.func (g *Group) TryGo(f func() error) bool {if g.sem != nil {select {case g.sem <- token{}: // 当g.sem的缓冲区满了过后,就会执行default,也代表着未启动成功// Note: this allows barging iff channels in general allow barging.default:return false}}    //----主要看上面的逻辑,下面的逻辑和Go中的一样-------g.wg.Add(1)go func() {defer g.done()if err := f(); err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}}()return true}

2.5 Wait

代码逻辑很简单,这里主要注意这里:

//我看这里的时候,有点疑惑,为啥这里会去调用 cancel()方法呢?
//这里是为了代码的健壮性,用 context.WithCancel() 创建得到的 cancel,在代码执行完毕之前取消是一个好习惯
g.cancel()

// Wait blocks until all function calls from the Go method have returned, then// returns the first non-nil error (if any) from them.func (g *Group) Wait() error {  g.wg.Wait() //通过 g.wg.Wait() 阻塞等待所有的 goroutine 执行完if g.cancel != nil {    //我看这里的时候,有点疑惑,为啥这里会去调用 cancel()方法呢?    //这里是为了代码的健壮性,用 context.WithCancel() 创建得到的 cancel,在代码执行完毕之前取消是一个好习惯 g.cancel()}return g.err}

2.6 SetLimit

看代码的注释,我们知道:SetLimit的逻辑主要是限制同时执行的 goroutines 的数量为n,当n小于0时,没有限制。如果有运行的 goroutine,调用此方法会报错。

// SetLimit limits the number of active goroutines in this group to at most n.// A negative value indicates no limit.//// Any subsequent call to the Go method will block until it can add an active// goroutine without exceeding the configured limit.//// The limit must not be modified while any goroutines in the group are active.func (g *Group) SetLimit(n int) {if n < 0 {g.sem = nilreturn}if len(g.sem) != 0 {panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))}g.sem = make(chan token, n)}

3、errgroup 容易忽视的坑

这个坑是看别人的记录看到的,对errgroup不太熟悉时,是不小心确实容易掉进去,所以摘抄了过来,如果侵权,请联系删除,谢谢!

原文链接:并发编程包之 errgroup

需求:

开启多个Goroutine去缓存中设置数据,同时开启一个Goroutine去异步写日志,很快我的代码就写出来了:

package mainimport ("context""errors""fmt""golang.org/x/sync/errgroup""time")func main()  {g, ctx := errgroup.WithContext(context.Background())// 单独开一个协程去做其他的事情,不参与waitGroupgo WriteChangeLog(ctx)for i:=0 ; i< 3; i++{g.Go(func() error {return errors.New("访问redis失败\n")})}if err := g.Wait();err != nil{fmt.Printf("appear error and err is %s",err.Error())}time.Sleep(1 * time.Second)}func WriteChangeLog(ctx context.Context) error {select {case <- ctx.Done():return nilcase <- time.After(time.Millisecond * 50):fmt.Println("write changelog")}return nil}

结果:

appear error and err is 访问redis失败

代码看着没有问题,但是日志一直没有写入。这是为什么呢?

其实原因就是因为这个ctxerrgroup.WithContext方法返回的一个带取消的ctx,我们把这个ctx当作父context传入WriteChangeLog方法中了,如果errGroup取消了,也会导致上下文的context都取消了,所以WriteChangelog方法就一直执行不到。

这个点是我们在日常开发中想不到的,所以需要注意一下~。

解决方法:

解决方法就是在 go WriteChangeLog(context.Background()) 传入新的ctx

参考资料:

八. Go并发编程–errGroup

并发编程包之 errgroup

上面这个案例中讲了一个容易忽视的坑,大家可以看看