All Articles

Golang Pipeline

1. 前言

学习一门新语言最简单的就是那些顺序执行的语言,会了PHP,换JAVA、Python、Ruby,无非就是语法更换下,上stackoverflow查下:“python字符串拼接”之类的,看下怎么写就完事了。难的是那些有特殊编程思维的语言,比如说JavaScript,异步原生,写代码要非常小心注意异步的返回如何处理,否则就会明明看着是先A后B,执行的时候却成了先B再A。就这方面来说,Go语言也属于后者,属于那种语法看会简单,实际学精很难的语言,而Golang难就难在goroutine以及channel带来的功能上。

之前在查看Go语言的gRPC第三方库的时候,其代码对channel的运用非常灵活:grpc-go-pool/pool.go。深感需要加深这块的理解。

在学习的时候,找到了一篇官方之前的博客,时间比较早了(2014年),但还算有用:Go Concurrency Patterns: Pipelines and cancellation

不过语言还是一直在发展的,目前在进行批量的routine控制以及退出触发的方面,最佳的解决方案是context

接下来,一点点开始。

2. Channel实验范例

在阅读官方博文之前,先自己着手做一些范例代码,加深对于Golang Basic > 2.4 通道 / 信道 channel的理解。

我做了点简单的演示范例:dist-system-practice/golang/src/experiment/channel/

只需要结果的话,可以直接读README:dist-system-practice/golang/src/experiment/channel/README.md

当中有几点比较有意思:

  • slice值进出channel之后,地址没有改变,和pointer是一样的结果
  • channel在无buffer的情况下,很容易就会阻塞,导致routine睡眠,必须小心处理
  • 带buffer的channel,在buffer耗尽之后也是同样的结果,实际上来说也必须小心阻塞
  • 虽然向关闭的通道写数据会引起panic,但向关闭的通道读数据不会,只会获得通道类型的默认值

有一张图,很好地将一系列状态以及操作结果整理了出来,可以参考:

3. Go Concurrency Patterns: Pipelines and cancellation

这里并不会通篇翻译:Go Concurrency Patterns: Pipelines and cancellation,有需要的可以看这篇

此外,本章后半也会涉及到context,毕竟这家伙才是现在的准标准。

3.1 什么是Pipeline

  • 通过输入channel从上游接收值
  • 对这些数据执行某些函数,通常是生成一些新的值
  • 通过输出channel发送值到下游

而在这一系列的交棒过程中,错误的发生和事务的取消是正常的情况,必须要进行处理,否则的话作为下游的routine可能会一直等待已经错误退出或取消事务的上游routine,导致资源泄露。使用的难点就在这里。

3.2 Cancel: Channel Done

官方博文中的代码范例写的非常散,因为有多处重复修改,这里提供一份完整拼接的版本:experiment/pipeline/pipeline.go

官方博文通篇都阅读了的话,撇开行文中的Dummy业务逻辑不谈,可以了解到该文主要还是谈了如何对出问题的routine进行退出通知。官方的博文中使用的方法是申明一个类型为struct{}的Dummy通道,关闭该通道来进行退出消息的传送。

通过:

go func() {
	defer close(out) // HL
	for _, n := range nums {
		select {
		case out <- n:
		case <-done:
			return // HL
		}
	}
}()

以这样的方式,监听done通道的关闭,退出routine函数,并设置defer函数,在routine函数退出的时候,清理对应的资源并关闭通道。这种编码解决方法确实能解决问题,但非常难看,也没有层级退出的概念。

3.3 Cancel: Context

done通道确实能在功能上达到要求,但尚欠缺了一些要素,因此后续就有context这个包的出现。相关的学习可以阅读:快速掌握 Golang context 包,简单示例。此外,还有官方博客:Go Concurrency Patterns: Context

一些概念和规范这里做下引用:

Context 的调用应该是链式的,通过WithCancelWithDeadlineWithTimeoutWithValue派生出新的 Context。当父 Context 被取消时,其派生的所有 Context 都将取消。

通过context.WithXXX都将返回新的 Context 和 CancelFunc。调用 CancelFunc 将取消子代,移除父代对子代的引用,并且停止所有定时器。未能调用 CancelFunc 将泄漏子代,直到父代被取消或定时器触发。go vet工具检查所有流程控制路径上使用 CancelFuncs。

遵循以下规则,以保持包之间的接口一致,并启用静态分析工具以检查上下文传播。

  • 不要将 Contexts 放入结构体,相反context应该作为第一个参数传入,命名为ctx。func DoSomething(ctx context.Context,arg Arg)error { // ... use ctx ... }
  • 即使函数允许,也不要传入nil的 Context。如果不知道用哪种 Context,可以使用context.TODO()
  • 使用context的Value相关方法只应该用于在程序和接口中传递的和请求相关的元数据,不要用它来传递一些可选的参数
  • 相同的 Context 可以传递给在不同的goroutine;Context 是并发安全的。

用法:

Done函数会返回一个channel,用来进行close通知(本质上还是之前的那一套):

gen := func(ctx context.Context) <-chan int {
	go func() {
		select {
		case <-ctx.Done():
			return // returning not to leak the goroutine
		//...
		}
	}
//...
}
//...
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel when we are finished consuming integers

此外,还有一篇老王的:Golang之Context的迷思,可以好好读下。

资料

链接

EOF

Published 2019/5/10

Some tech & personal blog posts