龙空技术网

利用websocket实现单机百万连接分布式聊天系统

青衫禅客 1106

前言:

当前各位老铁们对“nginx代理配置grpc协议”大致比较关心,兄弟们都需要剖析一些“nginx代理配置grpc协议”的相关文章。那么小编在网摘上网罗了一些对于“nginx代理配置grpc协议””的相关文章,希望我们能喜欢,咱们一起来学习一下吧!

目前搭建聊天消息系统,最主流的有基于XMPP和webSocket实现。文本介绍的是基于webSocket实现单机百万连接的分布式聊天系统。首先介绍webSocket的概念,然后开始介绍websocket项目,以及在Nginx中配置域名做webSocket的转发,最后介绍如何搭建一个分布式系统。

一、WebSocket基本概念介绍

1、WebSocket是什么?

WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。

它的最大特点就是,服务器能够主动向客户端推送信息,客户端也能够主动向服务器发送信息,是真正的双向平等对话,属于目前服务器推送技术的其中一种。

HTTP和WebSocket在通讯过程的比较HTTP和webSocket都支持配置证书,ws:// 无证书 wss:// 配置证书的协议标识一般项目中webSocket使用的架构图

2、WebSocket的兼容性

目前已经支持webSocket的浏览器版本服务端技术的支持情况

golang、java、php、node.js、python、nginx 都有不错的支持。

Android系统和IOS系统的支持

Android系统可以使用java-webSocket对webSocket来支持。

iOS 4.2及以上版本均具有WebSockets支持。

3、为什么要用WebSocket?

(1)从业务角度考虑,需要一个主动通达客户端的能力

目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,然后返回结果,不可以服务端主动向某一个客户端主动发送数据;

(2)大多数场景我们需要主动通知用户,如:聊天系统、用户完成任务主动告诉用户、一些运营活动需要通知到在线的用户;

(3)可以获取用户在线状态;

(4)在没有长连接的时候通过客户端主动轮询获取数据;

(5)可以通过一种方式实现,多种不同平台(H5/Android/IOS)去使用。

4、WebSocke的建立过程

(1)客户端发起升级协议的请求

客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息

Connection: Upgrade表明连接需要升级

Upgrade: websocket需要升级到 websocket协议

Sec-WebSocket-Version: 13 协议的版本为13

Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== 这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept对应。

# Request Headers

Connection: Upgrade

Host: im.91vh.com

Origin:

Pragma: no-cache

Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==

Sec-WebSocket-Version: 13

Upgrade: websocket

(2)服务器响应升级协议

服务端接收到升级协议的请求,如果服务端支持升级协议会做如下响应

返回:

Status Code: 101 Switching Protocols 表示支持切换协议

# Response Headers

Connection: upgrade

Date: Fri, 09 Aug 2019 07:36:59 GMT

Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=

Server: nginx/1.12.1

Upgrade: websocket

(3)升级协议完成以后,客户端和服务器就可以相互发送数据

二、如何基于webSocket实现长连接系统

1、使用go实现webSocket服务端

(1) 启动端口监听

websocket需要监听端口,因此需要在golang 成功的 main 函数中用协程的方式去启动程序main.go 实现启动

go websocket.StartWebSocket()

init_acc.go 启动程序

// 启动程序

func StartWebSocket() {

http.HandleFunc("/acc", wsPage)

http.ListenAndServe(":8089", nil)

}

(2) 升级的协议

客户端是通过http请求发送到服务端,我们需要对http协议进行升级为websocket协议;对http请求协议进行升级 golang 库gorilla/websocket 比较成熟,直接使用即可;在实际使用的时候,建议每个连接使用两个协程处理客户端请求数据和向客户端发送数据,虽然开启协程会占用一些内存,但是读取分离,减少收发数据堵塞的可能;init_acc.go

func wsPage(w http.ResponseWriter, req *http.Request) {

// 升级协议

conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {

fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])

return true

}}).Upgrade(w, req, nil)

if err != nil {

http.NotFound(w, req)

return

}

fmt.Println("webSocket 建立连接:", conn.RemoteAddr().String())

currentTime := uint64(time.Now().Unix())

client := NewClient(conn.RemoteAddr().String(), conn, currentTime)

go client.read()

go client.write()

// 用户连接事件

clientManager.Register <- client

}

(3) 客户端连接的管理

当前程序有多少用户连接,还需要对用户广播的需要,这里我们就需要一个管理者(clientManager),处理这些事件:记录全部的连接、登录用户的可以通过 appId+uuid 查到用户连接使用map存储,就涉及到多协程并发读写的问题,所以需要加读写锁定义四个channel ,分别处理客户端建立连接、用户登录、断开连接、全员广播事件

// 连接管理

type ClientManager struct {

Clients map[*Client]bool // 全部的连接

ClientsLock sync.RWMutex // 读写锁

Users map[string]*Client // 登录的用户 // appId+uuid

UserLock sync.RWMutex // 读写锁

Register chan *Client // 连接连接处理

Login chan *login // 用户登录处理

Unregister chan *Client // 断开连接处理程序

Broadcast chan []byte // 广播 向全部成员发送数据

}

// 初始化

func NewClientManager() (clientManager *ClientManager) {

clientManager = &ClientManager{

Clients: make(map[*Client]bool),

Users: make(map[string]*Client),

Register: make(chan *Client, 1000),

Login: make(chan *login, 1000),

Unregister: make(chan *Client, 1000),

Broadcast: make(chan []byte, 1000),

}

return

}

(4) 注册客户端的socket的写的异步处理程序

防止发生程序崩溃,所以需要捕获异常;为了显示异常崩溃位置这里使用string(debug.Stack())打印调用堆栈信息;如果写入数据失败了,可能连接有问题,就关闭连接。client.go

// 向客户端写数据

func (c *Client) write() {

defer func() {

if r := recover(); r != nil {

fmt.Println("write stop", string(debug.Stack()), r)

}

}

defer func() {

clientManager.Unregister <- c

c.Socket.Close()

fmt.Println("Client发送数据 defer", c)

}()

for {

select {

case message, ok := <-c.Send:

if !ok {

// 发送数据错误 关闭连接

fmt.Println("Client发送数据 关闭连接", c.Addr, "ok", ok)

return

}

c.Socket.WriteMessage(websocket.TextMessage, message)

}

}

}

(5)注册客户端的socket的读的异步处理程序

循环读取客户端发送的数据并处理;如果读取数据失败了,关闭channel。client.go

// 读取客户端数据

func (c *Client) read() {

defer func() {

if r := recover(); r != nil {

fmt.Println("write stop", string(debug.Stack()), r)

}

}

defer func() {

fmt.Println("读取客户端数据 关闭send", c)

close(c.Send)

}

for {

_, message, err := c.Socket.ReadMessage()

if err != nil {

fmt.Println("读取客户端数据 错误", c.Addr, err)

return

}

// 处理程序

fmt.Println("读取客户端数据 处理:", string(message))

ProcessData(c, message)

}

}

(6)接收客户端数据并处理

约定发送和接收请求数据格式,为了js处理方便,采用了json的数据格式发送和接收数据(人类可以阅读的格式在工作开发中使用是比较方便的)登录发送数据示例:

{"seq":"1565336219141-266129","cmd":"login","data":{"userId":"马远","appId":101}}

登录响应数据示例:

{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}

websocket是双向的数据通讯,可以连续发送,如果发送的数据需要服务端回复,就需要一个seq来确定服务端的响应是回复哪一次的请求数据cmd 是用来确定动作,websocket没有类似于http的url,所以规定 cmd 是什么动作目前的动作有:login/heartbeat 用来发送登录请求和连接保活(长时间没有数据发送的长连接容易被浏览器、移动中间商、nginx、服务端程序断开)为什么需要AppId,UserId是表示用户的唯一字段,设计的时候为了做成通用性,设计AppId用来表示用户在哪个平台登录的(web、app、ios等),方便后续扩展request_model.go 约定的请求数据格式

/************************ 请求数据 **************************/

// 通用请求数据格式

type Request struct {

Seq string `json:"seq"` // 消息的唯一Id

Cmd string `json:"cmd"` // 请求命令字

Data interface{} `json:"data,omitempty"` // 数据 json

}

// 登录请求数据

type Login struct {

ServiceToken string `json:"serviceToken"` // 验证用户是否登录

AppId uint32 `json:"appId,omitempty"`

UserId string `json:"userId,omitempty"`

}

// 心跳请求数据

type HeartBeat struct {

UserId string `json:"userId,omitempty"`

}

response_model.go

/************************ 响应数据 **************************/

type Head struct {

Seq string `json:"seq"` // 消息的Id

Cmd string `json:"cmd"` // 消息的cmd 动作

Response *Response `json:"response"` // 消息体

}

type Response struct {

Code uint32 `json:"code"`

CodeMsg string `json:"codeMsg"`

Data interface{} `json:"data"` // 数据 json

}

(7)使用路由的方式处理客户端的请求数据

使用路由的方式处理由客户端发送过来的请求数据以后添加请求类型以后就可以用类是用http相类似的方式(router-controller)去处理acc_routers.go

// Websocket 路由

func WebsocketInit() {

websocket.Register("login", websocket.LoginController)

websocket.Register("heartbeat", websocket.HeartbeatController)

}

(8) 防止内存溢出和Goroutine不回收

a、定时任务清除超时连接 没有登录的连接和登录的连接6分钟没有心跳则断开连接

client_manager.go

// 定时清理超时连接

func ClearTimeoutConnections() {

currentTime := uint64(time.Now().Unix())

for client := range clientManager.Clients {

if client.IsHeartbeatTimeout(currentTime) {

fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)

client.Socket.Close()

}

}

}

b、读写的Goroutine有一个失败,则相互关闭 write()Goroutine写入数据失败,关闭c.Socket.Close()连接,会关闭read()Goroutine read()Goroutine读取数据失败,关闭close(c.Send)连接,会关闭write()Goroutine

c、客户端主动关闭 关闭读写的Goroutine 从ClientManager删除连接

d、监控用户连接、Goroutine数 十个内存溢出有九个和Goroutine有关 添加一个http的接口,可以查看系统的状态,防止Goroutine不回收 查看系统状态

e、Nginx 配置不活跃的连接释放时间,防止忘记关闭的连接

f、使用 pprof 分析性能、耗时

2、使用javaScript实现webSocket客户端

(1) 启动并注册监听程序

js 建立连接,并处理连接成功、收到数据、断开连接的事件处理

ws = new WebSocket("ws://127.0.0.1:8089/acc");

ws.onopen = function(evt) {

console.log("Connection open ...");

};

ws.onmessage = function(evt) {

console.log( "Received Message: " + evt.data);

data_array = JSON.parse(evt.data);

console.log( data_array);

};

ws.onclose = function(evt) {

console.log("Connection closed.");

};

(2) 发送数据

需要注意:连接建立成功以后才可以发送数据建立连接以后由客户端向服务器发送数据示例

登录:

ws.send('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}');

心跳:

ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}');

关闭连接:

ws.close();

三、goWebSocket 项目

1、项目说明

本项目是基于webSocket实现的分布式聊天系统客户端随机分配用户名,所有人进入一个聊天室,实现群聊的功能单台机器(24核128G内存)支持百万客户端连接支持水平部署,部署的机器之间可以相互通讯项目架构图

2、项目依赖

本项目只需要使用 redis 和 golang;本项目使用govendor管理依赖,克隆本项目就可以直接使用。

# 主要使用到的包

github.com/gin-gonic/gin@v1.4.0

github.com/go-redis/redis

github.com/gorilla/websocket

github.com/spf13/viper

google.golang.org/grpc

github.com/golang/protobuf

3、项目启动

克隆项目

git clone git@github.com:link1st/gowebsocket.git

git clone

修改项目配置

cd gowebsocket

cd config

mv app.yaml.example app.yaml

# 修改项目监听端口,redis连接等(默认127.0.0.1:3306)

vim app.yaml

# 返回项目目录,为以后启动做准备

cd ..

配置文件说明

app:

logFile: log/gin.log # 日志文件位置

httpPort: 8080 # http端口

webSocketPort: 8089 # webSocket端口

rpcPort: 9001 # 分布式部署程序内部通讯端口

httpUrl: 127.0.0.1:8080

webSocketUrl: 127.0.0.1:8089

redis:

addr: "localhost:6379"

password: ""

DB: 0

poolSize: 30

minIdleConns: 30

启动项目

go run main.go

进入IM聊天地址 到这里,就可以体验到基于webSocket的IM系统

四、WebSocket项目的Nginx配置

1、为什么要配置Nginx?

使用nginx实现内外网分离,对外只暴露Nginx的Ip(一般的互联网企业会在nginx之前加一层LVS做负载均衡),减少入侵的可能;使用Nginx可以利用Nginx的负载功能,前端再使用的时候只需要连接固定的域名,通过Nginx将流量分发了到不同的机器;同时我们也可以使用Nginx的不同的负载策略(轮询、weight、ip_hash)。

2、nginx如何配置?

使用域名 im.91vh.com 为示例,参考配置;一级目录im.91vh.com/acc 是给webSocket使用,是用nginx stream转发功能(nginx 1.3.31 开始支持,使用Tengine配置也是相同的),转发到golang 8089 端口处理;其它目录是给HTTP使用,转发到golang 8080 端口处理;

upstream go-im

{

server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;

keepalive 16;

}

upstream go-acc

{

server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;

keepalive 16;

}

server {

listen 80 ;

server_name im.91vh.com;

index index.html index.htm ;

location /acc {

proxy_set_header Host $host;

proxy_pass ;

proxy_http_version 1.1;

proxy_set_header Upgrade $http_upgrade;

proxy_set_header Connection $connection_upgrade;

proxy_set_header Connection "";

proxy_redirect off;

proxy_intercept_errors on;

client_max_body_size 10m;

}

location {

proxy_set_header Host $host;

proxy_pass ;

proxy_http_version 1.1;

proxy_set_header Connection "";

proxy_redirect off;

proxy_intercept_errors on;

client_max_body_size 30m;

}

access_log /link/log/nginx/access/im.log;

error_log /link/log/nginx/access/im.error.log;

}

3、问题处理

运行nginx测试命令,查看配置文件是否正确

/link/server/tengine/sbin/nginx -t

如果出现错误

nginx: [emerg] unknown "connection_upgrade" variable

configuration file /link/server/tengine/conf/nginx.conf test failed

处理方法在nginx.com添加

http{

fastcgi_temp_file_write_size 128k;

..... # 需要添加的内容

#support websocket

map $http_upgrade $connection_upgrade {

default upgrade;

'' close;

}

.....

gzip on;

}

原因:Nginx代理webSocket的时候就会遇到Nginx的设计问题 End-to-end and Hop-by-hop Headers

五、压测

1、Linux内核参数的优化

设置文件打开句柄数

ulimit -n 1000000

设置sockets连接参数

vim /etc/sysctl.conf

net.ipv4.tcp_tw_reuse = 1

net.ipv4.tcp_tw_recycle = 0

2、压测准备

待压测,如果大家有压测的结果欢迎补充

3、 压测数据

项目在实际使用的时候,每个连接约占 24Kb内存,一个Goroutine 约占11kb支持百万连接需要22G内存

六、如何基于webSocket实现一个分布式聊天系统?

1、说明

参考本项目源码gowebsocket v1.0.0 单机版聊天系统;gowebsocket v1.0.0 分布式聊天系统;为了方便演示,IM系统和webSocket(acc)系统合并在一个系统中;IM系统接口: 获取全部在线的用户,查询单前服务的全部用户+集群中服务的全部用户 发送消息,这里采用的是http接口发送(微信网页版发送消息也是http接口),这里考虑主要是两点: 1.服务分离,让acc系统尽量的简单一点,不掺杂其它业务逻辑 2.发送消息是走http接口,不使用webSocket连接,才用收和发送数据分离的方式,可以加快收发数据的效率;

2、架构

项目启动注册和用户连接时序图

其它系统(IM、任务)向webSocket(acc)系统连接的用户发送消息时序图

七、总结与回顾

1、在其它业务系统上的应用

本系统设计的初衷就是:和客户端保持一个长连接、对外部系统两个接口(查询用户是否在线、给在线的用户推送消息),实现业务的分离;只有和业务分离开,才可以供多个业务使用,而不是每个业务都建立一个长链接。

2、 已经实现的功能

gin log日志(请求日志+debug日志)读取配置文件 完成定时脚本,清理过期未心跳连接 完成http接口,获取登录、连接数量 完成http接口,发送push、查询有多少人在线 完成grpc 程序内部通讯,发送消息 完成appIds 一个用户在多个平台登录界面,把所有在线的人拉倒一个群里面,发送消息 完成单聊、群聊 完成实现分布式,水平扩张 完成压测脚本文档整理架构图以及扩展。

IM实现细节:

定义文本消息结构 完成;html发送文本消息 完成;接口接收文本消息并发送给全体 完成;html接收到消息 显示到界面 完成;界面优化 需要持续优化;有人加入以后广播全体 完成;定义加入聊天室的消息结构 完成;引入机器人 待定。

3、还需要继续完善和优化的地方

登录,使用微信登录 获取昵称、头像等;有账号系统、资料系统;界面优化、适配手机端;消息 文本消息(支持表情)、图片、语音、视频消息;微服务注册、发现、熔断等;添加配置项,单台机器最大连接数量。

4、 总结

虽然实现了一个分布式聊天系统,但是有很多细节没有处理(登录没有鉴权、界面还待优化等),但是可以通过这个示例可以了解到:通过WebSocket解决很多业务上需求;本文虽然号称单台机器能有百万长连接(内存上能满足),但是实际在场景远比这个复杂(cpu有些压力),当然了如果你有这么大的业务量可以购买更多的机器更好的去支撑你的业务,本程序只是演示如何在实际工作用使用webSocket;参考本文章,你就可以实现出来符合你需要的聊天程序。

如果你喜欢本文章,请帮忙关注、转发、点赞,本人将定期更新一些技术“干货”分享给大家,希望能帮助大家解决工作中的一些问题,谢谢!

标签: #nginx代理配置grpc协议