登录
    Technology changes quickly but people's minds change slowly.

spring boot websocket 通知实践

技术宅 破玉 1366次浏览 0个评论

  最近在做大屏项目的实时数据展示功能,于是想到了websocket,今天做个小小的笔记。

引入相关依赖,编写配置

  spring-boot-starter-websocket jar 包加入到pom文件中。
增加如下代码,提供 webscocket 和 sockjs 两种方式,并指定对应的路径

@Configuration
@EnableWebSocket
public class WebSocketAutoConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // webSocket通道
        // 指定处理器和路径
        registry.addHandler(new MyWebSocketHandler(), "/websocket")
                // 指定自定义拦截器
                .addInterceptors(new WebSocketInterceptor())
                // 允许跨域
                .setAllowedOrigins("*");
        // sockJs通道
        registry.addHandler(new MyWebSocketHandler(), "/sock-js")
                .addInterceptors(new WebSocketInterceptor())
                .setAllowedOrigins("*")
                // 开启sockJs支持
                .withSockJS();
    }
}

实现自定义拦截器

此处,从请求中取出相关参数,放入 session 中

public class WebSocketInterceptor implements HandshakeInterceptor {

    /**
     * handler处理前调用,attributes属性最终在WebSocketSession里,可能通过webSocketSession.getAttributes().get(key值)获得
     */
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
            // 获取请求路径携带的参数
            String user = serverHttpRequest.getServletRequest().getParameter("user");
            attributes.put("user", user);
            return true;
        } else {
            return false;
        }
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

    }


}

实现自定义消息处理器

发送消息可以选择 sendMessage 和 sendAllMessage 两种,一种是发给某个特定会话,一种是通知到所有会话

public class MyWebSocketHandler extends AbstractWebSocketHandler {

    /**
     *  存储sessionId和webSocketSession
     *  需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储
     *  在分布式系统中,要想别的办法实现webSocketSession共享
     */
    private static Map sessionMap = new ConcurrentHashMap();
    private static Map userMap = new ConcurrentHashMap();

    /**
     * webSocket连接创建后调用
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        // 获取参数
        String user = String.valueOf(session.getAttributes().get("user"));
        userMap.put(user, session.getId());
        sessionMap.put(session.getId(), session);
    }

    /**
     * 接收到消息会调用
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception {
        if (message instanceof TextMessage) {

        } else if (message instanceof BinaryMessage) {

        } else if (message instanceof PongMessage) {

        } else {
            System.out.println("Unexpected WebSocket message type: " + message);
        }
    }

    /**
     * 连接出错会调用
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) {
        sessionMap.remove(session.getId());
    }

    /**
     * 连接关闭会调用
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessionMap.remove(session.getId());
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 后端发送消息
     */
    public void sendMessage(String user, String message) throws Exception{
        String sessionId=userMap.get(user);
        WebSocketSession session = sessionMap.get(sessionId);
        session.sendMessage(new TextMessage(message));
    }

    /**
     * 全部通知到
     */
    public void sendAllMessage(String message){
            sessionMap.forEach((k,v)-> {
                try {
                    v.sendMessage(new TextMessage(message));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
    }

定时任务,自动发送消息

利用 springboot 自带定时,每隔一分钟给某个会话,发送消息

@Component
@Slf4j
public class WebTask {
    private MyWebSocketHandler myWebSocketHandler=new MyWebSocketHandler();
    @Scheduled(cron = "0 0/1 * * * ? ")
    public void taskWeb(){
        try {
            log.info("发送消息:{}");
            myWebSocketHandler.sendMessage("tony","hello");
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

前端测试可以使用下面的网址测试:http://www.websocket-test.com/
如果在分布式系统中使用,可以采用服务调用的方式实现,以及完成 session 共享。或者借助消息队列完成通知,之后我们再研究这一块。


华裳绕指柔, 版权所有丨如未注明 , 均为原创|转载请注明spring boot websocket 通知实践
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址