redis 网络协议
Redis Serialization Protocol
是 Redis
的网络协议,简称 RESP
它是一种文本协议,基于 TCP
协议,用于 Redis
服务器和客户端之间的通信
RESP
协议的设计目标是简单、易于实现,同时保证高效的网络传输,它的设计思想是将数据序列化为文本协议,以便于网络传输,同时保证序列化和反序列化的效率文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
RESP
协议的数据类型主要有以下几种:文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
- 正常消息
- 错误消息
- 整数
- 多行字符串
- 数组
正常消息
简单字符串是一个以 +
开头的字符串,rn
结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
例如:+OKrn
表示一个简单字符串,OK
是字符串的内容,rn
是字符串的结束符文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
错误消息
错误消息是一个以 -
开头的字符串,rn
结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
例如:-ERR unknown command 'foobar'rn
表示一个错误消息,ERR unknown command 'foobar'
是错误消息的内容,rn
是字符串的结束符文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
整数
整数是一个以 :
开头的字符串,rn
结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
例如::1000rn
表示一个整数,1000
是整数的内容,rn
是字符串的结束符文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
多行字符串
多行字符串是一个以 $
开头的字符串,后面跟实际发送字节数,rn
结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
例如:文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html
$6rnfoobarrn
表示一个多行字符串,6
是字符串的长度,foobar
是字符串的内容,rn
是字符串的结束符$0rnrn
表示一个空字符串$16rnfoobarrnfoobarrn
表示一个多行字符串,14
是字符串的长度,foobarrnfoobar
是字符串的内容,rn
是字符串的结束符$-1rn
表示null
数组
数组是一个以 *
开头的字符串,后面跟实际数组元素个数,rn
结尾
例如:
SET key value
:*3rn$3rnSETrn$3rnkeyrn$5rnvaluern
表示一个数组,*3
是数组的元素个数,$3
是表示后面成员的长度,SET
,key
,value
是数组的元素,rn
是字符串的结束符
实现常量正常回复
conn
在 redis
协议层代表是一个 redis
的连接,所以我们抽象出一个 Connection
接口,定义在 interface/resp/conn.go
文件中
定义接口的主要目的是未来可能会有不同的 redis
客户端实现
go
type Connection interface {
Write([]byte) error // 写入数据
GetDBIndex() int // 获取 db 的
SelectDB(int) // 切换 redis 数据库
}
我们再来定义一个服务端回复客户端的消息的接口,定义在 interface/resp/reply.go
文件中,作用是把回复的内容转成字节,因为 tcp
协议传输的就是字节流
go
type Reply interface {
ToBytes() []byte // 把回复的内容转成字节
}
接口定义完之后,我们需要实现这两个接口相关的方法,定义在 resp/reply
目录中
首先来实现一些固定的回复,定义在 resp/reply/consts.go
文件中
比如:
- 回复
PONG
- 回复
OK
- 回复
null
ping
ping
指令标识:*1rn$4rnpingrn
回复 PONG
实在用户发送 ping
指令的时候,redis
服务器回复的内容,我们会给用户的指令是 +PONGrn
,实现如下:
go
// pong
type PongReply struct{}
var pongBytes = []byte("+PONGrn")
func MakePongReply() *PongReply {
return &PongReply{}
}
func (p *PongReply) ToBytes() []byte {
return pongBytes
}
ok
回复 OK
是在用户发送一些命令的时候,redis
服务器正常响应的内容,我们会给用户的指令是 +OKrn
,实现如下:
go
// ok
type OkReply struct{}
var okBytes = []byte("+OKrn")
func MakeOkReply() *OkReply {
return &OkReply{}
}
func (r *OkReply) ToBytes() []byte {
return okBytes
}
nil
回复 nil
是在用户发送一些命令的时候,redis
服务器未找到对应的值,我们会给用户的指令是 $-1rn
,实现如下:
go
// nil
type NullBulkReply struct{}
var nullBulkBytes = []byte("$-1rn")
func MakeNullBulkReply() *NullBulkReply {
return &NullBulkReply{}
}
func (n NullBulkReply) ToBytes() []byte {
return nullBulkBytes
}
实现常量异常回复
我们先定义一个异常回复的接口,定义在 interface/resp/reply.go
文件中
go
type ErrorReply interface {
Error() string
ToBytes() []byte
}
异常回复的实现定义在 resp/reply/error.go
文件中,主要实现两种异常回复:
- 未知错误
- 参数错误
未知错误
未知错误是用户发送的命令不在 redis
服务器支持的命令中,我们会给用户的指令是 -ERR unknownrn
,实现如下:
go
// 未知错误
type UnknownErrReply struct{}
func MakeUnknownErrReply() *UnknownErrReply {
return &UnknownErrReply{}
}
var unknownErrBytes = []byte("-Err unknownrn")
func (u UnknownErrReply) Error() string {
return "Err unknown"
}
func (u UnknownErrReply) ToBytes() []byte {
return unknownErrBytes
}
参数错误
参数错误是用户在发送命令的时候,参数个数不对,我们会给用户的指令是 -ERR wrong number of arguments for 'command' commandrn
,实现如下:
go
// 参数错误
type ArgNumErrReply struct {
Cmd string
}
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
return &ArgNumErrReply{Cmd: cmd}
}
func (r *ArgNumErrReply) Error() string {
return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}
func (r *ArgNumErrReply) ToBytes() []byte {
return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' commandrn")
}
实现自定义回复
实现了常量回复和异常回复之后,我们再来实现自定义回复,定义在 resp/reply/reply.go
文件中
自定义错误回复可以更灵活的响应各种错误信息,主要有以下几种:
- 单个字符串回复
- 多个字符串回复
- 状态回复
- 数字回复
- 标准错误回复
单个字符串回复
在 redis
中,Bulk
代表一个块或者字符串的意思
比如要回复一个字符串 foobar
,我们回复给用户的指令是 $6rnfoobarrn
,所以 ToBytes
方法的实现如下:
go
type BulkReply struct {
Arg []byte
}
func MakeBulkReply(arg []byte) *BulkReply {
return &BulkReply{Arg: arg}
}
func (b *BulkReply) ToBytes() []byte {
// 如果是空,应该回复 $-1rn
if len(b.Arg) == 0 {
return nullBulkBytes
}
// $6rnfoobarrn
return []byte("$" + strconv.Itoa(len(b.Arg)) + CRLF + string(b.Arg) + CRLF)
}
多个字符串回复
多个字符串回复是一个数组,数组中的每个元素都是一个字符串
比如回复这个指令:*3rn$3rnSETrn$3rnkeyrn$5rnvaluern
Args
中保存的是 [SET KEY VALUE]
的字节,我们需要取出数组中的每一项,拼接成字符串,最后回复给用户
拼接字符串使用 bytes.Buffer
,因为 bytes.Buffer
是一个缓冲区,可以高效的拼接字符串
go
type MultiBulkReply struct {
Args [][]byte
}
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
return &MultiBulkReply{Args: args}
}
func (r *MultiBulkReply) ToBytes() []byte {
argLen := len(r.Args)
var buf bytes.Buffer
// *3rn$3rnSETrn$3rnkeyrn$5rnvaluern
// *3rn
buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
for _, arg := range r.Args {
// 如果是空,应该回复 $-1rn
if arg == nil {
buf.WriteString(string(nullBulkReplyBytes) + CRLF)
} else {
// $3rnSETrn
// $3rnkeyrn
// $5rnvaluern
buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
}
}
return buf.Bytes()
}
状态回复
状态回复是一个以 +
开头的字符串,rn
结尾
go
type StatusReply struct {
Status string
}
func MakeStatusReply(status string) *StatusReply {
return &StatusReply{Status: status}
}
func (r *StatusReply) ToBytes() []byte {
return []byte("+" + r.Status + CRLF)
}
数字回复
数字回复是一个以 :
开头的字符串,rn
结尾
go
type IntReply struct {
Code int64
}
func MakeIntReply(code int64) *IntReply {
return &IntReply{Code: code}
}
func (r *IntReply) ToBytes() []byte {
return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}
标准错误回复
标准错误回复是一个以 -
开头的字符串,rn
结尾
go
type StandardErrReply struct {
Status string
}
func MakeErrReply(status string) *StandardErrReply {
return &StandardErrReply{Status: status}
}
func (r *StandardErrReply) ToBytes() []byte {
return []byte("-" + r.Status + CRLF)
}
func (r *StandardErrReply) Error() string {
return r.Status
}
实现解析器 ParseStream
接下来实现用户发过来的指令,如何解析成 RESP
协议的数据类型
ParseStream
函数定义在 resp/parse/parse.go
文件中
首先定义一个结构体 Payload
,用来保存解析后的数据,因为用户发送的指令和回复给用户的指令格式一样,所以 Payload
中的 Data
类型可以使用 resp.Reply
go
type Payload struct {
Data resp.Reply
Err error
}
我们还要定义一个解析器的状态
readingMultiLine
:正在解析的数据是单行还是多行expectedArgsCount
:正在读取的指令有几个参数,比如SET KEY VALUE
指令,解析出来是3
个参数msgType
:传过来的消息类型,比如+
、-
、:
、$
、*
,也就是RESP
协议的数据类型args
:传过来的数据本身,比如SET KEY VALUE
,解析出来是[SET KEY VALUE]
的字节bulkLen
:数据块的长度
go
type readState struct {
readingMultiLine bool // 正在解析的数据是单行还是多行
expectedArgsCount int // 正在读取的指令有几个参数
msgType byte // 传过来的消息类型
args [][]byte // 传过来的数据本身
bulkLen int64 // 数据块的长度
}
判断解析是否完成,通过判断 expectedArgsCount
和 args
的长度是否相等,同时要排除掉参数为 0
的情况
go
func (s *readState) finished() bool {
return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}
ParseStream
函数是解析器的入口,解析器的核心是 parse0
函数
解析的过程应该是异步的,来一条指令解析一条指令,所以在 ParseStream
函数中调用 parse0
需要用 goroutine
ParseStream
函数接收一个 io.Reader
类型的参数(上层 tcp
协议读取的字节流是 io.Reader
类型),结果通过 chan
返回出去(异步返回,不会阻塞)
go
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}
解析器最核心部分是 parse0
函数,定义在 resp/parse/parse.go
文件中
接收两个参数:
io.Reader
:最上层tcp
读取的字节流是io.Reader
类型chan<- *Payload
:解析应该是异步的,来一条指令解析一条指令是一个并发的过程,所以使用chan<- *Payload
类型
go
func parse0(reader io.Reader, ch chan<- *Payload) {}
实现 readLine
在实现核心解析器 parse0
之前,我们先实现一个 readLine
函数,用来读取一行数据
readLine
接收两个参数:
*bufio.Reader
:bufio.Reader
是一个带缓冲的io.Reader
,可以提高读取效率*readState
:解析器的状态
返回三个值:
[]byte
:读取的数据bool
:有没有io
错误error
:错误本身,读取的数据有没问题
怎么读取一行数据呢?读取的数据,直到遇到 rn
算一行
但是这样读取数据会有问题
如果数据本身带有 rn
,那么这个数据就还没有读完,这时候应该按照数据的长度读取数据,数据长度是由 $数字
决定的
所以就要分情况读取
- 如果数据是
rn
结尾,直接读取 - 如果数据是
$数字
,按照数字读取数据
这两种情况如何分别呢?用 state.bulkLen == 0
,如果 bulkLen
是 0
,说明没有预设的个数,直接读取数据
解析指令:*3rn$3rnSETrn$3rnkeyrn$5rnvaluern
state.bulkLen == 0
解析的是 *3rn
,$3rn
这些数据,而 SETrn
,kernyrn
将会进入 else
分支中
go
// *3rn$3rnSETrn$3rnkeyrn$5rnvaluern
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var msg []byte
var err error
// 1. rn 切分
// state.bulkLen == 0,表示没有预设的个数,直接读取数据
if state.bulkLen == 0 {
// 读到 n 结束
msg, err = bufReader.ReadBytes('n')
if err != nil {
return nil, true, err
}
// n 前面是不是 r,如果不是,说明数据有问题
// 数据没有长度,说明数据也有问题
if len(msg) == 0 || msg[len(msg)-2] != 'r' {
return nil, false, errors.New("protocol error:" + string(msg))
}
} else {
// 2. 如果有 $数字,按照数字读取字符
// 读取 SETrn,kernyrn 这样的内容
// 所以这个长度是 set 长度+ rn 长度
msg = make([]byte, state.bulkLen+2)
// io.ReadFull 读取指定长度的数据,塞满 msg
_, err = io.ReadFull(bufReader, msg)
if err != nil {
return nil, true, nil
}
// 判断数据是否有问题
// 数据长度不对
// 倒数第一个字符是不是 n,倒数第二个字符是不是 r
if len(msg) == 0 || msg[len(msg)-2] != 'r' || msg[len(msg)-1] != 'n' {
return nil, false, errors.New("protocol error:" + string(msg))
}
// 读完数据之后,把 bulkLen 设置为 0,下次读取数据的时候,就会直接读取数据
state.bulkLen = 0
}
return msg, false, nil
}
实现 parseMultiBulkHeader
readLine
函数作用只是将数据切出来,但数据本身的含义还不知道,所以我们需要一个函数来解析数据的含义
解析数据的含义分为两个部分:header
和 body
,header
是数据的长度,
数据的长度有两种:
$4rnPINGrn
:$4
*3rn$3rnSETrn$3rnkeyrn$5rnvaluern
:*3
parseMultiBulkHeader
函数是处理 *3
这种数据的长度,parseBulkHeader
函数是处理 $4
这种数据的长度
parseMultiBulkHeader
函数接收两个参数:
[]byte
:读取的数据*readState
:解析器的状态
go
// *3rn$3rnSETrn$3rnkeyrn$5rnvaluern
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
// 将 $3rn 的 3 解析出来
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error:" + string(msg))
}
// 如果 expectedLine == 0,表示没有数据,直接返回
if expectedLine == 0 {
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {
// 如果 expectedLine > 0,表示有数据,设置状态
// 设置数据的长度
state.msgType = msg[0]
// 设置读取的数据是多行
state.readingMultiLine = true
// 设置数据的个数
state.expectedArgsCount = int(expectedLine)
// 设置数据的长度
state.args = make([][]byte, 0, expectedLine)
return nil
} else {
return errors.New("protocol error:" + string(msg))
}
}
实现 parseBulkHeader
parseBulkHeader
函数和 parseMultiBulkHeader
函数类似,需要多处理一种 state.bulkLen == -1
的情况
parseBulkHeader
函数接收两个参数:
[]byte
:读取的数据*readState
:解析器的状态
go
// $4rnPINGrn
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error:" + string(msg))
}
// 处理 $-1rn
if state.bulkLen == -1 {
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
// 这种情况也是读取多行数据
// $4rnPINGrn,有两组 rn
state.readingMultiLine = true
// 只需要接收 ping 一个参数
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error:" + string(msg))
}
}
实现 parseSingleLineReply
客户端也有可能会发送 +OK
,-ERR
这样的数据,所以我们也需要一个解析器来解析这些数据
parseSingleLineReply
可以一次性把一条指令解析完,所以它的返回值是 resp.Reply
类型
go
// +OKrn -ERRrn :5rn
func parseSingleLineReply(msg []byte) (resp.Reply, error) {
// 把 rn 去掉
str := strings.TrimSuffix(string(msg), "rn")
var result resp.Reply
// 拿到第一个字符,判断是什么类型的数据
switch msg[0] {
case '+':
// 拿到 + 后面的数据,返回一个状态回复
result = reply.MakeStatusReply(str[1:])
case '-':
// 拿到 - 后面的数据,返回一个标准错误回复
result = reply.MakeErrReply(str[1:])
case ':':
// 拿到 : 后面的数据,返回一个数字回复
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {
return nil, errors.New("protocol error:" + string(msg))
}
result = reply.MakeIntReply(val)
}
return result, nil
}
实现 readBody
readBody
函数是解析数据内容
数据的内容有两种:
$4rnPINGrn
:$4rn
已经被解析了,剩余未解析的PINGrn
*3rn$3rnSETrn$3rnkeyrn$5rnvaluern
:*3rn
已经被解析了,剩余未解析的$3rnSETrn$3rnkeyrn$5rnvaluern
go
// *3rn$3rnSETrn$3rnkeyrn$5rnvaluern,*3rn 已经被解析了,剩余未解析的 $3rnSETrn$3rnkeyrn$5rnvaluern
// $4rnPINGrn PINGrn,$4rn 已经被解析了,剩余未解析的 PINGrn
func readBody(msg []byte, state *readState) error {
// 切掉 rn
line := msg[0 : len(msg)-2]
var err error
// $3 这样的指令
// 如果第一个字符是 $,说明后面是描述数据的长度
if line[0] == '$' {
// 拿到 $ 后面的数据,解析数据的长度
state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return errors.New("protocol error:" + string(msg))
}
// 如果数据长度 <= 0,表示数据为空
if state.bulkLen <= 0 {
// 往 state.args 中添加一个空数据
state.args = append(state.args, []byte{})
state.bulkLen = 0
}
} else {
// SET 这样的指令
state.args = append(state.args, line)
// 这里不需要设置 bulkLen,因为在上面那个分支里已经设置了
}
return nil
}
实现 parse0
实现完解析器需要的工具函数之后,我们再来实现核心解析器 parse0
函数
handler
会调用 ParseStream
函数,ParseStream
函数会返回出去一个 chan
,parse0
会把解析的结果放到 chan
中
parse0
内部是个无限循环,不断的读取数据,解析数据,直到 io.EOF
,表示数据读取完毕,跳出循环
go
func parser0(reader io.Reader, ch chan<- *Payload) {
for {
msg, ioErr, err := readLine(bufReader, &state)
// 读到 io.EOF,表示数据读取完毨,跳出 for 循环
// 如果是 io.EOF,ioErr 变量值为 true
if err != nil {
if ioErr {
ch <- &Payload{Err: err}
close(ch)
return
}
// 如果 redis 协议解析出错,返回一个错误回复即可,无需结束程序
ch <- &Payload{Err: err}
state = readState{}
continue
}
}
}
但是如果某一条数据解析出现错误,出现了 panic
,那么整个程序就会挂掉,所以我们需要使用 recover
来捕获错误
go
func parser0(reader io.Reader, ch chan<- *Payload) {
defer func() {
if err := recover(); err != nil {
logger.Error(string(debug.Stack()))
}
}()
}
在处理完异常之后,就是处理正常的解析
正常解析数据需要判断是不是多行解析模式,根据 state.readingMultiLine
来判断
也就是说只有头信息不是多行,因为在 parseMultiBulkHeader
和 parseBulkHeader
两个解析头信息的函数中设置了 state.readingMultiLine = true
所以 state.readingMultiLine
为 true
的时候,是在解析 body
,state.readingMultiLine
为 false
的时候,是在解析 header
go
// 解析 * 开头的指令
if msg[0] == '*' {
err := parseMultiBulkHeader(msg, &state)
if err != nil {
ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
state = readState{}
continue
}
if state.expectedArgsCount == 0 {
ch <- &Payload{Data: reply.MakeEmptyMultiBulkReply()}
state = readState{}
continue
}
} else if msg[0] == '$' {
// 解析 $ 开头的指令
err := parseBulkHeader(msg, &state)
if err != nil {
ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
state = readState{}
continue
}
if state.bulkLen == -1 {
ch <- &Payload{Data: reply.MakeNullBulkReply()}
state = readState{}
continue
}
} else {
// 其他指令,比如 +OKrn -ERRrn 这种
result, err := parseSingleLineReply(msg)
ch <- &Payload{
Data: result,
Err: err,
}
state = readState{}
continue
}
解析完 header
之后,就要解析 body
了,解析 body
就是 state.readingMultiLine
为 true
的情况
go
// 每一个循环都会进来,将 msg 交给 readBody 函数处理
// readBody 函数会将 msg 解析成一个个的数据,然后添加到 state.args 中
err := readBody(msg, &state)
if err != nil {
ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
state = readState{}
continue
}
// 当解析完成之后,将解析的数据返回给用户
if state.finished() {
var result resp.Reply
if state.msgType == '*' {
// * 表示要返回一个数组
result = reply.MakeMultiBulkReply(state.args)
} else if state.msgType == '$' {
// $ 表示要返回一个字符串
result = reply.MakeBulkReply(state.args[0])
}
// 将数据通过 chan 返回给上层
ch <- &Payload{
Data: result,
Err: err,
}
state = readState{}
}
实现 connection
redis
解析协议实现后,我们再来实现 connection
connection
表示 redis
连接之后的记录的每一个用户信息
go
type Connection struct {
conn net.Conn // 连接信息
waitingReply wait.Wait // 在关掉 server 之前,需要等待回复给用户的指令完成
mu sync.Mutex // 在操作 conn 的时候,需要加锁
selectedDB int // 现在用户在操作哪个 db
}
然后这个结构体需要实现 resp.Connection
接口
go
// 给用户回复消息
func (c *Connection) Write(bytes []byte) error {
// 如果没有数据,直接返回
if len(bytes) == 0 {
return nil
}
// 写入数据的时候,需要加锁
c.mu.Lock()
// 每次写入数据,等待回复的指令 +1
c.waitingReply.Add(1)
defer func() {
// 写入数据完成之后,等待回复的指令 -1
c.waitingReply.Done()
// 解锁
c.mu.Unlock()
}()
// 写入数据
_, err := c.conn.Write(bytes)
return err
}
// 用户正在使用哪个 db
func (c *Connection) GetDBIndex() int {
return c.selectedDB
}
// 用户选择自己想用的 db
func (c *Connection) SelectDB(dbNum int) {
c.selectedDB = dbNum
}
以及需要实现一个 Close
方法,用来关闭连接
go
// 关闭连接直接调用 conn.Close() 即可
// 但是在关闭连接之前,需要等待回复给用户的指令完成
func (c *Connection) Close() error {
c.waitingReply.WaitWithTimeout(10 * time.Second)
_ = c.conn.Close()
return nil
}
实现 RespHandler
tcp
服务器需要一个 handler
来处理用户的请求
它的用途是调用 ParseStream
函数,解析用户的指令,然后根据用户的指令,返回给用户相应的回复
go
type RespHandler struct {
activeConn sync.Map // 记录用户的连接
db databaseface.Database // 核心数据库,操作 kev value 相关的逻辑
closing atomic.Boolean // 是否关闭
}
结构体定义好之后,就要实现 tcp.Handler
接口
这个接口有两个方法:
Handle
:处理用户的请求Close
:关闭连接
Close
Close
方法是关闭连接,关闭连接的时候,需要关闭所有的连接,关闭数据库
这里要注意的是 activeConn
是 sync.Map
类型,sync.Map
类型是线程安全的,所以在关闭连接的时候,需要遍历 activeConn
遍历的方法是调用 activeConn.Range
方法,接收一个匿名函数,在这个函数中做关闭连接的操作,并且最后需要返回 true
,表示继续遍历
go
func (r *RespHandler) Close() error {
logger.Info("handler shutting down")
// 将关闭连接的标志设置为 true
r.closing.Set(true)
// 使用 Range 遍历 sync.Map
// 接收一个匿名函数,在这个函数中做关闭连接的操作,并且最后需要返回 true,表示继续遍历
r.activeConn.Range(func(key, value any) bool {
// 需要将 key 断言成 *connection.Connection 类型
client := key.(*connection.Connection)
// 关闭连接
_ = client.Close()
// 返回 true,表示继续遍历,返回 false,表示停止遍历
return true
})
// 关闭数据库
r.db.Close()
return nil
}
Handle
在实现 Handle
方法之前,我们需要先实现一个 closeClient
方法,这个方法是关闭一个连接
它和 Close
方法的区别是,Close
方法是关闭所有的连接,而 closeClient
方法是关闭一个连接
它的用途是在连接出错的时候,关闭连接,释放资源
go
// 关闭一个客户端
func (r *RespHandler) closeClient(client *connection.Connection) {
// 关闭客户端
_ = client.Close()
// 关闭客户端后的善后工作
r.db.AfterClientClose(client)
// 从 sync.Map 中删除掉这个客户端
r.activeConn.Delete(client)
}
go
// 处理用户的指令
func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
//是不是在关闭中
if r.closing.Get() {
_ = conn.Close()
}
// 新建连接
client := connection.NewConn(conn)
// 将连接存储到 sync.Map 中
r.activeConn.Store(client, struct{}{})
// 调用 ParseStream 函数,解析用户的指令
// 通过 chan 拿到解析后的结果
ch := parser.ParseStream(conn)
for payload := range ch {
// 处理 payload 中的错误
if payload.Err != nil {
// io 错误
// io.EOF 表示已经解析到结尾了,连接可以正常关闭了
// io.ErrUnexpectedEOF 表示在预期的数据结束之前遇到了 EOF,可能发生了数据传输突然中断了
// use of close network connection 表示使用已经关闭的连接,这个时候也需要关闭连接
// 只要出现这三种情况,就需要关闭连接
if payload.Err == io.EOF || errors.Is(payload.Err, io.ErrUnexpectedEOF) || strings.Contains(payload.Err.Error(), "use of close network connection") {
r.closeClient(client)
logger.Infof("connection closed: " + client.RemoteAddr().String())
return
}
// 解析 resp 协议出错,MakeErrReply 作用是将错误信息前面加上 -ERR,后面加上 rn
errReply := reply.MakeErrReply(payload.Err.Error())
// 将解析 resp 协议出错的信息返回给用户
err := client.Write(errReply.ToBytes())
// 给用户写入数据可能会出错,如果出错,关闭连接
if err != nil {
r.closeClient(client)
logger.Infof("connection closed: " + client.RemoteAddr().String())
return
}
continue
}
// 没有数据
if payload.Data == nil {
continue
}
replyResult, ok := payload.Data.(*reply.MultiBulkReply)
if !ok {
logger.Infof("require multi bulk reply")
continue
}
// 调用数据库的 Exec 方法,执行用户的指令
result := r.db.Exec(client, replyResult.Args)
// 将执行结果返回给用户
if result != nil {
_ = client.Write(result.ToBytes())
} else {
// 如果执行结果为空,返回一个未知错误
_ = client.Write(unknownErrReplyBytes)
}
}
}
评论