diff --git a/spring-boot-demo-websocket-socketio/pom.xml b/spring-boot-demo-websocket-socketio/pom.xml index 96513e2..31e5221 100644 --- a/spring-boot-demo-websocket-socketio/pom.xml +++ b/spring-boot-demo-websocket-socketio/pom.xml @@ -20,7 +20,7 @@ UTF-8 UTF-8 1.8 - 1.7.12 + 1.7.16 @@ -35,11 +35,28 @@ spring-boot-starter-web + + org.springframework.boot + spring-boot-configuration-processor + true + + org.springframework.boot spring-boot-starter-test test + + + cn.hutool + hutool-all + + + + org.projectlombok + lombok + true + diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/DbTemplate.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/DbTemplate.java new file mode 100644 index 0000000..225187a --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/DbTemplate.java @@ -0,0 +1,69 @@ +package com.xkcoding.websocket.socketio.config; + +import cn.hutool.core.collection.CollUtil; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +/** + *

+ * 模拟数据库 + *

+ * + * @package: com.xkcoding.websocket.socketio.config + * @description: 模拟数据库 + * @author: yangkai.shen + * @date: Created in 2018-12-18 19:12 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Component +public class DbTemplate { + /** + * 模拟数据库存储 user_id <-> session_id 的关系 + */ + public static final ConcurrentHashMap DB = new ConcurrentHashMap<>(); + + /** + * 获取所有SessionId + * + * @return SessionId列表 + */ + public List findAll() { + return CollUtil.newArrayList(DB.values()); + } + + /** + * 根据UserId查询SessionId + * + * @param userId 用户id + * @return SessionId + */ + public Optional findByUserId(String userId) { + return Optional.ofNullable(DB.get(userId)); + } + + /** + * 保存/更新 user_id <-> session_id 的关系 + * + * @param userId 用户id + * @param sessionId SessionId + */ + public void save(String userId, UUID sessionId) { + DB.put(userId, sessionId); + } + + /** + * 删除 user_id <-> session_id 的关系 + * + * @param userId 用户id + */ + public void deleteByUserId(String userId) { + DB.remove(userId); + } + +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/Event.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/Event.java new file mode 100644 index 0000000..99ee6a7 --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/Event.java @@ -0,0 +1,47 @@ +package com.xkcoding.websocket.socketio.config; + +/** + *

+ * 事件常量 + *

+ * + * @package: com.xkcoding.websocket.socketio.config + * @description: 事件常量 + * @author: yangkai.shen + * @date: Created in 2018-12-18 19:36 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +public interface Event { + /** + * 聊天事件 + */ + String CHAT = "chat" ; + + /** + * 收到消息 + */ + String CHAT_RECEIVED = "chat_received" ; + + /** + * 拒收消息 + */ + String CHAT_REFUSED = "chat_refused" ; + + /** + * 广播消息 + */ + String BROADCAST = "broadcast" ; + + /** + * 群聊 + */ + String GROUP = "group" ; + + /** + * 加入群聊 + */ + String JOIN = "join" ; + +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/ServerConfig.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/ServerConfig.java new file mode 100644 index 0000000..d61be0d --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/ServerConfig.java @@ -0,0 +1,53 @@ +package com.xkcoding.websocket.socketio.config; + +import cn.hutool.core.util.StrUtil; +import com.corundumstudio.socketio.SocketIOServer; +import com.corundumstudio.socketio.annotation.SpringAnnotationScanner; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + *

+ * 服务器配置 + *

+ * + * @package: com.xkcoding.websocket.socketio.config + * @description: 服务器配置 + * @author: yangkai.shen + * @date: Created in 2018-12-18 16:42 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Configuration +@EnableConfigurationProperties({WsConfig.class}) +public class ServerConfig { + + @Bean + public SocketIOServer server(WsConfig wsConfig) { + com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); + config.setHostname(wsConfig.getHost()); + config.setPort(wsConfig.getPort()); + + //这个listener可以用来进行身份验证 + config.setAuthorizationListener(data -> { + // http://localhost:8081?token=xxxxxxx + // 例如果使用上面的链接进行connect,可以使用如下代码获取用户密码信息,本文不做身份验证 + String token = data.getSingleUrlParam("token"); + // 校验token的合法性,实际业务需要校验token是否过期等等,参考 spring-boot-demo-rbac-security 里的 JwtUtil + // 如果认证不通过会返回一个 Socket.EVENT_CONNECT_ERROR 事件 + return StrUtil.isNotBlank(token); + }); + + return new SocketIOServer(config); + } + + /** + * Spring 扫描自定义注解 + */ + @Bean + public SpringAnnotationScanner springAnnotationScanner(SocketIOServer server) { + return new SpringAnnotationScanner(server); + } +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/WsConfig.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/WsConfig.java new file mode 100644 index 0000000..a94505c --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/config/WsConfig.java @@ -0,0 +1,31 @@ +package com.xkcoding.websocket.socketio.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + *

+ * WebSocket配置类 + *

+ * + * @package: com.xkcoding.websocket.socketio.config + * @description: WebSocket配置类 + * @author: yangkai.shen + * @date: Created in 2018-12-18 16:41 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@ConfigurationProperties(prefix = "ws.server") +@Data +public class WsConfig { + /** + * 端口号 + */ + private Integer port; + + /** + * host + */ + private String host; +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/controller/MessageController.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/controller/MessageController.java new file mode 100644 index 0000000..432430b --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/controller/MessageController.java @@ -0,0 +1,68 @@ +package com.xkcoding.websocket.socketio.controller; + +import cn.hutool.core.lang.Dict; +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; +import com.xkcoding.websocket.socketio.handler.MessageEventHandler; +import com.xkcoding.websocket.socketio.payload.BroadcastMessageRequest; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.lang.reflect.Field; + +/** + *

+ * 消息发送Controller + *

+ * + * @package: com.xkcoding.websocket.socketio.controller + * @description: 消息发送Controller + * @author: yangkai.shen + * @date: Created in 2018-12-18 19:50 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@RestController +@RequestMapping("/send") +@Slf4j +public class MessageController { + @Autowired + private MessageEventHandler messageHandler; + + @PostMapping("/broadcast") + public Dict broadcast(BroadcastMessageRequest message) { + if (isBlank(message)) { + return Dict.create().set("flag", false).set("code", 400).set("message", "参数为空"); + } + messageHandler.sendToBroadcast(message); + return Dict.create().set("flag", true).set("code", 200).set("message", "发送成功"); + } + + /** + * 判断Bean是否为空对象或者空白字符串,空对象表示本身为null或者所有属性都为null + * + * @param bean Bean对象 + * @return 是否为空,true - 空 / false - 非空 + * @since 4.1.10 + */ + private boolean isBlank(Object bean) { + if (null != bean) { + for (Field field : ReflectUtil.getFields(bean.getClass())) { + Object fieldValue = ReflectUtil.getFieldValue(bean, field); + if (null != fieldValue) { + if (fieldValue instanceof String && StrUtil.isNotBlank((String) fieldValue)) { + return false; + } else if (!(fieldValue instanceof String)) { + return false; + } + } + } + } + return true; + } + +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/handler/MessageEventHandler.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/handler/MessageEventHandler.java new file mode 100644 index 0000000..c24ba22 --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/handler/MessageEventHandler.java @@ -0,0 +1,143 @@ +package com.xkcoding.websocket.socketio.handler; + +import com.corundumstudio.socketio.AckRequest; +import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.SocketIOServer; +import com.corundumstudio.socketio.annotation.OnConnect; +import com.corundumstudio.socketio.annotation.OnDisconnect; +import com.corundumstudio.socketio.annotation.OnEvent; +import com.xkcoding.websocket.socketio.config.DbTemplate; +import com.xkcoding.websocket.socketio.config.Event; +import com.xkcoding.websocket.socketio.payload.BroadcastMessageRequest; +import com.xkcoding.websocket.socketio.payload.GroupMessageRequest; +import com.xkcoding.websocket.socketio.payload.SingleMessageRequest; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Optional; +import java.util.UUID; + +/** + *

+ * 消息事件处理 + *

+ * + * @package: com.xkcoding.websocket.socketio.handler + * @description: 消息事件处理 + * @author: yangkai.shen + * @date: Created in 2018-12-18 18:57 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Component +@Slf4j +public class MessageEventHandler { + @Autowired + private SocketIOServer server; + + @Autowired + private DbTemplate dbTemplate; + + /** + * 添加connect事件,当客户端发起连接时调用 + * + * @param client 客户端对象 + */ + @OnConnect + public void onConnect(SocketIOClient client) { + if (client != null) { + String token = client.getHandshakeData().getSingleUrlParam("token"); + // 模拟用户id 和token一致 + String userId = client.getHandshakeData().getSingleUrlParam("token"); + UUID sessionId = client.getSessionId(); + + dbTemplate.save(userId, sessionId); + log.info("连接成功,【token】= {},【sessionId】= {}", token, sessionId); + } else { + log.error("客户端为空"); + } + } + + /** + * 添加disconnect事件,客户端断开连接时调用,刷新客户端信息 + * + * @param client 客户端对象 + */ + @OnDisconnect + public void onDisconnect(SocketIOClient client) { + if (client != null) { + String token = client.getHandshakeData().getSingleUrlParam("token"); + // 模拟用户id 和token一致 + String userId = client.getHandshakeData().getSingleUrlParam("token"); + UUID sessionId = client.getSessionId(); + + dbTemplate.deleteByUserId(userId); + log.info("客户端断开连接,【token】= {},【sessionId】= {}", token, sessionId); + client.disconnect(); + } else { + log.error("客户端为空"); + } + } + + /** + * 加入群聊 + * + * @param client 客户端 + * @param request 请求 + * @param roomId 群聊号 + */ + @OnEvent(value = Event.JOIN) + public void onJoinEvent(SocketIOClient client, AckRequest request, String roomId) { + // 模拟用户id 和token一致 + String userId = client.getHandshakeData().getSingleUrlParam("token"); + log.info("用户:{} 已加入群聊:{}", userId, roomId); + client.joinRoom(roomId); + } + + + @OnEvent(value = Event.CHAT) + public void onChatEvent(SocketIOClient client, AckRequest request, SingleMessageRequest data) { + Optional toUser = dbTemplate.findByUserId(data.getToUid()); + if (toUser.isPresent()) { + log.info("用户 {} 刚刚私信了用户 {}:{}", data.getFromUid(), data.getToUid(), data.getMessage()); + sendToSingle(toUser.get(), data); + } else { + client.sendEvent(Event.CHAT_REFUSED, "对方不在线"); + } + } + + @OnEvent(value = Event.CHAT) + public void onGroupEvent(SocketIOClient client, AckRequest request, GroupMessageRequest data) { + log.info("群号 {} 收到来自 {} 的群聊消息:{}", data.getGroupId(), data.getFromUid(), data.getMessage()); + sendToGroup(data); + } + + /** + * 单聊 + */ + public void sendToSingle(UUID sessionId, SingleMessageRequest message) { + server.getClient(sessionId).sendEvent(Event.CHAT_RECEIVED, message); + } + + /** + * 广播 + */ + public void sendToBroadcast(BroadcastMessageRequest message) { + log.info("系统紧急广播一条通知:{}", message.getMessage()); + for (UUID clientId : dbTemplate.findAll()) { + if (server.getClient(clientId) == null) { + continue; + } + server.getClient(clientId).sendEvent(Event.BROADCAST, message); + } + } + + /** + * 群聊 + */ + public void sendToGroup(GroupMessageRequest message) { + server.getRoomOperations(message.getGroupId()).sendEvent(Event.GROUP, message); + } +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/init/ServerRunner.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/init/ServerRunner.java new file mode 100644 index 0000000..cb548b5 --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/init/ServerRunner.java @@ -0,0 +1,33 @@ +package com.xkcoding.websocket.socketio.init; + +import com.corundumstudio.socketio.SocketIOServer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +/** + *

+ * websocket服务器启动 + *

+ * + * @package: com.xkcoding.websocket.socketio.init + * @description: websocket服务器启动 + * @author: yangkai.shen + * @date: Created in 2018-12-18 17:07 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Component +@Slf4j +public class ServerRunner implements CommandLineRunner { + @Autowired + private SocketIOServer server; + + @Override + public void run(String... args) { + server.start(); + log.info("websocket 服务器启动成功。。。"); + } +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/BroadcastMessageRequest.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/BroadcastMessageRequest.java new file mode 100644 index 0000000..7fe9bb3 --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/BroadcastMessageRequest.java @@ -0,0 +1,24 @@ +package com.xkcoding.websocket.socketio.payload; + +import lombok.Data; + +/** + *

+ * 广播消息载荷 + *

+ * + * @package: com.xkcoding.websocket.socketio.payload + * @description: 广播消息载荷 + * @author: yangkai.shen + * @date: Created in 2018-12-18 20:01 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Data +public class BroadcastMessageRequest { + /** + * 消息内容 + */ + private String message; +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/GroupMessageRequest.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/GroupMessageRequest.java new file mode 100644 index 0000000..5670b41 --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/GroupMessageRequest.java @@ -0,0 +1,34 @@ +package com.xkcoding.websocket.socketio.payload; + +import lombok.Data; + +/** + *

+ * 群聊消息载荷 + *

+ * + * @package: com.xkcoding.websocket.socketio.payload + * @description: 群聊消息载荷 + * @author: yangkai.shen + * @date: Created in 2018-12-18 16:59 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Data +public class GroupMessageRequest { + /** + * 消息发送方用户id + */ + private String fromUid; + + /** + * 群组id + */ + private String groupId; + + /** + * 消息内容 + */ + private String message; +} diff --git a/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/SingleMessageRequest.java b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/SingleMessageRequest.java new file mode 100644 index 0000000..5998b83 --- /dev/null +++ b/spring-boot-demo-websocket-socketio/src/main/java/com/xkcoding/websocket/socketio/payload/SingleMessageRequest.java @@ -0,0 +1,34 @@ +package com.xkcoding.websocket.socketio.payload; + +import lombok.Data; + +/** + *

+ * 私聊消息载荷 + *

+ * + * @package: com.xkcoding.websocket.socketio.payload + * @description: 私聊消息载荷 + * @author: yangkai.shen + * @date: Created in 2018-12-18 17:02 + * @copyright: Copyright (c) 2018 + * @version: V1.0 + * @modified: yangkai.shen + */ +@Data +public class SingleMessageRequest { + /** + * 消息发送方用户id + */ + private String fromUid; + + /** + * 消息接收方用户id + */ + private String toUid; + + /** + * 消息内容 + */ + private String message; +} diff --git a/spring-boot-demo-websocket-socketio/src/main/resources/application.yml b/spring-boot-demo-websocket-socketio/src/main/resources/application.yml index a02fbde..42ccdec 100644 --- a/spring-boot-demo-websocket-socketio/src/main/resources/application.yml +++ b/spring-boot-demo-websocket-socketio/src/main/resources/application.yml @@ -1,4 +1,8 @@ server: port: 8080 servlet: - context-path: /demo \ No newline at end of file + context-path: /demo +ws: + server: + port: 8081 + host: localhost \ No newline at end of file