字节开源的netPoll多路复用器源码解析

2023-12-17 17:32:00


引言

IO 有阻塞和非阻塞两种模式,在阻塞IO下,我们需要耗费一个线程去阻塞在read操作下,去等待有足够多的数据可读并返回。在非阻塞IO下,不停对所有fd集合进行轮询,筛选出所有可读fd进行处理。

阻塞IO浪费线程(会占用内存和上下文切换开销),非阻塞IO会浪费CPU做大量无效操作。而基于IO多路复用系统调用实现的poll的意义在于将可读/可写状态通知和实际文件操作分开,并支持多个文件描述符通过一个系统调用监听以提升性能。
在这里插入图片描述
网络库的核心功能就是去同时监听大量的文件描述符的状态变化(通过操作系统调用),并对于不同状态变更,高效,安全地进行对应的文件操作。

对于一个高效的网络库而言,它的设计需要考虑以下几个场景:

  • 连接数量密集型: 如长连接场景,每个连接请求并不多,但是需要一直维护着长连接
  • 连接创建/销毁密集型: 如短连接场景,会频繁创建销毁连接

这类场景下,对 listener fd 的压?很?,监听 listener fd 的系统调?会被频繁唤醒。??个 fd 只能被?个线程处理,这样的话创建连接的压?只能由单个 CPU 承担?法充分利?多核。Linux 后?增加SO_REUSEPORT 功能,可以对同?个 bind ip+port 创建多个 listener fd,内核提供负载均衡分发,这样来实现多核处理连接创建密集型场景。需要注意的是,即便是那些?连接场景下,如果遇到?些特殊业务场景(例如准点秒杀)也会出现瞬间创建?量连接的情况。

  • 请求密集型: 如支持连接多路复用的RPC服务,点对点的所有请求都可以基于一个长连接进行,此时单连接会频繁被唤醒处理事件

同时由于网络库不仅要管理监听文件事件,还需要管理用户业务逻辑层handler的执行 ,因此一个设计优秀的网络库,还应当具备以下指标:

  • QPS 尽量高

QPS 要?意味着要让单请求开销低(即平均 latency 低),?先要保证充分利?CPU ,其次是要让 CPU 尽可能少执?内存拷?等和我们?要?作?关的代码。简??之,CPU处于满负荷?作,且做有效的?作的状态。?效?作是指那些和业务?关的事情,例如GC,线程&& 协程上下?切换开销,锁竞争等。在 Golang 中,G 依赖 P 运?,? P ??有调度逻辑,所以需要尽可能充分利? P,不让 P 空转

  • P99 延迟尽量低

P99 ? Avg ?的根因是在运?中间遇到?些原因导致 CPU 腾出去进?了其他的?作,或是整个?作循环被暂停了(如 GC stop the world,或是 Goroutine 陷? syscall导致的暂时卡顿,?或是锁竞争)。


NetPoll

epoll API

在正式开始讲解NetPoll源码前,我们先来快速复习一下多路复用API实现,本文基于Linux系统进行展开,所有此处多路复用器实现基于epoll展开:

typedef union epoll_data {
   int fd;
   //...
} epoll_data_t;

struct epoll_event {
   uint32_t events; /* Epoll events */
   epoll_data_t data; /* User data variable */
};

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
  • epoll_create: 创建 epollfd 对象,后续 epoll 操作都围绕该对象。
  • epoll_ctl: 在 epollfd 对象上,对 fd 使?某个 op(ADD/DEL/...),并声明关?什么events(EPOLLIN/EPOLLOUT/...)
  • epoll_wait: 阻塞等待直到 epollfd 内有就绪事件便返回,返回值为有效事件数,并且有效事件会记录再传?的 events 地址中。
    • timeout > 0 时:超过 timeout ms 后返回,若?事件发?返回值为 0
    • timeout = 0 时:?阻塞,即便没有任何事件发?,也会?刻返回,返回值为 0
    • timeout = -1 时: 阻塞,直到有事件发?

Epoll 在使?上有两种模式:边缘触发(ET)和?平触发(LT)

  • 边缘触发只有在从?数据到有数据时通知?次,??平触发只要 fd 处于可读状态就会?直触发。
  • ?平触发的缺点在于,如果没有读完 fd 继续调? epoll_wait 还会再次触发 EPOLLOUT 导致会?直尝试进?读完所有数据。这导致如果包体积特别?的情况下,会占?更多内存开销。

原生网络库实现

golang 原生网络库基于epoll et模式开发,基本架构如下图所示:
在这里插入图片描述
在这里插入图片描述

  • 每个 fd 对应?个 goroutine,业务?对 conn 发起主动的读写,底层使??阻塞 IO,当事件未就
    绪,将 fd 注册(epoll_ctl)进 epoll fd,通过把 goroutine 设置(park)成 GWaiting 状态。当有就绪
    事件后,唤醒(ready) 对应 goroutine 成 GRunnable 状态。
  • ????的 Goroutine 负责进?实际的 read/write syscal,Go Net 负责事件监听,以及帮助 block/ready ?? Goroutine。

golang原生网络库的特点就是:

  • 从??视?来看 net.Conn 接?的函数都是阻塞的,即便底层 IO 是?阻塞的
  • Read 接?能够填充满缓冲区就填充,填充不满也会直接返回?度 n
  • 上层调??既可以控制从内核缓冲区中的读取速率,也可以控制读取块??
type Conn interface {
// Read reads data from the connection.
// Read can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetReadDeadline.
Read(b []byte) (n int, err error)

// Write writes data to the connection.
// Write can be made to time out and return an error after a fixed
// time limit; see SetDeadline and SetWriteDeadline.
Write(b []byte) (n int, err error)
}

由于go net采用ET模式,所以只会在数据就绪时通知一次,用户在自己的线程中调用read api不断读取数据,直到返回的n等于0,说明数据全部读取完毕了。

golang 原生网络库的优势如下:

  • 创建连接后,由调?? Goroutine ??决定是否要进?读写,何时进?读写,以多少 size 进?读写。
  • ??只需关? net.Conn 暴露的同步接?,使?上?常?便。

但是有利必有弊,golang原生网络库的设计会导致如下问题:

  • 1 Goroutine : 1 Connection 模型,在连接?常多时,Goroutine 数量会爆炸从?在调度上产?较?开销,进?影响到 P99 指标。
  • 不?持零拷?,??使? conn.Read(b []byte) 后,完成?次内核缓冲区到??缓冲区的复制。拿到 []byte 后再传递给上层进?协议解析(反序列化)时,往往还需要再进??次拷?

根本性原因还是协议反序列化时拿到的内存和内核缓冲区复制到??缓冲区的内存不是同?块导致的

  • net.Conn 是??主动调?才触发事件监听,所以如果?个连接对端已经关闭,此时需要下?次写/读时,才能根据返回 error 进?判断连接状态。如果该连接?直没被调?到,会?直存在,占?内存。

netpoll 设计思路

Netpoll 主要由两?部分构成:

  • EventLoop(Polls):?来监听?件描述符事件
  • FDOperators:?来根据不同事件进?不同操作

这里官方提供了一幅图画的很好:
在这里插入图片描述
整个流程要分为三部分来看:

  • netpoll 初始化:

    • netpoll 启动时,会初始化poll manager , 依次初始化池中每个poll对象
    • 首先调用EpollCreate api创建一个新的Epoll对象,然后将其与当前poll对象绑定,同时还会为当前poll对象分配linkbuffer等缓冲区
    • 为每个poll对象开启一个协程来不断轮询当前epoll上的可读可写等事件
  • server 端:

    • 启动后,从poll manager中获取一个空闲的poll ,将listener fd注册到poll中,监听accept事件
    • 当accept 到客户端连接后,从poll manager中获取一个空闲的poll ,将客户端socket fd注册到poll中,监听可读事件
    • 每个poll会关联一个LinkBuffer对象,当监听到客户端连接上的可读事件后,从linkbuffer中预定一块内存,将数据都读取到这块内存中来
    • 包装一个模版任务,用于不断轮询处理linkbuffer上剩余可读数据,同时每次轮询完后,都会回调用户设置好的OnRequest函数,就是上图的hanler函数
    • 包装的模版任务会被提交到协程池中执行,也就是上图中的gopool
    • 与内核的系统调?交互完全由?络库进?控制,??对 Conn 的读写都只是在操作?段 Buffer ?已
  • client 端:

    • 启动后,建立和server端的连接 , 从opCache对象池中获取一个空闲的FDOperator对象返回,然后等待直到client socket可写
    • 调用connection提供的相关写api如malloc,先分配一块内存用于写数据
    • 写完需要发送给server的数据,调用flush api进行数据提交
    • flush api会首先尝试将数据写入socket内核缓冲区中,如果一次没写完,说明socket缓冲区写满了,此时会在poll上注册对当前socket fd可写事件监听
    • 然后调用waitFlush api阻塞等待writeTrigger通道发送过来的可写通知
    • 当poll线程监听到当前socket fd上发生了可写事件的时候,会向writeTrigger通道发送消息,唤醒等待的客户端

netpoll 对比 go net

netpoll 实现思路和 golang 原生网络库的区别如下:

  • Go Net 使? Epoll ET ,Netpoll 使? LT。
  • Netpoll 在?包场景下会占?更多的内存。Go Net 只有?个 Epoll 事件循环(因为 ET 模式被唤醒的少,且事件循环内?需负责读写,所以?的活少),? Netpoll 允许有多个事件循环(循环内需要负责读写,?的活多,读写越重,越需要开更多 Loops)。
  • Go Net ?个连接?个 Goroutine,Netpoll 连接数和 Goroutine 数量没有关系,和请求数有?定
    关系,但是有 Gopool 重?。
  • Go Net 不?持 Zero Copy,甚?于如果??想要实现 BufferdConnection 这类缓存读取,还会产
    ??次拷?。Netpoll ?持管理?个 Buffer 池直接交给??,且上层??可以不使? Read(p []byte) 接??使?特定零拷?读取接?对 Buffer 进?管理,实现零拷?能?的传递。

在这里插入图片描述
ET模式在高并发下调度压力比较大,因为 EventLoop 本?只是监听事件,真正的读写操作都在????的 Goroutine 函数中执?,不由?络库控制;因此每次 EventLoop监听到事件发生后,都需要唤醒对应的线程去读写数据,这里存在上下文切换开销。

而 LT 单线程轮询对 cache/计算类业务更友好,因为 Cache 的特点是业务逻辑执?的?常快,所以在 readv 完了后可以?刻执? handler 同时执?write,整个过程都不需要进?线程调度。对于计算类任务??,越少协程切换能够让 CPU 尽可能少的做?效?作。


数据结构

此处只列举核心的几个对象:

  • server 服务端对象:
type server struct {
	operator    FDOperator // ?来根据不同事件进?不同操作
	ln          Listener. 
	opts        *options // 配置相关回调接口
	onQuit      func(err error) // 退出回调
	connections sync.Map // 记录当前server上accept得到的活跃客户端连接: key=fd, value=connection
}
  • 封装多路复用器操作的对象:
// FDOperator is a collection of operations on file descriptors.
type FDOperator struct {
	// epoll 监听的 fd
	FD int
    // 监听到读写事件后的回调函数
	OnRead  func(p Poll) error // accept 事件回调 
	OnWrite func(p Poll) error // 客户端 socket 写回调
	OnHup   func(p Poll) error
    
    // linkbuffer 与 socket 缓冲区之间的读写API
	Inputs   func(vs [][]byte) (rs [][]byte)
	InputAck func(n int) (err error)

	Outputs   func(vs [][]byte) (rs [][]byte, supportZeroCopy bool)
	OutputAck func(n int) (err error)

	// epoll 对象
	poll Poll
	...
}
  • 封装多路复用器的对象
type Poll interface {
	Wait() error
	Close() error
	Trigger() error
	Control(operator *FDOperator, event PollEvent) error
	Alloc() (operator *FDOperator)
	Free(operator *FDOperator)
}

type defaultPoll struct {
	pollArgs
	fd      int            // 监听的fd
	wop     *FDOperator    // eventfd(轻量级进程通信机制), wake epoll_wait -- 用于唤醒wait上阻塞的线程
	...
	Handler func(events []epollevent) (closed bool) // 发生感兴趣事件时,回调该接口处理这些事件
}

type pollArgs struct {
    ...
	events   []epollevent // 发送/接收感兴趣事件
	barriers []barrier。 // 用于实现分散读/集中写的向量缓冲区
}

type epollevent struct {
	events uint32  // 事件位图
	_      int32
	data   [8]byte // 注册感兴趣事件时,可以携带用户数据的指针
}

源码解析

多路复用池初始化

netpoll使用pollmanager维护着一组epoll对象池,以此来实现对象复用,每次有客户端新连接被Accept时,都会从epoll池中按照对应的负载均衡策略,pick出一个空闲的epoll对象来监听客户端连接上后续的读写事件。

netpoll多路复用池初始化的流程图如下所示:

在这里插入图片描述
具体源码如下:

// poll_manager.go
var pollmanager *manager // 多路复用器池子管理器

func init() {
	var loops = runtime.GOMAXPROCS(0)/20 + 1
	pollmanager = &manager{}
	// 设置负载均衡器,默认采用轮询策略从epoll池中挑选空闲epoll
	pollmanager.SetLoadBalance(RoundRobin)
	// 设置epoll池的大小,同时会初始化池中的epoll对象
	pollmanager.SetNumLoops(loops)
	...
}

golang 程序启动时,会去自动调用每个go文件的init方法,所以pollmanager会在程序启动时被初始化。

真正初始化epoll池中epoll对象的逻辑是在设置eventLoopNum时完成的:

// poll_manager.go
func (m *manager) SetNumLoops(numLoops int) error {
	..
	// netpoll支持运行时动态调整epoll池大小,所以此处存在该分支
	// 如果我们打算缩小epoll池大小,则进入下面这个分支
	if numLoops < m.NumLoops {
		// 创建一个新的epoll池
		var polls = make([]Poll, numLoops)
		for idx := 0; idx < m.NumLoops; idx++ {
		    // 对于无需缩减的部分,直接重新指向即可
			if idx < numLoops {
				polls[idx] = m.polls[idx]
			} else {
			// 对于需要缩减的部分,直接Close关闭该多路复用器
				if err := m.polls[idx].Close(); err != nil {
					logger.Printf("NETPOLL: poller close failed: %v\n", err)
				}
			}
		}
		// 更新多路复用池管理器的相关状态
		m.NumLoops = numLoops
		m.polls = polls
		m.balance.Rebalance(m.polls)
		// 如果是动态缩容,缩容完毕后,直接返回
		return nil
	}
    
    // 进入初始化或者扩容逻辑
	m.NumLoops = numLoops
	return m.Run()
}

从上面代码可以看出,netpoll支持在运行时动态调整池子的大小,下面我们看看初始化和扩容逻辑是如何完成的:

// poll_manager.go
// 扩容或者初始化epoll池
func (m *manager) Run() (err error) {
	defer func() {
		if err != nil {
			_ = m.Close()
		}
	}()

	// 如果是初始化epoll池,此处的polls大小应该为0
	// 如果时扩容逻辑,此处的polls大小为当前池中已有的多路复用器个数
	for idx := len(m.polls); idx < m.NumLoops; idx++ {
		var poll Poll
		// 创建一个新的多路复用器
		poll, err = openPoll()
		if err != nil {
			return
		}
		// 新创建的多路复用器追加到polls集合 
		m.polls = append(m.polls, poll)
		// 每个多路复用器绑定一个协程,不断轮询注册到该epoll上的fd事件
		go poll.Wait()
	}

	// 更新多路复用池管理器的相关状态
	m.balance.Rebalance(m.polls)
	return nil
}

初始化epoll池时,首先是将池中每个epoll对象创建出来:

// poll_default_linux.go

// 打开多路复用器
func openPoll() (Poll, error) {
	return openDefaultPoll()
}

func openDefaultPoll() (*defaultPoll, error) {
	var poll = new(defaultPoll) 
	poll.buf = make([]byte, 8)
	// 创建Epoll对象
	var p, err = EpollCreate(0)
	...
	// 保存epoll的fd
	poll.fd = p
	// eventfd是一种进程/线程通信的机制,他类似信号,不过eventfd只是一种通知机制
	// 无法承载数据(eventfd承载的数据是8个字节),他的好处是简单并且只消耗一个fd
	// 进程间通信机制: https://zhuanlan.zhihu.com/p/383395277
	var r0, _, e0 = syscall.Syscall(syscall.SYS_EVENTFD2, 0, 0, 0)
	...
	// TODO: 这几个回调接口干啥的 ?
	poll.Reset = poll.reset
	// 处理当前epoll fd上所发生的感兴趣的事件
	poll.Handler = poll.handler
	// eventFd 通信机制
	poll.wop = &FDOperator{FD: int(r0)}

	// 在epoll上注册并监听eventFd的可读事件 -- 监听r0上的可读事件
	if err = poll.Control(poll.wop, PollReadable); err != nil {
		_ = syscall.Close(poll.wop.FD)
		_ = syscall.Close(poll.fd)
		return nil, err
	}
	// 初始化FDOperator缓存
	poll.opcache = newOperatorCache()
	return poll, nil
}

这里使用defaultPoll保存多路复用器上下文信息,同时还为每个多路复用器创建出了一个eventFD用于实现进程间通信,同时在当前epoll上注册监听eventFD的可读事件。

此处使用eventFD是为了epoll池关闭的时候,通知那些阻塞在epoll_wait系统调用上的线程可以醒过来,然后结束自己。

当创建出来多路复用器后,下一步便是将其加入epoll池中,最后为每个多路复用器绑定一个协程,然后不断轮询注册到该epoll上的fd事件:

// poll_default_linux.go
func (p *defaultPoll) Wait() (err error) {
	// init
	var caps, msec, n = barriercap, -1, 0
	p.Reset(128, caps)
	// wait
	for {
		if n == p.size && p.size < 128*1024 {
			p.Reset(p.size<<1, caps)
		}
		// p.fd 就是 epoll fd
		// events 就是挂载到epoll tree上的epoll item
		// mesc 用于指定阻塞时间,是永久阻塞,还是阻塞一段时间,还是非阻塞IO
		// 等待当前epoll上发生感兴趣的事件
		n, err = EpollWait(p.fd, p.events, msec)
		if err != nil && err != syscall.EINTR {
			return err
		}
		// 如果没有发生感兴趣的事件,则将msec设置为-1,表示下一次采用永久阻塞策略来等待感兴趣的事件发生
		// 然后调用Gosched完成协程调度
		if n <= 0 {
			msec = -1
			runtime.Gosched()
			continue
		}
		msec = 0
		// 处理感兴趣的事件
		if p.Handler(p.events[:n]) {
			return nil
		}
		// we can make sure that there is no op remaining if Handler finished
		p.opcache.free()
	}
}

defaultPoll的Handler回调接口是在openDefaultPoll函数中被赋值的,实际调用的是poll_default_linux.go文件中的handler函数:

// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
	var triggerRead, triggerWrite, triggerHup, triggerError bool
	var err error
	// 遍历所有感兴趣的事件
	for i := range events {
		// epollevent.data保存的是与之关联的FDOperator对象
		operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
		if operator == nil || !operator.do() {
			continue
		}

		var totalRead int
		// 判断当前发生了什么事件
		evt := events[i].events
		triggerRead = evt&syscall.EPOLLIN != 0
		triggerWrite = evt&syscall.EPOLLOUT != 0
		triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
		triggerError = evt&syscall.EPOLLERR != 0

		// trigger or exit gracefully
		// 是否是eventFD可读事件发生了
		if operator.FD == p.wop.FD {
			// must clean trigger first
			// 从eventFD中读取数据到buf中
			syscall.Read(p.wop.FD, p.buf)
			atomic.StoreUint32(&p.trigger, 0)
			// if closed & exit
			// 说明接收到了关闭信号,那么就关闭当前epoll
			if p.buf[0] > 0 {
			    // 关闭eventFD
				syscall.Close(p.wop.FD)
				// 关闭epoll fd
				syscall.Close(p.fd)
				operator.done()
				return true
			}
			operator.done()
			continue
		}
		// 发生了可读事件
		if triggerRead {
			// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
			if operator.OnRead != nil {
				// 调用OnRead来接收并处理客户端连接
				operator.OnRead(p)
		    // 否则说明发生的是某个客户端连接上的可读事件		
			} else if operator.Inputs != nil {
				// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
				// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
				// 此处是从LinkBuffer中分配出一块空闲内存
				var bs = operator.Inputs(p.barriers[i].bs)
				if len(bs) > 0 {
					// 读取数据到bs缓存区中
					var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
					// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
					operator.InputAck(n)
					...
				}
			} else {
				logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
			}
		}
		// 其他感兴趣事件的触发此处暂时不展开
		...
}

Epoll相关API

本节介绍一下netpoll为Linux下的epoll系统调用封装的API接口:

Linux 底层的epoll系统调用由红黑树实现,netpoll 给红黑树上每个节点都关联一个epollevent类型,该类型由一个事件位图和用户数据指针组成:

// sys_epoll_linux_arm64.go
type epollevent struct {
	events uint32  // events:表示要监听的事件类型,如可读、可写等。这是一个位掩码,可以设置多个事件类型,例如 EPOLLIN 表示可读事件,EPOLLOUT 表示可写事件。
	_      int32
	data   [8]byte // 可以携带用户数据。这里的用户数据通常是一个指针,指向与文件描述符关联的对象或其他相关数据。
}

netpoll 还提供了对epoll对象创建,感兴趣事件监听,等待感兴趣事件发生等操作的API封装:

  • 创建epoll对象
func EpollCreate(flag int) (fd int, err error) {
	var r0 uintptr
	// 执行epoll_create系统调用
	r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0)
	if err == syscall.Errno(0) {
		err = nil
	}
	return int(r0), err
}
  • 注册感兴趣事件
func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) {
	_, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0)
	if err == syscall.Errno(0) {
		err = nil
	}
	return err
}
  • 等待感兴趣事件发生
func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) {
	var r0 uintptr
	var _p0 = unsafe.Pointer(&events[0])
	if msec == 0 {
		r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0)
	} else {
		r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0)
	}
	if err == syscall.Errno(0) {
		err = nil
	}
	return int(r0), err
}

关于注册感兴趣的事件,netpoll在此基础之上又封装了一层:

// Control implements Poll.
func (p *defaultPoll) Control(operator *FDOperator, event PollEvent) error {
	var op int
	// TODO: evt.data = operator
	var evt epollevent
	// 将epollevent对象的data指针指向传入的FDOperator对象
	p.setOperator(unsafe.Pointer(&evt.data), operator)
	// 根据监听的事件类型,更新事件位图
	switch event {
	case PollReadable: // server accept a new connection and wait read
		operator.inuse()
		op, evt.events = syscall.EPOLL_CTL_ADD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
	case PollWritable: // client create a new connection and wait connect finished
		operator.inuse()
		op, evt.events = syscall.EPOLL_CTL_ADD, EPOLLET|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
	case PollDetach: // deregister
		p.delOperator(operator)
		op, evt.events = syscall.EPOLL_CTL_DEL, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
	case PollR2RW: // connection wait read/write
		op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLOUT|syscall.EPOLLRDHUP|syscall.EPOLLERR
	case PollRW2R: // connection wait read
		op, evt.events = syscall.EPOLL_CTL_MOD, syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLERR
	}
	// 完成监听事件信息注册
	return EpollCtl(p.fd, op, operator.FD, &evt)
}

从Control函数中可以看出来,netpoll会在epollevent的data字段中保存监听的fd对象信息,这里fd对象是netpoll经过封装后的FDOperator对象。

FDOperator对象中又保存了对当前fd对象的读写API封装。


可读事件处理

可读事件有两类,一种是accept事件,另一种是readable事件:
在这里插入图片描述

accept事件针对的是server端

下面给出整个读事件处理的流程图,大家可以时不时回看本图:

在这里插入图片描述


server启动

服务提供方server在启动时,会创建一个新的server端套接字,然后在该套接字上打开并监听对应的端口,随后向poll manager获取一个空闲poller对象 , 并在该对象上监听server端套接字的可读事件,这里实际是客户端的accept事件:

netpoll server 方代码模版写法如下:

// 1. OnRequest: 有可读数据时回调该接口
var OnRequest = func(ctx context.Context, connection netpoll.Connection) error { return nil }

// 2. OnPrepare: 客户端连接建立完毕后,回调该接口
var OnPrepare = func(connection netpoll.Connection) context.Context { return nil }

func main() {
	// 1. 建立连接
	listen, err := net.Listen("tcp", ":1234")
	if err != nil {
		return
	}

	// 2. 创建eventLoop
	eventLoop, _ := netpoll.NewEventLoop(OnRequest, netpoll.WithOnPrepare(OnPrepare), netpoll.WithReadTimeout(time.Second))

	// 3. 启动服务
	eventLoop.Serve(listen)
}

eventLoop的Serve方法会创建一个新的Server对象,并启动netpoll服务端:

// Serve implements EventLoop.
func (evl *eventLoop) Serve(ln net.Listener) error {
	// 将原生的listener对象转换为netpoll包装后的Listener对象
	npln, err := ConvertListener(ln)
	if err != nil {
		return err
	}
	evl.Lock()
	// 创建新的server对象
	evl.svr = newServer(npln,
		// opts对象保存了相关事件回调
		evl.opts,
		// 退出事件回调监听函数
		evl.quit)
	// 启动回调
	evl.svr.Run()
	evl.Unlock()
    
    // 监听到停止信号后,从此处返回
	err = evl.waitQuit()
	// ensure evl will not be finalized until Serve returns
	runtime.SetFinalizer(evl, nil)
	return err
}

创建完server对象后,会调用server对象的Run方法启动服务:

// Run this server.
func (s *server) Run() (err error) {
	// 当前FDOperator对象封装的是server socket套接字对象
	s.operator = FDOperator{
		FD:     s.ln.Fd(), //  服务端Socket监听器
		OnRead: s.OnRead,  //  可读事件发生
		OnHup:  s.OnHup,   //   挂断事件发生
	}
	// 挑选一个空闲的多路复用器
	s.operator.poll = pollmanager.Pick()
	// 监听服务端套接字上的可读事件
	err = s.operator.Control(PollReadable)
	if err != nil {
		// 错误退出时,回调该方法
		s.onQuit(err)
	}
	return err
}

netpoll 在对server socket执行事件注册时,会设置FDOperator的OnRead接口,用于处理服务端套接字上的可读事件。

netpoll 也是通过FDOperator的OnRead接口是否为nil来判断当前发生的事件是accept还是readable事件。

只有server端启动时才会对服务端套接字设置OnRead回调接口,client端是不会设置的。


accept 事件

在defaultPoll的handler函数中,我们暂时只关心读事件是如何被处理的,而关于可读事件,本节我们来看看客户端accept事件是如何处理的:

// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
	var triggerRead, triggerWrite, triggerHup, triggerError bool
	var err error
	// 遍历所有感兴趣的事件
	for i := range events {
		// epollevent.data保存的是与之关联的FDOperator对象
		operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
		...
		var totalRead int
		// 判断当前发生了什么事件
		evt := events[i].events
		triggerRead = evt&syscall.EPOLLIN != 0
		triggerWrite = evt&syscall.EPOLLOUT != 0
		triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
		triggerError = evt&syscall.EPOLLERR != 0
		...
		// 发生了可读事件
		if triggerRead {
			// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
			if operator.OnRead != nil {
				// 调用OnRead来接收并处理客户端连接
				operator.OnRead(p)
		    // 否则说明发生的是某个客户端连接上的可读事件		
			} else if operator.Inputs != nil {
				// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
				// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
				// 此处是从LinkBuffer中分配出一块空闲内存
				var bs = operator.Inputs(p.barriers[i].bs)
				if len(bs) > 0 {
					// 读取数据到bs缓存区中
					var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
					// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
					operator.InputAck(n)
					...
				}
			} else {
				logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
			}
		}
		// 其他感兴趣事件的触发此处暂时不展开
		...
}

回顾上面给出的handler函数可知,netpoll会依次遍历感兴趣的事件集合中每个事件,然后获取与当前事件绑定的FDOperator对象;首先判断当前发生的是否死可读事件,再根据FDOperator的OnRead接口是否为空,来判断发生的是accept事件,还是readable事件。

在server启动一节我们已经知道了,如果FDOperator的OnRead接口不为空,那么说明发生的是客户端的accept事件,此时会调用FDOperator的OnRead回调来处理客户端的连接事件;此处实际调用的是server的OnRead的方法;

// OnRead implements FDOperator.
func (s *server) OnRead(p Poll) error {
	// 获取客户端连接
	conn, err := s.ln.Accept()
	...
	// 包装一下原生的conn连接
	var connection = &connection{}
	// 初始化一下连接
	connection.init(conn.(Conn),
		// 初始化完毕后,回调用户注册进来的prepare接口
		s.opts)
	// 连接不活跃,直接返回
	if !connection.IsActive() {
		return nil
	}
	// 返回客户端连接套接字对应的文件描述符
	var fd = conn.(Conn).Fd()
	// 添加关闭回调接口 --- netpoll回调接口这里采用的是回调链的形式,可以添加多个回调接口
	connection.AddCloseCallback(func(connection Connection) error {
		// 当前连接关闭时,将自己从server连接集合中移除
		s.connections.Delete(fd)
		return nil
	})
	// 在server对象中保存 < fd , 已打开连接 >
	s.connections.Store(fd, connection)

	// 调用连接建立接口
	connection.onConnect()
	return nil
}

处理客户端accept事件的过程主要分为三步:

  1. 获取原生conn连接对象,对其进行包装,然后为当前连接初始化相关数据结构和回调接口
  2. 从poller池中挑选出一个poll对象与当前连接进行绑定,并在该poll上注册对当前连接可读事件的监听
  3. 为当前连接包装一个任务对象,然后丢入协程池中之行,该任务负责死循环轮询,发现可读数据立马回调用户提供的OnRequest接口进行处理

客户端连接初始化

server.OnRead函数中调用的connection.init函数主要是用来为当前连接初始化相关数据结构,回调接口,以及在poll上注册对当前connection可读事件的监听

// init initialize the connection with options
func (c *connection) init(conn Conn, opts *options) (err error) {
	// init buffer, barrier, finalizer
	c.readTrigger = make(chan error, 1)
	c.writeTrigger = make(chan error, 1)
	// 初始化LinkBuffer相关数据结构
	c.bookSize, c.maxSize = pagesize, pagesize
	c.inputBuffer, c.outputBuffer = NewLinkBuffer(pagesize), NewLinkBuffer()
	c.outputBarrier = barrierPool.Get().(*barrier) // 用于聚集读写的缓冲区

	// 初始化
	c.initNetFD(conn)  // 确保conn是被netpoll包装后的netFD类型
	c.initFDOperator() // 初始化FDOperator
	c.initFinalizer()  // 添加close回调函数

	// 将客户端连接套接字设置为非阻塞模式
	syscall.SetNonblock(c.fd, true)
	// enable TCP_NODELAY by default
	switch c.network {
	case "tcp", "tcp4", "tcp6":
		// 禁用 Nagle 算法
		setTCPNoDelay(c.fd, true)
	}
	// 启用零拷贝传输的 TCP Socket 选项 和 阻塞超时时间
	if setZeroCopy(c.fd) == nil && setBlockZeroCopySend(c.fd, defaultZeroCopyTimeoutSec, 0) == nil {
		c.supportZeroCopy = true
	}

	// connection initialized and prepare options
	// 设置相关回调接口,在poll上注册对当前connection可读事件的监听
	return c.onPrepare(opts)
}

netpoll 里面为原生的Listener,Connection,Epoll,Fd等对象都进行了一层自己的封装,initNetFD函数便是对原生客户端套接字文件描述符的封装:

func (c *connection) initNetFD(conn Conn) {
	if nfd, ok := conn.(*netFD); ok {
		c.netFD = *nfd
		return
	}
	c.netFD = netFD{
		fd:         conn.Fd(),
		localAddr:  conn.LocalAddr(),
		remoteAddr: conn.RemoteAddr(),
	}
}

FDOperator 是对需要注册在epoll上进行监听的fd的封装,其是netpoll中的一个核心对象,内部持有被监听的fd和poll对象,同时对外提供fd数据读写回调接口 , 当fd上发生可读可写事件时,便会回调FDOperator上注册好的回调接口进行处理:

func (c *connection) initFDOperator() {
	// 通过负载均衡器挑选一个可用的poll
	poll := pollmanager.Pick()
	// 从opcache中分配一个可用的poll对象
	op := poll.Alloc()
	// 拿到当前客户端连接对应的socket文件描述符
	op.FD = c.fd
	// 回调接口初始化 -- 注意OnRead回调被设置为了nil
	op.OnRead, op.OnWrite, op.OnHup = nil, nil, c.onHup
	op.Inputs, op.InputAck = c.inputs, c.inputAck
	op.Outputs, op.OutputAck = c.outputs, c.outputAck
	c.operator = op
}

connection 的 onPrepare 函数主要用来将用户提供的相关回调接口设置到当前connection对象上,以及相关读写超时参数等;如果用户提供了OnPrepare接口,此处会进行回调通知。

该函数最后会在当前poll上注册对当前客户端connection的读事件监听:

// onPrepare supports close connection, but not read/write data.
// connection will be registered by this call after preparing.
func (c *connection) onPrepare(opts *options) (err error) {
	if opts != nil {
		// 将用户通过options设置的回调接口都赋值给当前accept得到的客户端连接
		c.SetOnConnect(opts.onConnect)
		c.SetOnRequest(opts.onRequest)
		c.SetReadTimeout(opts.readTimeout)
		c.SetWriteTimeout(opts.writeTimeout)
		c.SetIdleTimeout(opts.idleTimeout)

		// calling prepare first and then register.
		// 如果我们指定了onPrepare回调,此处会执行回调
		if opts.onPrepare != nil {
			c.ctx = opts.onPrepare(c)
		}
	}
	// 初始化连接上下文
	if c.ctx == nil {
		c.ctx = context.Background()
	}
	// prepare may close the connection.
	if c.IsActive() {
		// 在当前poll上注册对当前客户端connection的读事件监听
		return c.register()
	}
	return nil
}

connection的register函数负责在poll上注册对当前客户端connection的读事件监听:

// register only use for connection register into poll.
func (c *connection) register() (err error) {
	err = c.operator.Control(PollReadable)
	... 
	return nil
}

客户端连接建立

当accept得到的客户端连接初始化完毕后,会调用onConnect函数对客户端连接进行任务包装,然后提交到协程池执行任务:

// onConnect is responsible for executing onRequest if there is new data coming after onConnect callback finished.
func (c *connection) onConnect() {
	// 获取用户设置的OnConnect回调和OnRequest回调接口 --- 如果没有设置OnConnect回调,此处直接返回
	var onConnect, _ = c.onConnectCallback.Load().(OnConnect)
	if onConnect == nil {
		return
	}
	var onRequest, _ = c.onRequestCallback.Load().(OnRequest)
	var connected int32
	c.onProcess(
		// 第一个回调函数用于判断当前是否连接此刻是否可被处理
		func(c *connection) bool {
			// 在当前客户端连接初始化完毕后,会在onConnect函数中回调一次客户端提供的OnConnect接口
			// 此处通过标记确保只会调用一次OnConnect函数
			if atomic.LoadInt32(&connected) == 0 {
				return true
			}
			// check for onRequest
			return onRequest != nil &&
				// 存在可读数据
				c.Reader().Len() > 0
		},
		// 第二个回调函数会在第一个回调函数返回true的前提下,进行处理
		func(c *connection) {
			// 回调OnConnect函数
			if atomic.CompareAndSwapInt32(&connected, 0, 1) {
				c.ctx = onConnect(c.ctx, c)
				return
			}
			// 处理可读数据,回调用户提供的回调函数
			if onRequest != nil {
				_ = onRequest(c.ctx, c)
			}
		},
	)
}

onProcess 函数内部负责实现一套模版方法,用于不断轮询连接状态,如果可处理,则调用执行处理,直到接收到停止信号或连接不可处理时,才会退出循环:

// onProcess is responsible for executing the process function serially,
// and make sure the connection has been closed correctly if user call c.Close() in process function.
func (c *connection) onProcess(isProcessable func(c *connection) bool, process func(c *connection)) (processed bool) {
	...
	// 准备任务
	var task = func() {
		// 如果当前任务可执行,确保至少被执行过一次
		if isProcessable(c) {
			process(c)
		}
		// 死循环处理任务,直到接收到关闭信号或者任务不再可处理
		var closedBy who
		for {
			closedBy = c.status(closing)
			// close by user or no processable
			if closedBy == user || !isProcessable(c) {
				break
			}
			process(c)
		}
		...
		return
	}

	// 异步跑这个任务 --- gopool.CtxGo 字节开源的协程池
	runTask(c.ctx, task)
	return true
}

但是这里要注意的是,如果连接上一段时间都没有可读数据,那么与当前连接绑定的协程在发现无数据可读时,会退出返回,也就是说当前协程就与当前连接解绑,并重新放回了协程池中。

大家要注意此处netpoll的实现思路:

  • 连接初始化完毕的最后,会调用onConnect函数,该函数主要作用是调用用户设置好的onConnect回调,通知用户连接已经建立完毕了;而还需要OnRequest回调,只是为了顺道检查是否有可读数据准备就绪,如果准备就绪了,那么就顺道处理一波。
  • onProcess函数主要做的事情就是不断轮询处理当前连接上的可读数据,直到接送到停止信号或者当前连接此刻没有可读数据了,则结束轮询,释放当前协程。

netpoll 通过一个单独的协程来监听fd上的可读可写事件,当监听到可读可写事件时,不是在当前协程内进行同步处理,而是将可读可写事件包装为一个任务,然后从协程池中取出一个空闲协程进行处理,这是典型的Reactor模式实现思路。


可读事件

当netpoll accept到一个连接后,会从poller池中挑选一个空闲poll,然后在当前poll上执行对当前conn可读事件的监听。

后续当conn上发生可读事件时,便会被与该conn绑定的poll感知到,然后通过判断FDOperator的OnRead接口为nil,知道当前发生的是可读事件,而非accept事件。

此时我们再来回看defaultPoll的handler,看看当发生可读事件时,netpoll是如何处理的:

// poll_default_linux.go
// 当epoll上有感兴趣的事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
	var triggerRead, triggerWrite, triggerHup, triggerError bool
	var err error
	// 遍历所有感兴趣的事件
	for i := range events {
		// epollevent.data保存的是与之关联的FDOperator对象
		operator := p.getOperator(0, unsafe.Pointer(&events[i].data))
		...
		var totalRead int
		// 判断当前发生了什么事件
		evt := events[i].events
		triggerRead = evt&syscall.EPOLLIN != 0
		triggerWrite = evt&syscall.EPOLLOUT != 0
		triggerHup = evt&(syscall.EPOLLHUP|syscall.EPOLLRDHUP) != 0
		triggerError = evt&syscall.EPOLLERR != 0
		...
		// 发生了可读事件
		if triggerRead {
			// 如果FDOperator上的OnRead回调接口不为空,说明发生的是客户端的accept事件
			if operator.OnRead != nil {
				// 调用OnRead来接收并处理客户端连接
				operator.OnRead(p)
		    // 否则说明发生的是某个客户端连接上的可读事件		
			} else if operator.Inputs != nil {
				// 每个poll对象会关联一个barriers结构,该结构用于实现分散读取与集中写入的系统调用
				// 每个poll对象还会关联一个LinkBuffer对象,作为读写数据缓冲区
				// 此处是从LinkBuffer中分配出一块空闲内存
				var bs = operator.Inputs(p.barriers[i].bs)
				if len(bs) > 0 {
					// 读取数据到bs缓存区中
					var n, err = ioread(operator.FD, bs, p.barriers[i].ivs)
					// 推动读指针,让写入缓冲区的数据对消费者可见,同时调用用户注册的OnRequest回调接口,处理读数据
					operator.InputAck(n)
					...
				}
			} else {
				logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
			}
		}
		// 其他感兴趣事件的触发此处暂时不展开
		...
}

关于LinkBuffer的源码解析本文就不过多展开了,感兴趣的小伙伴可以阅读我之前写的这篇文章:

FDOperator的Inputs和InputAck回调接口都是在客户端连接初始化时,在initFDOperator方法中被设置的:

func (c *connection) initFDOperator() {
    ...
	op.Inputs, op.InputAck = c.inputs, c.inputAck
	op.Outputs, op.OutputAck = c.outputs, c.outputAck
	...
}

connection的inputs函数就是调用linkbuffer提供的book方法预定一块内存用于接收socket缓冲区中的可读数据:

// inputs implements FDOperator.
func (c *connection) inputs(vs [][]byte) (rs [][]byte) {
	vs[0] = c.inputBuffer.book(c.bookSize, c.maxSize)
	return vs[:1]
}

inputAck则复杂一些,首先会调用linkbuffer的bookAck函数完成预留内存的提交,这样已经从socket缓冲区写入linkbuffer的数据就对用户可见了:

// inputAck implements FDOperator.
func (c *connection) inputAck(n int) (err error) {
    ...
    // 提交预留内存,提交后,用户便可以读取这部分内存数据了
	length, _ := c.inputBuffer.bookAck(n)
	...
	var needTrigger = true
	// 从协程池中取出一个空闲协程来处理当前连接上的可读数据
	if length == n { // first start onRequest
		needTrigger = c.onRequest() // 返回值表示是否读取完毕了所有需要的数据, 如果返回false,说明读完了,否则说明没有读完
	}
	// 单开协程处理客户端连接上的可读数据时,可能在回调用户OnRequest接口时,调用读数据接口从而阻塞等待数据准备就绪
	// 此处当数据就绪时,会唤醒对应的协程
	if needTrigger && length >= int(atomic.LoadInt64(&c.waitReadSize)) {
		c.triggerRead(nil)
	}
	return nil
}

此处调用connection的onRequest方法,并非直接就是调用的用户提供的回调接口,而是和OnConnect方法一样,创建一个读数据任务去处理当前连接上的可读数据:

// onRequest is responsible for executing the closeCallbacks after the connection has been closed.
func (c *connection) onRequest() (needTrigger bool) {
    // 加载用户设置的回调接口
	var onRequest, ok = c.onRequestCallback.Load().(OnRequest)
	if !ok {
		return true
	}
	// 处理请求
	processed := c.onProcess(
		// 第一个回调函数用于判断当前连接是否活跃并且还有未读取数据
		func(c *connection) bool {
			return c.Reader().Len() > 0
		},
		// 第二个回调才是真正将请求交给用户回调来处理
		func(c *connection) {
			_ = onRequest(c.ctx, c)
		},
	)
	// if not processed, should trigger read
	return !processed
}

connection的onProcess方法上文已经说过,就是从协程池中捞取一个空闲协程来处理当前连接上的可读数据;

如果当前连接上一直有数据可读,便会一直处理,如果当前协程上没有数据可读了,协程便会被释放,重新返回池中。


等待读取数据

上文说到,当poll线程监听到可读可写数据的时候,会单开一个线程去处理当前连接上的可读可写数据;如果此时发生的是可读事件,那么最终会回调到用户提供的OnRequest接口。

而用户可以在OnRequest接口中去调用connection相关读API去读取数据:

// ReadString implements Connection.
func (c *connection) ReadString(n int) (s string, err error) {
	if err = c.waitRead(n); err != nil {
		return s, err
	}
	return c.inputBuffer.ReadString(n)
}

// ReadBinary implements Connection.
func (c *connection) ReadBinary(n int) (p []byte, err error) {
	if err = c.waitRead(n); err != nil {
		return p, err
	}
	return c.inputBuffer.ReadBinary(n)
}

// Next implements Connection.
func (c *connection) Next(n int) (p []byte, err error) {
	if err = c.waitRead(n); err != nil {
		return p, err
	}
	return c.inputBuffer.Next(n)
}

这些读API在方法开头都会调用waitRead等待所读数据量就绪后或者读超时后,才会进行数据读取或者超时返回:

// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
	// 如果当前可读数据大于需要的了,直接返回,无需等待
	if n <= c.inputBuffer.Len() {
		return nil
	}
	// 存储自己希望读取的数据量
	atomic.StoreInt64(&c.waitReadSize, int64(n))
	// 返回时,清空变量
	defer atomic.StoreInt64(&c.waitReadSize, 0)
	// 如果设置了读超时属性,就有限期等待,直到数据就绪
	if c.readTimeout > 0 {
		return c.waitReadWithTimeout(n)
	}
	// wait full n
	// 否则无限期等待,直到数据准备就绪
	for c.inputBuffer.Len() < n {
		switch c.status(closing) {
		case poller:
			return Exception(ErrEOF, "wait read")
		case user:
			return Exception(ErrConnClosed, "wait read")
		default:
		    // 等待接收数据就绪的信号
			err = <-c.readTrigger
			if err != nil {
				return err
			}
		}
	}
	return nil
}

// waitReadWithTimeout will wait full n bytes or until timeout.
// 有限期等待
func (c *connection) waitReadWithTimeout(n int) (err error) {
	// set read timeout
	if c.readTimer == nil {
		c.readTimer = time.NewTimer(c.readTimeout)
	} else {
		c.readTimer.Reset(c.readTimeout)
	}

	for c.inputBuffer.Len() < n {
		switch c.status(closing) {
		case poller:
			// cannot return directly, stop timer first!
			err = Exception(ErrEOF, "wait read")
			goto RET
		case user:
			// cannot return directly, stop timer first!
			err = Exception(ErrConnClosed, "wait read")
			goto RET
		default:
			select {
			case <-c.readTimer.C:
				// double check if there is enough data to be read
				if c.inputBuffer.Len() >= n {
					return nil
				}
				return Exception(ErrReadTimeout, c.remoteAddr.String())
			case err = <-c.readTrigger:
				if err != nil {
					return err
				}
				continue
			}
		}
	}
RET:
	// clean timer.C
	if !c.readTimer.Stop() {
		<-c.readTimer.C
	}
	return err
}

可写事件处理

可写事件有两类,一种是client端socket套接字可写事件,另一种是server端socket套接字可写事件:

注意区分server socket和socket套接字的区别 , 前者是server端启动绑定并监听的套接字,用于accept客户端连接,后者是accept得到的客户端socket连接套接字 和 客户端connect 服务端成功后得到的 socket套接字。

在这里插入图片描述
下面还是给出一幅写数据流程图:
在这里插入图片描述


客户端启动

客户端代码典型写法如下:

func main() {
	// 1. 建立连接
	dialer := netpoll.NewDialer()
	conn, _ := dialer.DialConnection("tcp", ":1234", time.Second)
	var reader, writer = conn.Reader(), conn.Writer()

	// 2. 写数据
	write_data := []byte("hello world")
	alloc, _ := writer.Malloc(len(write_data))
	copy(alloc, write_data) // write data
	writer.Flush()

	// 3. 读数据
	buf, _ := reader.Next(reader.Len())
	fmt.Println("服务端响应的数据:" + string(buf))
	reader.Release()
}

我们下面来看一下客户端启动过程:

  1. 建立连接
func (d *dialer) DialConnection(network, address string, timeout time.Duration) (connection Connection, err error) {
    ... 
	switch network {
	case "tcp", "tcp4", "tcp6":
	     // 走tcp连接
		return d.dialTCP(ctx, network, address)
    ...
	}
}

func (d *dialer) dialTCP(ctx context.Context, network, address string) (connection *TCPConnection, err error) {
    ...
    connection, err = DialTCP(ctx, "tcp", nil, tcpAddr)
	...
	return nil, firstErr
}

func DialTCP(ctx context.Context, network string, laddr, raddr *TCPAddr) (*TCPConnection, error) {
    ...
	sd := &sysDialer{network: network, address: raddr.String()}
	c, err := sd.dialTCP(ctx, laddr, raddr)
	...
	return c, nil
}
  1. 连接建立成功后,返回对应的客户端连接socket
func (sd *sysDialer) dialTCP(ctx context.Context, laddr, raddr *TCPAddr) (*TCPConnection, error) {
	conn, err := internetSocket(ctx, sd.network, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
	...
	return newTCPConnection(conn)
}

func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (conn *netFD, err error) {
    ... 
	return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)
}

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (netfd *netFD, err error) {
	// syscall.Socket & set socket options
	var fd int
	// 创建客户端socket对象,同时设置为非阻塞模式
	fd, err = sysSocket(family, sotype, proto)
	if err != nil {
		return nil, err
	}
	...
	// 包装客户端socket fd 为netFD
	netfd = newNetFD(fd, family, sotype, net)
	// 建立与server的连接
	err = netfd.dial(ctx, laddr, raddr)
	...
	return netfd, nil
}

func (c *netFD) dial(ctx context.Context, laddr, raddr sockaddr) (err error) {
    ...
	// 连接server
	if crsa, err = c.connect(ctx, lsa, rsa); err != nil {
		return err
	}
	...
	return nil
}
  1. 通过connect系统调用真正完成连接建立,然后为当前client socket创建一个FDOperator
func (c *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (rsa syscall.Sockaddr, retErr error) {
	// 系统调用connect,连接server
	syscall.Connect(c.fd, ra)
	...
	// 为当前client socket创建一个新的FDOperator
	c.pd = newPollDesc(c.fd)
	for {
		// 等待直到当前client socket可写为止
		if err := c.pd.WaitWrite(ctx); err != nil {
			return nil, err
		}
		switch err := syscall.Errno(nerr); err {
		...
		// 如果没有错误发生,直接返回Sockaddr
		case syscall.Errno(0):
			// The runtime poller can wake us up spuriously;
			// see issues 14548 and 19289. Check that we are
			// really connected; if not, wait again.
			if rsa, err := syscall.Getpeername(c.fd); err == nil {
				return rsa, nil
			}
		...
		}
	}
}
  1. 为当前client socket创建一个FDOperator,同时设置其OnWrite回调接口
func newPollDesc(fd int) *pollDesc {
	pd := &pollDesc{}
	poll := pollmanager.Pick()
	pd.operator = &FDOperator{
		poll:    poll,
		FD:      fd,
		OnWrite: pd.onwrite, // 设置OnWrite回调接口
		OnHup:   pd.onhup,
	}
	pd.writeTrigger = make(chan struct{})
	pd.closeTrigger = make(chan struct{})
	return pd
}
  1. 注册可写事件,同时等待直到client socket可写
// WaitWrite .
func (pd *pollDesc) WaitWrite(ctx context.Context) (err error) {
	if pd.operator.isUnused() {
		// add ET|Write|Hup
		// 在当前连接绑定的poll上注册等待可写事件
		if err = pd.operator.Control(PollWritable); err != nil {
			logger.Printf("NETPOLL: pollDesc register operator failed: %v", err)
			return err
		}
	}
    // 等待直到接收到终止信号或者可写事件(当前client socket缓冲区为空)  
	select {
	case <-pd.closeTrigger: // triggered by poller
		// no need to detach, since poller has done it in OnHup.
		return Exception(ErrConnClosed, "by peer")
	case <-pd.writeTrigger: // triggered by poller
		err = nil
	case <-ctx.Done(): // triggered by ctx
		pd.detach()
		pd.operator.unused()
		err = mapErr(ctx.Err())
	}
	// double check close trigger
	// 如果没有接收到停止信号,此处就直接返回
	select {
	case <-pd.closeTrigger:
		return Exception(ErrConnClosed, "by peer")
	default:
		return err
	}
}

写数据

当我们需要写数据时,通常都会先调用connection的malloc方法分配一块写缓冲区:

func (c *connection) Malloc(n int) (buf []byte, err error) {
	return c.outputBuffer.Malloc(n)
}

然后调用connection的flush方法刷新写缓冲区中的数据:

func (c *connection) Flush() error {
    ...
    // 刷新写缓冲区中的数据,让其对外可见 
	c.outputBuffer.Flush()
	// 将数据写入内核socket缓冲区
	return c.flush()
}

最终调用connection的flush方法,将linkbuffer中的数据写入socket内核缓冲区中:

func (c *connection) flush() error {
	...
	// netpoll采用聚集写,所以第一步是将写缓冲区中的数据都读取到写缓冲区向量数组中
	var bs = c.outputBuffer.GetBytes(c.outputBarrier.bs)
	// 将数据都写入内核socket缓冲区中
	var n, err = sendmsg(c.fd, bs, c.outputBarrier.ivs, false && c.supportZeroCopy)
	...
	// 如果写入了部分数据,则释放掉这部分内存中间
	if n > 0 {
		err = c.outputBuffer.Skip(n)
		c.outputBuffer.Release()
		if err != nil {
			return Exception(err, "when flush")
		}
	}
	// 如果所有数据都已经成功写入内核socket缓冲区中,则直接返回
	if c.outputBuffer.IsEmpty() {
		return nil
	}
	// 可能是因为socket缓冲区满了,导致还有一部分数据没写完
	// 此处注册对可写事件的监听
	err = c.operator.Control(PollR2RW)
	...
	// 等待直到可写才会返回
	return c.waitFlush()
}

如果socket内核缓冲区被写满了,则进行等待,具体是进行无限期等待,还是有限期等待,取决于我们是否设置了写超时时间:

func (c *connection) waitFlush() (err error) {
    // 如果我们没有设置写超时事件,则进行无限期等待
	if c.writeTimeout == 0 {
		select {
		case err = <-c.writeTrigger:
		}
		return err
	}

	// 如果我们设置了写超时事件,则执行有限期等待
	if c.writeTimer == nil {
		c.writeTimer = time.NewTimer(c.writeTimeout)
	} else {
		c.writeTimer.Reset(c.writeTimeout)
	}

	select {
	case err = <-c.writeTrigger:
		if !c.writeTimer.Stop() { // clean timer
			<-c.writeTimer.C
		}
		return err
	case <-c.writeTimer.C:
		select {
		// try fetch writeTrigger if both cases fires
		case err = <-c.writeTrigger:
			return err
		default:
		}
		// if timeout, remove write event from poller
		// we cannot flush it again, since we don't if the poller is still process outputBuffer
		c.operator.Control(PollRW2R)
		return Exception(ErrWriteTimeout, c.remoteAddr.String())
	}
}

可写事件

可写事件分为两类,一类是客户端socket可写,一类是服务端socket可写,本节我们来分别看看这两类可写事件都是如何处理的:

// 当感兴趣事件发生的时候,调用该函数进行处理
func (p *defaultPoll) handler(events []epollevent) (closed bool) {
	var triggerRead, triggerWrite, triggerHup, triggerError bool
	var err error
	// 遍历所有感兴趣的事件
	for i := range events {
	    ...
		// 触发写事件
		if triggerWrite {
			// 处理client socket可写事件
			if operator.OnWrite != nil {
				operator.OnWrite(p)
			} else if operator.Outputs != nil {
				// 处理服务端socket可写事件
				var bs, supportZeroCopy = operator.Outputs(p.barriers[i].bs)
				if len(bs) > 0 {
					// TODO: Let the upper layer pass in whether to use ZeroCopy.
					var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)
					operator.OutputAck(n)
					if err != nil {
						p.appendHup(operator)
						continue
					}
				}
			} else {
				logger.Printf("NETPOLL: operator has critical problem! event=%d operator=%v", evt, operator)
			}
		}
		operator.done()
	}
	...
}

客户端socket可写事件

当客户端socket可写事件发生时,也就是客户端socket内核缓冲区有空闲空间可写时,会调用FDOperator的onwrite回调方法进行处理。

onwrite回调中会向writeTrigger通道写入消息,唤醒阻塞等待可写事件的线程:

func (pd *pollDesc) onwrite(p Poll) error {
	select {
	case <-pd.writeTrigger:
	default:
		pd.detach()
		close(pd.writeTrigger)
	}
	return nil
}

服务端socket可写事件

当服务端socket可写事件发生时,也就是在server accept到客户端连接后,发现客户端连接对应的socket可写时,会经历下面三步:

  1. 如果写缓冲区数据为空,那么就移除对当前fd上可写事件的监听,否则读取数据到传入的vs缓冲区中
// outputs implements FDOperator.
func (c *connection) outputs(vs [][]byte) (rs [][]byte, supportZeroCopy bool) {
	// 如果写缓冲区为空
	if c.outputBuffer.IsEmpty() {
		// 移除对当前fd上可写事件的监听
		c.rw2r()
		return rs, c.supportZeroCopy
	}
	// 读取数据到rs中
	rs = c.outputBuffer.GetBytes(vs)
	return rs, c.supportZeroCopy
}

// 不再监听当前FD上的可写事件
func (c *connection) rw2r() {
	c.operator.Control(PollRW2R)
	c.triggerWrite(nil)  // 唤醒等到可写事件的线程
}
  1. 采用分散写技术,将bs向量中所有数据写入socket内核缓冲区中
	var n, err = iosend(operator.FD, bs, p.barriers[i].ivs, false && supportZeroCopy)

3,释放掉已经用完的写缓冲区空间,同时移除对当前fd上可写事件的监听

func (c *connection) outputAck(n int) (err error) {
    // 将已经用完的部分内存回收掉 
	if n > 0 {
		c.outputBuffer.Skip(n)
		c.outputBuffer.Release()
	}
	// 如果此时发现所有待写入数据都写入完毕了,那么就移除对当前fd上可写事件的监听
	if c.outputBuffer.IsEmpty() {
		c.rw2r()
	}
	return nil
}

文章来源:https://blog.csdn.net/m0_53157173/article/details/134767669
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。