0%

问题:为什么PBFT的节点数量是3f+1?

pbft的论文提到这样2段话,可以很好的解决这个问题:


page3

在存在f个faulty节点的情况下,3f+1是保证系统安全性和活跃性的最小的总节点数量。当存在f个节点不响应的情况下,需要n-f个正常节点达成共识需要保障n-f > f。另外一种情况:f个响应的节点是错误的(响应错误数据),f个节点没有响应,但他们不是faulty的,所以要保证好的响应的节点 - 好的未响应的节点 - 坏的响应的节点 > 坏的节点,即需要n - f -f > fn > 3f。但n只能是整数,所以n >= 3f + 1


page3

副本节点的数量设为R,为了简便使R = 3f + 1。尽管存在副本节点数量多于3f+1的情况,比如3f+2, 3f+3但多出来的节点没有带来任何改善,反而降低了系统的性能,因为需要更多的通信量

为什么这样说呢?

3f+2, 3f+3的实际能容错的节点数量与3f+1是相同的,即只能容忍f个faulty的节点。

根据论文,当n = 3f+1时,

  • 在Prepare进入Commit阶段,需要2f个Prepare消息,即需要n - 1 - f条Prepare消息。
  • 在Commit阶段完成,需要2f+1条Commit消息,即需要n - f条Commit消息。

请思考,当n = 3f+2时,

  • 在Prepare进入Commit阶段,需要n - 1 - f个Prepare消息,即2f+1需要条Prepare消息。
  • 在Commit阶段完成,需要n - 1 - f条Commit消息,即需要2f+1条Commit消息。
  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/01/23/why-pbft-using-3f-plus-1/
关注公众号,获取最新Golang文章

这篇文章总结了channel的11种常用操作,以一个更高的视角看待channel,会给大家带来对channel更全面的认识。

在介绍11种操作前,先简要介绍下channel的使用场景、基本操作和注意事项。

channel的使用场景

把channel用在数据流动的地方

  1. 消息传递、消息过滤
  2. 信号广播
  3. 事件订阅与广播
  4. 请求、响应转发
  5. 任务分发
  6. 结果汇总
  7. 并发控制
  8. 同步与异步

channel的基本操作和注意事项

channel存在3种状态

  1. nil,未初始化的状态,只进行了声明,或者手动赋值为nil
  2. active,正常的channel,可读或者可写
  3. closed,已关闭,千万不要误认为关闭channel后,channel的值是nil

channel可进行3种操作

  1. 关闭

把这3种操作和3种channel状态可以组合出9种情况

操作 nil的channel 正常channel 已关闭channel
<- ch 阻塞 成功或阻塞 读到零值
ch <- 阻塞 成功或阻塞 panic
close(ch) panic 成功 panic

对于nil通道的情况,也并非完全遵循上表,有1个特殊场景:当nil的通道在select的某个case中时,这个case会阻塞,但不会造成死锁。

参考代码请看:https://dave.cheney.net/2014/03/19/channel-axioms

下面介绍使用channel的10种常用操作。

1. 使用for range读channel

场景

当需要不断从channel读取数据时。

原理

使用for-range读取channel,这样既安全又便利,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值。

用法

1
2
3
for x := range ch{
fmt.Println(x)
}

2. 使用v,ok := <-ch + select操作判断channel是否关闭

场景

v,ok := <-ch + select操作判断channel是否关闭

原理

ok的结果和含义:
- true:读到通道数据,不确定是否关闭,可能channel还有保存的数据,但channel已关闭。
- false:通道关闭,无数据读到。

从关闭的channel读值读到是channel所传递数据类型的零值,这个零值有可能是发送者发送的,也可能是channel关闭了。

_, ok := <-ch与select配合使用的,当ok为false时,代表了channel已经close。下面解释原因, _,ok := <-ch对应的函数是func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool),入参block含义是当前goroutine是否可阻塞,当block为false代表的是select操作,不可阻塞当前goroutine的在channel操作,否则是普通操作(即_, ok不在select中)。返回值selected代表当前操作是否成功,主要为select服务,返回received代表是否从channel读到有效值。它有3种返回值情况:

  1. block为false,即执行select时,如果channel为空,返回(false,false),代表select操作失败,没接收到值。
  2. 否则,如果channel已经关闭,并且没有数据,ep即接收数据的变量设置为零值,返回(true,false),代表select操作成功,但channel已关闭,没读到有效值。
  3. 否则,其他读到有效数据的情况,返回(true,ture)。

我们考虑_, ok := <-chselect结合使用的情况。

情况1:当chanrecv返回(false,false)时,本质是select操作失败了,所以相关的case会阻塞,不会执行,比如下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
ch := make(chan int)
select {
case v, ok := <-ch:
fmt.Printf("v: %v, ok: %v\n", v, ok)
default:
fmt.Println("nothing")
}
}

// 结果:
// nothing

情况2:下面的结果会是零值和false:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
ch := make(chan int)

// 增加关闭
close(ch)

select {
case v, ok := <-ch:
fmt.Printf("v: %v, ok: %v\n", v, ok)
}
}

// v: 0, ok: false

情况3的received为true,即_, ok中的ok为true,不做讨论了,只讨论ok为false的情况。

最后ok为false的时候,只有情况2,此时channel必然已经关闭,我们便可以在select中用ok判断channel是否已经关闭。

用法

下面例子展示了,向channel写数据然后关闭,依然可以从已关闭channel读到有效数据,但channel关闭且没有数据时,读不到有效数据,ok为false,可以确定当前channel已关闭。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// demo_select6.go
func main() {
ch := make(chan int, 1)

// 发送1个数据关闭channel
ch <- 1
close(ch)
print("close channel\n")

// 不停读数据直到channel没有有效数据
for {
select {
case v, ok := <-ch:
print("v: ", v, ", ok:", ok, "\n")
if !ok {
print("channel is close\n")
return
}
default:
print("nothing\n")
}
}
}

// 结果
// close channel
// v: 1, ok:true
// v: 0, ok:false
// channel is close

更多见golang_step_by_step/channel/ok仓库中ok和select的示例,或者阅读channel源码。

3. 使用select处理多个channel

场景

需要对多个通道进行同时处理,但只处理最先发生的channel时

原理

select可以同时监控多个通道的情况,只处理未阻塞的case。当通道为nil时,对应的case永远为阻塞,无论读写。特殊关注:普通情况下,对nil的通道写操作是要panic的

用法

1
2
3
4
5
6
7
8
9
// 分配job时,如果收到关闭的通知则退出,不分配job
func (h *Handler) handle(job *Job) {
select {
case h.jobCh<-job:
return
case <-h.stopCh:
return
}
}

4. 使用channel的声明控制读写权限

场景

协程对某个通道只读或只写时

目的:

  1. 使代码更易读、更易维护,
  2. 防止只读协程对通道进行写数据,但通道已关闭,造成panic。

用法

  • 如果协程对某个channel只有写操作,则这个channel声明为只写。
  • 如果协程对某个channel只有读操作,则这个channe声明为只读。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 只有generator进行对outCh进行写操作,返回声明
// <-chan int,可以防止其他协程乱用此通道,造成隐藏bug
func generator(int n) <-chan int {
outCh := make(chan int)
go func(){
for i:=0;i<n;i++{
outCh<-i
}
}()
return outCh
}

// consumer只读inCh的数据,声明为<-chan int
// 可以防止它向inCh写数据
func consumer(inCh <-chan int) {
for x := range inCh {
fmt.Println(x)
}
}

5. 使用缓冲channel增强并发

场景

异步

原理

有缓冲通道可供多个协程同时处理,在一定程度可提高并发性。

用法

1
2
3
4
5
// 无缓冲
ch1 := make(chan int)
ch2 := make(chan int, 0)
// 有缓冲
ch3 := make(chan int, 1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 使用5个`do`协程同时处理输入数据
func test() {
inCh := generator(100)
outCh := make(chan int, 10)

for i := 0; i < 5; i++ {
go do(inCh, outCh)
}

for r := range outCh {
fmt.Println(r)
}
}

func do(inCh <-chan int, outCh chan<- int) {
for v := range inCh {
outCh <- v * v
}
}

6. 为操作加上超时

场景

需要超时控制的操作

原理

使用selecttime.After,看操作和定时器哪个先返回,处理先完成的,就达到了超时控制的效果

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func doWithTimeOut(timeout time.Duration) (int, error) {
select {
case ret := <-do():
return ret, nil
case <-time.After(timeout):
return 0, errors.New("timeout")
}
}

func do() <-chan int {
outCh := make(chan int)
go func() {
// do work
}()
return outCh
}

7. 使用time实现channel无阻塞读写

场景

并不希望在channel的读写上浪费时间

原理

是为操作加上超时的扩展,这里的操作是channel的读或写

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func unBlockRead(ch chan int) (x int, err error) {
select {
case x = <-ch:
return x, nil
case <-time.After(time.Microsecond):
return 0, errors.New("read time out")
}
}

func unBlockWrite(ch chan int, x int) (err error) {
select {
case ch <- x:
return nil
case <-time.After(time.Microsecond):
return errors.New("read time out")
}
}

注:time.After等待可以替换为default,则是channel阻塞时,立即返回的效果

8. 使用close(ch)关闭所有下游协程

场景

退出时,显示通知所有协程退出

原理

所有读ch的协程都会收到close(ch)的信号

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (h *Handler) Stop() {
close(h.stopCh)

// 可以使用WaitGroup等待所有协程退出
}

// 收到停止后,不再处理请求
func (h *Handler) loop() error {
for {
select {
case req := <-h.reqCh:
go handle(req)
case <-h.stopCh:
return
}
}
}

9. 使用chan struct{}作为信号channel

场景

使用channel传递信号,而不是传递数据时

原理

没数据需要传递时,传递空struct

用法

1
2
3
4
5
6
// 上例中的Handler.stopCh就是一个例子,stopCh并不需要传递任何数据
// 只是要给所有协程发送退出的信号
type Handler struct {
stopCh chan struct{}
reqCh chan *Request
}

10. 使用channel传递结构体的指针而非结构体

场景

使用channel传递结构体数据时

原理

channel本质上传递的是数据的拷贝,拷贝的数据越小传输效率越高,传递结构体指针,比传递结构体更高效

用法

1
2
3
4
reqCh chan *Request

// 好过
reqCh chan Request

11. 使用channel传递channel

场景

使用场景有点多,通常是用来获取结果。

原理

channel可以用来传递变量,channel自身也是变量,可以传递自己。

用法

下面示例展示了有序展示请求的结果,另一个示例可以见另外文章的版本3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func main() {
reqs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9}

// 存放结果的channel的channel
outs := make(chan chan int, len(reqs))
var wg sync.WaitGroup
wg.Add(len(reqs))
for _, x := range reqs {
o := handle(&wg, x)
outs <- o
}

go func() {
wg.Wait()
close(outs)
}()

// 读取结果,结果有序
for o := range outs {
fmt.Println(<-o)
}
}

// handle 处理请求,耗时随机模拟
func handle(wg *sync.WaitGroup, a int) chan int {
out := make(chan int)
go func() {
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
out <- a
wg.Done()
}()
return out
}

推荐阅读

本文介绍的channel特性,大多在过去的文章中已详细介绍,可按需求阅读。

  1. Golang并发模型:并发协程的优雅退出
  2. Golang并发模型:轻松入门select
  3. Golang并发模型:select进阶
  4. Golang并发模型:轻松入门协程池
  5. Golang并发模型:再也不愁选channel还是选锁
  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/01/20/golang-channel-all-usage/
关注公众号,获取最新Golang文章。
一起学Golang-分享有料的Go语言技术

周末又到了,为大家准备了一份实用干货:如何使用channel和Mutex解决并发问题,利用周末的好时光,配上音乐,思考一下吧🤔。

来,问自己个问题:面对并发问题,是用channel解决,还是用Mutex解决

如果自己心里还没有清晰的答案,那就读下这篇文章,你会了解到:

  • 使用channel解决并发问题的核心思路和示例
  • channel擅长解决什么样的并发问题,Mutex擅长解决什么样的并发问题
  • 一个并发问题该怎么入手解解决
  • 一个重要的plus思维

前戏

前面很多篇的文章都在围绕channel介绍,而只有前一篇sync的文章介绍到了Mutex,不是我偏心,而是channel在Golang是first class级别的,设计在语言特性中的,而Mutex只是一个包中的。这就注定了一个是主角,一个是配角。

并且Golang还有一个并发座右铭,在《Effective Go》的channel介绍中写到:

Share memory by communicating, don’t communicate by sharing memory.
通过通信共享内存,而不是通过共享内存而通信。

Golang以如此明显的方式告诉我们:面对并发问题,你首先想到的应该是channel,因为channel是线程安全的并且不会有数据冲突,比锁好用多了

既生瑜,何生亮。既然有channel了,为啥还提供sync.Mutex呢?

主角不是万能的,他也需要配角。在Golang里,channel也不是万能的,这是由channel的特性和局限造成的。

下面就给大家介绍channel的特点、核心方法和缺点。

channel解决并发问题的思路和示例

channel的核心是数据流动,关注到并发问题中的数据流动,把流动的数据放到channel中,就能使用channel解决这个并发问题。这个思路是从Go语言的核心开发者的演讲中学来的,然而视频我已经找不到了,不然直接共享给大家,他提到了Golang并发的核心实践的4个点:

DataFlow -> Drawing -> Pipieline -> Exiting

DataFlow指数据流动,Drawing指把数据流动画出来,Pipeline指的是流水线,Exit指协程的退出。DataFlow + Drawing就是我提到到channel解决并发问题的思路,Pipeline和Exit是具体的实践模式,Pipeline和Exit我都写过文章,有需要自取:

下面我使用例子具体解释DataFlow + Drawing。借用《Golang并发的次优选择:sync包》中银行的例子,介绍如何使用channel解决例子中银行的并发问题:银行支持多个用户的同时操作。顺便看下同一个并发问题,使用channel和Mutex解决是什么差别。

一起分析下多个用户同时操作银行的数据流动:

  1. 每个人都可以向银行发起请求,请求可以是存、取、查3种操作,并且包含操作时必要的数据,包含的数据只和自身相关。
  2. 银行处理请求后给用户发送响应,包含的数据只和操作用户相关。

你一定发现了上面的数据流动:

  1. 请求数据:个人请求数据流向银行。
  2. 响应数据:银行处理结果数据流向用户。

channel是数据流动的通道/管道,为流动的数据建立通道,这里需要建立2类channel:

  1. reqCh:传送请求的channel,把请求从个人发送给银行。
  2. retCh:传送响应的channel,把响应从银行发给个人。

我们把channel添加到上图中,得到下面的图:

以上就是从数据流动的角度,发现如何使用channel解决并发问题。思路有了,再思考下代码层面需要怎么做:

  1. 银行:
    1. 定义银行,只保存1个map即可
    2. 银行操作:接收和解析请求,并把请求分发给存、取、查函数
    3. 实现存、取、查函数:处理请求,并把结果写入到用户提供的响应通道
  2. 定义请求和响应
  3. 用户:创建请求和接收响应的通道,发送请求后等待响应,提取响应结果
  4. mian函数:创建银行和用户间的请求通道,创建银行、用户等协程,并等待操作完成

以上,我们这个并发问题的逻辑实现和各块工作就清晰了,写起来也方便、简单。代码实现有200多行,公众号不方便查看,可以点阅读原文,一键直达

代码不能贴了,运行结果还是可以的,为了方便理解结果,介绍下示例代码做了什么。main函数创建了银行、小明、小刚3个并发协程:

  1. 银行:从reqCh接收请求,依次处理每个请求,直到通道关闭,把请求交给处理函数,处理函数把结果写入到请求中的retCh
  2. 用户小明:创建了存100、取20、查余额的3个请求,每个请求得到响应后,再把下一个请求写入到reqCh
  3. 用户小刚:流程和小明相同,但存100取200,造成取钱操作失败,他查询下自己又多少钱,得到100。

main函数最后使用WaitGroup等待小明、小刚结束后退出。

下面是运行结果:

1
2
3
4
5
6
7
8
$ go run channel_map.go
xiaogang deposite 100 success
xiaoming deposite 100 success
xiaogang withdraw 200 failed
xiaoming withdraw 20 success
xiaogang has 100
xiaoming has 80
Bank exit

这一遭搞完,发现啥没有?用Mutex直接加锁、解锁完事了,但channel搞出来一坨,是不是用channel解决这个问题不太适合?是的。对于当前这个问题,和Mutex的方案相比,channel的方案显的有点“重”,不够简洁、高效、易用

但这个例子展示了3点:

  1. 使用channel解决并发问题的核心在于关注数据的流动
  2. channel不一定是某个并发问题最好的解决方案
  3. map在并发中,可以不用锁进行保护,而是使用channel

现在,回到了开篇的问题:同一个并发问题,你是用channel解决,还是用mutex解决?下面,一起看看怎么选择。

channel和mutex的选择

面对一个并发问题的时候,应当选择合适的并发方式:channel还是mutex。选择的依据是他们的能力/特性:channel的能力是让数据流动起来,擅长的是数据流动的场景,《Channel or Mutex》中给了3个数据流动的场景:

  1. 传递数据的所有权,即把某个数据发送给其他协程
  2. 分发任务,每个任务都是一个数据
  3. 交流异步结果,结果是一个数据

mutex的能力是数据不动,某段时间只给一个协程访问数据的权限擅长数据位置固定的场景,《Channel or Mutex》中给了2个数据不动场景:

  1. 缓存
  2. 状态,我们银行例子中的map就是一种状态

提供解决并发问题的一个思路

  1. 先找到数据的流动,并且还要画出来,数据流动的路径换成channel,channel的两端设计成协程
  2. 基于画出来的图设计简要的channel方案,代码需要做什么
  3. 这个方案是不是有点复杂,是不是用Mutex更好一点?设计一个简要的Mutex方案,对比&选择易做的、高效的

channel + mutex思维

面对并发问题,除了channel or mutex,你还有另外一个选择:channel plus mutex

一个大并发问题,可以分解成很多小的并发问题,每个小的并发都可以单独选型:channel or mutex。但对于整个大的问题,通常不是channel or mutex,而是channel plus mutex。

如果你是认为是channel and mutex也行,但我更喜欢plus,体现相互配合

总结

读到这里,感觉这篇文章头重脚轻,channel的讲了很多,而channel和mutex的选择却讲的很少。在channel和mutex的选择,实际并没有一个固定答案,也没有固定的方法,但提供了一个简单的思路:设计出channel和Mutex的简单方案,然后选择最适合当前业务、问题的那个。

思考比结论更重要,希望你有所收获

  1. 关注数据的流动,就可以使用channel解决并发问题。
  2. 不流动的数据,如果存在并发访问,尝试使用sync.Mutex保护数据。
  3. channel不一定某个并发问题的最优解。
  4. 不要害怕、拒绝使用mutex,如果mutex是问题的最优解,那就大胆使用。
  5. 对于大问题,channel plus mutex也许才是更好的方案。

参考资料

  1. Effective Go》,https://golang.org/doc/effective_go.html#sharing
  2. Mutex Or Channel》,https://github.com/golang/go/wiki/MutexOrChannel

文章推荐

  1. Golang并发模型:轻松入门流水线模型
  2. Golang并发模型:轻松入门流水线FAN模式
  3. Golang并发模型:并发协程的优雅退出
  4. Golang并发的次优选择:sync包
  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2019/01/14/golang-channel-and-mutex/
关注公众号,获取最新Golang文章。
一起学Golang-分享有料的Go语言技术

我们都知道Golang并发优选channel,但channel不是万能的,Golang为我们提供了另一种选择:sync。通过这篇文章,你会了解sync包最基础、最常用的方法,至于sync和channel之争留给下一篇文章。

sync包提供了基础的异步操作方法,比如互斥锁(Mutex)、单次执行(Once)和等待组(WaitGroup),这些异步操作主要是为低级库提供,上层的异步/并发操作最好选用通道和通信。

sync包提供了:

  1. Mutex:互斥锁
  2. RWMutex:读写锁
  3. WaitGroup:等待组
  4. Once:单次执行
  5. Cond:信号量
  6. Pool:临时对象池
  7. Map:自带锁的map

这篇文章是sync包的入门文章,所以只介绍常用的结构和方法:MutexRWMutexWaitGroupOnce,而CondPoolMap留给大家自行探索,或有需求再介绍。

阅读全文 »

goroutine是非常轻量的,不会暂用太多资源,基本上有多少任务,我们可以开多少goroutine去处理。但有时候,我们还是想控制一下。

比如,我们有A、B两类工作,不想把太多资源花费在B类务上,而是花在A类任务上。对于A,我们可以来1个开一个goroutine去处理,对于B,我们可以使用一个协程池,协程池里有5个线程去处理B类任务,这样B消耗的资源就不会太多。

控制使用资源并不是协程池目的,使用协程池是为了更好并发、程序鲁棒性、容错性等。废话少说,快速入门协程池才是这篇文章的目的。

协程池指的是预先分配固定数量的goroutine处理相同的任务,和线程池是类似的,不同点是协程池中处理任务的是协程,线程池中处理任务的是线程。

最简单的协程池模型

简单协程池模型

上面这个图展示了最简单的协程池的样子。先把协程池作为一个整体看,它使用2个通道,左边的jobCh是任务通道,任务会从这个通道中流进来,右边的retCh是结果通道,协程池处理任务后得到的结果会写入这个通道。至于协程池中,有多少协程处理任务,这是外部不关心的。

看一下协程池内部,图中画了5个goroutine,实际goroutine的数量是依具体情况而定的。协程池内每个协程都从jobCh读任务、处理任务,然后将结果写入到retCh

示例

模型看懂了,看个小例子吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func workerPool(n int, jobCh <-chan int, retCh chan<- string) {
for i := 0; i < n; i++ {
go worker(i, jobCh, retCh)
}
}

func worker(id int, jobCh <-chan int, retCh chan<- string) {
cnt := 0
for job := range jobCh {
cnt++
ret := fmt.Sprintf("worker %d processed job: %d, it's the %dth processed by me.", id, job, cnt)
retCh <- ret
}
}

workerPool()会创建1个简单的协程池,协程的数量可以通入参数n执行,并且还指定了jobChretCh两个参数。

worker()是协程池中的协程,入参分布是它的ID、job通道和结果通道。使用for-rangejobCh读取任务,直到jobCh关闭,然后一个最简单的任务:生成1个字符串,证明自己处理了某个任务,并把字符串作为结果写入retCh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func main() {
jobCh := genJob(10000)
retCh := make(chan string, 10000)
workerPool(5, jobCh, retCh)

time.Sleep(time.Second)
close(retCh)
for ret := range retCh {
fmt.Println(ret)
}
}

func genJob(n int) <-chan int {
jobCh := make(chan int, 200)
go func() {
for i := 0; i < n; i++ {
jobCh <- i
}
close(jobCh)
}()

return jobCh
}

main()启动genJob获取存放任务的通道jobCh,然后创建retCh,它的缓存空间是200,并使用workerPool启动一个有5个协程的协程池。1s之后,关闭retCh,然后开始从retCh中读取协程池处理结果,并打印。

genJob启动一个协程,并生产n个任务,写入到jobCh

示例运行结果如下,一共产生了10个任务,显示大部分工作都被worker 2这个协程抢走了,如果我们设置的任务成千上万,协程池长时间处理任务,每个协程处理的工作数量就会均衡很多。

1
2
3
4
5
6
7
8
9
10
11
➜ go run simple_goroutine_pool.go
worker 2 processed job: 4
worker 2 processed job: 5
worker 2 processed job: 6
worker 2 processed job: 7
worker 2 processed job: 8
worker 2 processed job: 9
worker 0 processed job: 1
worker 3 processed job: 2
worker 4 processed job: 3
worker 1 processed job: 0

回顾

最简单的协程池模型就这么简单,再回头看下协程池及周边由哪些组成:

  1. 协程池内的一定数量的协程。
  2. 任务队列,即jobCh,存在协程池不能立即处理任务的情况,所以需要队列把任务先暂存。
  3. 结果队列,即retCh,同上,协程池处理任务的结果,也存在不能被下游立刻提取的情况,要暂时保存。

协程池最简要(核心)的逻辑是所有协程从任务读取任务,处理后把结果存放到结果队列。

示例源码

本文所有示例源码,及历史文章、代码都存储在Github:https://github.com/Shitaibin/golang_step_by_step/tree/master/goroutine_pool

Go并发系列文章

  1. Golang并发模型:轻松入门流水线模型
  2. Golang并发模型:轻松入门流水线FAN模式
  3. Golang并发模型:并发协程的优雅退出
  4. Golang并发模型:轻松入门select
  5. Golang并发模型:select进阶
  6. Golang并发模型:轻松入门协程池
  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:http://lessisbetter.site/2018/12/11/gongzhonghao-articles/
关注公众号,获取最新Golang文章。
一起学Golang-分享有料的Go语言技术

之前的文章都提到过,Golang的并发模型都来自生活,select也不例外。举个例子:我们都知道一句话,“吃饭睡觉打豆豆”,这一句话里包含了3件事:

  1. 妈妈喊你吃饭,你去吃饭。
  2. 时间到了,要睡觉。
  3. 没事做,打豆豆。

在Golang里,select就是干这个事的:到吃饭了去吃饭,该睡觉了就睡觉,没事干就打豆豆。

结束发散,我们看下select的功能,以及它能做啥。

select功能

在多个通道上进行读或写操作,让函数可以处理多个事情,但1次只处理1个。以下特性也都必须熟记于心:

  1. 每次执行select,都会只执行其中1个case或者执行default语句。
  2. 当没有case或者default可以执行时,select则阻塞,等待直到有1个case可以执行。
  3. 当有多个case可以执行时,则随机选择1个case执行。
  4. case后面跟的必须是读或者写通道的操作,否则编译出错。

select长下面这个样子,由selectcase组成,default不是必须的,如果没其他事可做,可以省略default

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
readCh := make(chan int, 1)
writeCh := make(chan int, 1)

y := 1
select {
case x := <-readCh:
fmt.Printf("Read %d\n", x)
case writeCh <- y:
fmt.Printf("Write %d\n", y)
default:
fmt.Println("Do what you want")
}
}

我们创建了readChwriteCh2个通道:

  1. readCh中没有数据,所以case x := <-readCh读不到数据,所以这个case不能执行。
  2. writeCh是带缓冲区的通道,它里面是空的,可以写入1个数据,所以case writeCh <- y可以执行。
  3. case可以执行,所以default不会执行。

这个测试的结果是

1
2
$ go run example.go
Write 1

用打豆豆实践select

来,我们看看select怎么实现打豆豆:eat()函数会启动1个协程,该协程先睡几秒,事件不定,然后喊你吃饭,main()函数中的sleep是个定时器,每3秒喊你吃1次饭,select则处理3种情况:

  1. eatCh中读到数据,代表有人喊我吃饭,我要吃饭了。
  2. sleep.C中读到数据,代表闹钟时间到了,我要睡觉。
  3. default是,没人喊我吃饭,也不到时间睡觉,我就打豆豆。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import (
"fmt"
"time"
"math/rand"
)

func eat() chan string {
out := make(chan string)
go func (){
rand.Seed(time.Now().UnixNano())
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
out <- "Mom call you eating"
close(out)
}()
return out
}


func main() {
eatCh := eat()
sleep := time.NewTimer(time.Second * 3)
select {
case s := <-eatCh:
fmt.Println(s)
case <- sleep.C:
fmt.Println("Time to sleep")
default:
fmt.Println("Beat DouDou")
}
}

由于前2个case都要等待一会,所以都不能执行,所以执行default,运行结果一直是打豆豆:

1
2
$ go run x.go
Beat DouDou

现在我们不打豆豆了,你把default和下面的打印注释掉,多运行几次,有时候会吃饭,有时候会睡觉,比如这样:

1
2
3
4
5
6
$ go run x.go
Mom call you eating
$ go run x.go
Time to sleep
$ go run x.go
Time to sleep

select很简单但功能很强大,它让golang的并发功能变的更强大。这篇文章写的啰嗦了点,重点是为下一篇文章做铺垫,下一篇我们将介绍下select的高级用法。

select的应用场景很多,让我总结一下,放在下一篇文章中吧。

完整代码

可在Github查看:https://github.com/Shitaibin/golang_step_by_step/tree/master/golang_select

并发系列文章推荐

  1. 如果这篇文章对你有帮助,不妨关注下我的Github,有文章会收到通知。
  2. 本文作者:大彬
  3. 如果喜欢本文,随意转载,但请保留此原文链接:https://mp.weixin.qq.com/s/ACh-TGlPo72r4e6pbh52vg

一起学Golang-分享有料的Go语言技术

公众号:一起学Golang,分享有料的Go语言技术。以下是公众号历史文章,希望对你有用:

  1. Golang并发模型:轻松入门流水线模型
  2. Golang并发模型:轻松入门流水线FAN模式
  3. Golang并发模型:并发协程的优雅退出
  4. Golang并发模型:轻松入门select
  5. Golang并发模型:select进阶
  6. Golang并发模型:轻松入门协程池
  7. Golang并发的次优选择:sync包
  8. Golang并发:再也不愁选channel还是选锁
阅读全文 »

区块链就是何交易打交道,我们今天就介绍下,交易处理过程中的一个重要组成部分:txpool。这篇文章主要从功能角度介绍,通过这篇文章会了解:

  1. txpool的在交易中的位置和作用。
  2. txpool的功能,核心组成部分queued和pending。
  3. txpool如何实现它的功能。
  4. txpool源码的重要关注点。
阅读全文 »

goroutine作为Golang并发的核心,我们不仅要关注它们的创建和管理,当然还要关注如何合理的退出这些协程,不(合理)退出不然可能会造成阻塞、panic、程序行为异常、数据结果不正确等问题。这篇文章介绍,如何合理的退出goroutine,减少软件bug。

goroutine在退出方面,不像线程和进程,不能通过某种手段强制关闭它们,只能等待goroutine主动退出。但也无需为退出、关闭goroutine而烦恼,下面就介绍3种优雅退出goroutine的方法,只要采用这种最佳实践去设计,基本上就可以确保goroutine退出上不会有问题,尽情享用。

1:使用for-range退出

for-range是使用频率很高的结构,常用它来遍历数据,**range能够感知channel的关闭,当channel被发送数据的协程关闭时,range就会结束**,接着退出for循环。

阅读全文 »