goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元。channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。
select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。
// $GOROOT/src/net/http/server.go
c := srv.newConn(rw)
go c.serve(connCtx)
type T struct {...}
func spawn(f func()) chan T {
c := make(chan T)
go func() {
// 使用channel变量c(通过闭包方式)与调用spawn的goroutine通信
...
f()
...
}()
return c
}
func main() {
c := spawn(func(){})
// 使用channel变量c与新创建的goroutine通信
}
// 一次性任务
// $GOROOT/src/net/dial.go
func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
...
if oldCancel := d.Cancel; oldCancel != nil {
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():
}
}()
ctx = subCtx
}
...
}
// 后台任务
// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
// 每个P都有一个运行在后台的用于标记的G
for _, p := range allp {
if p.gcBgMarkWorker == 0 {
go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorker
notetsleepg(&work.bgMarkReady, -1)
noteclear(&work.bgMarkReady)
}
}
}
func gcBgMarkWorker(_p_ *p) {
gp := getg()
...
for { // 常驻后台处理GC事宜
...
}
}
// join
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
go func() {
f(args...)
c <- struct{}{}
}()
return c
}
func main() {
done := spawn(worker, 5)
println("spawn a worker goroutine")
<-done
println("worker done")
}
// 附带结果
var OK = errors.New("ok")
func worker(args ...interface{}) error {
if len(args) == 0 {
return errors.New("invalid args")
}
interval, ok := args[0].(int)
if !ok {
return errors.New("invalid interval arg")
}
time.Sleep(time.Second * (time.Duration(interval)))
return OK
}
func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
c := make(chan error)
go func() {
c <- f(args...)
}()
return c
}
func main() {
done := spawn(worker, 5)
println("spawn worker1")
err := <-done
fmt.Println("worker1 done:", err)
done = spawn(worker)
println("spawn worker2")
err = <-done
fmt.Println("worker2 done:", err)
}
// 多个 goroutine
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Second * (time.Duration(interval)))
}
func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
name := fmt.Sprintf("worker-%d:", i)
f(args...)
println(name, "done")
wg.Done() // worker done!
}(i)
}
go func() {
wg.Wait()
c <- struct{}{}
}()
return c
}
func main() {
done := spawnGroup(5, worker, 3)
println("spawn a group of workers")
<-done
println("group workers done")
}
// 超时
func main() {
done := spawnGroup(5, worker, 30)
println("spawn a group of workers")
timer := time.NewTimer(time.Second * 5)
defer timer.Stop()
select {
case <-timer.C:
println("wait group workers exit timeout!")
case <-done:
println("group workers done")
}
}
// 通知退出并等待
func worker(j int) {
time.Sleep(time.Second * (time.Duration(j)))
}
func spawn(f func(int)) chan string {
quit := make(chan string)
go func() {
var job chan int // 模拟job channel
for {
select {
case j := <-job:
f(j)
case <-quit:
quit <- "ok"
}
}
}()
return quit
}
func main() {
quit := spawn(worker)
println("spawn a worker goroutine")
time.Sleep(5 * time.Second)
// 通知新创建的goroutine退出
println("notify the worker to exit...")
quit <- "exit"
timer := time.NewTimer(time.Second * 10)
defer timer.Stop()
select {
case status := <-quit:
println("worker done:", status)
case <-timer.C:
println("wait worker exit timeout")
}
}
// 通知多个gorutine退出并等待
func worker(j int) {
time.Sleep(time.Second * (time.Duration(j)))
}
func spawnGroup(n int, f func(int)) chan struct{} {
quit := make(chan struct{})
job := make(chan int)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done() // 保证wg.Done在goroutine退出前被执行
name := fmt.Sprintf("worker-%d:", i)
for {
j, ok := <-job
if !ok {
println(name, "done")
return
}
// 执行这个job
worker(j)
}
}(i)
}
go func() {
<-quit
close(job) // 广播给所有新goroutine
wg.Wait()
quit <- struct{}{}
}()
return quit
}
func main() {
quit := spawnGroup(5, worker)
println("spawn a group of workers")
time.Sleep(5 * time.Second)
// 通知 worker goroutine 组退出
println("notify the worker group to exit...")
quit <- struct{}{}
timer := time.NewTimer(time.Second * 5)
defer timer.Stop()
select {
case <-timer.C:
println("wait group workers exit timeout!")
case <-quit:
println("group workers done")
}
}
// 并行和串行超时退出
type GracefullyShutdowner interface {
Shutdown(waitTimeout time.Duration) error
}
type ShutdownerFunc func(time.Duration) error
func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {
return f(waitTimeout)
}
func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
c := make(chan struct{})
go func() {
var wg sync.WaitGroup
for _, g := range shutdowners {
wg.Add(1)
go func(shutdowner GracefullyShutdowner) {
defer wg.Done()
shutdowner.Shutdown(waitTimeout)
}(g)
}
wg.Wait()
c <- struct{}{}
}()
timer := time.NewTimer(waitTimeout)
defer timer.Stop()
select {
case <-c:
return nil
case <-timer.C:
return errors.New("wait timeout")
}
}
func SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
start := time.Now()
var left time.Duration
timer := time.NewTimer(waitTimeout)
for _, g := range shutdowners {
elapsed := time.Since(start)
left = waitTimeout - elapsed
c := make(chan struct{})
go func(shutdowner GracefullyShutdowner) {
shutdowner.Shutdown(left)
c <- struct{}{}
}(g)
timer.Reset(left)
select {
case <-c:
//continue
case <-timer.C:
return errors.New("wait timeout")
}
}
return nil
}
// pipe
func newNumGenerator(start, count int) <-chan int {
c := make(chan int)
go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()
return c
}
func filterOdd(in int) (int, bool) {
if in%2 != 0 {
return 0, false
}
return in, true
}
func square(in int) (int, bool) {
return in * in, true
}
func spawn(f func(int) (int, bool), in <-chan int) <-chan int {
out := make(chan int)
go func() {
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
}()
return out
}
func main() {
in := newNumGenerator(1, 20)
out := spawn(square, spawn(filterOdd, in))
for v := range out {
println(v)
}
}
// 扇入扇出
func newNumGenerator(start, count int) <-chan int {
c := make(chan int)
go func() {
for i := start; i < start+count; i++ {
c <- i
}
close(c)
}()
return c
}
func filterOdd(in int) (int, bool) {
if in%2 != 0 {
return 0, false
}
return in, true
}
func square(in int) (int, bool) {
return in * in, true
}
func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {
groupOut := make(chan int)
var outSlice []chan int
for i := 0; i < num; i++ {
out := make(chan int)
go func(i int) {
name := fmt.Sprintf("%s-%d:", name, i)
fmt.Printf("%s begin to work...\\n", name)
for v := range in {
r, ok := f(v)
if ok {
out <- r
}
}
close(out)
fmt.Printf("%s work done\\n", name)
}(i)
outSlice = append(outSlice, out)
}
// 扇入模式
//
// out --\\
// \\
// out ---- --> groupOut
// /
// out --/
//
go func() {
var wg sync.WaitGroup
for _, out := range outSlice {
wg.Add(1)
go func(out <-chan int) {
for v := range out {
groupOut <- v
}
wg.Done()
}(out)
}
wg.Wait()
close(groupOut)
}()
return groupOut
}
func main() {
in := newNumGenerator(1, 20)
out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))
time.Sleep(3 * time.Second) //为了输出更直观的结果,这里等上面的goroutine都就绪
for v := range out {
fmt.Println(v)
}
}
// 超时取消
type result struct {
value string
}
func first(servers ...*httptest.Server) (result, error) {
c := make(chan result)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
queryFunc := func(i int, server *httptest.Server) {
url := server.URL
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Printf("query goroutine-%d: http NewRequest error: %s\\n", i, err)
return
}
req = req.WithContext(ctx)
log.Printf("query goroutine-%d: send request...\\n", i)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("query goroutine-%d: get return error: %s\\n", i, err)
return
}
log.Printf("query goroutine-%d: get response\\n", i)
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
c <- result{
value: string(body),
}
return
}
for i, serv := range servers {
go queryFunc(i, serv)
}
select {
case r := <-c:
return r, nil
case <-time.After(500 * time.Millisecond):
return result{}, errors.New("timeout")
}
}
func fakeWeatherServer(name string, interval int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Printf("%s receive a http request\\n", name)
time.Sleep(time.Duration(interval) * time.Millisecond)
w.Write([]byte(name + ":ok"))
}))
}
func main() {
result, err := first(fakeWeatherServer("open-weather-1", 200),
fakeWeatherServer("open-weather-2", 1000),
fakeWeatherServer("open-weather-3", 600))
if err != nil {
log.Println("invoke first error:", err)
return
}
fmt.Println(result)
time.Sleep(10 * time.Second)
}