Skip to content

Commit 851b6ee

Browse files
committed
update netpoll
1 parent 4c5d240 commit 851b6ee

File tree

1 file changed

+204
-22
lines changed

1 file changed

+204
-22
lines changed

netpoll.md

Lines changed: 204 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ type pollDesc struct {
144144

145145
## 流程
146146

147-
148147
### listen
149148

150149
```mermaid
@@ -406,6 +405,7 @@ func netpollopen(fd uintptr, pd *pollDesc) int32 {
406405

407406
pollDesc 初始化好之后,会当作 epoll event 的数据存储到 ev.data 中。 当有事件就续时,会取 ev.data,以判断是哪个 fd 可读/可写。
408407

408+
TODO,conn 是什么时候赋值给 Conn 类型的?
409409

410410
#### Read 流程
411411

@@ -552,20 +552,7 @@ func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
552552
}
553553
```
554554

555-
gopark 会执行 netpollblockcommit,并将 gpp 挂起,netpollblockcommit 比较简单:
556-
557-
```go
558-
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
559-
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
560-
if r {
561-
// Bump the count of goroutines waiting for the poller.
562-
// The scheduler uses this to decide whether to block
563-
// waiting for the poller if there is nothing else to do.
564-
atomic.Xadd(&netpollWaiters, 1)
565-
}
566-
return r
567-
}
568-
```
555+
gopark 将当前 g 挂起,等待就绪事件到达之后再继续执行。
569556

570557
#### Write 流程
571558

@@ -592,7 +579,61 @@ func (fd *netFD) Write(buf []byte) (int, error) {
592579
}
593580
```
594581

595-
TODO
582+
```go
583+
// Write implements io.Writer.
584+
func (fd *FD) Write(p []byte) (int, error) {
585+
if err := fd.writeLock(); err != nil {
586+
return 0, err
587+
}
588+
defer fd.writeUnlock()
589+
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
590+
return 0, err
591+
}
592+
var nn int
593+
for {
594+
max := len(p)
595+
if fd.IsStream && max-nn > maxRW {
596+
max = nn + maxRW
597+
}
598+
n, err := syscall.Write(fd.Sysfd, p[nn:max])
599+
if n > 0 {
600+
nn += n
601+
}
602+
if nn == len(p) {
603+
return nn, err
604+
}
605+
if err == syscall.EAGAIN && fd.pd.pollable() {
606+
if err = fd.pd.waitWrite(fd.isFile); err == nil {
607+
continue
608+
}
609+
}
610+
if err != nil {
611+
return nn, err
612+
}
613+
if n == 0 {
614+
return nn, io.ErrUnexpectedEOF
615+
}
616+
}
617+
}
618+
```
619+
620+
内核的写缓冲区满,这里的 syscall.Write 就会返回 EAGAIN。
621+
622+
```go
623+
func (pd *pollDesc) waitWrite(isFile bool) error {
624+
return pd.wait('w', isFile)
625+
}
626+
627+
func (pd *pollDesc) wait(mode int, isFile bool) error {
628+
if pd.runtimeCtx == 0 {
629+
return errors.New("waiting for unsupported file type")
630+
}
631+
res := runtime_pollWait(pd.runtimeCtx, mode)
632+
return convertErr(res, isFile)
633+
}
634+
```
635+
636+
后面的流程就和 Read 完全一致了。
596637

597638
#### 就续通知
598639

@@ -662,20 +703,161 @@ func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
662703
*gpp = wg
663704
}
664705
}
706+
707+
// 按照 mode 把 pollDesc 的 wg 或者 rg 捞出来,返回
708+
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
709+
gpp := &pd.rg
710+
if mode == 'w' {
711+
gpp = &pd.wg
712+
}
713+
714+
for {
715+
old := *gpp
716+
if old == pdReady {
717+
return nil
718+
}
719+
if old == 0 && !ioready {
720+
// Only set READY for ioready. runtime_pollWait
721+
// will check for timeout/cancel before waiting.
722+
return nil
723+
}
724+
var new uintptr
725+
if ioready {
726+
new = pdReady
727+
}
728+
if atomic.Casuintptr(gpp, old, new) {
729+
if old == pdReady || old == pdWait {
730+
old = 0
731+
}
732+
return (*g)(unsafe.Pointer(old))
733+
}
734+
}
735+
}
665736
```
666737

667-
注释中这里说,返回一个 goroutines 的列表。实际这个是有个前提的,那就是 epoll_wait 返回的例如读事件,多个读对应的是同一个 pollDesc(本质上就是同一个 file descriptor 的事件),那么这时候才会构成链表,因为 netpoll 函数中的 for 循环在 i > 0 的时候,才会让 gp != 0。
738+
三个函数配合完成就续后唤醒对应的 g 的工作,netpollunblock 从 pollDesc 中捞出 rg/wg,netpollready 然后再把所有的 rg/wg 通过 schedlink 串成一个链表。findrunnable 之类需要 g 的场景下,调度器会主动调用 netpoll 函数来寻找是否有已经就绪的网络事件对应的 g。
739+
740+
netpoll 这个函数是平台相关的,实现在对应的 netpoll_epoll、netpoll_kqueue 文件中。
668741

669-
如果所有 event 分别属于不同的 pollDesc,那么就不会是链表了。
742+
#### 读写 g 的挂起和恢复
670743

671-
### poll 就续流程
744+
在上面读写流程,syscall.Read 或者 syscall.Write 返回 EAGAIN 时,会挂起当前正在进行这个读/写操作的 g,具体是调用 gopark,并执行 netpollblockcommit,并将 gpp 挂起,netpollblockcommit 比较简单:
672745

673746
```go
674-
type conn struct {
675-
fd *netFD
747+
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
748+
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
749+
if r {
750+
// Bump the count of goroutines waiting for the poller.
751+
// The scheduler uses this to decide whether to block
752+
// waiting for the poller if there is nothing else to do.
753+
atomic.Xadd(&netpollWaiters, 1)
754+
}
755+
return r
676756
}
677757
```
678758

679-
TODO,conn 是什么时候赋值给 Conn 类型的?
759+
EAGAIN 的时候:
760+
761+
```go
762+
gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
763+
```
764+
765+
```go
766+
// Puts the current goroutine into a waiting state and calls unlockf.
767+
// If unlockf returns false, the goroutine is resumed.
768+
// unlockf must not access this G's stack, as it may be moved between
769+
// the call to gopark and the call to unlockf.
770+
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
771+
mp := acquirem()
772+
gp := mp.curg
773+
status := readgstatus(gp)
774+
if status != _Grunning && status != _Gscanrunning {
775+
throw("gopark: bad g status")
776+
}
777+
mp.waitlock = lock
778+
mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) // unlockf = netpollblockcommit
779+
gp.waitreason = reason
780+
mp.waittraceev = traceEv
781+
mp.waittraceskip = traceskip
782+
releasem(mp)
783+
// can't do anything that might move the G between Ms here.
784+
mcall(park_m)
785+
}
786+
```
787+
788+
```go
789+
// park continuation on g0.
790+
func park_m(gp *g) {
791+
_g_ := getg()
792+
793+
casgstatus(gp, _Grunning, _Gwaiting)
794+
dropg()
795+
796+
if _g_.m.waitunlockf != nil {
797+
// fn = netpollblockcommit
798+
fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
799+
ok := fn(gp, _g_.m.waitlock)
800+
_g_.m.waitunlockf = nil
801+
_g_.m.waitlock = nil
802+
// 原子操作没成功,那还得先继续执行当前的逻辑
803+
// 回到最外面的读/写循环
804+
if !ok {
805+
casgstatus(gp, _Gwaiting, _Grunnable)
806+
execute(gp, true) // Schedule it back, never returns.
807+
}
808+
}
809+
// g 成功挂起,m 成功解绑,当前的 m 进入调度循环,去找其它可以执行的 g
810+
schedule()
811+
}
812+
813+
// 切换到 m->g0 的栈,然后调用 fn(g)
814+
// fn 必须不能返回
815+
// 这个 g 之后应该用 gogo(&g->sched) 来恢复执行
816+
TEXT runtime·mcall(SB), NOSPLIT, $0-8
817+
MOVQ fn+0(FP), DI
818+
819+
get_tls(CX)
820+
MOVQ g(CX), AX // save state in g->sched
821+
MOVQ 0(SP), BX // caller's PC
822+
// 把当前 g 的运行现场保存到 g.gobuf 中
823+
MOVQ BX, (g_sched+gobuf_pc)(AX)
824+
LEAQ fn+0(FP), BX // caller's SP
825+
MOVQ BX, (g_sched+gobuf_sp)(AX)
826+
MOVQ AX, (g_sched+gobuf_g)(AX)
827+
MOVQ BP, (g_sched+gobuf_bp)(AX)
828+
829+
// switch to m->g0 & its stack, call fn
830+
MOVQ g(CX), BX
831+
MOVQ g_m(BX), BX
832+
MOVQ m_g0(BX), SI
833+
CMPQ SI, AX // if g == m->g0 call badmcall
834+
JNE 3(PC)
835+
MOVQ $runtime·badmcall(SB), AX
836+
JMP AX
837+
MOVQ SI, g(CX) // g = m->g0
838+
MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp
839+
PUSHQ AX
840+
MOVQ DI, DX
841+
MOVQ 0(DI), DI
842+
CALL DI
843+
POPQ AX
844+
MOVQ $runtime·badmcall2(SB), AX
845+
JMP AX
846+
RET
847+
848+
// 把 g 当前所在的 m 的指针都清 0
849+
// 将 m 腾出来干别的事去
850+
func dropg() {
851+
_g_ := getg()
852+
setMNoWB(&_g_.m.curg.m, nil)
853+
setGNoWB(&_g_.m.curg, nil)
854+
}
855+
```
856+
857+
上面就是挂起的所有流程了,可见所谓的挂起,就是把 g 当前的运行状态: bp、sp、pc 寄存器,以及 g 的地址保存起来。
858+
859+
至于唤醒流程,当调度器在 findrunnable、startTheWorldWithSema 或者 sysmon 中调用 netpoll 函数时,会获取到上面说的就绪的 g 列表。把这些 g 的 bp/sp/pc 都从 g.gobuf 中恢复出来,就可以继续执行它们的 Read/Write 操作了。
860+
861+
因为调度中有讲,这里就不赘述了。
680862

681863
### 读写超时

0 commit comments

Comments
 (0)