channel #
遍历一个未关闭的channel会造成死循环
channel的使用场景 #
把channel用在数据流动的地方:
- 消息传递、消息过滤
- 信号广播
- 事件订阅与广播
- 请求、响应转发
- 任务分发
- 结果汇总
- 并发控制
- 同步与异步
- …
channel的基本操作和注意事项 #
channel存在3种状态
:
- nil,未初始化的状态,只进行了声明,或者手动赋值为
nil
- active,正常的channel,可读或者可写
- closed,已关闭,千万不要误认为关闭channel后,channel的值是nil
channel可进行3种操作
:
- 读
- 写
- 关闭
把这3种操作和3种channel状态可以组合出9种情况
:
操作 | nil的channel | 正常channel | 已关闭channel |
---|---|---|---|
<- ch | 阻塞 | 成功或阻塞 | 读到零值 |
ch <- | 阻塞 | 成功或阻塞 | panic |
close(ch) | panic | 成功 | panic |
对于nil通道的情况,也并非完全遵循上表,有1个特殊场景:当nil
的通道在select
的某个case
中时,这个case会阻塞,但不会造成死锁。
如何判断通道为空 #
if len(channel) == 0 {
// 通道为空
}
select {
case <-channel:
// 通道不为空,可以接收元素
default:
// 通道为空
}
如何判断通道已关闭 #
v, ok := <-ch
通道各种花里胡哨用法
https://learnku.com/articles/71310
https://cloud.tencent.com/developer/article/1911948
https://segmentfault.com/a/1190000017958702
https://www.jianshu.com/p/554e210bdca4
https://www.cnblogs.com/jiujuan/p/16014608.html
https://colobu.com/2016/04/14/Golang-Channels/
1、一个经典的算法题 #
有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出自己的编号,要求写一个程序,让输出的编号总是按照 1、2、3、4、1、2、3、4… 的顺序打印出来
func main() {
// 4个channel
chs := make([]chan int, 4)
for i, _ := range chs {
chs[i] = make(chan int)
// 开4个协程
go func(i int) {
for {
// 获取当前channel值并打印
v := <-chs[i]
fmt.Println(v + 1)
time.Sleep(time.Second)
// 把下一个值写入下一个channel,等待下一次消费
chs[(i+1)%4] <- (v + 1) % 4
}
}(i)
}
// 往第一个塞入0
chs[0] <- 0
select {}
}
2、限流器 #
func main() {
// 每次处理3个请求
chLimit := make(chan struct{}, 3)
for i := 0; i < 20; i++ {
chLimit <- struct{}{}
go func(i int) {
fmt.Println("下游服务处理逻辑...", i)
time.Sleep(time.Second * 3)
<-chLimit
}(i)
}
time.Sleep(30 * time.Second)
}
如果觉得 sleep 太丑太暴力,可以用 waitGroup 控制结束时机
var wg sync.WaitGroup
func main() {
// 每次处理3个请求
chLimit := make(chan struct{}, 3)
for i := 0; i < 20; i++ {
chLimit <- struct{}{}
wg.Add(1)
go func(i int) {
fmt.Println("下游服务处理逻辑...", i)
time.Sleep(time.Second * 3)
<-chLimit
wg.Done()
}(i)
}
wg.Wait()
}
3、优雅退出 #
func main() {
var closing = make(chan struct{})
var closed = make(chan struct{})
go func() {
for {
select {
case <-closing:
return
default:
fmt.Println("业务逻辑...")
time.Sleep(1 * time.Second)
}
}
}()
termChan := make(chan os.Signal)
// 监听退出信号
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
<-termChan
// 退出中
close(closing)
// 退出之前清理一下
go doCleanup(closed)
select {
case <-closed:
case <-time.After(time.Second):
log.Println("清理超时不等了")
}
log.Println("优雅退出")
}
func doCleanup(closed chan struct{}) {
time.Sleep(time.Minute)
// 清理完后退出
close(closed)
}
4、实现互斥锁 #
初始化一个缓冲区为 1 的 channel,放入元素代表一把锁,谁获取到这个元素就代表获取了这把锁,释放锁的时候再把这个元素放回 channel
type Mutex struct {
ch chan struct{}
}
// 初始化锁
func NewMutex() *Mutex {
mu := &Mutex{make(chan struct{}, 1)}
mu.ch <- struct{}{}
return mu
}
// 加锁,阻塞获取
func (m *Mutex) Lock() {
<- m.ch
}
// 释放锁
func (m *Mutex) Unlock() {
select {
// 成功写入channel代表释放成功
case m.ch <- struct{}{}:
default:
panic("unlock of unlocked mutex")
}
}
// 尝试获取锁
func (m *Mutex) TryLock() bool {
select {
case <-m.ch:
return true
default:
}
return false
}
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
timer := time.NewTimer(timeout)
select {
case <-m.ch:
// 成功获取锁关闭定时器
timer.Stop()
return true
case <-timer.C:
}
// 获取锁超时
return false
}
// 是否上锁
func (m *Mutex) IsLocked() bool {
return len(m.ch) == 0
}
func main() {
m := NewMutex()
ok := m.TryLock()
log.Printf("locked v %v\n", ok)
ok = m.TryLock()
log.Printf("locked v %v\n", ok)
go func() {
time.Sleep(5*time.Second)
m.Unlock()
}()
ok = m.LockTimeout(10*time.Second)
log.Printf("LockTimeout v %v\n", ok)
}
定向通道 #
我们在将一个 channel 变量传递到一个函数时,可以通过将其指定为单向 channel 变量,从而限制该函数中可以对此 channel 的操作,比如只能往这个 channel 写,或者只能从这个 channel 读。
(1)单向 channel 变量的声明 单向 channel 变量的声明非常简单,只能发送的通道类型为chan<-,只能接收的通道类型为<-chan,格式如下:
var 通道实例 chan<- 元素类型 // 只能发送通道
var 通道实例 <-chan 元素类型 // 只能接收通道
使用for range读channel #
当需要不断从channel读取数据时。
原理 #
使用for-range
读取channel,这样既安全又便利,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值。
用法 #
for x := range ch{
fmt.Println(x)
}
使用v,ok := <-ch
+ select
操作判断channel是否关闭
#
v,ok := <-ch
+ select
操作判断channel是否关闭
原理 #
ok的结果和含义:
- `true`:读到通道数据,不确定是否关闭,可能channel还有保存的数据,但channel已关闭。
- `false`:通道关闭,无数据读到。
从关闭的channel读值读到是channel所传递数据类型的零值,这个零值有可能是发送者发送的,也可能是channel关闭了。
_, ok := <-ch
与select配合使用的,当ok为false时,代表了channel已经close。下面解释原因,_,ok := <-ch
对应的函数是func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
,入参block含义是当前goroutine是否可阻塞,当block为false代表的是select操作,不可阻塞当前goroutine的在channel操作,否则是普通操作(即_, ok
不在select中)。返回值selected代表当前操作是否成功,主要为select服务,返回received代表是否从channel读到有效值。它有3种返回值情况:
- block为false,即执行select时,如果channel为空,返回(false,false),代表select操作失败,没接收到值。
- 否则,如果channel已经关闭,并且没有数据,ep即接收数据的变量设置为零值,返回(true,false),代表select操作成功,但channel已关闭,没读到有效值。
- 否则,其他读到有效数据的情况,返回(true,ture)。
我们考虑_, ok := <-ch
和select
结合使用的情况。
情况1:当chanrecv返回(false,false)时,本质是select操作失败了,所以相关的case会阻塞,不会执行,比如下面的代码:
func main() {
ch := make(chan int)
select {
case v, ok := <-ch:
fmt.Printf("v: %v, ok: %v\n", v, ok)
default:
fmt.Println("nothing")
}
}
// 结果:
// nothing
情况2:下面的结果会是零值和false:
func main() {
ch := make(chan int)
// 增加关闭
close(ch)
select {
case v, ok := <-ch:
fmt.Printf("v: %v, ok: %v\n", v, ok)
}
}
// v: 0, ok: false
情况3的received
为true,即_, ok
中的ok
为true,不做讨论了,只讨论ok
为false的情况。
最后ok
为false的时候,只有情况2,此时channel必然已经关闭,我们便可以在select
中用ok
判断channel是否已经关闭。
用法 #
下面例子展示了,向channel写数据然后关闭,依然可以从已关闭channel读到有效数据,但channel关闭且没有数据时,读不到有效数据,ok为false,可以确定当前channel已关闭。
// demo_select6.go
func main() {
ch := make(chan int, 1)
// 发送1个数据关闭channel
ch <- 1 close(ch)
print("close channel\n")
// 不停读数据直到channel没有有效数据
for {
select {
case v, ok := <-ch:
print("v: ", v, ", ok:", ok, "\n")
if !ok {
print("channel is close\n")
return
}
default:
print("nothing\n")
}
}
}
// 结果
// close channel
// v: 1, ok:true
// v: 0, ok:false
// channel is close
使用select处理多个channel #
需要对多个通道进行同时处理,但只处理最先发生的channel时
原理 #
select
可以同时监控多个通道的情况,只处理未阻塞的case。当通道为nil时,对应的case永远为阻塞,无论读写。特殊关注:普通情况下,对nil的通道写操作是要panic的。
用法 #
// 分配job时,如果收到关闭的通知则退出,不分配job
func (h *Handler) handle(job *Job) {
select {
case h.jobCh<-job:
return
case <-h.stopCh:
return
}
}
使用channel的声明控制读写权限 #
协程对某个通道只读或只写时
目的:
- 使代码更易读、更易维护,
- 防止只读协程对通道进行写数据,但通道已关闭,造成panic。
用法 #
- 如果协程对某个channel只有写操作,则这个channel声明为只写。
- 如果协程对某个channel只有读操作,则这个channe声明为只读。
// 只有generator进行对outCh进行写操作,返回声明
// <-chan int,可以防止其他协程乱用此通道,造成隐藏bug
func generator(int n) <-chan int {
outCh := make(chan int)
go func(){
for i:=0;i<n;i++{
outCh<-i
}
}()
return outCh
}
// consumer只读inCh的数据,声明为<-chan int
// 可以防止它向inCh写数据
func consumer(inCh <-chan int) {
for x := range inCh {
fmt.Println(x)
}
}
使用缓冲channel增强并发 #
异步
原理 #
有缓冲通道可供多个协程同时处理,在一定程度可提高并发性。
用法 #
// 无缓冲
ch1 := make(chan int)
ch2 := make(chan int, 0)
// 有缓冲
ch3 := make(chan int, 1)
// 使用5个`do`协程同时处理输入数据
func test() {
inCh := generator(100)
outCh := make(chan int, 10)
for i := 0; i < 5; i++ {
go do(inCh, outCh)
}
for r := range outCh {
fmt.Println(r)
}
}
func do(inCh <-chan int, outCh chan<- int) {
for v := range inCh {
outCh <- v * v
}
}
为操作加上超时 #
需要超时控制的操作
原理 #
使用select
和time.After
,看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果
用法 #
func doWithTimeOut(timeout time.Duration) (int, error) {
select {
case ret := <-do():
return ret, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}
func do() <-chan int {
outCh := make(chan int)
go func() {
// do work
}()
return outCh
}
使用time实现channel无阻塞读写 #
并不希望在channel的读写上浪费时间
原理 #
是为操作加上超时的扩展,这里的操作是channel的读或写
用法 #
func unBlockRead(ch chan int) (x int, err error) {
select {
case x = <-ch:
return x, nil
case <-time.After(time.Microsecond):
return 0, errors.New("read time out")
}
}
func unBlockWrite(ch chan int, x int) (err error) {
select {
case ch <- x:
return nil
case <-time.After(time.Microsecond):
return errors.New("read time out")
}
}
注:time.After等待可以替换为default,则是channel阻塞时,立即返回的效果
使用close(ch)
关闭所有下游协程
#
退出时,显示通知所有协程退出
原理 #
所有读ch
的协程都会收到close(ch)
的信号
用法 #
func (h *Handler) Stop() {
close(h.stopCh)
// 可以使用WaitGroup等待所有协程退出
}
// 收到停止后,不再处理请求
func (h *Handler) loop() error {
for {
select {
case req := <-h.reqCh:
go handle(req)
case <-h.stopCh:
return
}
}
}
使用chan struct{}
作为信号channel
#
使用channel传递信号,而不是传递数据时
原理 #
没数据需要传递时,传递空struct
用法 #
// 上例中的Handler.stopCh就是一个例子,stopCh并不需要传递任何数据
// 只是要给所有协程发送退出的信号
type Handler struct {
stopCh chan struct{}
reqCh chan *Request
}
使用channel传递结构体的指针而非结构体 #
使用channel传递结构体数据时
原理 #
channel本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效
用法 #
reqCh chan *Request
// 好过
reqCh chan Request
使用channel传递channel #
使用场景有点多,通常是用来获取结果。
原理 #
channel可以用来传递变量,channel自身也是变量,可以传递自己。