什么是非阻塞 I/O
下图是四层网络分层,其中数据链路层和网络层都是不可靠的,到了传输层就是可靠的了,机器和机器才能进行可靠的传输,RESP
协议是属于应用层的
文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
TCP
通信过程也就是通过三次握手建立连接,建立连接之后就可以进行数据传输,之后就可以通过四次挥手来通信文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
操作系统给我们提供了 socket
,作为开发者不需要关心底层的网络通信,只需要关心应用层的通信文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
在 linux
中有一个 Internet domain socket
,它的底层是 SOCK_STREAM
文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
在 linux
中每个 socket
都有一个 id
,这个 id
叫做“文件描述符”,用 FD
作为标识文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
我们处理 TCP
实际上就是在处理 socket
,socket
内部的过程如下图所示:文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
server
会新建一个 socket
,用来监听心的连接,这是 server socket
处理 listen
状态文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
这时如果 client1
建立了一个 socket
,它会连接 listen
状态的 server socket
,然后进行三次握手,握手结束后 server
会再次新建一个 socket
用于和 client1 socket
进行通信,这时这两个 socket
状态是 established
,也就是说此时 server
会有两个 socket
,一个用于监听新的连接,一个用于和 client
通信文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/17285.html
如果又来了一个 client2
,server
会再次新建一个 socket
用于和 client2
进行通信
这时 server
会有 3
个 socket
,当 client
越来越多时,server
会有越来越多的 socket
这时就诞生了 I/O
模型,I/O
指的是同时操作 socket
的方案,那 I/O
用什么方案同时操作多个 socket
呢?
三个方案:
- 阻塞
- 非阻塞
- 多路复用
阻塞 I/O
阻塞方案是建立三个线程,这三个线程的业务都是类似的,调用 socket
是阻塞式,如果没有新的数据会卡住,也就是说线程会卡住,直到外面有新的数据过来,才会返回新的数据,业务会根据新的数据进行处理,然后把药返回的数据在写回 socket
中
阻塞 I/O
的特点是:
- 同步读取
socket
时,线程陷入内核态 - 当读写成功后,切换会用户态,继续执行
- 优点:开发难度小,代码简单
- 缺点:内核态切换开销大
非阻塞 I/O
业务要处理三个 socket
,目的是要不断是去这三个 socket
中有没有新数据过来,有新数据就处理新数据,处理后再把数据写回去
非阻塞的意思是,不会卡住,而是会去询问,你有没有新数据,没有新数据就直接返回,在去询问下一个 socket
,一直循环下去,不会卡在任何一个 socket
中
那么理论上来说所有的 socket
可以通过一个线程来处理
非阻塞 I/O
的特点是:
- 如何暂时无法收发数据,会返回错误
- 应用会不断的轮询,直到某个
socket
可以读写 - 优点:不会陷入内核态,自由度高
- 缺点:需要自旋轮询
多路复用
在 linux
中多路复用叫做 epoll
,全称 event poll
,是事件池的意思
event poll
中的事件是 各个 scoket
可读/可写事件,就可以让操作系统来监控各个 socket
是否可以操作
还是和之前一样,一个 socket
负责监听,两个 socket
负责和 client
通信,然后把这三个 socket
可读事件注册到 linux epoll
中
然后非阻塞的去调用 linux epoll
,去询问这三个事件发生了哪些,如果某个 socket
事件发生了,epoll
会返回一个发生事件的列表,然后业务直接调用对应 socket
背后的业务处理
多路复用 epoll
的特点:
- 注册多个
socket
事件 - 调用
epoll
,当有时间发生时,返回发生事件的列表 - 优点:提供了事件列表,不需要查询各个
socket
- 缺点:开发难度大,逻辑复杂
- 不同操作系统的多路复用实现不同:
linux
:epoll
mac
:kqueue
windows
:IOCP
go 是如何抽象 epoll
- 在底层使用使用操作系统的多路复用
I/O
- 在协程层次使用阻塞模型
- 阻塞协程时,休眠协程
go
将各个系统对 epoll
的操作做了一层封装,抹平各平台的差异
多路复用器在各个系统都有以下功能:
- 新建多路复用器
epoll_create()
- 往多路复用器里插入需要监听的事件
epoll_ctl()
- 查询发生了什么事件
epoll_wait()
在 go sdk
中搜索 epoll_create
,会出现很多结果,我们选择 /cmd/vendor/golang.org/x/sys/unix/zsysnum_linux_amd64.go
在 go sdk
中有一个 netpoll
的文件
它一进来就写了很多注释,第一行注释写了这个工具叫做 network poller
,下面有很多被注释掉的方法,这些方法是在不同平台的文件中单独去实现的,netpoll
这个文件是一个总的文件
比如 linux
平台是在 netpoll_epoll
中实现,mac
平台是在 netpoll_kqueue
中实现,windows
平台是在 netpoll_windows
中实现
newtpollinit
:初始化多路复用器netpollopen
:往多路复用器里插入需要监听的事件netpoll
:看下有什么事件发生
go
// Integrated network poller (platform-independent part).
// A particular implementation (epoll/kqueue/port/AIX/Windows)
// must define the following functions:
//
// func netpollinit()
// Initialize the poller. Only called once.
//
// func netpollopen(fd uintptr, pd *pollDesc) int32
// Arm edge-triggered notifications for fd. The pd argument is to pass
// back to netpollready when fd is ready. Return an errno value.
//
// func netpollclose(fd uintptr) int32
// Disable notifications for fd. Return an errno value.
//
// func netpoll(delta int64) gList
// Poll the network. If delta < 0, block indefinitely. If delta == 0,
// poll without blocking. If delta > 0, block for up to delta nanoseconds.
// Return a list of goroutines built by calling netpollready.
//
// func netpollBreak()
// Wake up the network poller, assumed to be blocked in netpoll.
//
// func netpollIsPollDescriptor(fd uintptr) bool
// Reports whether fd is a file descriptor used by the poller.
也就说 linux
中的 epoll
对应 go
中的 netpoll
epoll_create
->netpollinit
epoll_ctl
->netpollopen
epoll_wait
->netpoll
netpollinit
作用是创建多路复用器:
- 新建
epoll
- 新建一个
pipe
管道,用于中断epoll
- 将“管道有数据到达”注册在
epoll
中
go
func netpollinit() {
var errno uintptr
// 新建 epoll
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
if errno != 0 {
println("runtime: epollcreate failed with", errno)
throw("runtime: netpollinit failed")
}
// 创建一个管道,linux 的管道,用来关闭 epoll
r, w, errpipe := nonblockingPipe()
if errpipe != 0 {
println("runtime: pipe failed with", -errpipe)
throw("runtime: pipe failed")
}
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
// 将管道的读事件加入 epoll
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
netpollopen
作用是插入事件
- 传入一个
socket
的fd
和pollDesc
指针 pollDesc
指针是socket
相关详细信息pollDesc
中记录了哪个协程休眠在等待此socket
- 将
socket
可读,可写,端开事件注册到epoll
中
go
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
// 设置事件
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
// 调用底层的 epoll_ctl
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
pollDesc
是 go
网络层对 socket
的描述
fd
:socket id
rg
:等待读取的协程wg
:等待写入的协程
go
type pollDesc struct {
fd uintptr // constant for pollDesc usage lifetime
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}
netpoll
查询发生了什么事件
- 调用
epoll_wait
查询有哪些事件发生 - 根据
socket
相关的pollDesc
信息,返回哪些协程可以唤醒
go
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
var events [128]syscall.EpollEvent
retry:
// epfd 是 epoll 的 id
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
if errno != 0 {
if errno != _EINTR {
println("runtime: epollwait on fd", epfd, "failed with", errno)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
// 多少个事件就有多少 n
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.Events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
netpollWakeSig.Store(0)
}
continue
}
var mode int32
// EPOLLIN:可读事件
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
// EPOLLOUT:可写事件
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer())
tag := tp.tag()
if pd.fdseq.Load() == tag {
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
netpollready(&toRun, pd, mode)
}
}
}
// 返回需要唤醒的协程列表
return toRun
}
Network Poller 是如何工作的
Network Poller
底层有一个多路复用抽象层,用来屏蔽不同平台对多路复用器的实现
Network Poller 初始化
Network Poller
初始化是 poll_runtime_pollServerInit
,使用原子操作,保证一个 go
程序只会初始化一次 netpoll
go
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
// 用原子操作初始化,保证只初始化一次
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited.Load() == 0 {
// 初始化多路复用器
netpollinit()
netpollInited.Store(1)
}
unlock(&netpollInitLock)
}
}
pollCache
和 pollDesc
pollCache
:是一个带锁的链表头pollDesc
:链表的成员pollDesc
是runtime
包对socket
的详细描述
pollCache
结构体如下
pollCache
实际上是一个链表头,它记录了 pollDesc
中第一个成员的指针,pollCache
的作用是为了放置一个锁,用来锁住 pollDesc
go
type pollCache struct {
lock mutex // 锁
first *pollDesc
// PollDesc objects must be type-stable,
// because we can get ready notification from epoll/kqueue
// after the descriptor is closed/reused.
// Stale notifications are detected using seq variable,
// seq is incremented when deadlines are changed or descriptor is reused.
}
pollDesc
是一个链表成员,link
属性指向了下一个 pollDesc
rg
:可能是pdReady
、pdWait
、等待读取这个socket
的地址wg
:可能是pdReady
、pdWait
、等待写这个socket
的地址- 写为什么也需要等待,是因为底层的网卡是有发送能力的,写数据时可能需要等待前面的数据发送完毕
go
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
fd uintptr // constant for pollDesc usage lifetime
// rg, wg are accessed atomically and hold g pointers.
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or pdNil
}
Network Poller 新增监听 socket
Network Poller
新增监听 socket
是 poll_runtime_pollOpen
方法,在 pollCache
中分配一个新的 pollDesc
,初始化 pollDesc
(rg
、wg
为 0
),调用 netpollopen
将 fd
和 pd
插入 epoll
go:linkname
是一个go
语言的关键字,用来将一个函数的实现指向另一个函数poll_runtime_pollOpen
和internal/poll.runtime_pollOpen
是同一个函数
go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// 分配一个新的 pollDesc
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
if pd.fdseq.Load() == 0 {
// The value 0 is special in setEventErr, so don't use it.
pd.fdseq.Store(1)
}
// 初始化 pollDesc
pd.closing = false
pd.setEventErr(false, 0)
pd.rseq++
pd.rg.Store(pdNil)
pd.rd = 0
pd.wseq++
pd.wg.Store(pdNil)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)
// 将 fd 和 pd 插入 epoll
errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
Network Poller 收发数据
Network Poller
是怎么做到一个协程对应一个 socket
收发数据分为两个场景:
- 协程需要收发数据时,
socket
已经可读可写runtime
循环调用netpoll
方法(g0
协程)netpoll
被startTheWorldWithSema
调用,startTheWorldWithSema
被gcStart
调用。为什么事件循环会被gcStart
调用呢?因为只是gcStart
是周期性不断的在调用
- 发现
socket
可读可写时,给对应的rg
或者wg
设置为pdReady(1)
- 协程调用
poll_runtime_pollWait
方法 - 判断
rg
或者wg
已经被置为pdReady(1)
,返回0
- 协程需要收发数据时,
socket
暂时无法读写runtime
循环调用netpoll
方法(g0
协程)- 协程调用
poll_runtime_pollWait
方法 - 发现对应的
rg
或者wg
为0
- 给对应的
rg
或者wg
置为协程地址 - 休眠等待
发现 socket
可读可写时,查看对应的 rg
和 wg
,如果rg
或者 wg
是协程地址的话(不是 0
、1
、2
),说明有协程在休眠监听,将协程地址返回给 runtime
,然后调度器开始调度对应的协程
go 是如何抽象 socket
go
中 socket
是 net
提供的
net
包抽象了TCP
网络操作- 使用
net.Listen()
得到TCPListener
(listen
状态的socket
) - 使用
listen.Accept()
得到TCPConn
(established
状态的socket
) TCPConn.Read/Write
进行读写socket
操作Network poll
作为上述功能的底层支撑
我们在前面知道了 server
监听新连接,会新建一个 socket
,新建新连接是由 net.listen
方法完成的
net.listen
方法返回的是一个 net.Listener
接口,net.Listener
接口中有一个 Accept
方法,用来接收新的连接
go
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
Listen
方法中调用了 ListenConfig
的 Listen
方法,ListenConfig
是一个结构体,它的 Listen
方法中根据不同的网络类型,调用不同的 listenTCP
、listenUnix
方法
go
// Listen announces on the local network address.
//
// See func Listen for a description of the network and address
// parameters.
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// 解析地址
addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
if err != nil {
return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
}
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
if sl.MultipathTCP() {
l, err = sl.listenMPTCP(ctx, la)
} else {
// 调用 listenTCP 方法
l, err = sl.listenTCP(ctx, la)
}
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
}
if err != nil {
return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
}
return l, nil
}
listenTCP
方法内部会调用 socket
方法
net
包中对 socket
详细的描述是 netFD
netFD
中有一个 pfd
属性,pfd
是 poll.FD
类型,poll.FD
就是上面说的 pollDesc
go
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
net.Listen
:
- 新建
socket
,并执行bind
操作 - 新建一个
FD
(net
包对socket
详细描述) - 返回一个
TCPListener
对象 - 将
TCPListener
的FD
信息加入监听 TCPListener
对象本质上是一个LISTEN
状态的socket
ls.Accept
方法是用来接收新连接的,返回的时候 Conn
接口`
go
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
- 直接调用
socket
的accept
方法 - 如果失败,休眠等待新的连接
- 将新的
socket
包装为TCPConn
变量返回 - 将
TCPConn
的FD
信息加入监听 TCPConn
本质上是一个ESTABLISHED
状态的socket
conn.Read
方法是用来读取数据
go
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
conn.Write
方法是用来写入数据
go
// Write implements the Conn Write method.
func (c *conn) Write(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Write(b)
if err != nil {
err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
TCPConn.Read
和 TCPConn.Write
方法内部调用了 netFD.Read
和 netFD.Write
方法
netFD.Read
和 netFD.Write
方法内部调用了 poll.FD.Read
和 poll.FD.Write
方法
poll.FD.Read
和 poll.FD.Write
方法内部调用了 netpoll
方法
- 调用
socket
原生读写方法 - 如果调用失败,休眠等待可读/可写
- 被唤醒后调用系统
socket
评论