Skip to content

It is an efficient, flexible, and lightweight goroutine pool. It provides an easy way to deal with concurrent tasks with limited resource.

License

Notifications You must be signed in to change notification settings

viney-shih/goroutines

Repository files navigation

goroutines

GoDev Build Status Go Report Card codecov Coverage Status License Sourcegraph FOSSA Status

Package goroutines is an efficient, flexible, and lightweight goroutine pool written in Go. It provides a easy way to deal with several kinds of concurrent tasks with limited resource.

Inspired by fastsocket, the implementation is based on channel. It adopts pubsub model for dispatching tasks, and holding surplus tasks in queue if submitted more than the capacity of pool.

Features

  • Spawning and managing arbitrary number of asynchronous goroutines as a worker pool.
  • Dispatch tasks to workers through pubsub model with specified queue size.
  • Adjust the worker numbers based on the usage periodically.
  • Easy to use when dealing with concurrent one-time batch jobs.
  • Monitor current status by metrics

Table of Contents

Installation

go get github.com/viney-shih/goroutines

Get Started

Basic usage of Pool in blocking mode

By calling Schedule(), it schedules the task executed by worker (goroutines) in the Pool. It will be blocked until the workers accepting the request.

taskN := 7
rets := make(chan int, taskN)

// allocate a pool with 5 goroutines to deal with those tasks
p := goroutines.NewPool(5)
// don't forget to release the pool in the end
defer p.Release()

// assign tasks to asynchronous goroutine pool
for i := 0; i < taskN; i++ {
	idx := i
	p.Schedule(func() {
		// sleep and return the index
		time.Sleep(20 * time.Millisecond)
		rets <- idx
	})
}

// wait until all tasks done
for i := 0; i < taskN; i++ {
	fmt.Println("index:", <-rets)
}

// Unordered output:
// index: 3
// index: 1
// index: 2
// index: 4
// index: 5
// index: 6
// index: 0

Basic usage of Pool in nonblocking mode

By calling ScheduleWithTimeout(), it schedules the task executed by worker (goroutines) in the Pool within the specified period. If it exceeds the time and doesn't be accepted, it will return error ErrScheduleTimeout.

totalN, taskN := 5, 5
pause := make(chan struct{})
rets := make(chan int, taskN)

// allocate a pool with 5 goroutines to deal with those 5 tasks
p := goroutines.NewPool(totalN)
// don't forget to release the pool in the end
defer p.Release()

// full the workers which are stopped with the `pause`
for i := 0; i < taskN; i++ {
	idx := i
	p.ScheduleWithTimeout(50*time.Millisecond, func() {
		<-pause
		rets <- idx
	})
}

// no more chance to add any task in Pool, and return `ErrScheduleTimeout`
if err := p.ScheduleWithTimeout(50*time.Millisecond, func() {
	<-pause
	rets <- taskN
}); err != nil {
	fmt.Println(err.Error())
}

close(pause)
for i := 0; i < taskN; i++ {
	fmt.Println("index:", <-rets)
}

// Unordered output:
// schedule timeout
// index: 0
// index: 3
// index: 2
// index: 4
// index: 1

Advanced usage of Batch jobs

To deal with batch jobs and consider the performance, we need to run tasks concurrently. However, the use case usually happen once and need not maintain a Pool for reusing it. I wrap this patten and call it Batch. Here comes an example.

taskN := 11

// allocate a one-time batch job with 3 goroutines to deal with those tasks.
// no need to spawn extra goroutine by specifing the batch size consisting with the number of tasks.
b := goroutines.NewBatch(3, goroutines.WithBatchSize(taskN))
// don't forget to close batch job in the end
defer b.Close()

// pull all tasks to this batch queue
for i := 0; i < taskN; i++ {
	idx := i
	b.Queue(func() (interface{}, error) {
		// sleep and return the index
		time.Sleep(10 * time.Millisecond)
		return idx, nil
	})
}

// tell the batch that's all need to do
// DO NOT FORGET THIS OR GOROUTINES WILL DEADLOCK
b.QueueComplete()

for ret := range b.Results() {
	if ret.Error() != nil {
		panic("not expected")
	}

	fmt.Println("index:", ret.Value().(int))
}

// Unordered output:
// index: 3
// index: 1
// index: 2
// index: 4
// index: 5
// index: 6
// index: 10
// index: 7
// index: 9
// index: 8
// index: 0

See the examples, documentation and article for more details.

Options

PoolOption

The PoolOption interface is passed to NewPool when creating Pool.

• WithTaskQueueLength( length int )

It sets up the length of task queue for buffering tasks before sending to goroutines. The default queue length is 0.

• WithPreAllocWorkers( size int )

It sets up the number of workers to spawn when initializing Pool. Without specifying this, It initialize all numbers of goroutines consisting with Pool size at the beginning.

• WithWorkerAdjustPeriod( period time.Duration )

It sets up the duration to adjust the worker size, and needs to be used with WithPreAllocWorkers at the same time. By specifying both, it enables the mechanism to adjust the number of goroutines according to the usage dynamically.

BatchOption

The BatchOption interface is passed to NewBatch when creating Batch.

• WithBatchSize( size int )

It specifies the batch size used to forward tasks. By default, it needs to spawn an extra goroutine to prevent deadlocks. It's helpful by specifing the batch size consisting with the number of tasks without an extra goroutine (see the example). The default batch size is 10.

References

License

Apache-2.0

FOSSA Status

About

It is an efficient, flexible, and lightweight goroutine pool. It provides an easy way to deal with concurrent tasks with limited resource.

Topics

Resources

License

Security policy

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •  

Languages