WebSocket #
WebSocket - Web API 接口参考 |多核 (mozilla.org)
WebSocket API是一种先进的技术,可以在用户的浏览器和服务器之间打开双向交互通信会话。使用此 API,您可以向服务器发送消息并接收事件驱动的响应,而无需轮询服务器以获取答复。
官方示例 #
官方示例可参照synk项目结合gin框架
main.go #
package main
import (
"flag"
"log"
"net/http"
)
var addr = flag.String("addr", ":8080", "http service address")
func serveHome(w http.ResponseWriter, r *http.Request) {
log.Println(r.URL)
if r.URL.Path != "/" {
http.Error(w, "Not found", http.StatusNotFound)
return
}
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
http.ServeFile(w, r, "home.html")
}
func main() {
flag.Parse() // 把用户传递的命令行参数解析为对应变量的值
hub := newHub() //创建hub结构体
go hub.run() //启动
http.HandleFunc("/", serveHome)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
err := http.ListenAndServe(*addr, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
client.go #
package main
import (
"bytes"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
writeWait = 10 * time.Second// 允许向peer写入消息的时间
pongWait = 60 * time.Second// 允许的时间读取来自peer的下一条pong消息
pingPeriod = (pongWait * 9) / 10 //发送 ping 以对此时间段进行对等。必须小于pongWait
maxMessageSize = 512//peer允许的最大message大小。
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
var upgrader = websocket.Upgrader{ //升级websocket协议
ReadBufferSize: 1024, //读写缓存大小
WriteBufferSize: 1024,
}
type Client struct {
hub *Hub
conn *websocket.Conn //连接信息
send chan []byte //往里面发送东西
}
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.broadcast <- message
}
}
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
// 将排队的聊天消息添加到当前 Websocket 消息
n := len(c.send)
for i := 0; i < n; i++ {
w.Write(newline)
w.Write(<-c.send)
}
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// 处理websocket请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil) //创建连接
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client //注册
//起协程,实时接收和回复数据
go client.writePump()
go client.readPump()
}
hub.go #
package main
type Hub struct {
clients map[*Client]bool// 注册客户
broadcast chan []byte// 来自客户端的消息
register chan *Client// 注册来自客户端的请求。
unregister chan *Client
}
func newHub() *Hub {
return &Hub{
broadcast: make(chan []byte),//广播
register: make(chan *Client),//监听
unregister: make(chan *Client),//不监听
clients: make(map[*Client]bool),//统计有多少个人在监听我
}
}
func (h *Hub) run() {
for {
select {
case client := <-h.register: //如果有人监听
h.clients[client] = true //添加进去
case client := <-h.unregister: //如果有人不监听
if _, ok := h.clients[client]; ok {
delete(h.clients, client) //把它删除
close(client.send)
}
case message := <-h.broadcast: //广播消息
for client := range h.clients { //遍历所有客户发给他们
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
home.html #
<!DOCTYPE html>
<html lang="en">
<head>
<title>Chat Example</title>
<script type="text/javascript">
window.onload = function () {
var conn;
var msg = document.getElementById("msg");
var log = document.getElementById("log");
function appendLog(item) {
var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1;
log.appendChild(item);
if (doScroll) {
log.scrollTop = log.scrollHeight - log.clientHeight;
}
}
document.getElementById("form").onsubmit = function () {
if (!conn) {
return false;
}
if (!msg.value) {
return false;
}
conn.send(msg.value);
msg.value = "";
return false;
};
if (window["WebSocket"]) {
conn = new WebSocket("ws://" + document.location.host + "/ws");
conn.onclose = function (evt) {
var item = document.createElement("div");
item.innerHTML = "<b>Connection closed.</b>";
appendLog(item);
};
conn.onmessage = function (evt) {
var messages = evt.data.split('\n');
for (var i = 0; i < messages.length; i++) {
var item = document.createElement("div");
item.innerText = messages[i];
appendLog(item);
}
};
} else {
var item = document.createElement("div");
item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
appendLog(item);
}
};
</script>
<style type="text/css">
html {
overflow: hidden;
}
body {
overflow: hidden;
padding: 0;
margin: 0;
width: 100%;
height: 100%;
background: gray;
}
#log {
background: white;
margin: 0;
padding: 0.5em 0.5em 0.5em 0.5em;
position: absolute;
top: 0.5em;
left: 0.5em;
right: 0.5em;
bottom: 3em;
overflow: auto;
}
#form {
padding: 0 0.5em 0 0.5em;
margin: 0;
position: absolute;
bottom: 1em;
left: 0px;
width: 100%;
overflow: hidden;
}
</style>
</head>
<body>
<div id="log"></div>
<form id="form">
<input type="submit" value="Send" />
<input type="text" id="msg" size="64" autofocus />
</form>
</body>
</html>
适合业务的修改版 #
结合gin框架
注册 #
客户端发送一次请求,服务端保留住这个请求(注册),使用心跳维持互通,这就需要服务端维护一个接口去接受客户端的连接请求
// 客户端连接
func WsClient(context *gin.Context) {
upGrande := websocket.Upgrader{
//设置允许跨域
CheckOrigin: func(r *http.Request) bool {
return true
},
//设置请求协议
Subprotocols: []string{context.GetHeader("Sec-WebSocket-Protocol")},
}
//创建连接
conn, err := upGrande.Upgrade(context.Writer, context.Request, nil)
if err != nil {
fmt.Println(err)
return
}
//生成唯一标识client_id
var uuid = uuid.NewV4().String()
client := &ws.Client{
Id: uuid,
Socket: conn,
Message: make(chan []byte, 1024),
}
//注册
ws.Manager.Register <- client
//起协程,实时接收和回复数据
go client.Read()
go client.Write()
}
//也可以这样去连接
conn, _, err := websocket.DefaultDialer.Dial("ws://localhost:8080/ws", nil)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
上面是一个客户端连接的入口(接口),需要在router路由中进行配置
r.GET("/ws", api.WsClient)
客户端的连接地址则可以是:ws://127.0.0.1:8080/ws
func main() {
go ws.Manager.Start() //这里要启动
portFlag := flag.Int("port", 8080, "the port to listen on")
flag.Parse()
r := gin.New()
r = routers.CollectRouter(r)
port := *portFlag
err := r.Run(fmt.Sprintf("0.0.0.0:%d", port))
if err != nil {
panic(err)
}
}
开启程序服务器后,后台开启一个协程去监听处理发送给客户端的消息,包括:客户端注册、客户端注销、回复客户端消息
package ws
import (
"encoding/json"
"github.com/gorilla/websocket"
)
type ClientManager struct {
Clients map[*Client]bool
Broadcast chan []byte
Register chan *Client
Unregister chan *Client
}
type Client struct {
ID string
Socket *websocket.Conn
Send chan []byte
}
type Message struct {
Sender string `json:"sender,omitempty"`
Recipient string `json:"recipient,omitempty"`
Content string `json:"content,omitempty"`
}
var Manager = ClientManager{
Broadcast: make(chan []byte),
Register: make(chan *Client),
Unregister: make(chan *Client),
Clients: make(map[*Client]bool),
}
func (manager *ClientManager) Start() {
for {
select {
case conn := <-manager.Register:
manager.Clients[conn] = true
jsonMessage, _ := json.Marshal(&Message{Content: "/A new socket has connected."})
manager.Send(jsonMessage, conn)
case conn := <-manager.Unregister:
if _, ok := manager.Clients[conn]; ok {
close(conn.Send)
delete(manager.Clients, conn)
jsonMessage, _ := json.Marshal(&Message{Content: "/A socket has disconnected."})
manager.Send(jsonMessage, conn)
}
case message := <-manager.Broadcast:
for conn := range manager.Clients {
select {
case conn.Send <- message:
default:
close(conn.Send)
delete(manager.Clients, conn)
}
}
}
}
}
func (manager *ClientManager) Send(message []byte, ignore *Client) {
for conn := range manager.Clients {
if conn != ignore {
conn.Send <- message
}
}
}
func (c *Client) Read() {
defer func() {
Manager.Unregister <- c
c.Socket.Close()
}()
for {
_, message, err := c.Socket.ReadMessage()
if err != nil {
Manager.Unregister <- c
c.Socket.Close()
break
}
jsonMessage, _ := json.Marshal(&Message{Sender: c.ID, Content: string(message)})
Manager.Broadcast <- jsonMessage
}
}
func (c *Client) Write() {
defer func() {
c.Socket.Close()
}()
for {
select {
case message, ok := <-c.Send:
if !ok {
c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.Socket.WriteMessage(websocket.TextMessage, message)
}
}
}
Start():启动websocket服务
Send():向连接websocket的管道chan写入数据
Read():读取在websocket管道中的数据
Write():通过websocket协议向连接到ws的客户端发送数据
函数调用 #
func SendWebSocket(info string, progress int64) {
if wsapi.Conn == nil {
fmt.Errorf("WebSocket未连接")
}
wsapi.Conn.WriteMessage(websocket.TextMessage, []byte(info))
fmt.Println(info, progress)
}
挎包调用可将conn设置为全局变量
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
var Conn *websocket.Conn
脚本测试 #
(重新建一个文件夹启动main函数)
var addr = flag.String("addr", "127.0.0.1:8080", "http service address")
func main() {
u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"}
var dialer *websocket.Dialer
conn, _, err := dialer.Dial(u.String(), nil)
if err != nil {
fmt.Println(err)
return
}
go timeWriter(conn)
for {
_, message, err := conn.ReadMessage()
if err != nil {
fmt.Println("read:", err)
return
}
fmt.Printf("received: %s\n", message)
}
}
func timeWriter(conn *websocket.Conn) {
for {
time.Sleep(time.Second * 2)
conn.WriteMessage(websocket.TextMessage, []byte(time.Now().Format("2006-01-02 15:04:05")))
}
}
升级版 #
for {
select {
case client := <-manager.Register:
//注册客户端
manager.Lock.Lock()
manager.Group[client.Id] = client
manager.clientCount += 1
log.WSLog(fmt.Sprintf("客户端注册: 客户端id为%s", client.Id))
manager.Lock.Unlock()
case client := <-manager.UnRegister:
//注销客户端
manager.Lock.Lock()
if _, ok := manager.Group[client.Id]; ok {
//关闭消息通道
close(client.Message)
//删除分组中客户
delete(manager.Group, client.Id)
//客户端数量减1
manager.clientCount -= 1
log.WSLog(fmt.Sprintf("客户端注销: 客户端id为%s", client.Id))
}
manager.Lock.Unlock()
case data := <-manager.BroadCastMessage:
//将数据广播给所有客户端
for _, conn := range manager.Group {
if data.IsBroadCast {
conn.Message <- data.Message
} else {
if function.InSliceStr(conn.Id, data.ClientIDs) {
conn.Message <- data.Message
}
}
}
}
}
单个websocket的client结构体 #
type Client struct {
ID string
Socket *websocket.Conn
Send chan []byte
}
服务端websocke的结构体 #
type Manager struct {
//client.id => Client
Group map[string]*Client
Lock sync.Mutex
Register, UnRegister chan *Client
BroadCastMessage chan *BroadCastMessageData
clientCount uint //分组及客户端数量
}
回复数据消息结构体 #
type BroadCastMessageData struct {
Message []byte
IsBroadCast bool
ClientIDs []string
}
数据通信 #
以下是在建立连接后的正常数据通信(发送数据,回复数据)的流程图
在处理客户端消息的逻辑处理中,封装了一个handle文件,接收客户端请求指令的函数方法处理
/**
* Description: websocket服务器接收数据指令调用对应函数
* author: shahao
* create on: 2021-04-16 18:05:21
*/
func (manager *Manager) ServerCodeToFunc(data ReadData) {
funcName := case2Camel(data.Actioncode)
vft := manager.serverReturnFunc()
params := make([]reflect.Value, 1)
params[0] = reflect.ValueOf(data)
if vft[funcName].IsValid() {
vft[funcName].Call(params)
}
}
复制代码
然后可以将处理逻辑集中放到serverInstructFunc处理,例如心跳回复函数
//心跳包
func (m *ServerMethod) HeartBeat(params ReadData) {
WebsocketManager.Success(params.Actioncode, true, params.IsBroadCast, params.ClientIDs)
}