用 go 实现 redis resp 协议解析器

零 Golang教程评论69字数 15771阅读52分34秒阅读模式

用 go 实现 redis resp 协议解析器

redis 网络协议

Redis Serialization Protocol 是 Redis 的网络协议,简称 RESP 它是一种文本协议,基于 TCP 协议,用于 Redis 服务器和客户端之间的通信

RESP 协议的设计目标是简单、易于实现,同时保证高效的网络传输,它的设计思想是将数据序列化为文本协议,以便于网络传输,同时保证序列化和反序列化的效率文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

RESP 协议的数据类型主要有以下几种:文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

  • 正常消息
  • 错误消息
  • 整数
  • 多行字符串
  • 数组

正常消息

简单字符串是一个以 + 开头的字符串,rn 结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

例如:+OKrn 表示一个简单字符串,OK 是字符串的内容,rn 是字符串的结束符文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

错误消息

错误消息是一个以 - 开头的字符串,rn 结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

例如:-ERR unknown command 'foobar'rn 表示一个错误消息,ERR unknown command 'foobar' 是错误消息的内容,rn 是字符串的结束符文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

整数

整数是一个以 : 开头的字符串,rn 结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

例如::1000rn 表示一个整数,1000 是整数的内容,rn 是字符串的结束符文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

多行字符串

多行字符串是一个以 $ 开头的字符串,后面跟实际发送字节数,rn 结尾文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

例如:文章源自灵鲨社区-https://www.0s52.com/bcjc/golangjc/16660.html

  • $6rnfoobarrn 表示一个多行字符串,6 是字符串的长度,foobar 是字符串的内容,rn 是字符串的结束符
  • $0rnrn 表示一个空字符串
  • $16rnfoobarrnfoobarrn 表示一个多行字符串,14 是字符串的长度,foobarrnfoobar 是字符串的内容,rn 是字符串的结束符
  • $-1rn 表示 null

数组

数组是一个以 * 开头的字符串,后面跟实际数组元素个数,rn 结尾

例如:

  • SET key value*3rn$3rnSETrn$3rnkeyrn$5rnvaluern 表示一个数组,*3 是数组的元素个数,$3 是表示后面成员的长度,SETkeyvalue 是数组的元素,rn 是字符串的结束符

实现常量正常回复

conn 在 redis 协议层代表是一个 redis 的连接,所以我们抽象出一个 Connection 接口,定义在 interface/resp/conn.go 文件中

定义接口的主要目的是未来可能会有不同的 redis 客户端实现

go

复制代码
type Connection interface {
  Write([]byte) error // 写入数据
  GetDBIndex() int    // 获取 db 的
  SelectDB(int)       // 切换 redis 数据库
}

我们再来定义一个服务端回复客户端的消息的接口,定义在 interface/resp/reply.go 文件中,作用是把回复的内容转成字节,因为 tcp 协议传输的就是字节流

go

复制代码
type Reply interface {
  ToBytes() []byte // 把回复的内容转成字节
}

接口定义完之后,我们需要实现这两个接口相关的方法,定义在 resp/reply 目录中

首先来实现一些固定的回复,定义在 resp/reply/consts.go 文件中

比如:

  • 回复 PONG
  • 回复 OK
  • 回复 null

ping

ping 指令标识:*1rn$4rnpingrn

回复 PONG 实在用户发送 ping 指令的时候,redis 服务器回复的内容,我们会给用户的指令是 +PONGrn,实现如下:

go

复制代码
// pong
type PongReply struct{}
var pongBytes = []byte("+PONGrn")
func MakePongReply() *PongReply {
  return &PongReply{}
}
func (p *PongReply) ToBytes() []byte {
  return pongBytes
}

ok

回复 OK 是在用户发送一些命令的时候,redis 服务器正常响应的内容,我们会给用户的指令是 +OKrn,实现如下:

go

复制代码
// ok
type OkReply struct{}
var okBytes = []byte("+OKrn")
func MakeOkReply() *OkReply {
  return &OkReply{}
}
func (r *OkReply) ToBytes() []byte {
  return okBytes
}

nil

回复 nil 是在用户发送一些命令的时候,redis 服务器未找到对应的值,我们会给用户的指令是 $-1rn,实现如下:

go

复制代码
// nil
type NullBulkReply struct{}
var nullBulkBytes = []byte("$-1rn")
func MakeNullBulkReply() *NullBulkReply {
  return &NullBulkReply{}
}
func (n NullBulkReply) ToBytes() []byte {
  return nullBulkBytes
}

实现常量异常回复

我们先定义一个异常回复的接口,定义在 interface/resp/reply.go 文件中

go

复制代码
type ErrorReply interface {
  Error() string
  ToBytes() []byte
}

异常回复的实现定义在 resp/reply/error.go 文件中,主要实现两种异常回复:

  • 未知错误
  • 参数错误

未知错误

未知错误是用户发送的命令不在 redis 服务器支持的命令中,我们会给用户的指令是 -ERR unknownrn,实现如下:

go

复制代码
// 未知错误
type UnknownErrReply struct{}
func MakeUnknownErrReply() *UnknownErrReply {
  return &UnknownErrReply{}
}
var unknownErrBytes = []byte("-Err unknownrn")
func (u UnknownErrReply) Error() string {
  return "Err unknown"
}
func (u UnknownErrReply) ToBytes() []byte {
  return unknownErrBytes
}

参数错误

参数错误是用户在发送命令的时候,参数个数不对,我们会给用户的指令是 -ERR wrong number of arguments for 'command' commandrn,实现如下:

go

复制代码
// 参数错误
type ArgNumErrReply struct {
  Cmd string
}
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
  return &ArgNumErrReply{Cmd: cmd}
}
func (r *ArgNumErrReply) Error() string {
  return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}
func (r *ArgNumErrReply) ToBytes() []byte {
  return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' commandrn")
}

实现自定义回复

实现了常量回复和异常回复之后,我们再来实现自定义回复,定义在 resp/reply/reply.go 文件中

自定义错误回复可以更灵活的响应各种错误信息,主要有以下几种:

  • 单个字符串回复
  • 多个字符串回复
  • 状态回复
  • 数字回复
  • 标准错误回复

单个字符串回复

在 redis 中,Bulk 代表一个块或者字符串的意思

比如要回复一个字符串 foobar,我们回复给用户的指令是 $6rnfoobarrn,所以 ToBytes 方法的实现如下:

go

复制代码
type BulkReply struct {
  Arg []byte
}
func MakeBulkReply(arg []byte) *BulkReply {
  return &BulkReply{Arg: arg}
}
func (b *BulkReply) ToBytes() []byte {
  // 如果是空,应该回复 $-1rn
  if len(b.Arg) == 0 {
    return nullBulkBytes
  }
  // $6rnfoobarrn
  return []byte("$" + strconv.Itoa(len(b.Arg)) + CRLF + string(b.Arg) + CRLF)
}

多个字符串回复

多个字符串回复是一个数组,数组中的每个元素都是一个字符串

比如回复这个指令:*3rn$3rnSETrn$3rnkeyrn$5rnvaluern

Args 中保存的是 [SET KEY VALUE] 的字节,我们需要取出数组中的每一项,拼接成字符串,最后回复给用户

拼接字符串使用 bytes.Buffer,因为 bytes.Buffer 是一个缓冲区,可以高效的拼接字符串

go

复制代码
type MultiBulkReply struct {
  Args [][]byte
}
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
  return &MultiBulkReply{Args: args}
}
func (r *MultiBulkReply) ToBytes() []byte {
  argLen := len(r.Args)
  var buf bytes.Buffer
  // *3rn$3rnSETrn$3rnkeyrn$5rnvaluern
  // *3rn
  buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
  for _, arg := range r.Args {
    // 如果是空,应该回复 $-1rn
    if arg == nil {
      buf.WriteString(string(nullBulkReplyBytes) + CRLF)
    } else {
      // $3rnSETrn
      // $3rnkeyrn
      // $5rnvaluern
      buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
    }
  }
  return buf.Bytes()
}

状态回复

状态回复是一个以 + 开头的字符串,rn 结尾

go

复制代码
type StatusReply struct {
  Status string
}
func MakeStatusReply(status string) *StatusReply {
  return &StatusReply{Status: status}
}
func (r *StatusReply) ToBytes() []byte {
  return []byte("+" + r.Status + CRLF)
}

数字回复

数字回复是一个以 : 开头的字符串,rn 结尾

go

复制代码
type IntReply struct {
  Code int64
}
func MakeIntReply(code int64) *IntReply {
  return &IntReply{Code: code}
}
func (r *IntReply) ToBytes() []byte {
  return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}

标准错误回复

标准错误回复是一个以 - 开头的字符串,rn 结尾

go

复制代码
type StandardErrReply struct {
  Status string
}
func MakeErrReply(status string) *StandardErrReply {
  return &StandardErrReply{Status: status}
}
func (r *StandardErrReply) ToBytes() []byte {
  return []byte("-" + r.Status + CRLF)
}
func (r *StandardErrReply) Error() string {
  return r.Status
}

实现解析器 ParseStream

接下来实现用户发过来的指令,如何解析成 RESP 协议的数据类型

ParseStream 函数定义在 resp/parse/parse.go 文件中

首先定义一个结构体 Payload,用来保存解析后的数据,因为用户发送的指令和回复给用户的指令格式一样,所以 Payload 中的 Data 类型可以使用 resp.Reply

go

复制代码
type Payload struct {
  Data resp.Reply
  Err  error
}

我们还要定义一个解析器的状态

  • readingMultiLine:正在解析的数据是单行还是多行
  • expectedArgsCount:正在读取的指令有几个参数,比如 SET KEY VALUE 指令,解析出来是 3 个参数
  • msgType:传过来的消息类型,比如 +-:$*,也就是 RESP 协议的数据类型
  • args:传过来的数据本身,比如 SET KEY VALUE,解析出来是 [SET KEY VALUE] 的字节
  • bulkLen:数据块的长度

go

复制代码
type readState struct {
  readingMultiLine  bool     // 正在解析的数据是单行还是多行
  expectedArgsCount int      // 正在读取的指令有几个参数
  msgType           byte     // 传过来的消息类型
  args              [][]byte // 传过来的数据本身
  bulkLen           int64    // 数据块的长度
}

判断解析是否完成,通过判断 expectedArgsCount 和 args 的长度是否相等,同时要排除掉参数为 0 的情况

go

复制代码
func (s *readState) finished() bool {
  return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}

ParseStream 函数是解析器的入口,解析器的核心是 parse0 函数

解析的过程应该是异步的,来一条指令解析一条指令,所以在 ParseStream 函数中调用 parse0 需要用 goroutine

ParseStream 函数接收一个 io.Reader 类型的参数(上层 tcp 协议读取的字节流是 io.Reader 类型),结果通过 chan 返回出去(异步返回,不会阻塞)

go

复制代码
func ParseStream(reader io.Reader) <-chan *Payload {
  ch := make(chan *Payload)
  go parse0(reader, ch)
  return ch
}

解析器最核心部分是 parse0 函数,定义在 resp/parse/parse.go 文件中

接收两个参数:

  • io.Reader:最上层 tcp 读取的字节流是 io.Reader 类型
  • chan<- *Payload:解析应该是异步的,来一条指令解析一条指令是一个并发的过程,所以使用 chan<- *Payload 类型

go

复制代码
func parse0(reader io.Reader, ch chan<- *Payload) {}

实现 readLine

在实现核心解析器 parse0 之前,我们先实现一个 readLine 函数,用来读取一行数据

readLine 接收两个参数:

  • *bufio.Readerbufio.Reader 是一个带缓冲的 io.Reader,可以提高读取效率
  • *readState:解析器的状态

返回三个值:

  • []byte:读取的数据
  • bool:有没有 io 错误
  • error:错误本身,读取的数据有没问题

怎么读取一行数据呢?读取的数据,直到遇到 rn 算一行

但是这样读取数据会有问题

如果数据本身带有 rn,那么这个数据就还没有读完,这时候应该按照数据的长度读取数据,数据长度是由 $数字 决定的

所以就要分情况读取

  1. 如果数据是 rn 结尾,直接读取
  2. 如果数据是 $数字,按照数字读取数据

这两种情况如何分别呢?用 state.bulkLen == 0,如果 bulkLen 是 0,说明没有预设的个数,直接读取数据

解析指令:*3rn$3rnSETrn$3rnkeyrn$5rnvaluern

state.bulkLen == 0 解析的是 *3rn$3rn 这些数据,而 SETrnkernyrn 将会进入 else 分支中

go

复制代码
// *3rn$3rnSETrn$3rnkeyrn$5rnvaluern
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
  var msg []byte
  var err error
  // 1. rn 切分
  // state.bulkLen == 0,表示没有预设的个数,直接读取数据
  if state.bulkLen == 0 {
    // 读到 n 结束
    msg, err = bufReader.ReadBytes('n')
    if err != nil {
      return nil, true, err
    }
    // n 前面是不是 r,如果不是,说明数据有问题
    // 数据没有长度,说明数据也有问题
    if len(msg) == 0 || msg[len(msg)-2] != 'r' {
      return nil, false, errors.New("protocol error:" + string(msg))
    }
  } else {
    // 2. 如果有 $数字,按照数字读取字符
    // 读取 SETrn,kernyrn 这样的内容
    // 所以这个长度是 set 长度+ rn 长度
    msg = make([]byte, state.bulkLen+2)
    // io.ReadFull 读取指定长度的数据,塞满 msg
    _, err = io.ReadFull(bufReader, msg)
    if err != nil {
      return nil, true, nil
    }
    // 判断数据是否有问题
    // 数据长度不对
    // 倒数第一个字符是不是 n,倒数第二个字符是不是 r
    if len(msg) == 0 || msg[len(msg)-2] != 'r' || msg[len(msg)-1] != 'n' {
      return nil, false, errors.New("protocol error:" + string(msg))
    }
    // 读完数据之后,把 bulkLen 设置为 0,下次读取数据的时候,就会直接读取数据
    state.bulkLen = 0
  }
  return msg, false, nil
}

实现 parseMultiBulkHeader

readLine 函数作用只是将数据切出来,但数据本身的含义还不知道,所以我们需要一个函数来解析数据的含义

解析数据的含义分为两个部分:header 和 bodyheader 是数据的长度,

数据的长度有两种:

  • $4rnPINGrn$4
  • *3rn$3rnSETrn$3rnkeyrn$5rnvaluern*3

parseMultiBulkHeader 函数是处理 *3 这种数据的长度,parseBulkHeader 函数是处理 $4 这种数据的长度

parseMultiBulkHeader 函数接收两个参数:

  • []byte:读取的数据
  • *readState:解析器的状态

go

复制代码
// *3rn$3rnSETrn$3rnkeyrn$5rnvaluern
func parseMultiBulkHeader(msg []byte, state *readState) error {
  var err error
  var expectedLine uint64
  // 将 $3rn 的 3 解析出来
  expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
  if err != nil {
    return errors.New("protocol error:" + string(msg))
  }
  // 如果 expectedLine == 0,表示没有数据,直接返回
  if expectedLine == 0 {
    state.expectedArgsCount = 0
    return nil
  } else if expectedLine > 0 {
    // 如果 expectedLine > 0,表示有数据,设置状态
    // 设置数据的长度
    state.msgType = msg[0]
    // 设置读取的数据是多行
    state.readingMultiLine = true
    // 设置数据的个数
    state.expectedArgsCount = int(expectedLine)
    // 设置数据的长度
    state.args = make([][]byte, 0, expectedLine)
    return nil
  } else {
    return errors.New("protocol error:" + string(msg))
  }
}

实现 parseBulkHeader

parseBulkHeader 函数和 parseMultiBulkHeader 函数类似,需要多处理一种 state.bulkLen == -1 的情况

parseBulkHeader 函数接收两个参数:

  • []byte:读取的数据
  • *readState:解析器的状态

go

复制代码
// $4rnPINGrn
func parseBulkHeader(msg []byte, state *readState) error {
  var err error
  state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
  if err != nil {
    return errors.New("protocol error:" + string(msg))
  }
  // 处理 $-1rn
  if state.bulkLen == -1 {
    return nil
  } else if state.bulkLen > 0 {
    state.msgType = msg[0]
    // 这种情况也是读取多行数据
    // $4rnPINGrn,有两组 rn
    state.readingMultiLine = true
    // 只需要接收 ping 一个参数
    state.expectedArgsCount = 1
    state.args = make([][]byte, 0, 1)
    return nil
  } else {
    return errors.New("protocol error:" + string(msg))
  }
}

实现 parseSingleLineReply

客户端也有可能会发送 +OK-ERR 这样的数据,所以我们也需要一个解析器来解析这些数据

parseSingleLineReply 可以一次性把一条指令解析完,所以它的返回值是 resp.Reply 类型

go

复制代码
// +OKrn -ERRrn :5rn
func parseSingleLineReply(msg []byte) (resp.Reply, error) {
  // 把 rn 去掉
  str := strings.TrimSuffix(string(msg), "rn")
  var result resp.Reply
  // 拿到第一个字符,判断是什么类型的数据
  switch msg[0] {
  case '+':
    // 拿到 + 后面的数据,返回一个状态回复
    result = reply.MakeStatusReply(str[1:])
  case '-':
    // 拿到 - 后面的数据,返回一个标准错误回复
    result = reply.MakeErrReply(str[1:])
  case ':':
    // 拿到 : 后面的数据,返回一个数字回复
    val, err := strconv.ParseInt(str[1:], 10, 64)
    if err != nil {
      return nil, errors.New("protocol error:" + string(msg))
    }
    result = reply.MakeIntReply(val)
  }
  return result, nil
}

实现 readBody

readBody 函数是解析数据内容

数据的内容有两种:

  • $4rnPINGrn$4rn 已经被解析了,剩余未解析的 PINGrn
  • *3rn$3rnSETrn$3rnkeyrn$5rnvaluern*3rn 已经被解析了,剩余未解析的 $3rnSETrn$3rnkeyrn$5rnvaluern

go

复制代码
// *3rn$3rnSETrn$3rnkeyrn$5rnvaluern,*3rn 已经被解析了,剩余未解析的 $3rnSETrn$3rnkeyrn$5rnvaluern
// $4rnPINGrn PINGrn,$4rn 已经被解析了,剩余未解析的 PINGrn
func readBody(msg []byte, state *readState) error {
  // 切掉 rn
  line := msg[0 : len(msg)-2]
  var err error
  // $3 这样的指令
  // 如果第一个字符是 $,说明后面是描述数据的长度
  if line[0] == '$' {
    // 拿到 $ 后面的数据,解析数据的长度
    state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
    if err != nil {
      return errors.New("protocol error:" + string(msg))
    }
    // 如果数据长度 <= 0,表示数据为空
    if state.bulkLen <= 0 {
      // 往 state.args 中添加一个空数据
      state.args = append(state.args, []byte{})
      state.bulkLen = 0
    }
  } else {
    // SET 这样的指令
    state.args = append(state.args, line)
    // 这里不需要设置 bulkLen,因为在上面那个分支里已经设置了
  }
  return nil
}

实现 parse0

实现完解析器需要的工具函数之后,我们再来实现核心解析器 parse0 函数

handler 会调用 ParseStream 函数,ParseStream 函数会返回出去一个 chanparse0 会把解析的结果放到 chan 中

parse0 内部是个无限循环,不断的读取数据,解析数据,直到 io.EOF,表示数据读取完毕,跳出循环

go

复制代码
func parser0(reader io.Reader, ch chan<- *Payload) {
  for {
    msg, ioErr, err := readLine(bufReader, &state)
    // 读到 io.EOF,表示数据读取完毨,跳出 for 循环
    // 如果是 io.EOF,ioErr 变量值为 true
    if err != nil {
      if ioErr {
        ch <- &Payload{Err: err}
        close(ch)
        return
      }
      // 如果 redis 协议解析出错,返回一个错误回复即可,无需结束程序
      ch <- &Payload{Err: err}
      state = readState{}
      continue
    }
  }
}

但是如果某一条数据解析出现错误,出现了 panic,那么整个程序就会挂掉,所以我们需要使用 recover 来捕获错误

go

复制代码
func parser0(reader io.Reader, ch chan<- *Payload) {
  defer func() {
    if err := recover(); err != nil {
      logger.Error(string(debug.Stack()))
    }
  }()
}

在处理完异常之后,就是处理正常的解析

正常解析数据需要判断是不是多行解析模式,根据 state.readingMultiLine 来判断

也就是说只有头信息不是多行,因为在 parseMultiBulkHeader 和 parseBulkHeader 两个解析头信息的函数中设置了 state.readingMultiLine = true

所以 state.readingMultiLine 为 true 的时候,是在解析 bodystate.readingMultiLine 为 false 的时候,是在解析 header

go

复制代码
// 解析 * 开头的指令
if msg[0] == '*' {
  err := parseMultiBulkHeader(msg, &state)
  if err != nil {
    ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
    state = readState{}
    continue
  }
  if state.expectedArgsCount == 0 {
    ch <- &Payload{Data: reply.MakeEmptyMultiBulkReply()}
    state = readState{}
    continue
  }
} else if msg[0] == '$' {
  // 解析 $ 开头的指令
  err := parseBulkHeader(msg, &state)
  if err != nil {
    ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
    state = readState{}
    continue
  }
  if state.bulkLen == -1 {
    ch <- &Payload{Data: reply.MakeNullBulkReply()}
    state = readState{}
    continue
  }
} else {
  // 其他指令,比如 +OKrn -ERRrn 这种
  result, err := parseSingleLineReply(msg)
  ch <- &Payload{
    Data: result,
    Err:  err,
  }
  state = readState{}
  continue
}

解析完 header 之后,就要解析 body 了,解析 body 就是 state.readingMultiLine 为 true 的情况

go

复制代码
// 每一个循环都会进来,将 msg 交给 readBody 函数处理
// readBody 函数会将 msg 解析成一个个的数据,然后添加到 state.args 中
err := readBody(msg, &state)
if err != nil {
  ch <- &Payload{Err: errors.New("protocol error:" + string(msg))}
  state = readState{}
  continue
}
// 当解析完成之后,将解析的数据返回给用户
if state.finished() {
  var result resp.Reply
  if state.msgType == '*' {
    // * 表示要返回一个数组
    result = reply.MakeMultiBulkReply(state.args)
  } else if state.msgType == '$' {
    // $ 表示要返回一个字符串
    result = reply.MakeBulkReply(state.args[0])
  }
  // 将数据通过 chan 返回给上层
  ch <- &Payload{
    Data: result,
    Err:  err,
  }
  state = readState{}
}

实现 connection

redis 解析协议实现后,我们再来实现 connection

connection 表示 redis 连接之后的记录的每一个用户信息

go

复制代码
type Connection struct {
  conn         net.Conn   // 连接信息
  waitingReply wait.Wait  // 在关掉 server 之前,需要等待回复给用户的指令完成
  mu           sync.Mutex // 在操作 conn 的时候,需要加锁
  selectedDB   int        // 现在用户在操作哪个 db
}

然后这个结构体需要实现 resp.Connection 接口

go

复制代码
// 给用户回复消息
func (c *Connection) Write(bytes []byte) error {
  // 如果没有数据,直接返回
  if len(bytes) == 0 {
    return nil
  }
  // 写入数据的时候,需要加锁
  c.mu.Lock()
  // 每次写入数据,等待回复的指令 +1
  c.waitingReply.Add(1)
  defer func() {
    // 写入数据完成之后,等待回复的指令 -1
    c.waitingReply.Done()
    // 解锁
    c.mu.Unlock()
  }()
  // 写入数据
  _, err := c.conn.Write(bytes)
  return err
}
// 用户正在使用哪个 db
func (c *Connection) GetDBIndex() int {
  return c.selectedDB
}
// 用户选择自己想用的 db
func (c *Connection) SelectDB(dbNum int) {
  c.selectedDB = dbNum
}

以及需要实现一个 Close 方法,用来关闭连接

go

复制代码
// 关闭连接直接调用 conn.Close() 即可
// 但是在关闭连接之前,需要等待回复给用户的指令完成
func (c *Connection) Close() error {
  c.waitingReply.WaitWithTimeout(10 * time.Second)
  _ = c.conn.Close()
  return nil
}

实现 RespHandler

tcp 服务器需要一个 handler 来处理用户的请求

它的用途是调用 ParseStream 函数,解析用户的指令,然后根据用户的指令,返回给用户相应的回复

go

复制代码
type RespHandler struct {
  activeConn sync.Map              // 记录用户的连接
  db         databaseface.Database // 核心数据库,操作 kev value 相关的逻辑
  closing    atomic.Boolean        // 是否关闭
}

结构体定义好之后,就要实现 tcp.Handler 接口

这个接口有两个方法:

  • Handle:处理用户的请求
  • Close:关闭连接

Close

Close 方法是关闭连接,关闭连接的时候,需要关闭所有的连接,关闭数据库

这里要注意的是 activeConn 是 sync.Map 类型,sync.Map 类型是线程安全的,所以在关闭连接的时候,需要遍历 activeConn

遍历的方法是调用 activeConn.Range 方法,接收一个匿名函数,在这个函数中做关闭连接的操作,并且最后需要返回 true,表示继续遍历

go

复制代码
func (r *RespHandler) Close() error {
  logger.Info("handler shutting down")
  // 将关闭连接的标志设置为 true
  r.closing.Set(true)
  // 使用 Range 遍历 sync.Map
  // 接收一个匿名函数,在这个函数中做关闭连接的操作,并且最后需要返回 true,表示继续遍历
  r.activeConn.Range(func(key, value any) bool {
    // 需要将 key 断言成 *connection.Connection 类型
    client := key.(*connection.Connection)
    // 关闭连接
    _ = client.Close()
    // 返回 true,表示继续遍历,返回 false,表示停止遍历
    return true
  })
  // 关闭数据库
  r.db.Close()
  return nil
}

Handle

在实现 Handle 方法之前,我们需要先实现一个 closeClient 方法,这个方法是关闭一个连接

它和 Close 方法的区别是,Close 方法是关闭所有的连接,而 closeClient 方法是关闭一个连接

它的用途是在连接出错的时候,关闭连接,释放资源

go

复制代码
// 关闭一个客户端
func (r *RespHandler) closeClient(client *connection.Connection) {
  // 关闭客户端
  _ = client.Close()
  // 关闭客户端后的善后工作
  r.db.AfterClientClose(client)
  // 从 sync.Map 中删除掉这个客户端
  r.activeConn.Delete(client)
}

go

复制代码
// 处理用户的指令
func (r *RespHandler) Handle(ctx context.Context, conn net.Conn) {
  //是不是在关闭中
  if r.closing.Get() {
    _ = conn.Close()
  }
  // 新建连接
  client := connection.NewConn(conn)
  // 将连接存储到 sync.Map 中
  r.activeConn.Store(client, struct{}{})
  // 调用 ParseStream 函数,解析用户的指令
  // 通过 chan 拿到解析后的结果
  ch := parser.ParseStream(conn)
  for payload := range ch {
    // 处理 payload 中的错误
    if payload.Err != nil {
      // io 错误
      // io.EOF 表示已经解析到结尾了,连接可以正常关闭了
      // io.ErrUnexpectedEOF 表示在预期的数据结束之前遇到了 EOF,可能发生了数据传输突然中断了
      // use of close network connection 表示使用已经关闭的连接,这个时候也需要关闭连接
      // 只要出现这三种情况,就需要关闭连接
      if payload.Err == io.EOF || errors.Is(payload.Err, io.ErrUnexpectedEOF) || strings.Contains(payload.Err.Error(), "use of close network connection") {
        r.closeClient(client)
        logger.Infof("connection closed: " + client.RemoteAddr().String())
        return
      }
      // 解析 resp 协议出错,MakeErrReply 作用是将错误信息前面加上 -ERR,后面加上 rn
      errReply := reply.MakeErrReply(payload.Err.Error())
      // 将解析 resp 协议出错的信息返回给用户
      err := client.Write(errReply.ToBytes())
      // 给用户写入数据可能会出错,如果出错,关闭连接
      if err != nil {
        r.closeClient(client)
        logger.Infof("connection closed: " + client.RemoteAddr().String())
        return
      }
      continue
    }
    // 没有数据
    if payload.Data == nil {
      continue
    }
    replyResult, ok := payload.Data.(*reply.MultiBulkReply)
    if !ok {
      logger.Infof("require multi bulk reply")
      continue
    }
    // 调用数据库的 Exec 方法,执行用户的指令
    result := r.db.Exec(client, replyResult.Args)
    // 将执行结果返回给用户
    if result != nil {
      _ = client.Write(result.ToBytes())
    } else {
      // 如果执行结果为空,返回一个未知错误
      _ = client.Write(unknownErrReplyBytes)
    }
  }
}

零
  • 转载请务必保留本文链接:https://www.0s52.com/bcjc/golangjc/16660.html
    本社区资源仅供用于学习和交流,请勿用于商业用途
    未经允许不得进行转载/复制/分享

发表评论