这可能是最容易理解的 Go Mutex 源码剖析

6-1 channel

代码案例

code1 基本chan收发

package main

import "fmt"

func chanDemo() {
    //var c chan int // c == nil
    c := make(chan int)
    c <- 1
    c <- 2
    c <- 3
    n:= <-c
    fmt.Println(n)

}

func main() {
    chanDemo()
}

/**
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.chanDemo()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/channel.go:8 +0x37
main.main()
 */

死锁了

发的数据没人收是会deadlock的

1635301183810

code2 加上等待,避免主线程停止而导致其他chan都被干掉

package main

import (
    "fmt"
    "time"
)

func chanDemo() {
    //var c chan int // c == nil
    c := make(chan int)

    go func() {
        for  {
            n:= <- c
            fmt.Println(n)
        }
    }()

    c <- 1
    c <- 2
    //c <- 3
    //n := <-c
    //fmt.Println(n)
    time.Sleep(time.Millisecond)

}

func main() {
    chanDemo()
}

/**
1
2

Process finished with the exit code 0
*/

go语言函数是一等公民

go语言中的channel 也是一等公民

code3 worker提取出来

package main

import (
    "fmt"
    "time"
)

func worker(c chan int) {
    for {
        n := <-c
        fmt.Println(n)
    }
}

func chanDemo() {
    //var c chan int // c == nil
    c := make(chan int)

    go worker(c)

    c <- 1
    c <- 2
    //c <- 3
    //n := <-c
    //fmt.Println(n)
    time.Sleep(time.Millisecond)

}

func main() {
    chanDemo()
}

/**
1
2

Process finished with the exit code 0
*/

code4 chan可以通过外部参数传递进来

package main

import (
    "fmt"
    "time"
)

func worker(id int,c chan int) {
    for {
        fmt.Println(id,<-c)
    }
}

func chanDemo() {

    var cahnneles [10]chan int
    for i := 0; i < 10; i++ {
        //var c chan int // c == nil
        cahnneles[i] = make(chan int)
        go worker(i,cahnneles[i])
    }

    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'a' + i
    }

    time.Sleep(time.Millisecond)

}

func main() {
    chanDemo()
}

/**
5 102
1 98
2 99
3 100
4 101
0 97
6 103
7 104
9 106
8 105

Process finished with the exit code 0

*/

在打印

code5 格式化一下

package main

import (
    "fmt"
    "time"
)

func worker(id int, c chan int) {
    for {
        fmt.Printf("%d,%c\n", id, <-c)
    }
}

func chanDemo() {

    var cahnneles [10]chan int
    for i := 0; i < 10; i++ {
        //var c chan int // c == nil
        cahnneles[i] = make(chan int)
        go worker(i, cahnneles[i])
    }

    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'A' + i
    }
    time.Sleep(time.Millisecond)

}

func main() {
    chanDemo()
}

/**
5,f
2,c
0,a
1,b
3,d
0,A
1,B
9,j
8,i
4,e
4,E
3,D
7,h
2,C
6,g
6,G
7,H
9,J
5,F
8,I

Process finished with the exit code 0


*/

goroutine 调度之后,先发的不一定会先收到

code6 并发从chan读取

package main

import (
    "fmt"
    "time"
)

func createWorker(id int) chan int {
    c := make(chan int)
    go func() {
        for {
            fmt.Printf("@%d---%c\n", id, <-c)
        }
    }()
    return c
}

func chanDemo() {

    var cahnneles [10]chan int
    for i := 0; i < 10; i++ {
        //var c chan int // c == nil
        cahnneles[i] = createWorker(i)
    }

    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'A' + i
    }
    time.Sleep(time.Millisecond)

}

func main() {
    chanDemo()
}

/**

@0---a
@8---i
@1---b
@1---B
@3---d
@4---e
@5---f
@6---g
@7---h
@0---A
@9---j
@2---c
@2---C
@5---F
@6---G
@7---H
@3---D
@4---E
@9---J
@8---I

Process finished with the exit code 0

*/

// // 告诉外面用的人 , 我这个channel怎么用

1635302653609

这样就不能收数据了,只能发 属性 (send-only type)

//n:= <-cahnneles[i] 
// ↑ Invalid operation: <-cahnneles[i] (receive from the send-only type chan<- int)

code7 chan<- int 只能够收/发 的类型 实现

package main

import (
    "fmt"
    "time"
)

// // 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan<- int { // 告诉外面用的人 , 我这个channel怎么用
    //
    c := make(chan int)
    go func() {
        for {
            fmt.Printf("@%d---%c\n", id, <-c)
        }
    }()
    return c
}

func chanDemo() {

    var cahnneles [10]chan<- int
    for i := 0; i < 10; i++ {
        //var c chan int // c == nil
        cahnneles[i] = createWorker(i)
        //n:= <-cahnneles[i]
        // ↑ Invalid operation: <-cahnneles[i] (receive from the send-only type chan<- int)
    }

    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        cahnneles[i] <- 'A' + i
    }
    time.Sleep(time.Millisecond)

}

func bufferedChannel() {
    c := make(chan int)
    c <- 1
}

func main() {
    //chanDemo()
    bufferedChannel()
}

/**
goroutine 1 [chan send]:
main.bufferedChannel(...)
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/channel.go:42
main.main()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/channel.go:47 +0x31

Process finished with the exit code 2

 */

这样就是到4个 追加的时候才报错

code8 加上缓冲区(指定长度)

func bufferedChannel() {
    // 加上缓冲区,大小为3
    c := make(chan int,3)
    c <- 1
    c <- 2
    c <- 3
    c <- 4

}

code9 chan 不用同步收发了就

package main

import (
    "fmt"
    "time"
)

func worker(id int,c chan int) {
    for {
        fmt.Printf("@%d---%c\n", id, <-c)
    }
}


func bufferedChannel() {
    // 加上缓冲区,大小为3
    c := make(chan int,3)
    go worker(0,c)
    c <- '1'
    c <- '2'
    c <- '3'
    //c <- 4
    time.Sleep(time.Millisecond)



}

func main() {
    //chanDemo()
    bufferedChannel()
}

/**
@0---1
@0---2
@0---3

Process finished with the exit code 0

 */

告诉接收方,何时发完了

code10 能够通过 err判断 chan是否 没消息了

package main

import (
    "fmt"
    "time"
)

func worker(id int, c chan int) {
    for {
        n,ok := <-c
        if ok{
            fmt.Printf("@%d---%d\n", id,n)
        }else{
            break
        }
    }
}


func bufferedChannel() {
    // 加上缓冲区,大小为3
    c := make(chan int)
    go worker(0, c)
    c <- '1'
    c <- '2'
    c <- '3'
    c <- 'd'
    //c <- 4
    close(c)
    time.Sleep(time.Millisecond)

}

func main() {
    //chanDemo()
    bufferedChannel()
}

/**
@0---49
@0---50
@0---51
@0---100

Process finished with the exit code 0

*/

code11 chan的for in 遍历

func worker(id int, c chan int) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
    }
}
// 这样也可以

理论channel

  • channel
  • buffered channel
  • range
  • 理论基础:Communication Sequential Process (CSP)

go语言的创作者说

go语言的创作者说:

Don't communicate by sharing memory; sharememory by communicating.

不要通过共享内存来通讯;通过通信来共享内存

6-2 使用Channel等待任务结束

通过通信来共享内存

code01 顺序打印待修复

目前是按照顺序打印的
package main

import (
    "fmt"
    "time"
)

func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        done <- true
    }
}

type worker struct {
    in   chan int
    done chan bool
}

// // 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i)
    }

    for i := 0; i < 10; i++ {
        workers[i].in <- 'a' + i
        res := <-workers[i].done
        print(res)
    }
    for i := 0; i < 10; i++ {
        workers[i].in <- 'A' + i
        res := <-workers[i].done
        print(res)
    }
    time.Sleep(time.Second)

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
    //bufferedChannel()
}

/**
Channel as first-class citizen
@0---97
@1---98
@2---99
@3---100
@4---101
@5---102
@6---103
@7---104
@8---105
@9---106
@0---65
@1---66
@2---67
@3---68
@4---69
@5---70
@6---71
@7---72
@8---73
@9---74
truetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetruetrue
Process finished with the exit code 0

*/

code02 报错了

package main

import (
    "fmt"
    "time"
)

func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        done <- true
    }
}

type worker struct {
    in   chan int
    done chan bool
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}

func chanDemo() {

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i)
    }

    for i, w := range workers {
        w.in <- 'a' + i
        //res := <-workers[i].done
        //print(res)
    }
    for i, w := range workers {
        w.in <- 'A' + i
        //res := <-workers[i].done
        //print(res)
    }

    // wait for all of them
    for _, w := range workers {
        res := <-w.done
        res2 := <-w.done
        print(res)
        print(res2)
    }
    time.Sleep(time.Second)

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
}

/**
Channel as first-class citizen
@0---97
@4---101
@2---99
@3---100
@9---106
@5---102
@6---103
@8---105
@7---104
@1---98
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.chanDemo()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:45 +0x15d
main.main()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:63 +0x57

goroutine 6 [chan send]:



*/

大写字母那部分,发完了,没有来接收的,发会阻塞住

因为 channel 没有长度

code03 等待的也 开goroutine

func doWork(id int, c chan int, done chan bool) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        go func() {
            done <- true
        }()
    }
}
//这样就可以了

这是因为我们需要等2次,如果我们只是等一次就不用的

code04 waitgroup方式来进行判断

package main

import (
    "fmt"
    "sync"
    //"time"
)

func doWork(
    id int, c chan int, wg *sync.WaitGroup,
) {
    for n := range c {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        wg.Done()
    }
}

type worker struct {
    in chan int
    wg *sync.WaitGroup
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int, wg *sync.WaitGroup) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in: make(chan int),
        wg: wg,
    }
    go doWork(id, w.in, wg)
    return w
}

func chanDemo() {

    var wg sync.WaitGroup

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i, &wg)
    }
    wg.Add(20)

    for i, w := range workers {
        w.in <- 'a' + i
        //wg.Add(1)

    }
    for i, w := range workers {
        w.in <- 'A' + i
    }

    // wait for all of themtime.Sleep(time.Second)
    wg.Wait()

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
}

/**
Channel as first-class citizen
@0---97
@4---101
@2---99
@3---100
@9---106
@5---102
@6---103
@8---105
@7---104
@1---98
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.chanDemo()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:45 +0x15d
main.main()
    E:/Projects/GolandProjects/go-camp/mooc/code/learngo/channel/done/channel.go:63 +0x57

goroutine 6 [chan send]:


*/

code05 函数是一等公民之再次改进

package main

import (
    "fmt"
    "sync"
    //"time"
)

func doWork(id int, w worker) {
    for n := range w.in {
        fmt.Printf("@%d---%d\n", id, n)
        // 通知外面 做完了( channel 是一等公民)
        w.done()
    }
}

type worker struct {
    in   chan int
    done func()
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int, wg *sync.WaitGroup) worker { // 告诉外面用的人 , 我这个channel怎么用
    //
    w := worker{
        in: make(chan int),
        // 函数式编程 
        // 匿名函数来赋值
        done: func() {
            wg.Done()
        },
    }
    go doWork(id, w)
    return w
}

func chanDemo() {

    var wg sync.WaitGroup

    var workers [10]worker
    for i := 0; i < 10; i++ {
        workers[i] = createWorker(i, &wg)
    }
    wg.Add(20)

    for i, w := range workers {
        w.in <- 'a' + i
        //wg.Add(1)

    }
    for i, w := range workers {
        w.in <- 'A' + i
    }

    // wait for all of themtime.Sleep(time.Second)
    wg.Wait()

}

func main() {
    fmt.Println("Channel as first-class citizen")
    chanDemo()
}

/**
Channel as first-class citizen
@0---97
@1---98
@2---99
@2---67
@3---100
@3---68
@4---101
@4---69
@5---102
@5---70
@6---103
@6---71
@7---104
@7---72
@8---105
@8---73
@9---106
@9---74
@0---65
@1---66

Process finished with the exit code 0


*/

6-3 使用Channel进行树的遍历

code01

package tree

import "fmt"

type Node struct {
    Value       int
    Left, Right *Node
}

func (node Node) Print() {
    fmt.Print(node.Value, " ")
}

func (node *Node) SetValue(value int) {
    if node == nil {
        fmt.Println("Setting Value to nil " +
            "node. Ignored.")
        return
    }
    node.Value = value
}

func CreateNode(value int) *Node {
    return &Node{Value: value}
}
package tree

import "fmt"

func (node *Node) Traverse() {
    node.TraverseFunc(func(n *Node) {
        n.Print()
    })
    fmt.Println()
}

func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }

    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}

func (node *Node) TraverseWithChannel() chan *Node {
    out := make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out)
    }()
    return out
}

6-4 Select

使用select 进行调度

code1 非阻塞式的处理

package main

import "fmt"

func main() {
    var c1, c2 chan int
    select {
    case n := <-c1:
        fmt.Println("received from c1:", n)
    case n := <-c2:
        fmt.Println("received from c2:", n)
    default:
        fmt.Println("no val received")
    }

}
/**
no val received

Process finished with the exit code 0
非阻塞式的处理
 */

code02 死循环

func main() {
    var c1, c2 chan int
    for {
        select {
        case n := <-c1:
            fmt.Println("received from c1:", n)
        case n := <-c2:
            fmt.Println("received from c2:", n)
        default:
            fmt.Println("no val received")
        }
    }

}

code03 改进

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func main() {
    var c1, c2 = generator(), generator()
    for {
        select {
        case n := <-c1:
            fmt.Println("received from c1:", n)
        case n := <-c2:
            fmt.Println("received from c2:", n)
        //default:
        //    fmt.Println("no val received")
        }
    }

}

/**
received from c1: 0
received from c2: 0
received from c2: 1
received from c1: 1
received from c1: 2
received from c2: 2
received from c1: 3
received from c2: 3
received from c2: 4
received from c2: 5
received from c1: 4
received from c1: 5
received from c1: 6
received from c2: 6
received from c1: 7
received from c2: 7
received from c1: 8

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

*/

code04 temp 改进中

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for {
        n, ok := <-c
        if ok {
            fmt.Printf("@%d---%d\n", id, n)
        } else {
            break
        }
    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    hasValue := false
    for {

        var activeWorker chan<- int
        if hasValue {
            activeWorker = work
        }
        select {
        case n = <-c1:
            hasValue = true
        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
@0---3

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

 */

code05 sleep长一点就会出问题

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(5 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    hasValue := false
    for {

        var activeWorker chan<- int
        if hasValue {
            activeWorker = work
        }
        select {
        case n = <-c1:
            hasValue = true
        case n = <-c2:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }
    }

}

/**
@0---0
@0---7
@0---12
@0---18


*/

新的数据会把之前的数据冲掉

code06 吧收到的n,存下来排队

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
@0---3
@0---3


*/

code07 让程序 10秒钟后结束

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    tm := time.After(10 * time.Second)
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-tm:
            fmt.Println("bye")
            return
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
@0---3
@0---3
@0---4
bye

Process finished with the exit code 0

*/

code08 超时的情况

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    tm := time.After(10 * time.Second)
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-time.After(800 * time.Millisecond):
            // 如果超过 800 毫秒 之内没有生成数据
            fmt.Println("timeout")
        case <-tm:
            fmt.Println("bye")
            return
        }
    }

}

/**
@0---0
@0---0
@0---1
@0---1
@0---2
@0---2
timeout
@0---3
timeout
@0---3
timeout
@0---4
bye

Process finished with the exit code 0


*/

code09 定时的加入 每秒显示长度

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(
                time.Duration(rand.Intn(1500)) * time.Millisecond,
            )
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(1 * time.Second)
        fmt.Printf("@%d---%d\n", id, n)

    }
}

// 告诉外面用的人 , 我这个channel怎么用
func createWorker(id int) chan int { // 告诉外面用的人 , 我这个channel怎么用
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var work = createWorker(0)

    n := 0
    var values []int
    tm := time.After(10 * time.Second)
    tick := time.Tick(time.Second)
    for {

        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = work
            activeValue = values[0]
        }
        select {
        case n = <-c1:
            values = append(values, n)
        case n = <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-time.After(800 * time.Millisecond):
            // 如果超过 800 毫秒 之内没有生成数据
            fmt.Println("timeout")
        case <-tick:
            fmt.Println("queue len is", len(values))
        case <-tm:
            fmt.Println("bye")
            return
        }
    }

}

/**
queue len is 3
@0---0
queue len is 4
@0---0
queue len is 5
@0---1
queue len is 10
@0---1
queue len is 10
@0---2
queue len is 11
@0---2
queue len is 12
@0---3
queue len is 13
@0---3
queue len is 14
@0---4
queue len is 15
bye

Process finished with the exit code 0

*/

6-5 传统同步机制

传统同步机制

  • WaitGroup
  • Mutex
  • Cond

code0 init atomic

package main

import (
    "fmt"
    "time"
)

type atomicInt int

func (a *atomicInt) increment() {
    *a++
}
func (a *atomicInt) get() int {
    return int(*a)
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a)
}
/**
2

Process finished with the exit code 0
 */
PS E:\Projects\GolandProjects\go-camp\mooc\code\learngo\basic\atomic> go run -race .\atomic.go
==================
WARNING: DATA RACE
Read at 0x00c00000e0d8 by main goroutine:
  main.main()
      E:/Projects/GolandProjects/go-camp/mooc/code/learngo/basic/atomic/atomic.go:24 +0xee

Previous write at 0x00c00000e0d8 by goroutine 7:
  main.(*atomicInt).increment()
      E:/Projects/GolandProjects/go-camp/mooc/code/learngo/basic/atomic/atomic.go:11 +0x45
  main.main.func1()
      E:/Projects/GolandProjects/go-camp/mooc/code/learngo/basic/atomic/atomic.go:21 +0x2e

Goroutine 7 (finished) created at:
  main.main()

data race 存在

code01 加上锁的

package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock  sync.Mutex
}

func (a *atomicInt) increment() {
    a.lock.Lock()

    defer a.lock.Unlock()
    a.value++
}
func (a *atomicInt) get() int {
    a.lock.Lock()

    defer a.lock.Unlock()
    return a.value
}

func main() {
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}

/**
2

Process finished with the exit code 0
*/
PS E:\Projects\GolandProjects\go-camp\mooc\code\learngo\basic\atomic> go run -race .\atomic.go
2
PS E:\Projects\GolandProjects\go-camp\mooc\code\learngo\basic\atomic>

这回没有data race了

code 02 部分加锁 (匿名函数)

func (a *atomicInt) increment() {
    fmt.Println("safe increment")
    func() {
        a.lock.Lock()

        defer a.lock.Unlock()
        a.value++
    }()
}
safe increment
safe increment
2

Process finished with the exit code 0

协程Coroutine

  • 轻量级“线程”
  • 非抢占式多任务处理,由协程主动交出控制权
  • 编译器/解释器/虚拟机层面的多任务
  • 多个协程可能在一个或多个线程上运行

这可能是最容易理解的 Go Mutex 源码剖析

6-6 并发模式(上)

code00 init

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan 是一等公民
func msgGen() chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
            // sprintf 作为字符串打印
            c <- fmt.Sprintf("message %d", i)
            i++
        }
    }()
    return c
}
func main() {
    // 生成消息
    m := msgGen()
    for {
        fmt.Println(<-m)

    }

}

/**
message 0
message 1
message 2
message 3
message 4
message 5
message 6

 */

code01 加上方向

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan 是一等公民
func msgGen() <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
            // sprintf 作为字符串打印
            c <- fmt.Sprintf("message %d", i)
            i++
        }
    }()
    return c
}
func main() {
    // 生成消息
    m := msgGen()
    for {
        fmt.Println(<-m)
        //m<- "abc"
        // 没有办法发数据
    }

}

/**
message 0
message 1
message 2
message 3
message 4
message 5
message 6

*/

code 02 开两个 msgGen

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan 是一等公民
func msgGen() <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
            // sprintf 作为字符串打印
            c <- fmt.Sprintf("message %d", i)
            i++
        }
    }()
    return c
}
func main() {
    // 生成消息
    m1 := msgGen()
    m2 := msgGen()
    for {
        fmt.Println(<-m1)
        fmt.Println(<-m2)
        //m<- "abc"
        // 没有办法发数据
    }

}

/**
message 0
message 0
message 1
message 1
message 2
message 2
message 3
message 3
message 4
...

*/

code03 加上参数

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan 是一等公民
func msgGen(name string) <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
            // sprintf 作为字符串打印
            c <- fmt.Sprintf("service %s: message %d", name, i)
            i++
        }
    }()
    return c
}
func main() {
    // 生成消息
    m1 := msgGen("服务A")
    m2 := msgGen("服务B")
    for {
        fmt.Println(<-m1)
        fmt.Println(<-m2)
        //m<- "abc"
        // 没有办法发数据
    }

}

/**
service 服务A: message 0
service 服务B: message 0
service 服务A: message 1
service 服务B: message 1
service 服务A: message 2
service 服务B: message 2
service 服务A: message 3
service 服务B: message 3
service 服务A: message 4
*/

上面是交替的等待,显然是不对的

code 04 让他们交替

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan 是一等公民
func msgGen(name string) <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
            // sprintf 作为字符串打印
            c <- fmt.Sprintf("service %s: message %d", name, i)
            i++
        }
    }()
    return c
}

func fanIn(c1, c2 <-chan string) chan string {
    c := make(chan string)
    go func() {
        // todo c1,c2 如何调度
        for {
            c <- <-c1
        }
    }()
    go func() {
        for {
            c <- <-c2
        }
    }()
    return c
}
func main() {
    // 生成消息
    m1 := msgGen("服务A")
    m2 := msgGen("服务B")
    m := fanIn(m1, m2)

    for {
        fmt.Println(<-m)
        //m<- "abc"
        // 没有办法发数据
    }

}

/**
service 服务A: message 0
service 服务B: message 0
service 服务B: message 1
service 服务A: message 1
service 服务B: message 2
service 服务B: message 3
service 服务B: message 4
service 服务A: message 2
service 服务B: message 5
service 服务B: message 6
service 服务A: message 3
service 服务B: message 7
service 服务A: message 4
service 服务A: message 5
service 服务B: message 8

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

*/

code 05 select 方式

func fanInBySelect(c1, c2 <-chan string) chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case m := <-c1:
                c <- m
            case m := <-c2:
                c <- m
            }
        }
    }()
    return c
}

比较 非select 和 select

6-7 并发模式(下)

code00 出现问题 goroutine循环变量的坑

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan 是一等公民
func msgGen(name string) <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(2000)))
            // sprintf 作为字符串打印
            c <- fmt.Sprintf("service %s: message %d", name, i)
            i++
        }
    }()
    return c
}

func fanIn(chs ...<-chan string) chan string {
    c := make(chan string)
    for _, ch := range chs {
        go func() {
            // todo c1,c2 如何调度
            for {
                c <- <-ch
            }
        }()
    }

    return c
}

func fanInBySelect(c1, c2 <-chan string) chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case m := <-c1:
                c <- m
            case m := <-c2:
                c <- m
            }
        }
    }()
    return c
}

func main() {
    // 生成消息
    m1 := msgGen("服务A")
    m2 := msgGen("服务B")
    m3 := msgGen("服务C")
    m := fanIn(m1, m2, m3)

    for {
        fmt.Println(<-m)
        //m<- "abc"
        // 没有办法发数据
    }

}

/**
service 服务C: message 0
service 服务C: message 1
service 服务C: message 2
service 服务C: message 3
service 服务C: message 4
service 服务C: message 5
service 服务C: message 6

Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

*/

go语言forin循环的一个bug,导致只能拿到最后一次的

go 开routine 的时候不会拿值,等到 调用的时候才会拿值

code01 解决方式self

使用temp

func fanIn(chs ...<-chan string) chan string {
    c := make(chan string)
    var ch_temp  <-chan string
    for _, ch := range chs {
        ch_temp = ch
        go func() {
            // todo c1,c2 如何调度
            for {
                c <- <-ch_temp
            }
        }()
    }

    return c
}

或者这样

func fanIn(chs ...<-chan string) chan string {
    c := make(chan string)
    for _, ch := range chs {
        chTemp := ch
        go func() {
            // todo c1,c2 如何调度
            for {
                c <- <-chTemp
            }
        }()
    }
    return c
}

终极解决,因为go语言全是值传递

所以

func fanIn(chs ...<-chan string) chan string {
    c := make(chan string)
    for _, ch := range chs {
        //chTemp := ch
        go func(in <-chan string) {
            // todo c1,c2 如何调度
            for {
                c <- <-in
            }
        }(ch)
    }
    return c
}

1635323116816

6-8 并发任务的控制

code00 select 等待 (非阻塞等待)


func nonBlockingWait(c <-chan string) (string, bool) {
    select {
    case m := <-c:
        return m, true
    default:
        return "", false
    }
}

func main() {
    // 生成消息
    m1 := msgGen("服务A")
    m2 := msgGen("服务B")

    for {
        fmt.Println(<-m1)
        if m, ok := nonBlockingWait(m2); ok {
            fmt.Println(m)
        }else {
            fmt.Println("no message from serve")
        }
    }
}

/**
service 服务A: message 0
service 服务B: message 0
service 服务A: message 1
service 服务B: message 1
service 服务A: message 2
no message from serve
service 服务A: message 3
no message from serve
service 服务A: message 4
no message from serve
service 服务A: message 5
service 服务B: message 2
 */

code01 timeout 支持设置等待时间(超时机制)


func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) {
    select {
    case m := <-c:
        return m, true
    case <-time.After(timeout):
        // 没有等到
        return "", false
    }
}

func main() {
    // 生成消息
    m1 := msgGen("服务A")
    m2 := msgGen("服务B")

    for {
        fmt.Println(<-m1)
        if m, ok := timeoutWait(m2,2*time.Second); ok {
            fmt.Println(m)
        } else {
            fmt.Println("no message from serve")
        }
    }
}

/**
service 服务A: message 0
service 服务B: message 0
service 服务A: message 1
service 服务B: message 1
service 服务A: message 2
service 服务B: message 2
service 服务A: message 3
service 服务B: message 3
service 服务A: message 4

*/

当 等待时间小于任务间隔时间 ,可能会出现 等待 (走到else {no message from serve")分支


func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) {
    select {
    case m := <-c:
        return m, true
    case <-time.After(timeout):
        // 没有等到
        return "", false
    }
}

func main() {
    // 生成消息
    m1 := msgGen("服务A")
    m2 := msgGen("服务B")

    for {
        fmt.Println(<-m1)
        if m, ok := timeoutWait(m2,1*time.Second); ok {
            fmt.Println(m)
        } else {
            fmt.Println("no message from serve")
        }
    }
}

/**
service 服务A: message 0
service 服务B: message 0
service 服务A: message 1
service 服务B: message 1
service 服务A: message 2
no message from serve
service 服务A: message 3
service 服务B: message 2

*/

code02 处理中断

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan 是一等公民
func msgGen(name string) <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
            // sprintf 作为字符串打印
            c <- fmt.Sprintf("service %s: message %d", name, i)
            i++
        }
    }()
    return c
}

func fanIn(chs ...<-chan string) chan string {
    c := make(chan string)
    for _, ch := range chs {
        //chTemp := ch
        go func(in <-chan string) {
            // todo c1,c2 如何调度
            for {
                c <- <-in
            }
        }(ch)
    }
    return c
}

func fanInBySelect(c1, c2 <-chan string) chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case m := <-c1:
                c <- m
            case m := <-c2:
                c <- m
            }
        }
    }()
    return c
}

func nonBlockingWait(c <-chan string) (string, bool) {
    select {
    case m := <-c:
        return m, true
    default:
        return "", false
    }
}

func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) {
    select {
    case m := <-c:
        return m, true
    case <-time.After(timeout):
        // 没有等到
        return "", false
    }
}

func main() {
    // 生成消息
    m1 := msgGen("服务A")
    m2 := msgGen("服务B")

    for i:=0;i<5; i++{
        fmt.Println(<-m1)
        if m, ok := timeoutWait(m2,1*time.Second); ok {
            fmt.Println(m)
        } else {
            fmt.Println("no message from serve")
        }
    }
}

/**
service 服务A: message 0
service 服务B: message 0
service 服务A: message 1
no message from serve
service 服务A: message 2
service 服务B: message 1
service 服务A: message 3
no message from serve
service 服务A: message 4
service 服务B: message 2

Process finished with the exit code 0


*/

考虑时间场景中: 我的等待代码部分,是正在做事情,而我不希望 异常中断

准备做成: 能够通知我 退出 信号

// chan bool 或者 chan struct{}
// 这个 chan struct{} 里面没有任何的数据,他比bool更加的省空间
func msgGen(name string, done chan struct{}) <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            // 判断 done 是否在
            select {
            case <-time.After(time.Millisecond * time.Duration(rand.Intn(5000))):
                c <- fmt.Sprintf("service %s: message %d", name, i)
            case <-done:
                // 证明我是主动的退出
                fmt.Println("cleaning up")
                return
            }
            i++
        }
    }()
    return c
}

主函数


func main() {

    done :=make(chan struct{})
    // 生成消息
    m1 := msgGen("服务A",done)
    m2 := msgGen("服务B",done)

    for i := 0; i < 5; i++ {
        fmt.Println(<-m1)
        if m, ok := timeoutWait(m2, 1*time.Second); ok {
            fmt.Println(m)
        } else {
            fmt.Println("no message from serve")
            //time.Sleep(2*time.Second)
        }
    }
    done<- struct{}{}
    // 送完停止信号,为了让看到结果,再等他个2s钟
    time.Sleep(2*time.Second)

}

/**
service 服务A: message 0
service 服务B: message 0
service 服务A: message 1
no message from serve
service 服务A: message 2
service 服务B: message 1
service 服务A: message 3
no message from serve
service 服务A: message 4
service 服务B: message 2
cleaning up

Process finished with the exit code 0

*/

大部分的chan都是单向的

这个done可以做成双向的,可以在清理完后,再通知主函数可以正常退出了

code03 优雅的退出

服务器的优雅退出

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// chan bool 或者 chan struct{}
// 这个 chan struct{} 里面没有任何的数据,他比bool更加的省空间
func msgGen(name string, done chan struct{}) <-chan string {
    c := make(chan string)
    go func() {
        i := 0
        for {
            // 判断 done 是否在
            select {
            case <-time.After(time.Millisecond * time.Duration(rand.Intn(5000))):
                c <- fmt.Sprintf("service %s: message %d", name, i)
            case <-done:
                // 证明我是主动的退出
                fmt.Println("cleaning up")
                time.Sleep(2*time.Second)
                fmt.Println("cleaning over")
                //done <- "cleaning over"
                done<- struct{}{}
                return
            }
            i++
        }
    }()
    return c
}

func fanIn(chs ...<-chan string) chan string {
    c := make(chan string)
    for _, ch := range chs {
        //chTemp := ch
        go func(in <-chan string) {
            // todo c1,c2 如何调度
            for {
                c <- <-in
            }
        }(ch)
    }
    return c
}

func fanInBySelect(c1, c2 <-chan string) chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case m := <-c1:
                c <- m
            case m := <-c2:
                c <- m
            }
        }
    }()
    return c
}

func nonBlockingWait(c <-chan string) (string, bool) {
    select {
    case m := <-c:
        return m, true
    default:
        return "", false
    }
}

func timeoutWait(c <-chan string, timeout time.Duration) (string, bool) {
    select {
    case m := <-c:
        return m, true
    case <-time.After(timeout):
        // 没有等到
        return "", false
    }
}



func main() {

    done :=make(chan struct{})
    // 生成消息
    m1 := msgGen("服务A",done)
    m2 := msgGen("服务B",done)

    for i := 0; i < 5; i++ {
        fmt.Println(<-m1)
        if m, ok := timeoutWait(m2, 1*time.Second); ok {
            fmt.Println(m)
        } else {
            fmt.Println("no message from serve")
            //time.Sleep(2*time.Second)
        }
    }
    done<- struct{}{}
    // 送完停止信号,为了让看到结果,再等他个2s钟
    <-done

}

/**
service 服务A: message 0
service 服务B: message 0
service 服务A: message 1
no message from serve
service 服务A: message 2
service 服务B: message 1
service 服务A: message 3
no message from serve
service 服务A: message 4
service 服务B: message 2
cleaning up
cleaning over

Process finished with the exit code 0


*/


results matching ""

    No results matching ""