goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元。channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。

select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。

// $GOROOT/src/net/http/server.go
c := srv.newConn(rw)
go c.serve(connCtx)
type T struct {...}

func spawn(f func()) chan T {
    c := make(chan T)
    go func() {
        // 使用channel变量c(通过闭包方式)与调用spawn的goroutine通信
        ...
        f()
        ...
    }()
    
    return c
}

func main() {
    c := spawn(func(){})
    // 使用channel变量c与新创建的goroutine通信
}
// 一次性任务
// $GOROOT/src/net/dial.go

func (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
    ...
    if oldCancel := d.Cancel; oldCancel != nil {
        subCtx, cancel := context.WithCancel(ctx)
        defer cancel()
        go func() {
            select {
            case <-oldCancel:
                cancel()
            case <-subCtx.Done():
            }
        }()
        ctx = subCtx
    }
    ...
}
// 后台任务
// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
    // 每个P都有一个运行在后台的用于标记的G
    for _, p := range allp {
        if p.gcBgMarkWorker == 0 {
            go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorker
            notetsleepg(&work.bgMarkReady, -1)
            noteclear(&work.bgMarkReady)
        }
    }
}

func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ...
    for { // 常驻后台处理GC事宜
        ...
    }
}
// join
func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
    interval, ok := args[0].(int)
    if !ok {
        return
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
}

func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    go func() {
        f(args...)
        c <- struct{}{}
    }()
    return c
}

func main() {
     done := spawn(worker, 5)
     println("spawn a worker goroutine")
     <-done
     println("worker done")
}
// 附带结果

var OK = errors.New("ok")

func worker(args ...interface{}) error {
    if len(args) == 0 {
        return errors.New("invalid args")
    }
    interval, ok := args[0].(int)
    if !ok {
        return errors.New("invalid interval arg")
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
    return OK
}

func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
    c := make(chan error)
    go func() {
        c <- f(args...)
    }()
    return c
}

func main() {
    done := spawn(worker, 5)
    println("spawn worker1")
    err := <-done
    fmt.Println("worker1 done:", err)
    done = spawn(worker)
    println("spawn worker2")
    err = <-done
    fmt.Println("worker2 done:", err)
}
// 多个 goroutine
func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
    
    interval, ok := args[0].(int)
    if !ok {
        return
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
}

func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    var wg sync.WaitGroup
    
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            name := fmt.Sprintf("worker-%d:", i)
            f(args...)
            println(name, "done")
            wg.Done() // worker done!
        }(i)
    }
    
    go func() {
        wg.Wait()
        c <- struct{}{}
    }()
    
    return c
}

func main() {
    done := spawnGroup(5, worker, 3)
    println("spawn a group of workers")
    <-done
    println("group workers done")
}
// 超时
func main() {
    done := spawnGroup(5, worker, 30)
    println("spawn a group of workers")
    
    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-done:
        println("group workers done")
    }
}
// 通知退出并等待
func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}

func spawn(f func(int)) chan string {
    quit := make(chan string)
    go func() {
        var job chan int // 模拟job channel
        for {
            select {
            case j := <-job:
                f(j)
            case <-quit:
                quit <- "ok"
            }
        }
    }()
    return quit
}

func main() {
    quit := spawn(worker)
    println("spawn a worker goroutine")
    
    time.Sleep(5 * time.Second)
    
    // 通知新创建的goroutine退出
    println("notify the worker to exit...")
    quit <- "exit"
    
    timer := time.NewTimer(time.Second * 10)
    defer timer.Stop()
    select {
    case status := <-quit:
        println("worker done:", status)
    case <-timer.C:
        println("wait worker exit timeout")
    }
}
// 通知多个gorutine退出并等待
func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}

func spawnGroup(n int, f func(int)) chan struct{} {
    quit := make(chan struct{})
    job := make(chan int)
    var wg sync.WaitGroup
    
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done() // 保证wg.Done在goroutine退出前被执行
            name := fmt.Sprintf("worker-%d:", i)
            for {
                j, ok := <-job
                if !ok {
                    println(name, "done")
                    return
                }
                // 执行这个job
                worker(j)
            }
        }(i)
    }
    
    go func() {
        <-quit
        close(job) // 广播给所有新goroutine
        wg.Wait()
        quit <- struct{}{}
    }()
    
    return quit
}

func main() {
    quit := spawnGroup(5, worker)
    println("spawn a group of workers")
    
    time.Sleep(5 * time.Second)
    // 通知 worker goroutine 组退出
    println("notify the worker group to exit...")
    quit <- struct{}{}
    
    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-quit:
        println("group workers done")
    }
}
// 并行和串行超时退出

type GracefullyShutdowner interface {
	Shutdown(waitTimeout time.Duration) error
}

type ShutdownerFunc func(time.Duration) error

func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {
	return f(waitTimeout)
}

func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
	c := make(chan struct{})

	go func() {
		var wg sync.WaitGroup
		for _, g := range shutdowners {
			wg.Add(1)
			go func(shutdowner GracefullyShutdowner) {
				defer wg.Done()
				shutdowner.Shutdown(waitTimeout)
			}(g)
		}
		wg.Wait()
		c <- struct{}{}
	}()

	timer := time.NewTimer(waitTimeout)
	defer timer.Stop()

	select {
	case <-c:
		return nil
	case <-timer.C:
		return errors.New("wait timeout")
	}
}

func SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
	start := time.Now()
	var left time.Duration
	timer := time.NewTimer(waitTimeout)

	for _, g := range shutdowners {
		elapsed := time.Since(start)
		left = waitTimeout - elapsed

		c := make(chan struct{})
		go func(shutdowner GracefullyShutdowner) {
			shutdowner.Shutdown(left)
			c <- struct{}{}
		}(g)

		timer.Reset(left)
		select {
		case <-c:
			//continue
		case <-timer.C:
			return errors.New("wait timeout")
		}
	}

	return nil
}
// pipe
func newNumGenerator(start, count int) <-chan int {
    c := make(chan int)
    go func() {
        for i := start; i < start+count; i++ {
            c <- i
        }
        close(c)
    }()
    return c
}

func filterOdd(in int) (int, bool) {
    if in%2 != 0 {
      return 0, false
    }
    return in, true
}

func square(in int) (int, bool) {
    return in * in, true
}

func spawn(f func(int) (int, bool), in <-chan int) <-chan int {
    out := make(chan int)
    
    go func() {
        for v := range in {
            r, ok := f(v)
            if ok {
                out <- r
            }
        }
        close(out)
    }()
    
    return out
}

func main() {
    in := newNumGenerator(1, 20)
    out := spawn(square, spawn(filterOdd, in))
    
    for v := range out {
        println(v)
    }
}
// 扇入扇出
func newNumGenerator(start, count int) <-chan int {
    c := make(chan int)
    go func() {
        for i := start; i < start+count; i++ {
            c <- i
        }
        close(c)
    }()
    return c
}

func filterOdd(in int) (int, bool) {
    if in%2 != 0 {
        return 0, false
    }
    return in, true
}

func square(in int) (int, bool) {
    return in * in, true
}

func spawnGroup(name string, num int, f func(int) (int, bool), in <-chan int) <-chan int {
    groupOut := make(chan int)
    var outSlice []chan int
    for i := 0; i < num; i++ {
        out := make(chan int)
        go func(i int) {
            name := fmt.Sprintf("%s-%d:", name, i)
            fmt.Printf("%s begin to work...\\n", name)
            
            for v := range in {
                r, ok := f(v)
                if ok {
                    out <- r
                }
            }
            close(out)
            fmt.Printf("%s work done\\n", name)
        }(i)
        outSlice = append(outSlice, out)
    }
    
    // 扇入模式
    //
    // out --\\
    //        \\
    // out ---- --> groupOut
    //        /
    // out --/
    //
    go func() {
        var wg sync.WaitGroup
        for _, out := range outSlice {
            wg.Add(1)
            go func(out <-chan int) {
                for v := range out {
                        groupOut <- v
                }
                wg.Done()
            }(out)
        }
        wg.Wait()
        close(groupOut)
    }()
    
    return groupOut
}

func main() {
      in := newNumGenerator(1, 20)
      out := spawnGroup("square", 2, square, spawnGroup("filterOdd", 3, filterOdd, in))

      time.Sleep(3 * time.Second) //为了输出更直观的结果,这里等上面的goroutine都就绪

      for v := range out {
          fmt.Println(v)
    }
}
// 超时取消

type result struct {
	value string
}

func first(servers ...*httptest.Server) (result, error) {
	c := make(chan result)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	queryFunc := func(i int, server *httptest.Server) {
		url := server.URL
		req, err := http.NewRequest("GET", url, nil)
		if err != nil {
			log.Printf("query goroutine-%d: http NewRequest error: %s\\n", i, err)
			return
		}
		req = req.WithContext(ctx)

		log.Printf("query goroutine-%d: send request...\\n", i)
		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			log.Printf("query goroutine-%d: get return error: %s\\n", i, err)
			return
		}
		log.Printf("query goroutine-%d: get response\\n", i)
		defer resp.Body.Close()
		body, _ := ioutil.ReadAll(resp.Body)

		c <- result{
			value: string(body),
		}
		return
	}

	for i, serv := range servers {
		go queryFunc(i, serv)
	}

	select {
	case r := <-c:
		return r, nil
	case <-time.After(500 * time.Millisecond):
		return result{}, errors.New("timeout")
	}
}

func fakeWeatherServer(name string, interval int) *httptest.Server {
	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		log.Printf("%s receive a http request\\n", name)
		time.Sleep(time.Duration(interval) * time.Millisecond)
		w.Write([]byte(name + ":ok"))
	}))
}

func main() {
	result, err := first(fakeWeatherServer("open-weather-1", 200),
		fakeWeatherServer("open-weather-2", 1000),
		fakeWeatherServer("open-weather-3", 600))
	if err != nil {
		log.Println("invoke first error:", err)
		return
	}

	fmt.Println(result)
	time.Sleep(10 * time.Second)
}

channel

sync