首页 > 文章列表 > Java应用层协议WebSocket实现消息推送

Java应用层协议WebSocket实现消息推送

java
315 2023-03-17

前言

  大部分的web开发者,开发的业务都是基于Http协议的:前端请求后端接口,携带参数,后端执行业务代码,再返回结果给前端。作者参与开发的项目,有一个报警推送的功能,服务端实时推送报警信息给浏览器端;还有像抖音里面,如果有人关注、回复你的评论时,抖音就会推送相关消息给你了,你就会收到一条消息。

  有些同学会说了,基于Http协议也能实现啊:前端定时访问后端(每隔3s或者几秒),后端返回消息数据,前端拿到后弹出消息。这种方式太low了,而且每个浏览器都这样,使用系统的人一多,服务器的压力就太大了些。那到底用什么技术手段实现呢?我们的主角就登场了。

  WebSocket是在单个TCP连接上进行全双工通信的应用层协议(Http协议也是应用层),浏览器端和服务端都可主动发送数据给另一端。这样是不是比Http协议更适合消息推送这种场景。

浏览器端

  作者建了一个SpringBoot项目,Html放在src\main\resources\static下:

<!DOCTYPE html>

<html lang="zh" xmlns:th="http://www.thymeleaf.org">

<head>

<!--    解决中文乱码-->

    <meta charset="UTF-8"/>

    <title></title>

    <script type="text/javascript" src="./js/jquery.min.js"></script>

</head>

<body>

    <input id="input1" type="text" /><br/>

    <input type="button" value="浏览器发送服务端" onclick="btnClick()" />

    <input type="button" value="服务端发送浏览器" onclick="btnClick1()" />

    <input type="button" value="重新打开连接" onclick="btnClick2()" />

    <br/>

    <textarea id="textArea" style="height: 50px"></textarea>

<script>

    var ws;

    webSocketInit();

    function webSocketInit() {

        ws =new WebSocket('ws://localhost:8080/bootdemo/webSocket/10086');

        // 获取连接状态

        console.log('ws连接状态[初始]:' + ws.readyState);

        //监听是否连接成功

        ws.onopen = function () {

            console.log('ws连接状态[成功]:' + ws.readyState);

        };

        // 接听服务器发回的信息并处理展示

        ws.onmessage = function (obj) {

            console.log('接收到来自服务器的消息:');

            var txt = $("#textArea").val();

            $("#textArea").val(txt + "\n" + obj.data);

            $("#textArea").scrollTop($("#textArea")[0].scrollHeight);

            //完成通信后关闭WebSocket连接

            // ws.close();

        };

        // 监听连接关闭事件

        ws.onclose = function () {

            // 监听整个过程中websocket的状态

            console.log('ws连接状态[关闭]:' + ws.readyState);

        };

        // 监听并处理error事件

        ws.onerror = function (error) {

            console.log(error);

        };

    }

    function btnClick() {

        console.log("浏览器端发送消息:");

        //连接成功则发送一个数据

        ws.send($("#input1").val());

    }

    function btnClick1() {

        $.ajax({

            url: 'http://localhost:8080/bootdemo/pushWebSocket/publish?' +

            'userId=10086&message=' + $("#input1").val(),

            type: 'GET',

            success: function (data) {

                // console.log(data);

            }

        });

    }

    function btnClick2() {

        webSocketInit();

    }

</script>

</body>

</html>

服务器端

  先引入依赖:

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-thymeleaf</artifactId>

    </dependency>

    <dependency>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-websocket</artifactId>

    </dependency>

    <dependency>

        <groupId>org.projectlombok</groupId>

        <artifactId>lombok</artifactId>

        <scope>provided</scope>

    </dependency>

  bean上添加@ServerEndpoint,作为WebSocket的服务端。

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import javax.websocket.OnMessage;

import javax.websocket.OnOpen;

import javax.websocket.Session;

import javax.websocket.server.PathParam;

import javax.websocket.server.ServerEndpoint;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.CopyOnWriteArraySet;

@Component

@Slf4j

@ServerEndpoint("/webSocket/{userId}")

public class WebSocketServer {

    //与某个客户端的连接会话,需要通过它来给客户端发送数据

    private Session session;

    private static final CopyOnWriteArraySet<WebSocketServer> webSockets =

    new CopyOnWriteArraySet<>();

    // 用来存在线连接数

    private static final Map<String, Session> sessionPool = 

    new HashMap<String, Session>();

    /**

     * 连接成功调用的方法

     */

    @OnOpen

    public void onOpen(Session session, @PathParam(value = "userId") 

    String userId) {

        try {

            this.session = session;

            webSockets.add(this);

            sessionPool.put(userId, session);

        }

        catch (Exception e) {

        }

    }

    /**

     * 收到客户端消息后调用的方法

     */

    @OnMessage

    public void onMessage(String message) {

        log.info("websocket消息: 收到客户端消息:" + message);

    }

    public void sendOneMessage(String userId, String message) {

        Session session = sessionPool.get(userId);

        if (session != null && session.isOpen()) {

            try {

                log.info("服务端推送消息:" + message);

                session.getAsyncRemote().sendText(message);

            }

            catch (Exception e) {

                e.printStackTrace();

            }

        }

    }

}

  进行注册:

@Configuration

public class WebSocketConfigOne {

    /**

     * 这个bean会自动注册使用了@ServerEndpoint注解声明的对象

     * 没有的话会报404

     *

     * @return

     */

    @Bean

    public ServerEndpointExporter serverEndpointExporter() {

        return new ServerEndpointExporter();

    }

}

  推送消息的控制器:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import java.util.HashMap;

import java.util.Map;

@Controller

@RequestMapping("/pushWebSocket")

public class WebSocketController {

    @Autowired

    private WebSocketServer webSocketServer;

    @GetMapping("/publish")

    @ResponseBody

    public Map publish(String userId, String message) {

        webSocketServer.sendOneMessage(userId, message);

        HashMap<String, Object> map = new HashMap<>();

        map.put("code", 200);

        return map;

    }

}

  还有我的配置文件application.properties:

  # web port

  server.port=8080

  server.servlet.context-path=/bootdemo

  运行启动类后,访问html(localhost:8080/bootdemo/index.html)如下:

  有的同学一思索,点击图中的第2个按钮"服务端发送浏览器",你这好像也是前端先请求,再推送的消息;我们的WebSocketController#publish方法,在真实的场景下,可以在后端的定时任务中、消息中间件的消费者端调用,不用前端先发送请求。

  当然SpringBoot有专门构建WebSocket服务端的方式。

  核心配置类:

import lombok.extern.slf4j.Slf4j;

import org.springframework.context.annotation.Configuration;

import org.springframework.http.server.ServerHttpRequest;

import org.springframework.http.server.ServerHttpResponse;

import org.springframework.http.server.ServletServerHttpRequest;

import org.springframework.web.servlet.HandlerMapping;

import org.springframework.web.socket.WebSocketHandler;

import org.springframework.web.socket.config.annotation.EnableWebSocket;

import org.springframework.web.socket.config.annotation.WebSocketConfigurer;

import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.HttpServletRequest;

import java.util.Map;

@Configuration

@EnableWebSocket

@Slf4j

public class WebSocketConfig1 implements WebSocketConfigurer {

    @Override

    public void registerWebSocketHandlers(WebSocketHandlerRegistry 

    registry) {

        registry.addHandler(new MyWebSocketHandler(), "/webSocket/{userId}")//设置连接路径和处理

                .setAllowedOrigins("*")

                .addInterceptors(new MyWebSocketInterceptor());//设置拦截器

    }

    class MyWebSocketInterceptor implements HandshakeInterceptor {

        //前置拦截一般用来注册用户信息,绑定 WebSocketSession

        @Override

        public boolean beforeHandshake(ServerHttpRequest request, 

        ServerHttpResponse response, WebSocketHandler wsHandler, 

        Map<String, Object> attributes) throws Exception {

            log.info("前置拦截~~");

            if (!(request instanceof ServletServerHttpRequest)) {

                return true;

            }

            HttpServletRequest servletRequest = 

            ((ServletServerHttpRequest)request).getServletRequest();

            Map map = (Map)servletRequest.getAttribute(HandlerMapping.

            URI_TEMPLATE_VARIABLES_ATTRIBUTE);

            String userId = (String)map.get("userId");

            attributes.put("userId", userId);

            return true;

        }

        @Override

        public void afterHandshake(ServerHttpRequest request, 

        ServerHttpResponse response, WebSocketHandler wsHandler, 

        Exception exception) {

            log.info("后置拦截~~");

        }

    }

}

  核心处理器:

import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

import org.springframework.web.socket.*;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

@Slf4j

@Component

public class MyWebSocketHandler implements WebSocketHandler {

    private static final Map<String, WebSocketSession> SESSIONS = 

    new ConcurrentHashMap<>();

	/**

	 * 建立新的socket连接后回调的方法

	 */

    @Override

    public void afterConnectionEstablished(WebSocketSession session) 

    throws Exception {

        String userId = (String) session.getAttributes().get("userId");

        SESSIONS.put(userId, session);

    }

	/**

	 * 接收到浏览器端的消息后回调的方法

	 */

    @Override

    public void handleMessage(WebSocketSession session, 

    WebSocketMessage<?> message) throws Exception {

        String msg = message.getPayload().toString();

        log.info("收到客户端消息:" + msg);

    }

	/**

	 * 连接出错时回调的方法

	 */

    @Override

    public void handleTransportError(WebSocketSession session, 

    Throwable exception) throws Exception {

        log.info("连接出错");

        if (session.isOpen()) {

            session.close();

        }

    }

	/**

	 * 连接关闭时回调的方法

	 */

    @Override

    public void afterConnectionClosed(WebSocketSession session, 

    CloseStatus closeStatus) throws Exception {

        log.info("连接关闭:status:" + closeStatus);

    }

	/**

	 * 是否处理部分消息,返回false就行

	 */

    @Override

    public boolean supportsPartialMessages() {

        return false;

    }

	/**

	 * 推送消息给浏览器端

	 */

    public void sendMessage(String userId, String message) {

        WebSocketSession webSocketSession = SESSIONS.get(userId);

        if (webSocketSession == null || !webSocketSession.isOpen()) {

            return;

        }

        try {

            webSocketSession.sendMessage(new TextMessage(message));

        }

        catch (Exception ex) {

            log.error("推送消息异常:" + ex);

        }

    }

}

  控制器也改造下:

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import java.util.HashMap;

import java.util.Map;

@Controller

@RequestMapping("/pushWebSocket")

public class WebSocketController {

    @Autowired

    private MyWebSocketHandler handler;

    @GetMapping("/publish")

    @ResponseBody

    public Map publish(String userId, String message) {

        handler.sendMessage(userId, message);

        HashMap<String, Object> map = new HashMap<>();

        map.put("code", 200);

        return map;

    }

}

  前端部分不用做修改,和之前一样的代码。