添加pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
详细代码
@Component
@ServerEndpoint("/websocket/broadcast/{userName}")
@Slf4j
public class BroadcastWebSocketHandler {
/**
* 如果不允许相同用户重复推送此处key 改为 userid
*/
private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, String> userNameMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
log.info("BroadcastWebSocketHandler 加载完毕");
}
@OnOpen
public void onOpen(Session session, @PathParam("userName") String userName) {
sessionMap.put(session.getId(), session);
userNameMap.put(session.getId(), userName.toLowerCase());
log.info("新用户上线,sessionId:{} userName:{},当前登陆人数:{}", session.getId(),userName, sessionMap.size());
sendMessageOnOpen(session, userName);
}
@OnClose
public void onClose(Session session, @PathParam("userName") String userName) {
sessionMap.remove(session.getId());
userNameMap.remove(session.getId());
log.info("用户下线,sessionId:{} userName:{},当前登陆人数:{}", session.getId(),userName, sessionMap.size());
}
@OnError
public void onError(Session session, Throwable throwable) {
log.error(throwable.getMessage(), throwable);
}
/**
* 建接收到的消息广播给客户端
*
* @param session
* @param message
* @throws IOException
*/
@OnMessage
public void onMessage(Session session, String message) {
initWenGetMessage();
//心跳消息不广播
if(isHeartBeat(message)){
sendMessage(session,backPing());
}
else{
log.info("发送一条数据,{}", message);
sendMessage(session,message);
}
}
private boolean isHeartBeat(String message){
Gson gson = BeanUtilExt.newGsonInstance();
if(StringHelper.isJSONType(message)){
MessageDto messageDto = gson.fromJson(message,MessageDto.class);
return ObjectUtil.equals("ping",messageDto.getType());
}
return false;
}
private MessageDto backPing(){
MessageDto messageDto = new MessageDto();
messageDto.setType("pong");
return messageDto;
}
private void initWenGetMessage() {
redisUtils = (RedisUtils) SpringBeanUtils.getBean("redisUtils");
redisUtils.delete(MessageConstants.REDIS_KEY_NOTIFIED_USERS);
}
/**
* @param session
* @param userId
*/
private void sendMessageOnOpen(Session session, String userId) {
}
/**
* 发送消息给指定的客户端
*/
private static boolean sendMessage(Session session, String message) {
try {
// log.info("发送一条数据,{}", message);
session.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
return true;
}
private static boolean sendMessage(Session session, MessageDto messageDto) {
Gson gson = BeanUtilExt.newGsonInstance();
return sendMessage(session, gson.toJson(messageDto));
}
public static boolean sendMessages(List<MessageDto> messageDtos){
for (MessageDto messageDto : messageDtos) {
String sendUserName = messageDto.getUserName();
sessionMap.forEach((k, v) -> {
String userName= userNameMap.get(k);
if(ObjectUtil.equals(sendUserName,userName)){
sendMessage(v,messageDto);
}
});
}
return true;
}
/**
* 消息广播
*
* @param messageDto
*/
public static void broadcast(MessageDto messageDto) {
List<String> userIdList = new ArrayList<>();
sessionMap.forEach((k, v) -> {
String userId= userNameMap.get(k);
//首次
messageDto.setSendFlag("0");
boolean b = sendMessage(v, messageDto);
if (b && ObjectUtil.isNotEmpty(userId)) {
userIdList.add(userId);
}
});
/**
* 记录已发送的用户
*/
if (CollUtil.isNotEmpty(userIdList)) {
redisUtils.setList(MessageConstants.REDIS_KEY_NOTIFIED_USERS, userIdList.toArray());
}
}
}
@Configuration
@EnableWebSocket
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
/**
* 通信文本消息和二进制缓存区大小
* 避免对接 第三方 报文过大时,Websocket 1009 错误
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 在此处设置bufferSize
container.setMaxTextMessageBufferSize(10240000);
container.setMaxBinaryMessageBufferSize(10240000);
//8小时工作制
container.setMaxSessionIdleTimeout(8* 60 * 60000L);
return container;
}
}
@Slf4j
@Configuration
public class RedisMessageListenerConfig {
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
private final int poolSize = 20;
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, RedisProperties redisProperties,
@Qualifier("messageReceiver") MessageListenerAdapter listenerAdapter
) {
log.info("初始化消息监听RedisMessageListenerContainer");
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic( redisProperties.getKeyPrefix()+MessageConstants.EASAYCODE_MSG_PATTERN_TOPIC));
return container;
}
@Bean("messageReceiver")
public MessageListenerAdapter listenerAdapter( @Qualifier("receiverRedisMessage") ReceiverRedisMessage receiver){
return new MessageListenerAdapter(receiver,"onMessage");
}
@Bean
public ThreadPoolTaskScheduler initMessageScheduler(){
if(threadPoolTaskScheduler != null){
return threadPoolTaskScheduler;
}
threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize( poolSize );
threadPoolTaskScheduler.setRejectedExecutionHandler( new MessageTaskRejectedPolicy() );
threadPoolTaskScheduler.setThreadFactory( new MessageTaskThreadFactory() );
return threadPoolTaskScheduler;
}
}
@Component
@Slf4j
public class ReceiverRedisMessage implements MessageListener {
@Autowired
RedisUtils redisUtils;
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("监听到redis广播消息:{}", message);
byte[] body = message.getBody();
if (body.length > 0) {
MessageDto messageDto = str2MessageDto(body);
//缓存消息
redisUtils.setWithExpireTime(MessageConstants.REDIS_KEY_MSG, messageDto, 60 * 60 * 24 * 7 * 2);
BroadcastWebSocketHandler.broadcast(messageDto);
}
}
private MessageDto str2MessageDto(byte[] body){
String msg = StringHelper.removeQuotes(StringEscapeUtils.unescapeJava(new String(body, StandardCharsets.UTF_8)));
log.info("广播消息msg:{}",msg);
MessageDto messageDto = null;
Gson gson = BeanUtilExt.newGsonInstance();
if(StringHelper.isJSONType(msg)){
messageDto = gson.fromJson(msg,MessageDto.class);
} else {
messageDto = new MessageDto();
messageDto.setData(msg);
messageDto.setSendFlag("0");
}
return messageDto;
}
}
@Data
public class MessageDto {
private String userId;
private String data;
/**
* 是否已发送过
*/
private String sendFlag ;
private String type;
private String message;
private String userName;
}