这种模型之所以被称为两级线程模型,是因为该模型中线程既不是完全靠自己调度,也不是完全靠内核去调度,而是二者协调调度。 用户 Runtime 负责调度用户线程到内核线程的调度,而内核负责内核线程到 CPU 上的调度,故称为二级线程模型。Go 语言采用的就是这种模型。
G-P-M 模型
GPM 结构
G(goroutine)
表示 goroutine,参与调度的最小单位。每个 Goroutine 对应一个 G 结构体,G 存储 goroutine 的运行堆栈、状态以及任务函数,因为 goroutine 在执行过程中可能因为各种原因被暂停,这时需要保存 PC 和堆栈信息,以便恢复后时继续执行。每个 G 需要绑定到 P 才能被调度执行。
在函数前加 go 关键字创建一个协程,其实是调用 newproc 函数,fn 就是 go 关键字后面函数地址
type g struct { goid int64// goroutine id atomicstatus uint32// 当前状态 stack stack // g 栈区间 m *m // 当前 m sched gobuf // 运行时信息,包含 PC 以及运行时的堆栈信息 stackguard0 uintptr// stackguard0 = stack.lo + StackGuard,如果要抢占当前 g 会把字段值设为 stackPreempt preempt bool// preemption signal, duplicates stackguard0 = stackpreempt // ... 省略部分字段。.. } type stack struct { //栈从高地址往低地址增长 lo uintptr hi uintptr } type gobuf struct { sp uintptr// 堆栈指针 pc uintptr// 计数器指针 g guintptr ctxt unsafe.Pointer ret uintptr lr uintptr bp uintptr// for framepointer-enabled architectures }
逻辑处理器,对于 G 来说,只有 G 绑定到 P(进入 P 的任务队列中),才能被调度。对于 M 来说,P 提供了相关的执行环境(Context 上下文),内存分配状态,任务队列等。P 的初始化在 schedinit 中
Go1.0 时期并没有 P,所以所有的 G 的创建和调度都需要加全局的锁,性能损耗很大,所以早期 Go 并发性能并不好
// go1.18 darwin/amd64 runtime/runtime2.go type p struct { id int32//P 的 id status uint32//当前状态 mcache *mcache //内存分配器 runqhead uint32// 队列头 runqtail uint32// 队列尾 runq [256]guintptr //等执行的 goroutine 队列,访问时不需要加锁 m muintptr //P 绑定的 m // Available G's (status == Gdead) gFree struct { //当 G 运行结束后,清除数据放入列表以便复用 gList n int32 } runnext guintptr //下个运行的 g,如果不为 nil, 则当前 g 执行完后,优先执行它 // ... 省略部分字段。.. }
P 的状态
_Pidle //当 M 没有 G 可执行时,P 进入空闲列表 _Prunning //P 和 M 绑定,正在执行 G 或在寻找可执行的 G _Psyscall //与之关联中的 M 进入系统调用 _Pgcstop //GC STW _Pdead //调小 GOMAXPROCS 数量后多余的 p 置为此状态
M(Machine)
系统线程抽象,代表着真正执行计算的资源。M 会从 P 的队列(或者 Global 队列)中取 G 来执行,切换到 G 的执行栈上并执行 G 的函数,调用 goexit 做清理工作并回到 M,当 G 被暂停 M 会把上下文信息写回 G,并取下一个 G 继续执行。M 并不保留 G 状态,这也是 G 可以跨 M 调度的核心。
type m struct { id int64 g0 *g // 用于执行调度任务的 g,使用系统栈,不受 gc 影响 tls [6]uintptr// 线程本地存储空间 curg *g // 当前正在被执行的 goroutine nextp puintptr // M 被唤醒需要立即绑定的 P p puintptr // 与 M 绑定的 P spinning bool// true 表示 M 处于自旋转状态(当前没有 g 执行,正在寻找可执行的 g) mcache *mcache //当 M 与 P 绑定后,跟 P 的 mcache 指向同一个内存分配器 }
G0 和 M0
m0: 表示进程启动的第一个线程,也叫主线程。它和其他的 m 没有什么区别,要说区别的话,它是进程启动通过汇编直接复制给 m0 的,m0 是个全局变量,而其他的 m 都是 runtime 内自己创建的。 一个 go 进程只有一个 m0。
g0: 每个 m 都有一个 g0,因为每个线程有一个系统堆栈,g0 虽然也是 g 的结构,但和普通的 g 还是有差别的,最重要的差别就是栈的差别。g0 上的栈是系统分配的栈,在 linux 上栈大小默认固定 8MB,不能扩展,也不能缩小。 而普通 g 一开始只有 2KB 大小,可扩展。在 g0 上也没有任何任务函数,也没有任何状态,并且它不能被调度程序抢占,因为调度就是在 g0 上跑的(参考 allocm)。
// 2. call schedinit funcschedinit() { sched.maxmcount = 10000//m 的最大数量 mallocinit() //内存分配相关 //根据环境变量,创建 GOMAXPROCS 个 G sched.lastpoll = uint64(nanotime()) procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } } // 3. 启动 main goroutine funcmain() { g := getg() // 标记 main 已经启动 mainStarted = true //在系统栈上(也就是通过 g0) 创建 m,并执行 sysmon,所以 symon 是在单独的 m 上运行,不受 gc 影响 if GOARCH != "wasm" { systemstack(func() { newm(sysmon, nil) }) } //如果 main goroutine 不在 m0 上运行,肯定 bug 了 if g.m != &m0 { throw("runtime.main not on m0") } //执行 runtime 的 init 和用户包中 init 函数 doInit(&runtime_inittask) doInit(&main_inittask) //调用用户自定义的 main 函数 //从这里可以看出在 golang 中,init 函数先于 main 函数执行 fn := main_main fn() //退出主进程 exit(0) for { var x *int32 *x = 0 } } //4. call runtime·mstart(m0 启动) //M0 在这里调用的 https://github.com/golang/go/blob/master/src/runtime/asm_amd64.s#L225 // mstart 是一个新 M 的入口函数。 funcmstart() { _g_ := getg() osStack := _g_.stack.lo == 0 if osStack { // 初始化 g0 栈大小 size := _g_.stack.hi if size == 0 { size = 8192 * sys.StackGuardMultiplier } _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) _g_.stack.lo = _g_.stack.hi - size + 1024 } _g_.stackguard0 = _g_.stack.lo + _StackGuard _g_.stackguard1 = _g_.stackguard0 mstart1() if GOOS == "windows" || GOOS == "solaris" || GOOS == "illumos" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" { osStack = true } // m 退出 mexit(osStack) } funcmstart1() { _g_ := getg() if _g_ != _g_.m.g0 {//调用这个函数只,m 上只可能有 g0 throw("bad runtime·mstart") } //初始化 m asminit() minit() //执行 mspinning、sysmon 等 if fn := _g_.m.mstartfn; fn != nil { fn() } //_g_所在的 m 不是 m0, 则关联 p 和 m if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } schedule()//开始调度 }
M 启动后进入 schedule() 方法,进入调度,调度的本质其实就是查找可以运行的 G,然后去运行 G 上面的任务函数
//需要注意的是:schedule 函数及子函数中调用的 getg() 返回的都是 g0,因为 schedule 是运行在 g0 上的 funcschedule() { _g_ := getg() //获取当前 g var gp *g var inheritTime bool // ..... if gp == nil { // 为了保证公平调度,schedule 每执行 61 次就会去全局队列拿一批 g 到 p 的本地队列,避免全局队列中的 g 饥饿 // 否则可能出现 2 个 g 永久占用本地队列(因为被暂停的 goroutine 唤醒后优先放入本地队列) if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } // ..... if gp == nil { //从本地队列中取 g gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } //如果本地队列中没有可执行的 g,则调用 findrunnable 直到有可运行的 g 为止 if gp == nil { gp, inheritTime = findrunnable() } // 到这里,说明已经找到可运行的 g,如果 m 还处于自旋转状态,则置回正常状态 // 并唤醒 p 与之绑定 if _g_.m.spinning { resetspinning() } //直接在当前 m 上执行 g execute(gp, inheritTime) }
findrunnable 查找可运行的 G
//阻塞获取可执行的 G,findrunnable 会从全局队列、其它 P 队列、netpoll 中去轮询 funcfindrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() // 本地队列中有 g,则直接返回 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // 全局队列中如果有 g, 则从全局队列中取一批到本地队列 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // 从 netpoll 中获取 g(非阻塞轮询已经完成的网络 io) if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(false); !list.empty() { //从 netpoll 中取出 i/o 读写完成的 g 列表 gp := list.pop() //先取出一个让当前 m 执行,这样能提高响应速度 injectglist(&list) //再把剩余的 g 列表放入队列 casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } } // 检查是否可以从其它 p 中偷一部分 g procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { //如果其它 p 全部都是 idle 的,那肯定没地方偷 goto stop } //如果当前 m 没有处于自旋转且自旋转中的 p 数量 < running 中的 p 数量/2, 则让当前 m 进入自旋转 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) }
//随机从一个 p 中偷,最多偿试 4 次 for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: allpSnapshot := allp //再次检查全局队列,如果有 g,则取出执行,否则把 p 与当前 m 解绑 if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { //p 与当前 m 解绑 throw("findrunnable: wrong p") } pidleput(_p_) //解绑的 p 放入 idle 队列 unlock(&sched.lock)
// 再次检查所有 p 的队列 for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() //如果某个 p 的队列不为空,则从 idle 列表中取出一个 p unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { //如果解绑以前 m 是自旋转的,则还是让它保持自旋转 _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } // 再次从 netpoll 中阻塞的取 g if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } list := netpoll(true) atomic.Store64(&sched.lastpoll, uint64(nanotime())) if !list.empty() { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } injectglist(&list) } } //如果始终找到不,就让 m 停止 stopm() goto top }
//把 g 列表放入全局队列,调用 startm 检测是否有 idle 的 p,将 idle 的 m 与之绑定或 new 一个 m funcinjectglist(glist *gList) { if glist.empty() { return } if trace.enabled { for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() { traceGoUnpark(gp, 0) } } lock(&sched.lock) var n int for n = 0; !glist.empty(); n++ { gp := glist.pop() //队列中的 g 状态必须为_Grunnable casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) } unlock(&sched.lock) for ; n != 0 && sched.npidle != 0; n-- { startm(nil, false) } *glist = gList{} }
//重置自旋转状态 funcresetspinning() { _g_ := getg() if !_g_.m.spinning { throw("resetspinning: not a spinning m") } _g_.m.spinning = false nmspinning := atomic.Xadd(&sched.nmspinning, -1) ifint32(nmspinning) < 0 { throw("findrunnable: negative nmspinning") } //除当前 m 外没有其它 m 处于自旋转状态且还有 p 处于 idle,则唤醒一个 p,这样能让等执行的 g 尽量早的被处理。 if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { wakep() } } funcwakep() { // 如果已有 m 处于自旋转状态,则直接返回 (g 一定会被处于自旋转状态的 m 执行 [结合 findrunable 函数看]) if !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) }
//startm 检测是否有 idle 的 p, 并将 idle 的 m 与 p 绑定或 new 一个 m funcstartm(_p_ *p, spinning bool) { lock(&sched.lock) if _p_ == nil { _p_ = pidleget() //获取一个 idle 的 p if _p_ == nil { unlock(&sched.lock) if spinning { // spinning 为 true 说明 startm 的调用方对 nmspinning 加了 1,但是没发现 idle 的 p,所以要回滚 nmspinning ifint32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } //成功获取到 p, 再获取 idle 的 m mp := mget() unlock(&sched.lock) if mp == nil { //获取 idle 的 m 失败,则创建一个 m var fn func() if spinning { // 如果 spinning 为 true, 则标记新创建的 m 为 spinning fn = mspinning } newm(fn, _p_) //创建新 m return } mp.spinning = spinning mp.nextp.set(_p_) // 把 p 设置为即将与 m 绑定的 p notewakeup(&mp.park) } funcnewm(fn func(), _p_ *p) { mp := allocm(_p_, fn) //new 一个 m 结构,并初始化 mp.nextp.set(_p_) //把 p 设置为即将与 m 绑定的 p mp.sigmask = initSigmask newm1(mp) } funcnewm1(mp *m) { execLock.rlock() newosproc(mp) execLock.runlock() }
//newosproc 创建 OS 线程,不同的 OS 接口不一样,linux 用的 clone, windows 为_CreateThread funcnewosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) // clone 期间禁用信号,clone 完成再启用 var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) //调用 clone 创建 os 线程, mstart 为线程起始函数 ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) sigprocmask(_SIG_SETMASK, &oset, nil) }