首页 > 基础资料 博客日记

WebSocket 的使用

2025-12-25 10:07:06基础资料围观2

Java资料网推荐WebSocket 的使用这篇文章给大家,欢迎收藏Java资料网享受知识的乐趣

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,允许服务器和客户端之间进行实时双向通信。

基本使用

1. 创建 WebSocket 连接

// 创建 WebSocket 连接
const socket = new WebSocket('ws://localhost:8080');

// 或者使用安全连接
const secureSocket = new WebSocket('wss://example.com/socket');

2. WebSocket 事件

// 连接建立时触发
socket.onopen = function(event) {
    console.log('连接已建立');
    socket.send('Hello Server!');
};

// 接收到消息时触发
socket.onmessage = function(event) {
    console.log('收到消息:', event.data);
    // 处理接收到的数据
};

// 发生错误时触发
socket.onerror = function(error) {
    console.error('WebSocket 错误:', error);
};

// 连接关闭时触发
socket.onclose = function(event) {
    console.log('连接关闭', event.code, event.reason);
    // 可以在这里尝试重连
};

完整示例

客户端示例

<!DOCTYPE html>
<html>
<head>
    <title>WebSocket 示例</title>
</head>
<body>
    <div>
        <input type="text" id="messageInput" placeholder="输入消息">
        <button onclick="sendMessage()">发送</button>
    </div>
    <div id="messages"></div>

    <script>
        // 创建 WebSocket 连接
        const socket = new WebSocket('ws://localhost:8080');
        const messagesDiv = document.getElementById('messages');
        
        // 连接建立
        socket.onopen = function() {
            addMessage('系统', '连接成功!');
        };
        
        // 接收消息
        socket.onmessage = function(event) {
            try {
                const data = JSON.parse(event.data);
                addMessage(data.sender, data.message);
            } catch (e) {
                addMessage('系统', event.data);
            }
        };
        
        // 错误处理
        socket.onerror = function(error) {
            addMessage('系统', '连接错误');
        };
        
        // 连接关闭
        socket.onclose = function() {
            addMessage('系统', '连接已关闭');
        };
        
        // 发送消息
        function sendMessage() {
            const input = document.getElementById('messageInput');
            const message = input.value.trim();
            
            if (message) {
                socket.send(JSON.stringify({
                    type: 'message',
                    content: message,
                    timestamp: new Date().toISOString()
                }));
                input.value = '';
            }
        }
        
        // 显示消息
        function addMessage(sender, text) {
            const msgElement = document.createElement('div');
            msgElement.innerHTML = `<strong>${sender}:</strong> ${text}`;
            messagesDiv.appendChild(msgElement);
            messagesDiv.scrollTop = messagesDiv.scrollHeight;
        }
        
        // 关闭连接(页面卸载时)
        window.addEventListener('beforeunload', function() {
            if (socket.readyState === WebSocket.OPEN) {
                socket.close(1000, '用户离开页面');
            }
        });
    </script>
</body>
</html>

Node.js 服务器端示例

// 使用 ws 库
const WebSocket = require('ws');

// 创建 WebSocket 服务器
const wss = new WebSocket.Server({ port: 8080 });

console.log('WebSocket 服务器启动在 ws://localhost:8080');

// 连接处理
wss.on('connection', function connection(ws) {
    console.log('新客户端连接');
    
    // 发送欢迎消息
    ws.send(JSON.stringify({
        type: 'system',
        message: '欢迎连接到服务器!'
    }));
    
    // 接收客户端消息
    ws.on('message', function incoming(message) {
        console.log('收到消息:', message);
        
        try {
            const data = JSON.parse(message);
            
            // 广播消息给所有客户端
            wss.clients.forEach(function each(client) {
                if (client !== ws && client.readyState === WebSocket.OPEN) {
                    client.send(JSON.stringify({
                        type: 'message',
                        sender: '用户',
                        message: data.content,
                        timestamp: new Date().toISOString()
                    }));
                }
            });
        } catch (error) {
            console.error('消息解析错误:', error);
        }
    });
    
    // 连接关闭
    ws.on('close', function() {
        console.log('客户端断开连接');
    });
    
    // 错误处理
    ws.on('error', function(error) {
        console.error('WebSocket 错误:', error);
    });
});

WebSocket 状态

// 检查连接状态
switch(socket.readyState) {
    case WebSocket.CONNECTING:  // 0 - 连接中
        console.log('连接中...');
        break;
    case WebSocket.OPEN:        // 1 - 已连接
        console.log('已连接');
        break;
    case WebSocket.CLOSING:     // 2 - 关闭中
        console.log('正在关闭...');
        break;
    case WebSocket.CLOSED:      // 3 - 已关闭
        console.log('已关闭');
        break;
}

高级特性

1. 心跳检测

// 心跳检测
let heartbeatInterval;

socket.onopen = function() {
    console.log('连接建立');
    
    // 开始心跳
    heartbeatInterval = setInterval(() => {
        if (socket.readyState === WebSocket.OPEN) {
            socket.send(JSON.stringify({ type: 'ping' }));
        }
    }, 30000);
};

socket.onclose = function() {
    // 清除心跳
    clearInterval(heartbeatInterval);
};

2. 重连机制

class WebSocketClient {
    constructor(url) {
        this.url = url;
        this.socket = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;
    }
    
    connect() {
        this.socket = new WebSocket(this.url);
        
        this.socket.onopen = () => {
            console.log('连接成功');
            this.reconnectAttempts = 0;
        };
        
        this.socket.onclose = (event) => {
            console.log('连接断开,尝试重连...');
            this.reconnect();
        };
        
        this.socket.onerror = (error) => {
            console.error('连接错误:', error);
        };
    }
    
    reconnect() {
        if (this.reconnectAttempts < this.maxReconnectAttempts) {
            this.reconnectAttempts++;
            const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts);
            
            setTimeout(() => {
                console.log(`第 ${this.reconnectAttempts} 次重连`);
                this.connect();
            }, delay);
        } else {
            console.error('重连次数已达上限');
        }
    }
    
    send(data) {
        if (this.socket.readyState === WebSocket.OPEN) {
            this.socket.send(data);
        }
    }
}

3. 二进制数据传输

// 发送二进制数据
socket.onopen = function() {
    // 发送 ArrayBuffer
    const buffer = new ArrayBuffer(4);
    const view = new Uint8Array(buffer);
    view[0] = 1;
    view[1] = 2;
    view[2] = 3;
    view[3] = 4;
    
    socket.send(buffer);
    
    // 发送 Blob
    const blob = new Blob(['Hello'], { type: 'text/plain' });
    socket.send(blob);
};

// 接收二进制数据
socket.binaryType = 'arraybuffer'; // 或 'blob'

socket.onmessage = function(event) {
    if (event.data instanceof ArrayBuffer) {
        // 处理 ArrayBuffer
        const view = new Uint8Array(event.data);
        console.log('收到二进制数据:', view);
    } else {
        // 处理文本数据
        console.log('收到文本数据:', event.data);
    }
};

Spring Boot 中使用 WebSocket

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

基础配置类

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        // 消息代理前缀
        config.enableSimpleBroker("/topic", "/queue");
        // 应用目的地前缀
        config.setApplicationDestinationPrefixes("/app");
        // 用户目的地前缀(一对一消息)
        config.setUserDestinationPrefix("/user");
    }
    
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // 注册 WebSocket 端点
        registry.addEndpoint("/ws")
                .setAllowedOriginPatterns("*")
                .withSockJS();  // 支持 SockJS 降级
        
        // 也可以添加多个端点
        registry.addEndpoint("/ws-native")
                .setAllowedOriginPatterns("*");
    }
    
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        // 配置传输限制
        registration.setMessageSizeLimit(128 * 1024); // 消息大小限制 128KB
        registration.setSendTimeLimit(20 * 1000);    // 发送超时 20秒
        registration.setSendBufferSizeLimit(512 * 1024); // 发送缓冲区限制 512KB
    }
}

控制器示例

@Controller
public class WebSocketController {
    
    // 注入消息模板
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    /**
     * 处理客户端发送的消息
     * 目的地:/app/chat
     */
    @MessageMapping("/chat")
    @SendTo("/topic/messages")
    public ChatMessage handleMessage(ChatMessage message) {
        message.setTimestamp(new Date());
        System.out.println("收到消息: " + message.getContent());
        return message;
    }
    
    /**
     * 发送广播消息
     */
    @GetMapping("/broadcast")
    public void broadcast(String content) {
        ChatMessage message = new ChatMessage();
        message.setContent(content);
        message.setSender("系统");
        message.setTimestamp(new Date());
        
        // 发送到 /topic/messages
        messagingTemplate.convertAndSend("/topic/messages", message);
    }
    
    /**
     * 发送点对点消息
     */
    @GetMapping("/sendToUser")
    public void sendToUser(String userId, String content) {
        ChatMessage message = new ChatMessage();
        message.setContent(content);
        message.setSender("管理员");
        message.setTimestamp(new Date());
        
        // 发送给指定用户:/user/{userId}/queue/messages
        messagingTemplate.convertAndSendToUser(
            userId, 
            "/queue/messages", 
            message
        );
    }
}

// 消息实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {
    private String sender;
    private String content;
    private Date timestamp;
}

连接拦截器

@Component
public class WebSocketInterceptor extends ChannelInterceptorAdapter {
    
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = 
            MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            // 连接建立时处理
            String token = accessor.getFirstNativeHeader("token");
            // 验证 token...
            System.out.println("用户连接: " + accessor.getSessionId());
        } else if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            // 连接断开时处理
            System.out.println("用户断开: " + accessor.getSessionId());
        }
        
        return message;
    }
}

原生 Java WebSocket(JSR 356)

注解方式

@ServerEndpoint("/chat/{userId}")
@Component
public class ChatEndpoint {
    
    // 存储所有连接
    private static final Map<String, Session> sessions = new ConcurrentHashMap<>();
    
    // 存储用户ID和session的映射
    private static final Map<String, String> userSessionMap = new ConcurrentHashMap<>();
    
    /**
     * 连接建立时调用
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        System.out.println("连接建立: " + session.getId() + ", 用户: " + userId);
        
        // 保存连接
        sessions.put(session.getId(), session);
        userSessionMap.put(userId, session.getId());
        
        // 通知其他用户有新用户上线
        broadcast("系统", "用户 " + userId + " 上线了");
    }
    
    /**
     * 收到消息时调用
     */
    @OnMessage
    public void onMessage(String message, Session session, 
                         @PathParam("userId") String userId) {
        System.out.println("收到消息: " + message + " from: " + userId);
        
        try {
            // 解析消息
            JSONObject json = new JSONObject(message);
            String content = json.getString("content");
            String toUserId = json.optString("to", null);
            
            if (toUserId != null) {
                // 私聊消息
                sendToUser(userId, toUserId, content);
            } else {
                // 群发消息
                broadcast(userId, content);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 连接关闭时调用
     */
    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        System.out.println("连接关闭: " + session.getId());
        
        // 移除连接
        sessions.remove(session.getId());
        userSessionMap.remove(userId);
        
        // 通知其他用户
        broadcast("系统", "用户 " + userId + " 下线了");
    }
    
    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        System.out.println("连接错误: " + session.getId());
        error.printStackTrace();
    }
    
    /**
     * 广播消息给所有用户
     */
    private void broadcast(String sender, String content) {
        JSONObject message = new JSONObject();
        message.put("sender", sender);
        message.put("content", content);
        message.put("timestamp", System.currentTimeMillis());
        message.put("type", "broadcast");
        
        // 发送给所有连接的客户端
        for (Session session : sessions.values()) {
            if (session.isOpen()) {
                try {
                    session.getAsyncRemote().sendText(message.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发送私聊消息
     */
    private void sendToUser(String fromUserId, String toUserId, String content) {
        String toSessionId = userSessionMap.get(toUserId);
        if (toSessionId != null) {
            Session toSession = sessions.get(toSessionId);
            if (toSession != null && toSession.isOpen()) {
                try {
                    JSONObject message = new JSONObject();
                    message.put("sender", fromUserId);
                    message.put("content", content);
                    message.put("timestamp", System.currentTimeMillis());
                    message.put("type", "private");
                    
                    toSession.getAsyncRemote().sendText(message.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

编程方式(继承 Endpoint 类)

@ServerEndpoint("/game")
public class GameEndpoint extends Endpoint {
    
    private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
    
    @Override
    public void onOpen(Session session, EndpointConfig config) {
        System.out.println("新连接: " + session.getId());
        sessions.add(session);
        
        // 添加消息处理器
        session.addMessageHandler(new MessageHandler.Whole<String>() {
            @Override
            public void onMessage(String message) {
                System.out.println("收到: " + message);
                // 处理游戏逻辑
                handleGameMessage(session, message);
            }
        });
        
        // 发送欢迎消息
        try {
            JSONObject welcome = new JSONObject();
            welcome.put("type", "welcome");
            welcome.put("message", "欢迎加入游戏!");
            welcome.put("sessionId", session.getId());
            session.getBasicRemote().sendText(welcome.toString());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void onClose(Session session, CloseReason closeReason) {
        System.out.println("连接关闭: " + session.getId());
        sessions.remove(session);
        
        // 通知其他玩家
        broadcastPlayerLeft(session.getId());
    }
    
    @Override
    public void onError(Session session, Throwable thr) {
        System.err.println("连接错误: " + session.getId());
        thr.printStackTrace();
    }
    
    private void handleGameMessage(Session session, String message) {
        try {
            JSONObject json = new JSONObject(message);
            String type = json.getString("type");
            
            switch (type) {
                case "move":
                    // 处理移动
                    handlePlayerMove(session, json);
                    break;
                case "chat":
                    // 处理聊天
                    handleChatMessage(session, json);
                    break;
                default:
                    System.out.println("未知消息类型: " + type);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private void handlePlayerMove(Session session, JSONObject moveData) {
        // 处理玩家移动逻辑
        // 广播给所有玩家
        broadcastGameUpdate(moveData);
    }
    
    private void handleChatMessage(Session session, JSONObject chatData) {
        // 广播聊天消息
        JSONObject broadcastMsg = new JSONObject();
        broadcastMsg.put("type", "chat");
        broadcastMsg.put("sender", session.getId());
        broadcastMsg.put("message", chatData.getString("message"));
        broadcastMsg.put("timestamp", System.currentTimeMillis());
        
        broadcast(broadcastMsg.toString());
    }
    
    private void broadcast(String message) {
        synchronized (sessions) {
            for (Session s : sessions) {
                if (s.isOpen()) {
                    try {
                        s.getBasicRemote().sendText(message);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

 配置文件

application.yml 配置

spring:
  websocket:
    # WebSocket 配置
    enabled: true
    
server:
  # 服务器配置
  port: 8080
  servlet:
    context-path: /api
  
# 自定义配置
websocket:
  max-sessions: 1000
  heartbeat-interval: 30000
  max-message-size: 128KB

心跳检测和连接管理

@Component
public class WebSocketHeartbeat {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    @PostConstruct
    public void init() {
        // 每30秒发送一次心跳
        scheduler.scheduleAtFixedRate(() -> {
            try {
                messagingTemplate.convertAndSend("/topic/heartbeat", 
                    Map.of("timestamp", System.currentTimeMillis(), 
                           "type", "heartbeat"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    @PreDestroy
    public void destroy() {
        scheduler.shutdown();
    }
}

消息编码器/解码器

// 自定义消息编解码器
@Component
public class ChatMessageConverter implements MessageConverter {
    
    @Override
    public Message<?> toMessage(Object payload, MessageHeaders headers) {
        if (payload instanceof ChatMessage) {
            ChatMessage msg = (ChatMessage) payload;
            byte[] bytes = serializeMessage(msg);
            return MessageBuilder.createMessage(bytes, headers);
        }
        return null;
    }
    
    @Override
    public Object fromMessage(Message<?> message, Class<?> targetClass) {
        if (targetClass == ChatMessage.class) {
            byte[] bytes = (byte[]) message.getPayload();
            return deserializeMessage(bytes);
        }
        return null;
    }
    
    private byte[] serializeMessage(ChatMessage message) {
        try {
            return new ObjectMapper().writeValueAsBytes(message);
        } catch (Exception e) {
            throw new RuntimeException("序列化失败", e);
        }
    }
    
    private ChatMessage deserializeMessage(byte[] bytes) {
        try {
            return new ObjectMapper().readValue(bytes, ChatMessage.class);
        } catch (Exception e) {
            throw new RuntimeException("反序列化失败", e);
        }
    }
}

集群支持

@Configuration
@EnableRedisRepositories
public class RedisConfig {
    
    @Bean
    public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        return template;
    }
}

// Redis 广播消息
@Component
public class RedisMessagePublisher {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    public void publish(String channel, Object message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

@Component
public class RedisMessageSubscriber implements MessageListener {
    
    @Autowired
    private SimpMessagingTemplate messagingTemplate;
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 处理从 Redis 收到的消息
        // 转发给 WebSocket 客户端
        String channel = new String(pattern);
        String msg = new String(message.getBody());
        
        messagingTemplate.convertAndSend("/topic/" + channel, msg);
    }
}

Spring Boot 的 STOMP 实现更加完整和易于使用,而原生 WebSocket 则更加灵活。


文章来源:https://www.cnblogs.com/syf0824/p/19368779
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云