首页 > 文章列表 > Spring Boot怎么使用SSE方式向前端推送数据

Spring Boot怎么使用SSE方式向前端推送数据

springboot SSE
497 2023-05-15

Spring Boot怎么使用SSE方式向前端推送数据

前言

SSE简单的来说就是服务器主动向前端推送数据的一种技术,它是单向的,也就是说前端是不能向服务器发送数据的。SSE适用于消息推送,监控等只需要服务器推送数据的场景中,下面是使用Spring Boot 来实现一个简单的模拟向前端推动进度数据,前端页面接受后展示进度条。

服务端

在Spring Boot中使用时需要注意,最好使用Spring Web 提供的SseEmitter这个类来进行操作,我在刚开始时使用网上说的将Content-Type设置为text-stream这种方式发现每次前端每次都会重新创建接。最后参考该文实现了最终想要的效果:

SSE工具类

SSEServer.java

package vip.huhailong.catchat.sse;



import lombok.extern.slf4j.Slf4j;

import org.springframework.http.MediaType;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.atomic.AtomicInteger;

import java.util.function.Consumer;



/**

 * @author Huhailong

 */

@Slf4j

public class SSEServer {



    /**

     * 当前连接数

     */

    private static AtomicInteger count = new AtomicInteger(0);



    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();



    public static SseEmitter connect(String userId){

        //设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常

        SseEmitter sseEmitter = new SseEmitter(0L);

        //注册回调

        sseEmitter.onCompletion(completionCallBack(userId));

        sseEmitter.onError(errorCallBack(userId));

        sseEmitter.onTimeout(timeOutCallBack(userId));

        sseEmitterMap.put(userId,sseEmitter);

        //数量+1

        count.getAndIncrement();

        log.info("create new sse connect ,current user:{}",userId);

        return sseEmitter;

    }

    /**

     * 给指定用户发消息

     */

    public static void sendMessage(String userId, String message){

        if(sseEmitterMap.containsKey(userId)){

            try{

                sseEmitterMap.get(userId).send(message);

            }catch (IOException e){

                log.error("user id:{}, send message error:{}",userId,e.getMessage());

                e.printStackTrace();

            }

        }

    }



    /**

     * 想多人发送消息,组播

     */

    public static void groupSendMessage(String groupId, String message){

        if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){

            sseEmitterMap.forEach((k,v) -> {

                try{

                    if(k.startsWith(groupId)){

                        v.send(message, MediaType.APPLICATION_JSON);

                    }

                }catch (IOException e){

                    log.error("user id:{}, send message error:{}",groupId,message);

                    removeUser(k);

                }

            });

        }

    }

    public static void batchSendMessage(String message) {

        sseEmitterMap.forEach((k,v)->{

            try{

                v.send(message,MediaType.APPLICATION_JSON);

            }catch (IOException e){

                log.error("user id:{}, send message error:{}",k,e.getMessage());

                removeUser(k);

            }

        });

    }

    /**

     * 群发消息

     */

    public static void batchSendMessage(String message, Set<String> userIds){

        userIds.forEach(userId->sendMessage(userId,message));

    }

    public static void removeUser(String userId){

        sseEmitterMap.remove(userId);

        //数量-1

        count.getAndDecrement();

        log.info("remove user id:{}",userId);

    }

    public static List<String> getIds(){

        return new ArrayList<>(sseEmitterMap.keySet());

    }

    public static int getUserCount(){

        return count.intValue();

    }

    private static Runnable completionCallBack(String userId) {

        return () -> {

            log.info("结束连接,{}",userId);

            removeUser(userId);

        };

    }

    private static Runnable timeOutCallBack(String userId){

        return ()->{

            log.info("连接超时,{}",userId);

            removeUser(userId);

        };

    }

    private static Consumer<Throwable> errorCallBack(String userId){

        return throwable -> {

            log.error("连接异常,{}",userId);

            removeUser(userId);

        };

    }

}

上面这个类可以把它当作一个SSE的工具类,下面我们使用一下它

在Controller层创建 SSEController.java

package vip.huhailong.catchat.controller;



import lombok.extern.slf4j.Slf4j;

import org.springframework.web.bind.annotation.*;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import vip.huhailong.catchat.sse.SSEServer;



/**

 * @author Huhailong

 */

@Slf4j

@RestController

@CrossOrigin

@RequestMapping("/sse")

public class SSEController {



    @GetMapping("/connect/{userId}")

    public SseEmitter connect(@PathVariable String userId){

        return SSEServer.connect(userId);

    }



    @GetMapping("/process")

    public void sendMessage() throws InterruptedException {

        for(int i=0; i<=100; i++){

            if(i>50&&i<70){

                Thread.sleep(500L);

            }else{

                Thread.sleep(100L);

            }

            SSEServer.batchSendMessage(String.valueOf(i));

        }

    }

}

上面的connect是用来连接sse的,它返回一个SseEmitter实例,这时候连接就已经创建了,然后下面的process接口是用来推送数据的,我这里是准备让前端实现一个进度条的效果,所以推送的是数字,为了效果明显,我在推送到50到70的时候速度放慢,其余都是100ms

前端代码

<!DOCTYPE html>

<html lang="en">

<head>

    <meta charset="UTF-8">

    <title>Home</title>

    <script>

        let data = new EventSource("/cat-chat/sse/connect/huhailong")

        data.onmessage = function(event){

            console.log("test=>",event)

            document.getElementById("result").innerText = event.data+'%';

            document.getElementById("my-progress").value = event.data;

        }

    </script>

</head>

<body>

    <div id="result"></div>

    <progress  id="my-progress" value="0" max="100"></progress>

</body>

</html>

最终效果: