项目中的需求是这样的:

一个页面实时刷新的功能,页面上的数据状态可能会随着操作实时改变,所以每个用户在使用的时候都希望能看到数据的最新状态。

我想到了两种解决方法:1.轮循,2.WebSocket

我们这里采用的是WebSocket来解决问题

然而在解决的过程中又发现了其他的问题

WebSocket在建立连接后,如果不是人为操作的话,他不会主动地进行断开,这样会导致数据安全问题。

下面是我解决问题的代码:

import javax.websocket.Session;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;@Componentpublic class WebSocketTokenExpirationCheckTask {// 保存session和过期时间的Mapprivate static final Map<Session, Long> tokenExpirationMap = new ConcurrentHashMap<>();/** * 添加token和过期时间(也可以将token代替session作为参数,验证token有效性,来判断是否过期) */public static void addSessionExpiration(Session session, Long expirationTime) {tokenExpirationMap.put(session, expirationTime);}public static void removeSessionExpiration(Session session) {tokenExpirationMap.remove(session);}/** * 检查token是否过期 */@Scheduled(fixedDelay = 60000) // 每60秒执行一次public void checkTokenExpiration() {long currentTimeMillis = System.currentTimeMillis();for (Session session : tokenExpirationMap.keySet()) {Long expirationTime = tokenExpirationMap.get(session);if (expirationTime != null && currentTimeMillis > expirationTime) {WebSocketServerPool.removeBySession(session); // 移除对应的连接(也可以将token作为参数,验证token有效性,来判断是否过期)}}}}

这部分是提供了一个定时任务,每60秒去看一下,我们的连接是否过期了。

public class WebSocketServerPool {/** * WebSocket连接池 */private static ConcurrentMap<Session, String> dataConnect = new ConcurrentHashMap<>();private static ConcurrentMap<Session, String> dataMessage = new ConcurrentHashMap<>();private static ConcurrentMap<Session, String> dataScene = new ConcurrentHashMap<>();/** * 将websocket连接,放入连接池 * * @param session websocket连接 */public static void addDataConnect(Session session, String screen) {dataConnect.put(session, screen);Iterator<Map.Entry<Session, String>> iterator = dataConnect.entrySet().iterator();synchronized (iterator) {//移除失效连接while (iterator.hasNext()) {Map.Entry<Session, String> entry = iterator.next();Session sessionNew = entry.getKey();Map<String, Object> userProperties = sessionNew.getUserProperties();if (null != userProperties && null != userProperties.get("ReadyState") && "0" != String.valueOf(userProperties.get("ReadyState"))) {iterator.remove();}}}}public static void setDataMessage(Session session, String message) {dataMessage.put(session, message);}public static void setDataScene(Session session, String scene) {dataScene.put(session, scene);}/** * 将websocket连接从连接池中移除 * * @param session websocket连接 */public static void removeConnect(Session session) {Iterator<Map.Entry<Session, String>> iterator = dataConnect.entrySet().iterator();synchronized (iterator) {//主动移除连接while (iterator.hasNext()) {if (session.equals(iterator.next().getKey())) {iterator.remove();}}}}/** * 获取连接池中所有连接 * * @return 连接池所有数据 */public static ConcurrentMap<Session, String> getDataConnect() {return dataConnect;}/** * 获取消息信息 * * @return */public static ConcurrentMap<Session, String> getDataMessage() {return dataMessage;}/** * 获取数据场景 * * @return */public static ConcurrentMap<Session, String> getDataScene() {return dataScene;}/** * Websocket消息推送 * * @param session 连接 * @param message 消息主体 * @throws IOException I/O异常 */public static void sendMessage(Session session, String message) throws IOException {session.getBasicRemote().sendText(message);}public static void removeBySession(Session session) {//for (Session session : dataConnect.keySet()) {//String sessionToken = dataConnect.get(token);//if (sessionToken != null && sessionToken.equals(token)) {//try {//session.close(); // 主动关闭websocket连接//} catch (IOException e) {//e.printStackTrace();//}//removeConnect(session);//break;//}//}try {session.close(); // 主动关闭websocket连接} catch (IOException e) {e.printStackTrace();}removeConnect(session);WebSocketTokenExpirationCheckTask.removeSessionExpiration(session); // 从检查任务中移除该session(token)}}

最后就是WebSocket服务代码了,这部分代码网上很多,核心就是在连接时(onOpen)将session和过期时间存入Map集合中,你也可以将token作为参数传递进来。

import com.zzyc.web.config.WebSocketServerPool;import com.zzyc.web.service.IOrbitsService;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import javax.annotation.Resource;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.core.task.AsyncTaskExecutor;import org.springframework.stereotype.Component;@ServerEndpoint("/webSocket/{userId}")@Component@Slf4jpublic class WebSocketServer {/** * 建立连接成功调用 (Session + 场景ID) */@OnOpenpublic void onOpen(Session session,@PathParam("userId") String screen) throws IOException {log.info("[onOpen][session({}) 接入, [screen: {}]", session, screen);WebSocketTokenExpirationCheckTask.addSessionExpiration(session, System.currentTimeMillis() + 30 * 60 * 1000); // 过期时间为30分钟WebSocketServerPool.setDataScene(session,"1");WebSocketServerPool.addDataConnect(session,screen);}/** * 关闭连接时调用 * @param session 连接 */@OnClosepublic void onClose(Session session, CloseReason closeReason) {log.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);WebSocketServerPool.removeConnect(session);}/** * 错误时调用 * @param session 连接 * @param throwable 异常 */@OnErrorpublic void onError(Session session, Throwable throwable) {log.info("[onClose][session({}) 发生异常]", session, throwable);WebSocketServerPool.removeConnect(session);}/** * 收到客户端信息后,根据接收到的信息进行处理 * @param session 连接 * @param message 数据消息 */@OnMessagepublic void onMessage(Session session, String message) {WebSocketServerPool.setDataMessage(session, message);log.info("[onOpen][session({}) 接收到一条消息({})]", session, message);// TODO: 2023/04/11 对于客户端发送的指令信息,解析后进行对应的逻辑处理}private void sendMessage(Session session, String message) {try {session.getBasicRemote().sendText(message);} catch (Exception e) {System.out.println("Error sending message to session " + session.getId() + ": " + e.getMessage());}}}

这样就可以将WebSocket连接进行关闭了