Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some goroutine leaks problem and Potential program crash issue #69

Open
cyb0225 opened this issue Feb 16, 2023 · 0 comments
Open

Some goroutine leaks problem and Potential program crash issue #69

cyb0225 opened this issue Feb 16, 2023 · 0 comments

Comments

@cyb0225
Copy link

cyb0225 commented Feb 16, 2023

func (u *userService) List(ctx context.Context, opts metav1.ListOptions) (*v1.UserList, error) {
users, err := u.store.Users().List(ctx, opts)
if err != nil {
log.L(ctx).Errorf("list users from storage failed: %s", err.Error())
return nil, errors.WithCode(code.ErrDatabase, err.Error())
}
wg := sync.WaitGroup{}
errChan := make(chan error, 1)
finished := make(chan bool, 1)
var m sync.Map
// Improve query efficiency in parallel
for _, user := range users.Items {
wg.Add(1)
go func(user *v1.User) {
defer wg.Done()
// some cost time process
policies, err := u.store.Policies().List(ctx, user.Name, metav1.ListOptions{})
if err != nil {
errChan <- errors.WithCode(code.ErrDatabase, err.Error())
return
}
m.Store(user.ID, &v1.User{
ObjectMeta: metav1.ObjectMeta{
ID: user.ID,
InstanceID: user.InstanceID,
Name: user.Name,
Extend: user.Extend,
CreatedAt: user.CreatedAt,
UpdatedAt: user.UpdatedAt,
},
Nickname: user.Nickname,
Email: user.Email,
Phone: user.Phone,
TotalPolicy: policies.TotalCount,
LoginedAt: user.LoginedAt,
})
}(user)
}
go func() {
wg.Wait()
close(finished)
}()
select {
case <-finished:
case err := <-errChan:
return nil, err
}
infos := make([]*v1.User, 0, len(users.Items))
for _, user := range users.Items {
info, _ := m.Load(user.ID)
infos = append(infos, info.(*v1.User))
}
log.L(ctx).Debugf("get %d users from backend storage.", len(infos))
return &v1.UserList{ListMeta: users.ListMeta, Items: infos}, nil
}

I think the design of this function errChan will have potential coroutine blocking problems, which will lead to coroutine leaks.
If more than one goroutine encounters an error, all goroutines will block except for the first error goroutine.
And here is my test demo, which confirmed my guess.

package main

import (
	"errors"
	"fmt"
	"sync"
	"time"
)

func List() error {
	wg := sync.WaitGroup{}
	errChan := make(chan error, 1)
	finished := make(chan bool, 1)

	// arr is form 1 to 10
	arr := make([]int, 10)
	for i := 0; i < len(arr); i++ {
		arr[i] = i + 1
	}

	for _, v := range arr {
		wg.Add(1)

		go func(item int) {
			defer wg.Done()

			// We assume an even number of entries will get an error
			// And We send it to errChan
			if item%2 == 0 {
				for {
					// the select is used to detect whether the coroutine is blocked here
					select {
					case errChan <- errors.New(fmt.Sprint("error message: ", item)):
						return
					case <-time.After(time.Second * 2):
						fmt.Printf("goroutine %d get an error and blocked...\n", item)
					}
				}
			}

		}(v)
	}

	go func() {
		wg.Wait()
		close(finished)
	}()

	select {
	case <-finished:
	// If we get more than one error here,
	// the goroutine that sends the error will be blocked
	case err := <-errChan:
		return err
	}

	return nil
}

func main() {
	if err := List(); err != nil {
		fmt.Println("catch error:", err)
	}

	ch := make(chan struct{})
	<-ch
}

/*
Output:
catch error: error message: 6
goroutine 10 get an error and blocked...
goroutine 8 get an error and blocked...
goroutine 4 get an error and blocked...
goroutine 8 get an error and blocked...
goroutine 10 get an error and blocked...
goroutine 4 get an error and blocked...
goroutine 10 get an error and blocked...
goroutine 8 get an error and blocked...
goroutine 4 get an error and blocked...
goroutine 4 get an error and blocked...
goroutine 8 get an error and blocked...
goroutine 10 get an error and blocked...
goroutine 10 get an error and blocked...
goroutine 8 get an error and blocked...
goroutine 4 get an error and blocked...
*/

One way to solve this problem is to set errChan's cap to the length of arrary.
And I found that go-zero is use mapreduce to solve this kind of problem about concurrent processing of list data.

And another problem in this file is that if we use the naked goroutine without 'defer recover', the whole application will panic when the naked goroutine get an unpredictable panic.
for example,

package main

import "fmt"

// sub-goroutine panic will cause the application to crash
func main() {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println("catch: ", err)
		}
	}()

	go func() {
		panic("sub-goroutine panic")
	}()

	ch := make(chan struct{})
	<-ch
}

/*
Output:
panic: sub-goroutine panic

goroutine 6 [running]:
main.main.func2()
        E:/Projects/go/go-cas-demos/goroutine/main.go:14 +0x27
created by main.main
        E:/Projects/go/go-cas-demos/goroutine/main.go:13 +0x4
*/

I think we can refer to the design of go-zero, and uniformly use the packaged function to start the coroutine, so as to avoid the collapse of online business.

In the past few days, I am learning the beautiful design of go-zero. If I can, I will submit a pull request to solve these problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant