最近在做大屏项目的实时数据展示功能,于是想到了websocket,今天做个小小的笔记。
引入相关依赖,编写配置
spring-boot-starter-websocket jar 包加入到pom文件中。
增加如下代码,提供 webscocket 和 sockjs 两种方式,并指定对应的路径
@Configuration @EnableWebSocket public class WebSocketAutoConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // webSocket通道 // 指定处理器和路径 registry.addHandler(new MyWebSocketHandler(), "/websocket") // 指定自定义拦截器 .addInterceptors(new WebSocketInterceptor()) // 允许跨域 .setAllowedOrigins("*"); // sockJs通道 registry.addHandler(new MyWebSocketHandler(), "/sock-js") .addInterceptors(new WebSocketInterceptor()) .setAllowedOrigins("*") // 开启sockJs支持 .withSockJS(); } }
实现自定义拦截器
此处,从请求中取出相关参数,放入 session 中
public class WebSocketInterceptor implements HandshakeInterceptor { /** * handler处理前调用,attributes属性最终在WebSocketSession里,可能通过webSocketSession.getAttributes().get(key值)获得 */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request; // 获取请求路径携带的参数 String user = serverHttpRequest.getServletRequest().getParameter("user"); attributes.put("user", user); return true; } else { return false; } } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } }
实现自定义消息处理器
发送消息可以选择 sendMessage 和 sendAllMessage 两种,一种是发给某个特定会话,一种是通知到所有会话
public class MyWebSocketHandler extends AbstractWebSocketHandler { /** * 存储sessionId和webSocketSession * 需要注意的是,webSocketSession没有提供无参构造,不能进行序列化,也就不能通过redis存储 * 在分布式系统中,要想别的办法实现webSocketSession共享 */ private static Map sessionMap = new ConcurrentHashMap(); private static Map userMap = new ConcurrentHashMap(); /** * webSocket连接创建后调用 */ @Override public void afterConnectionEstablished(WebSocketSession session) { // 获取参数 String user = String.valueOf(session.getAttributes().get("user")); userMap.put(user, session.getId()); sessionMap.put(session.getId(), session); } /** * 接收到消息会调用 */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { if (message instanceof TextMessage) { } else if (message instanceof BinaryMessage) { } else if (message instanceof PongMessage) { } else { System.out.println("Unexpected WebSocket message type: " + message); } } /** * 连接出错会调用 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) { sessionMap.remove(session.getId()); } /** * 连接关闭会调用 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { sessionMap.remove(session.getId()); } @Override public boolean supportsPartialMessages() { return false; } /** * 后端发送消息 */ public void sendMessage(String user, String message) throws Exception{ String sessionId=userMap.get(user); WebSocketSession session = sessionMap.get(sessionId); session.sendMessage(new TextMessage(message)); } /** * 全部通知到 */ public void sendAllMessage(String message){ sessionMap.forEach((k,v)-> { try { v.sendMessage(new TextMessage(message)); } catch (IOException e) { e.printStackTrace(); } }); }
定时任务,自动发送消息
利用 springboot 自带定时,每隔一分钟给某个会话,发送消息
@Component @Slf4j public class WebTask { private MyWebSocketHandler myWebSocketHandler=new MyWebSocketHandler(); @Scheduled(cron = "0 0/1 * * * ? ") public void taskWeb(){ try { log.info("发送消息:{}"); myWebSocketHandler.sendMessage("tony","hello"); }catch (Exception e){ e.printStackTrace(); } } }
前端测试可以使用下面的网址测试:http://www.websocket-test.com/
如果在分布式系统中使用,可以采用服务调用的方式实现,以及完成 session 共享。或者借助消息队列完成通知,之后我们再研究这一块。