首页 > 基础资料 博客日记
WebSocket 的使用
2025-12-25 10:07:06基础资料围观2次
Java资料网推荐WebSocket 的使用这篇文章给大家,欢迎收藏Java资料网享受知识的乐趣
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器和客户端之间进行实时双向通信。
基本使用
1. 创建 WebSocket 连接
// 创建 WebSocket 连接 const socket = new WebSocket('ws://localhost:8080'); // 或者使用安全连接 const secureSocket = new WebSocket('wss://example.com/socket');
2. WebSocket 事件
// 连接建立时触发 socket.onopen = function(event) { console.log('连接已建立'); socket.send('Hello Server!'); }; // 接收到消息时触发 socket.onmessage = function(event) { console.log('收到消息:', event.data); // 处理接收到的数据 }; // 发生错误时触发 socket.onerror = function(error) { console.error('WebSocket 错误:', error); }; // 连接关闭时触发 socket.onclose = function(event) { console.log('连接关闭', event.code, event.reason); // 可以在这里尝试重连 };
完整示例
客户端示例
<!DOCTYPE html> <html> <head> <title>WebSocket 示例</title> </head> <body> <div> <input type="text" id="messageInput" placeholder="输入消息"> <button onclick="sendMessage()">发送</button> </div> <div id="messages"></div> <script> // 创建 WebSocket 连接 const socket = new WebSocket('ws://localhost:8080'); const messagesDiv = document.getElementById('messages'); // 连接建立 socket.onopen = function() { addMessage('系统', '连接成功!'); }; // 接收消息 socket.onmessage = function(event) { try { const data = JSON.parse(event.data); addMessage(data.sender, data.message); } catch (e) { addMessage('系统', event.data); } }; // 错误处理 socket.onerror = function(error) { addMessage('系统', '连接错误'); }; // 连接关闭 socket.onclose = function() { addMessage('系统', '连接已关闭'); }; // 发送消息 function sendMessage() { const input = document.getElementById('messageInput'); const message = input.value.trim(); if (message) { socket.send(JSON.stringify({ type: 'message', content: message, timestamp: new Date().toISOString() })); input.value = ''; } } // 显示消息 function addMessage(sender, text) { const msgElement = document.createElement('div'); msgElement.innerHTML = `<strong>${sender}:</strong> ${text}`; messagesDiv.appendChild(msgElement); messagesDiv.scrollTop = messagesDiv.scrollHeight; } // 关闭连接(页面卸载时) window.addEventListener('beforeunload', function() { if (socket.readyState === WebSocket.OPEN) { socket.close(1000, '用户离开页面'); } }); </script> </body> </html>
Node.js 服务器端示例
// 使用 ws 库 const WebSocket = require('ws'); // 创建 WebSocket 服务器 const wss = new WebSocket.Server({ port: 8080 }); console.log('WebSocket 服务器启动在 ws://localhost:8080'); // 连接处理 wss.on('connection', function connection(ws) { console.log('新客户端连接'); // 发送欢迎消息 ws.send(JSON.stringify({ type: 'system', message: '欢迎连接到服务器!' })); // 接收客户端消息 ws.on('message', function incoming(message) { console.log('收到消息:', message); try { const data = JSON.parse(message); // 广播消息给所有客户端 wss.clients.forEach(function each(client) { if (client !== ws && client.readyState === WebSocket.OPEN) { client.send(JSON.stringify({ type: 'message', sender: '用户', message: data.content, timestamp: new Date().toISOString() })); } }); } catch (error) { console.error('消息解析错误:', error); } }); // 连接关闭 ws.on('close', function() { console.log('客户端断开连接'); }); // 错误处理 ws.on('error', function(error) { console.error('WebSocket 错误:', error); }); });
WebSocket 状态
// 检查连接状态 switch(socket.readyState) { case WebSocket.CONNECTING: // 0 - 连接中 console.log('连接中...'); break; case WebSocket.OPEN: // 1 - 已连接 console.log('已连接'); break; case WebSocket.CLOSING: // 2 - 关闭中 console.log('正在关闭...'); break; case WebSocket.CLOSED: // 3 - 已关闭 console.log('已关闭'); break; }
高级特性
1. 心跳检测
// 心跳检测 let heartbeatInterval; socket.onopen = function() { console.log('连接建立'); // 开始心跳 heartbeatInterval = setInterval(() => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify({ type: 'ping' })); } }, 30000); }; socket.onclose = function() { // 清除心跳 clearInterval(heartbeatInterval); };
2. 重连机制
class WebSocketClient { constructor(url) { this.url = url; this.socket = null; this.reconnectAttempts = 0; this.maxReconnectAttempts = 5; this.reconnectDelay = 1000; } connect() { this.socket = new WebSocket(this.url); this.socket.onopen = () => { console.log('连接成功'); this.reconnectAttempts = 0; }; this.socket.onclose = (event) => { console.log('连接断开,尝试重连...'); this.reconnect(); }; this.socket.onerror = (error) => { console.error('连接错误:', error); }; } reconnect() { if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts); setTimeout(() => { console.log(`第 ${this.reconnectAttempts} 次重连`); this.connect(); }, delay); } else { console.error('重连次数已达上限'); } } send(data) { if (this.socket.readyState === WebSocket.OPEN) { this.socket.send(data); } } }
3. 二进制数据传输
// 发送二进制数据 socket.onopen = function() { // 发送 ArrayBuffer const buffer = new ArrayBuffer(4); const view = new Uint8Array(buffer); view[0] = 1; view[1] = 2; view[2] = 3; view[3] = 4; socket.send(buffer); // 发送 Blob const blob = new Blob(['Hello'], { type: 'text/plain' }); socket.send(blob); }; // 接收二进制数据 socket.binaryType = 'arraybuffer'; // 或 'blob' socket.onmessage = function(event) { if (event.data instanceof ArrayBuffer) { // 处理 ArrayBuffer const view = new Uint8Array(event.data); console.log('收到二进制数据:', view); } else { // 处理文本数据 console.log('收到文本数据:', event.data); } };
Spring Boot 中使用 WebSocket
添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
基础配置类
@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { // 消息代理前缀 config.enableSimpleBroker("/topic", "/queue"); // 应用目的地前缀 config.setApplicationDestinationPrefixes("/app"); // 用户目的地前缀(一对一消息) config.setUserDestinationPrefix("/user"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 注册 WebSocket 端点 registry.addEndpoint("/ws") .setAllowedOriginPatterns("*") .withSockJS(); // 支持 SockJS 降级 // 也可以添加多个端点 registry.addEndpoint("/ws-native") .setAllowedOriginPatterns("*"); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { // 配置传输限制 registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KB registration.setSendTimeLimit(20 * 1000); // 发送超时 20秒 registration.setSendBufferSizeLimit(512 * 1024); // 发送缓冲区限制 512KB } }
控制器示例
@Controller public class WebSocketController { // 注入消息模板 @Autowired private SimpMessagingTemplate messagingTemplate; /** * 处理客户端发送的消息 * 目的地:/app/chat */ @MessageMapping("/chat") @SendTo("/topic/messages") public ChatMessage handleMessage(ChatMessage message) { message.setTimestamp(new Date()); System.out.println("收到消息: " + message.getContent()); return message; } /** * 发送广播消息 */ @GetMapping("/broadcast") public void broadcast(String content) { ChatMessage message = new ChatMessage(); message.setContent(content); message.setSender("系统"); message.setTimestamp(new Date()); // 发送到 /topic/messages messagingTemplate.convertAndSend("/topic/messages", message); } /** * 发送点对点消息 */ @GetMapping("/sendToUser") public void sendToUser(String userId, String content) { ChatMessage message = new ChatMessage(); message.setContent(content); message.setSender("管理员"); message.setTimestamp(new Date()); // 发送给指定用户:/user/{userId}/queue/messages messagingTemplate.convertAndSendToUser( userId, "/queue/messages", message ); } } // 消息实体类 @Data @AllArgsConstructor @NoArgsConstructor public class ChatMessage { private String sender; private String content; private Date timestamp; }
连接拦截器
@Component public class WebSocketInterceptor extends ChannelInterceptorAdapter { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { // 连接建立时处理 String token = accessor.getFirstNativeHeader("token"); // 验证 token... System.out.println("用户连接: " + accessor.getSessionId()); } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) { // 连接断开时处理 System.out.println("用户断开: " + accessor.getSessionId()); } return message; } }
原生 Java WebSocket(JSR 356)
注解方式
@ServerEndpoint("/chat/{userId}")
@Component
public class ChatEndpoint {
// 存储所有连接
private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
// 存储用户ID和session的映射
private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();
/**
* 连接建立时调用
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
System.out.println("连接建立: " + session.getId() + ", 用户: " + userId);
// 保存连接
sessions.put(session.getId(), session);
userSessionMap.put(userId, session.getId());
// 通知其他用户有新用户上线
broadcast("系统", "用户 " + userId + " 上线了");
}
/**
* 收到消息时调用
*/
@OnMessage
public void onMessage(String message, Session session,
@PathParam("userId") String userId) {
System.out.println("收到消息: " + message + " from: " + userId);
try {
// 解析消息
JSONObject json = new JSONObject(message);
String content = json.getString("content");
String toUserId = json.optString("to", null);
if (toUserId != null) {
// 私聊消息
sendToUser(userId, toUserId, content);
} else {
// 群发消息
broadcast(userId, content);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 连接关闭时调用
*/
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
System.out.println("连接关闭: " + session.getId());
// 移除连接
sessions.remove(session.getId());
userSessionMap.remove(userId);
// 通知其他用户
broadcast("系统", "用户 " + userId + " 下线了");
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
System.out.println("连接错误: " + session.getId());
error.printStackTrace();
}
/**
* 广播消息给所有用户
*/
private void broadcast(String sender, String content) {
JSONObject message = new JSONObject();
message.put("sender", sender);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis());
message.put("type", "broadcast");
// 发送给所有连接的客户端
for (Session session : sessions.values()) {
if (session.isOpen()) {
try {
session.getAsyncRemote().sendText(message.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
/**
* 发送私聊消息
*/
private void sendToUser(String fromUserId, String toUserId, String content) {
String toSessionId = userSessionMap.get(toUserId);
if (toSessionId != null) {
Session toSession = sessions.get(toSessionId);
if (toSession != null && toSession.isOpen()) {
try {
JSONObject message = new JSONObject();
message.put("sender", fromUserId);
message.put("content", content);
message.put("timestamp", System.currentTimeMillis());
message.put("type", "private");
toSession.getAsyncRemote().sendText(message.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
编程方式(继承 Endpoint 类)
@ServerEndpoint("/game")
public class GameEndpoint extends Endpoint {
private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
@Override
public void onOpen(Session session, EndpointConfig config) {
System.out.println("新连接: " + session.getId());
sessions.add(session);
// 添加消息处理器
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
System.out.println("收到: " + message);
// 处理游戏逻辑
handleGameMessage(session, message);
}
});
// 发送欢迎消息
try {
JSONObject welcome = new JSONObject();
welcome.put("type", "welcome");
welcome.put("message", "欢迎加入游戏!");
welcome.put("sessionId", session.getId());
session.getBasicRemote().sendText(welcome.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onClose(Session session, CloseReason closeReason) {
System.out.println("连接关闭: " + session.getId());
sessions.remove(session);
// 通知其他玩家
broadcastPlayerLeft(session.getId());
}
@Override
public void onError(Session session, Throwable thr) {
System.err.println("连接错误: " + session.getId());
thr.printStackTrace();
}
private void handleGameMessage(Session session, String message) {
try {
JSONObject json = new JSONObject(message);
String type = json.getString("type");
switch (type) {
case "move":
// 处理移动
handlePlayerMove(session, json);
break;
case "chat":
// 处理聊天
handleChatMessage(session, json);
break;
default:
System.out.println("未知消息类型: " + type);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void handlePlayerMove(Session session, JSONObject moveData) {
// 处理玩家移动逻辑
// 广播给所有玩家
broadcastGameUpdate(moveData);
}
private void handleChatMessage(Session session, JSONObject chatData) {
// 广播聊天消息
JSONObject broadcastMsg = new JSONObject();
broadcastMsg.put("type", "chat");
broadcastMsg.put("sender", session.getId());
broadcastMsg.put("message", chatData.getString("message"));
broadcastMsg.put("timestamp", System.currentTimeMillis());
broadcast(broadcastMsg.toString());
}
private void broadcast(String message) {
synchronized (sessions) {
for (Session s : sessions) {
if (s.isOpen()) {
try {
s.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
配置文件
application.yml 配置
spring:
websocket:
# WebSocket 配置
enabled: true
server:
# 服务器配置
port: 8080
servlet:
context-path: /api
# 自定义配置
websocket:
max-sessions: 1000
heartbeat-interval: 30000
max-message-size: 128KB
心跳检测和连接管理
@Component public class WebSocketHeartbeat { @Autowired private SimpMessagingTemplate messagingTemplate; private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @PostConstruct public void init() { // 每30秒发送一次心跳 scheduler.scheduleAtFixedRate(() -> { try { messagingTemplate.convertAndSend("/topic/heartbeat", Map.of("timestamp", System.currentTimeMillis(), "type", "heartbeat")); } catch (Exception e) { e.printStackTrace(); } }, 0, 30, TimeUnit.SECONDS); } @PreDestroy public void destroy() { scheduler.shutdown(); } }
消息编码器/解码器
// 自定义消息编解码器 @Component public class ChatMessageConverter implements MessageConverter { @Override public Message<?> toMessage(Object payload, MessageHeaders headers) { if (payload instanceof ChatMessage) { ChatMessage msg = (ChatMessage) payload; byte[] bytes = serializeMessage(msg); return MessageBuilder.createMessage(bytes, headers); } return null; } @Override public Object fromMessage(Message<?> message, Class<?> targetClass) { if (targetClass == ChatMessage.class) { byte[] bytes = (byte[]) message.getPayload(); return deserializeMessage(bytes); } return null; } private byte[] serializeMessage(ChatMessage message) { try { return new ObjectMapper().writeValueAsBytes(message); } catch (Exception e) { throw new RuntimeException("序列化失败", e); } } private ChatMessage deserializeMessage(byte[] bytes) { try { return new ObjectMapper().readValue(bytes, ChatMessage.class); } catch (Exception e) { throw new RuntimeException("反序列化失败", e); } } }
集群支持
@Configuration @EnableRedisRepositories public class RedisConfig { @Bean public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(factory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class)); return template; } } // Redis 广播消息 @Component public class RedisMessagePublisher { @Autowired private RedisTemplate<String, Object> redisTemplate; public void publish(String channel, Object message) { redisTemplate.convertAndSend(channel, message); } } @Component public class RedisMessageSubscriber implements MessageListener { @Autowired private SimpMessagingTemplate messagingTemplate; @Override public void onMessage(Message message, byte[] pattern) { // 处理从 Redis 收到的消息 // 转发给 WebSocket 客户端 String channel = new String(pattern); String msg = new String(message.getBody()); messagingTemplate.convertAndSend("/topic/" + channel, msg); } }
Spring Boot 的 STOMP 实现更加完整和易于使用,而原生 WebSocket 则更加灵活。
文章来源:https://www.cnblogs.com/syf0824/p/19368779
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
上一篇:递归与分治算法
下一篇:剑指offer-52、正则表达式匹配

