接下来我们就需要给Zinx添加消息队列和多任务Worker机制了。
我们可以通过worker的数量来限定处理业务的固定goroutine数量,而不是无限制的开辟Goroutine,虽然我们知道go的调度算法已经做的很极致了,但是大数量的Goroutine依然会带来一些不必要的环境切换成本,这些本应该是服务器应该节省掉的成本。
我们可以用消息队列来缓冲worker工作的数据。
初步我们的设计结构如下图:
8.1 创建 消息队列
首先,处理消息队列的部分,我们应该集成到 MsgHandler模块下,因为属于我们消息管理模块范畴内:
zinx/znet/msghandler.go
type MsgHandle struct {
Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
WorkerPoolSize uint32 //业务工作Worker池的数量
//新增:消息队列 管道
TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
}
func NewMsgHandle() *MsgHandle {
return &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
WorkerPoolSize:utils.GlobalObject.WorkerPoolSize,
//一个worker对应一个queue消息队列管道
TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
}
}
这里添加两个成员:
WokerPoolSize:作为工作池的数量,因为TaskQueue中的每个队列应该是和一个Worker对应的,所以我们在创建TaskQueue中队列数量要和Worker的数量一致。
TaskQueue:是一个Request请求信息的channel集合。用来缓冲提供worker调用的Request请求信息,worker会从对应的队列中获取客户端的请求数据并且处理掉。
当然WorkerPoolSize最好也可以从GlobalObject获取,并且zinx.json配置文件可以手动配置。
zinx/utils/globalobj.go
/*
存储一切有关Zinx框架的全局参数,供其他模块使用
一些参数也可以通过 用户根据 zinx.json来配置
*/
type GlobalObj struct {
/*
Server
*/
TcpServer ziface.IServer //当前Zinx的全局Server对象
Host string //当前服务器主机IP
TcpPort int //当前服务器主机监听端口号
Name string //当前服务器名称
/*
Zinx
*/
Version string //当前Zinx版本号
MaxPacketSize uint32 //都需数据包的最大值
MaxConn int //当前服务器主机允许的最大链接个数
WorkerPoolSize uint32 //业务工作Worker池的数量
MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量
/*
config file path
*/
ConfFilePath string
}
//...
//...
/*
提供init方法,默认加载
*/
func init() {
//初始化GlobalObject变量,设置一些默认值
GlobalObject = &GlobalObj{
Name: "ZinxServerApp",
Version: "V0.4",
TcpPort: 7777,
Host: "0.0.0.0",
MaxConn: 12000,
MaxPacketSize: 4096,
ConfFilePath: "conf/zinx.json",
WorkerPoolSize: 10,
MaxWorkerTaskLen: 1024,
}
//从配置文件中加载一些用户配置的参数
GlobalObject.Reload()
}
8.2 创建及启动Worker工作池
现在添加Worker工作池,先定义一些启动工作池的接口
zinx/ziface/imsghandler.go
/*
消息管理抽象层
*/
type IMsgHandle interface{
DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
//新增:
StartWorkerPool() //启动worker工作池
SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue管道,传递给对应的worker进行处理
}
zinx/znet/msghandler.go
//启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID = ", workerID, " is started.")
//不断的取队列中的消息
for {
select {
//有消息的话,则取出队列的Request,并执行request绑定的业务方法
case request := <-taskQueue:
mh.DoMsgHandler(request)
}
}
}
//启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i:= 0; i < int(mh.WorkerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
//调用上面的方法:
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go mh.StartOneWorker(i, mh.TaskQueue[i])
}
}
StartWorkerPool()方法是启动Worker工作池,这里根据用户配置好的WorkerPoolSize的数量来启动,然后分别给每个Worker分配一个TaskQueue,然后用一个goroutine来承载一个Worker的工作业务。
StartOneWorker()方法就是一个Worker的工作业务,每个worker是不会退出的(目前没有设定worker的停止工作机制),会永久的从对应的TaskQueue中等待消息,并处理。
8.3 发送消息 给 消息队列
现在,worker工作池已经准备就绪了,那么就需要有一个给到worker工作池消息的入口,我们再定义一个方法
zinx/ziface/imsghandler.go
//将 request消息 交给 TaskQueue ,由 worker进行处理
func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
//得到需要处理此条连接的workerID
workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)
//将请求消息发送给任务队列
mh.TaskQueue[workerID] <- request
}
SendMsgToTaskQueue()作为工作池的数据入口,这里面采用的是轮询的分配机制,因为不同链接信息都会调用这个入口,那么到底应该由哪个worker处理该链接的请求处理,这里用的是一个简单的求模运算。用余数和workerID的匹配来进行分配。
最终将request请求数据发送给对应worker的TaskQueue,那么对应的worker的Goroutine就会处理该链接请求了。
8.4 Zinx-V0.8代码实现
好了,现在需要将消息队列和多任务worker机制集成到我们Zinx的中了。我们在Server的Start()方法中,在服务端Accept之前,启动Worker工作池。
zinx/znet/server.go
//开启网络服务
func (s *Server) Start() {
//...
//开启一个go去做服务端Linster业务
go func() {
//0 启动worker工作池机制
s.msgHandler.StartWorkerPool()
//1 获取一个TCP的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
return
}
//...
//...
}
}()
}
其次,当我们已经得到客户端的连接请求过来数据的时候,我们应该将数据发送给Worker工作池进行处理。
所以应该在 Connection 的 StartReader()方法 中修改:
zinx/znet/connection.go
/*
读消息Goroutine,用于从客户端中读取数据
*/
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
//创建拆包解包的对象...
//读取客户端的Msg head...
//拆包,得到msgid 和 datalen 放在msg中...
//根据 dataLen 读取 data,放在msg.Data中...
//得到当前客户端请求的Request数据
req := Request{
conn:c,
msg:msg,
}
//已经启动工作池机制,将消息交给Worker处理
if utils.GlobalObject.WorkerPoolSize > 0 {
c.MsgHandler.SendMsgToTaskQueue(&req)
} else { //如果没有配置工作池,就直接去执行
//从绑定好的消息和对应的处理方法中直接去执行对应的Handle方法
go c.MsgHandler.DoMsgHandler(&req)
}
}
}
这里并没有强制使用多任务Worker机制,而是判断用户配置WorkerPoolSize的个数,如果大于0,那么我就启动多任务机制处理链接请求消息,如果=0或者<0那么,我们依然只是之前的开启一个临时的Goroutine处理客户端请求消息。
8.5 使用Zinx-V0.8完成应用程序
测试代码和V0.6、V0.7的代码一样。因为Zinx框架对外接口没有发生改变。
我们分别启动Server、Client
$go run Server.go
$go run Client0.go
$go run Client1.go
$go run Client0.go
结果:
服务端:
$ go run Server.go
Add api msgId = 0
Add api msgId = 1
[START] Server name: zinx v-0.8 demoApp,listenner at IP: 127.0.0.1, Port 7777 is starting
[Zinx] Version: V0.4, MaxConn: 3, MaxPacketSize: 4096
Worker ID = 4 is started.
start Zinx server zinx v-0.8 demoApp succ, now listenning...
Worker ID = 9 is started.
Worker ID = 0 is started.
Worker ID = 5 is started.
Worker ID = 6 is started.
Worker ID = 1 is started.
Worker ID = 2 is started.
Worker ID = 7 is started.
Worker ID = 8 is started.
Worker ID = 3 is started.
Reader Goroutine is running
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Reader Goroutine is running
Add ConnID= 1 request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Reader Goroutine is running
Add ConnID= 2 request msgID= 0 to workerID= 2
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 1 request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 2 request msgID= 0 to workerID= 2
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
Add ConnID= 1 request msgID= 1 to workerID= 1
Call HelloZinxRouter Handle
recv from client : msgId= 1 , data= Zinx V0.8 Client1 Test Message
Add ConnID= 0 request msgID= 0 to workerID= 0
Call PingRouter Handle
recv from client : msgId= 0 , data= Zinx V0.8 Client0 Test Message
客户端0
$ go run Client0.go
Client Test ... start
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
客户端1
$ go run Client1.go
Client Test ... start
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
==> Recv Msg: ID= 1 , len= 22 , data= Hello Zinx Router V0.8
客户端2
$ go run Client0.go
Client Test ... start
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping
==> Recv Msg: ID= 0 , len= 18 , data= ping...ping...ping