0%

Context 与超时控制

Context 是 go 语言中监控与控制 goroutine 的一种方式,它可以在协程之间进行信息的传递,从而控制 goroutine 的行为。在一定程度上,可以将 context 看作包含一部分输入参数的 channel。与 channel 不同的是,channel 的传递并不是递归的,当一个 goroutine 调用的 goroutine 再次调用了 goroutine,简单使用 channel 进行退出通知是非常麻烦的;而使用 context 则能很轻松地使用递归的 goroutine 控制。

源码分析

context 在 go 语言中是一个接口,在此基础上可以衍生出不同的类型的 context,sdk 中提供了六种方法来获取不同类型的 context 以适用于不同的场景,可以概括为:

  • 两个接口:context、canceler
  • 四种实现:emptyCtx、cancelCtx、timerCtx、valueCtx
  • 六个方法:Background、TODO、WithCancel、WithDeadline、WithTimeout、WithValue

两个接口

在 go sdk 中,context 和 canceler 接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Context interface {
// 返回 context 的截止时间
Deadline() (deadline time.Time, ok bool)
// 返回只读 channel,用于接收 context 是否完成
Done() <-chan struct{}
// 返回 context 被取消的原因
Err() error
// 返回 key 对应的 value
Value(key interface{}) interface{}
}

type canceler interface {
// 取消方法
cancel(removeFromParent bool, err error)
// 返回只读 channel
Done() <-chan struct{}
}

在这两种接口的基础上,sdk 实现了四种不同的基础 context,用户同样可以利用这些 context 来自行实现。

四种实现

在 context 和 canceler 接口的基础上,go sdk 包含了四种基础 context 实现。

emptyCtx

emptyCtx 的所有方法均为空方法,主要用于作为基础上下文,并衍生出其他上下文。由于 goroutine 的调用是树状的,因此 context 的组织形式也是树状的,一颗 context 树,其根节点应当是一个 emptyCtx 类型。

sdk 提供了 Background 和 TODO 两种 emptyCtx,两者互为别名,只是在语义上有所不同:

  • context.Background 是上下文的默认值,所有其他的上下文都应该从它衍生出来;
  • context.TODO 应该仅在不确定应该使用哪种上下文时使用;

一般情况下,会在一个模块的 main goroutine 使用 Background context;当想从一个叶 context 节点从派生出一个新的 context 叶时,可以选择使用 TODO。

cancelCtx

cancelCtx 是一个具备取消功能的 context,在父 goroutine 中可以使用 cancel 函数来通知 goroutine 进行取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 父 goroutine 中
func parent() {
// 获取 context
ctx, cancel := context.WithCancel(context.Background(), time.Millisecond*100)

go func(){
children(ctx)
}()

// 通知取消
cancel()
}

func children(c *context.Context) {
for {
select{
case <- c.Done():
case default:
}
}
}

cancelCtx 可以用于用户交互等场景的控制,自由度比较高。

timerCtx

timerCtx 主要通过两种方法来获得:WithDeadline、WithTimeout。不同之处就是一个是明确截止时间,一个是明确运行时间,使用方法与 cancelCtx 非常相似,也同样提供了 cancel 接口,用户可以手动进行取消,或等待设置的时间到期后自动取消。

timerCtx 是通过 timer.AfterFunc 来实现的,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {

...

if c.err == nil {
// 设置定时器,当定时器到期后,自动调用取消函数
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}

...
}

valueCtx

valueCtx 提供了在不同 goroutine 之间利用 context 进行值传递的功能,可以使用 context 来进行链路追踪。valueCtx 只能够将值传递给子 goroutine,而不能够实现反向操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 父 goroutine 中
func parent() {
// 获取 context
ctx := context.WithValue(context.Background(), "key","value")

go func(){
children(ctx)
}()
}

func children(c *context.Context) {
// 在子 goroutine 中获取值
println("context key:",c.Value("key").(string))
}

不同 context 的混合使用

用户可以将 sdk 中提供的六种方法混合使用,来构造较为复杂的 context,利用使用带有超时控制的值 context:

1
2
3
4
// 获取超时	
withTimeout, cancel := context.WithTimeout(context.Background(), time.Millisecond*1)
// 加入值传递
ctx := context.WithValue(withTimeout, "traceId", "id12345")

Context 使用场景

context 可以很轻松地完成消息在 goroutine 中的传递,但是不能够直接完成对 goroutine 的控制,而是需要用户自行完成接收通知的逻辑,也就是这是一种侵入式比较强的消息传递方式。但由于这是在 go sdk 中提供的一种标准控制方式,所以其应用场景非常广,一些开源的工具包中基本上都提供了包含 context 的实现,使用起来会相对简单。

一般情况下,在下一级逻辑中,需要使用定时器和 select 来完成对 context 超时的读取,或者是使用 context 中的截止时间,不使用ctx.Done 函数完成超时控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// mongo-driver session.go
// 不使用 ctx.Done() 完成控制

for {
...

select {
// timeout 定时事件为 context deadline
case <-timeout.C:
return res, err
default:
// default 自旋,防止被阻塞
}

...
return res, err
}

无论哪一种方式,使用 context 来完成超时控制都是需要使用 for - select 循环来处理的,对于网络 IO、信号阻塞等对 CPU 占用比较少的情景,使用 context 是非常合适的。但是如果使用 context 来处理一些比较重型的任务,就显得不是特别合适,因为不能够直接在任务中去执行检查。这种情况下,可以考虑使用状态机来将任务进行分解,每一个阶段来完成后检查一次是否超时。

另一种超时控制

使用 context 进行超时控制是侵入式的,可能会破坏代码逻辑,增加维护的难度。最主要的是,当需要调用的代码并没有提供 context 接口时,无法使用 context 来完成控制逻辑。这种情况下,是无法完成对 goroutine 的调度的,只能够完成在逻辑上的退出,即上一层逻辑提前完成并放任下一层逻辑继续运行直至退出。这种情况下,可以使用 goroutine 来完成非侵入式的超时控制。gin-contrib 中的 timeout 中间件就是一个很好的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 代码截自 gin-contrib/timeout@v0.0.3/timeout.go
func New(opts ...Option) gin.HandlerFunc {

// 开辟缓冲区,存储下一层逻辑返回值
...

// 使用 goroutine 处理下一层逻辑
go func() {

// 处理下一层逻辑中抛出的 panic
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
// 处理下一层逻辑
t.handler(c)
// 等待下一层逻辑完成
finish <- struct{}{}
}()

select {
// 处理下一层逻辑抛出的 panic
case p := <-panicChan:
...
// 处理下一层逻辑完成
case <-finish:

// 使用 default 返回值和 error:timeout 返回
...

// 处理超时
case <-time.After(t.timeout):

// 使用下一层逻辑的返回值返回
...

}

...
}

可以看到,源码中并没有去调度下一层逻辑的 goroutine,而是在超时后直接使用 default 值并抛出一个错误来完成退出。

总结

这两种方法显然都不是特别完美,但是 goroutine 的调度机制决定了,一个 goroutine 只能够自行退出。这种调度方式是最符合 goroutine 的使用逻辑的,一个 goroutine 内必须是一个完整的状态机来处理逻辑,直接 abort 一个 goroutine 会大大增加状态的不确定性,这是一个非常危险的操作。

timeout 本身就是非常危险的,如果在操作一个有状态的程序,无法通过 timeout 来确定程序的状态。因此,需要避免在不需要进行用户交互的代码层级使用超时错误,并避免超时错误传递到数据库层级。

go 中的 IO 操作

go 语言的官方库中对所有的文件描述符都做了统一的封装,无论是 socket 文件还是普通文件,都包含了poll.FD数据结构,并被 pollDesc 统一管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 普通文件描述符
type file struct {
pfd poll.FD
name string
dirinfo *dirInfo // nil unless directory being read
nonblock bool // whether we set nonblocking mode
stdoutOrErr bool // whether this is stdout or stderr
appendMode bool // whether file is opened for appending
}

// socket 描述符
type netFD struct {
pfd poll.FD

// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}

这两种文件类型的读写操作都会最终调用 poll.FD的读写函数,以文件描述符的读操作为例,最终会调用以下函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// 检查、加锁、准备操作
...

for {
// 忽略 EINTR 错误,并尝试进行读取
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
// 如果读取失败,错误为 EAGAIN,且 fd 被注册到了多路复用管理器中
// 主动进入阻塞状态,并等待被唤醒
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

函数处理读取的主要逻辑为:尝试非阻塞读取——如果读取成功则返回/读取失败则进入阻塞。该函数的处理逻辑其实已经对普通文件和 socket 文件进行了分流,普通文件的读写通常是不会进入阻塞状态的,通常会在第一次读取时返回。而 socket 类型的文件描述符,在尝试读取失败后,会进入到 fd.pd.waitRead(fd.isFile),最终调用runtime_pollWait 函数,将当前 goroutine 的标识为更改为阻塞状态。

go 语言中,虽然 IO 底层使用了非阻塞的多路复用,但最终提交给用户是阻塞调用的模式。非阻塞多路复用只是为了给 goroutine 提供调度——当一个协程发现自身不能满足执行条件时,得以能够主动修改自身状态,使自身进入到协程的阻塞状态,而不是被操作系统进行线程调度,使线程进入阻塞。用户态的阻塞 IO 是为了配合 goroutine 的调度机制和 go 语言的编程逻辑,即采用多个 goroutine 来处理并行的请求。这样程序在编写过程中就只要考虑请求的执行顺序,如果执行过程中遇到条件不满足的情况,直接进入阻塞并等待被唤醒。

如果需要在 go 中使用非阻塞 IO,那么可能就需要抛弃官方库所构建出的高层次抽象,这其实更像是在写 C/C++程序。go 语言在 package unix 中提供了比较底层的 unix 标准函数封装,用户可以在此基础上自行搭建非阻塞 IO 模型来满足特定场景下的需求。

如果需要调用底层的 package unix,一定要将 fd 设置为非阻塞的方式。因为操作系统的是无法感知到 goroutine 的,如果使用阻塞操作,可能会导致 M 被操作系统调度,导致无法进行 G 调度,导致 goroutine 正常切换调度收到影响。fd 的非阻塞已经在 package unix 中封装完毕,为了与 unix 函数保持一致,unix 包中的函数都没有自动设置为非阻塞状态,需要用户手动设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 非常经典的设置非阻塞,简直跟 C++ 一模一样
func SetNonblock(fd int, nonblocking bool) (err error) {
flag, err := fcntl(fd, F_GETFL, 0)
if err != nil {
return err
}
if nonblocking {
flag |= O_NONBLOCK
} else {
flag &= ^O_NONBLOCK
}
_, err = fcntl(fd, F_SETFL, flag)
return err
}