Web Socket

WebSocket #

WebSocket - Web API 接口参考 |多核 (mozilla.org)

WebSocket API是一种先进的技术,可以在用户的浏览器和服务器之间打开双向交互通信会话。使用此 API,您可以向服务器发送消息并接收事件驱动的响应,而无需轮询服务器以获取答复。

官方示例 #

Chat Example

官方示例可参照synk项目结合gin框架

官方介绍

main.go #

package main
import (
	"flag"
	"log"
	"net/http"
)
var addr = flag.String("addr", ":8080", "http service address")
func serveHome(w http.ResponseWriter, r *http.Request) {
	log.Println(r.URL)
	if r.URL.Path != "/" {
		http.Error(w, "Not found", http.StatusNotFound)
		return
	}
	if r.Method != http.MethodGet {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}
	http.ServeFile(w, r, "home.html")
}

func main() {
	flag.Parse() // 把用户传递的命令行参数解析为对应变量的值
	hub := newHub() //创建hub结构体
	go hub.run()   //启动
	http.HandleFunc("/", serveHome)
	http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
		serveWs(hub, w, r)
	})
	err := http.ListenAndServe(*addr, nil)
	if err != nil {
		log.Fatal("ListenAndServe: ", err)
	}
}

client.go #

package main
import (
	"bytes"
	"log"
	"net/http"
	"time"
	"github.com/gorilla/websocket"
)
const (
	writeWait = 10 * time.Second// 允许向peer写入消息的时间
	pongWait = 60 * time.Second// 允许的时间读取来自peer的下一条pong消息
	pingPeriod = (pongWait * 9) / 10 //发送 ping 以对此时间段进行对等。必须小于pongWait
	maxMessageSize = 512//peer允许的最大message大小。
)
var (
	newline = []byte{'\n'}
	space   = []byte{' '}
)
var upgrader = websocket.Upgrader{ //升级websocket协议
	ReadBufferSize:  1024,  //读写缓存大小
	WriteBufferSize: 1024,
}

type Client struct {
	hub *Hub
	conn *websocket.Conn //连接信息
	send chan []byte    //往里面发送东西
}

func (c *Client) readPump() {
	defer func() {
		c.hub.unregister <- c
		c.conn.Close()
	}()
	c.conn.SetReadLimit(maxMessageSize)
	c.conn.SetReadDeadline(time.Now().Add(pongWait))
	c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("error: %v", err)
			}
			break
		}
		message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
		c.hub.broadcast <- message
	}
}

func (c *Client) writePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		c.conn.Close()
	}()
	for {
		select {
		case message, ok := <-c.send:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			w.Write(message)
			// 将排队的聊天消息添加到当前 Websocket 消息
			n := len(c.send)
			for i := 0; i < n; i++ {
				w.Write(newline)
				w.Write(<-c.send)
			}
			if err := w.Close(); err != nil {
				return
			}
		case <-ticker.C:
			c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}
// 处理websocket请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)  //创建连接
	if err != nil {
		log.Println(err)
		return
	}
	client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
	client.hub.register <- client //注册
   //起协程,实时接收和回复数据
	go client.writePump()
	go client.readPump()
}

hub.go #

package main

type Hub struct {
	clients map[*Client]bool// 注册客户
	broadcast chan []byte// 来自客户端的消息
	register chan *Client// 注册来自客户端的请求。
	unregister chan *Client
}

func newHub() *Hub { 
	return &Hub{
		broadcast:  make(chan []byte),//广播
		register:   make(chan *Client),//监听
		unregister: make(chan *Client),//不监听
		clients:    make(map[*Client]bool),//统计有多少个人在监听我
	}
}

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register: //如果有人监听
			h.clients[client] = true  //添加进去
		case client := <-h.unregister:  //如果有人不监听
			if _, ok := h.clients[client]; ok {
				delete(h.clients, client)  //把它删除
				close(client.send)
			}
		case message := <-h.broadcast:  //广播消息
			for client := range h.clients {  //遍历所有客户发给他们
				select {
				case client.send <- message:
				default:
					close(client.send)
					delete(h.clients, client)
				}
			}
		}
	}
}

home.html #

<!DOCTYPE html>
<html lang="en">
<head>
<title>Chat Example</title>
<script type="text/javascript">
window.onload = function () {
    var conn;
    var msg = document.getElementById("msg");
    var log = document.getElementById("log");

    function appendLog(item) {
        var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1;
        log.appendChild(item);
        if (doScroll) {
            log.scrollTop = log.scrollHeight - log.clientHeight;
        }
    }
    document.getElementById("form").onsubmit = function () {
        if (!conn) {
            return false;
        }
        if (!msg.value) {
            return false;
        }
        conn.send(msg.value);
        msg.value = "";
        return false;
    };
    if (window["WebSocket"]) {
        conn = new WebSocket("ws://" + document.location.host + "/ws");
        conn.onclose = function (evt) {
            var item = document.createElement("div");
            item.innerHTML = "<b>Connection closed.</b>";
            appendLog(item);
        };
        conn.onmessage = function (evt) {
            var messages = evt.data.split('\n');
            for (var i = 0; i < messages.length; i++) {
                var item = document.createElement("div");
                item.innerText = messages[i];
                appendLog(item);
            }
        };
    } else {
        var item = document.createElement("div");
        item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
        appendLog(item);
    }
};
</script>
<style type="text/css">
html {
    overflow: hidden;
}
body {
    overflow: hidden;
    padding: 0;
    margin: 0;
    width: 100%;
    height: 100%;
    background: gray;
}
#log {
    background: white;
    margin: 0;
    padding: 0.5em 0.5em 0.5em 0.5em;
    position: absolute;
    top: 0.5em;
    left: 0.5em;
    right: 0.5em;
    bottom: 3em;
    overflow: auto;
}
#form {
    padding: 0 0.5em 0 0.5em;
    margin: 0;
    position: absolute;
    bottom: 1em;
    left: 0px;
    width: 100%;
    overflow: hidden;
}
</style>
</head>
<body>
<div id="log"></div>
<form id="form">
    <input type="submit" value="Send" />
    <input type="text" id="msg" size="64" autofocus />
</form>
</body>
</html>

适合业务的修改版 #

结合gin框架

注册 #

客户端发送一次请求,服务端保留住这个请求(注册),使用心跳维持互通,这就需要服务端维护一个接口去接受客户端的连接请求

// 客户端连接
func WsClient(context *gin.Context) {
    upGrande := websocket.Upgrader{
            //设置允许跨域
            CheckOrigin: func(r *http.Request) bool {
                    return true
            },
            //设置请求协议
            Subprotocols: []string{context.GetHeader("Sec-WebSocket-Protocol")},
    }
    //创建连接
    conn, err := upGrande.Upgrade(context.Writer, context.Request, nil)
	if err != nil {
		fmt.Println(err)
		return
	}
    //生成唯一标识client_id
    var uuid = uuid.NewV4().String()
    client := &ws.Client{
            Id:      uuid,
            Socket:  conn,
            Message: make(chan []byte, 1024),
    }
    //注册
	ws.Manager.Register <- client

    //起协程,实时接收和回复数据
	go client.Read()
	go client.Write()
}
//也可以这样去连接
conn, _, err := websocket.DefaultDialer.Dial("ws://localhost:8080/ws", nil) 
    if err != nil {
        log.Fatal(err)
    }
    defer conn.Close()

上面是一个客户端连接的入口(接口),需要在router路由中进行配置

r.GET("/ws", api.WsClient)

客户端的连接地址则可以是:ws://127.0.0.1:8080/ws

func main() {
	go ws.Manager.Start()     //这里要启动
	portFlag := flag.Int("port", 8080, "the port to listen on")
	flag.Parse()
	r := gin.New()
	r = routers.CollectRouter(r)
	port := *portFlag
	err := r.Run(fmt.Sprintf("0.0.0.0:%d", port))
	if err != nil {
		panic(err)
	}
}

开启程序服务器后,后台开启一个协程去监听处理发送给客户端的消息,包括:客户端注册、客户端注销、回复客户端消息

package ws
import (
	"encoding/json"
	"github.com/gorilla/websocket"
)
type ClientManager struct {
	Clients    map[*Client]bool
	Broadcast  chan []byte
	Register   chan *Client
	Unregister chan *Client
}
type Client struct {
	ID     string
	Socket *websocket.Conn
	Send   chan []byte
}
type Message struct {
	Sender    string `json:"sender,omitempty"`
	Recipient string `json:"recipient,omitempty"`
	Content   string `json:"content,omitempty"`
}
var Manager = ClientManager{
	Broadcast:  make(chan []byte),
	Register:   make(chan *Client),
	Unregister: make(chan *Client),
	Clients:    make(map[*Client]bool),
}
func (manager *ClientManager) Start() {
	for {
		select {
		case conn := <-manager.Register:
			manager.Clients[conn] = true
			jsonMessage, _ := json.Marshal(&Message{Content: "/A new socket has connected."})
			manager.Send(jsonMessage, conn)
		case conn := <-manager.Unregister:
			if _, ok := manager.Clients[conn]; ok {
				close(conn.Send)
				delete(manager.Clients, conn)
				jsonMessage, _ := json.Marshal(&Message{Content: "/A socket has disconnected."})
				manager.Send(jsonMessage, conn)
			}
		case message := <-manager.Broadcast:
			for conn := range manager.Clients {
				select {
				case conn.Send <- message:
				default:
					close(conn.Send)
					delete(manager.Clients, conn)
				}
			}
		}
	}
}
func (manager *ClientManager) Send(message []byte, ignore *Client) {
	for conn := range manager.Clients {
		if conn != ignore {
			conn.Send <- message
		}
	}
}
func (c *Client) Read() {
	defer func() {
		Manager.Unregister <- c
		c.Socket.Close()
	}()
	for {
		_, message, err := c.Socket.ReadMessage()
		if err != nil {
			Manager.Unregister <- c
			c.Socket.Close()
			break
		}
		jsonMessage, _ := json.Marshal(&Message{Sender: c.ID, Content: string(message)})
		Manager.Broadcast <- jsonMessage
	}
}
func (c *Client) Write() {
	defer func() {
		c.Socket.Close()
	}()
	for {
		select {
		case message, ok := <-c.Send:
			if !ok {
				c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			c.Socket.WriteMessage(websocket.TextMessage, message)
		}
	}
}
Start():启动websocket服务
Send():向连接websocket的管道chan写入数据
Read():读取在websocket管道中的数据
Write():通过websocket协议向连接到ws的客户端发送数据

函数调用 #

func SendWebSocket(info string, progress int64) {
	if wsapi.Conn == nil {
		fmt.Errorf("WebSocket未连接")
	}
	wsapi.Conn.WriteMessage(websocket.TextMessage, []byte(info))
	fmt.Println(info, progress)
}

挎包调用可将conn设置为全局变量

var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}
var Conn *websocket.Conn

脚本测试 #

(重新建一个文件夹启动main函数)

var addr = flag.String("addr", "127.0.0.1:8080", "http service address")
func main() {
	u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws"}
	var dialer *websocket.Dialer
	conn, _, err := dialer.Dial(u.String(), nil)
	if err != nil {
		fmt.Println(err)
		return
	}
	go timeWriter(conn)
	for {
		_, message, err := conn.ReadMessage()
		if err != nil {
			fmt.Println("read:", err)
			return
		}
		fmt.Printf("received: %s\n", message)
	}
}
func timeWriter(conn *websocket.Conn) {
	for {
		time.Sleep(time.Second * 2)
		conn.WriteMessage(websocket.TextMessage, []byte(time.Now().Format("2006-01-02 15:04:05")))
	}
}

升级版 #

for {
   select {
   case client := <-manager.Register:
           //注册客户端
           manager.Lock.Lock()
           manager.Group[client.Id] = client
           manager.clientCount += 1
           log.WSLog(fmt.Sprintf("客户端注册: 客户端id为%s", client.Id))
           manager.Lock.Unlock()
   case client := <-manager.UnRegister:
           //注销客户端
           manager.Lock.Lock()
           if _, ok := manager.Group[client.Id]; ok {
                   //关闭消息通道
                   close(client.Message)
                   //删除分组中客户
                   delete(manager.Group, client.Id)
                   //客户端数量减1
                   manager.clientCount -= 1
                   log.WSLog(fmt.Sprintf("客户端注销: 客户端id为%s", client.Id))
           }
           manager.Lock.Unlock()
   case data := <-manager.BroadCastMessage:
           //将数据广播给所有客户端
           for _, conn := range manager.Group {
                   if data.IsBroadCast {
                           conn.Message <- data.Message
                   } else {
                           if function.InSliceStr(conn.Id, data.ClientIDs) {
                                   conn.Message <- data.Message
                           }
                   }

           }

   }
}

单个websocket的client结构体 #

type Client struct {
	ID     string
	Socket *websocket.Conn
	Send   chan []byte
}

服务端websocke的结构体 #

type Manager struct {
  //client.id => Client
  Group                map[string]*Client
  Lock                 sync.Mutex
  Register, UnRegister chan *Client
  BroadCastMessage     chan *BroadCastMessageData
  clientCount          uint //分组及客户端数量
}

回复数据消息结构体 #

type BroadCastMessageData struct {
  Message     []byte
  IsBroadCast bool
  ClientIDs   []string
}

数据通信 #

以下是在建立连接后的正常数据通信(发送数据,回复数据)的流程图

在处理客户端消息的逻辑处理中,封装了一个handle文件,接收客户端请求指令的函数方法处理

/**
* Description: websocket服务器接收数据指令调用对应函数
* author: 	shahao
* create on:	2021-04-16 18:05:21
*/
func (manager *Manager) ServerCodeToFunc(data ReadData) {
  funcName := case2Camel(data.Actioncode)
  vft := manager.serverReturnFunc()
  params := make([]reflect.Value, 1)
  params[0] = reflect.ValueOf(data)
  if vft[funcName].IsValid() {
  	vft[funcName].Call(params)
  }
}

复制代码

然后可以将处理逻辑集中放到serverInstructFunc处理,例如心跳回复函数

//心跳包
func (m *ServerMethod) HeartBeat(params ReadData) {
  WebsocketManager.Success(params.Actioncode, true, params.IsBroadCast, params.ClientIDs)
}