添加pom依赖

<dependency>
   <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

详细代码

@Component
@ServerEndpoint("/websocket/broadcast/{userName}")
@Slf4j
public class BroadcastWebSocketHandler {


    /**
     * 如果不允许相同用户重复推送此处key 改为 userid
     */
    private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, String> userNameMap = new ConcurrentHashMap<>();



    @PostConstruct
    public void init() {

        log.info("BroadcastWebSocketHandler 加载完毕");
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("userName") String userName) {
        sessionMap.put(session.getId(), session);
        userNameMap.put(session.getId(), userName.toLowerCase());
        log.info("新用户上线,sessionId:{} userName:{},当前登陆人数:{}", session.getId(),userName, sessionMap.size());
        sendMessageOnOpen(session, userName);
    }

    @OnClose
    public void onClose(Session session, @PathParam("userName") String userName) {
        sessionMap.remove(session.getId());
        userNameMap.remove(session.getId());
        log.info("用户下线,sessionId:{} userName:{},当前登陆人数:{}", session.getId(),userName, sessionMap.size());
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        log.error(throwable.getMessage(), throwable);
    }

    /**
     * 建接收到的消息广播给客户端
     *
     * @param session
     * @param message
     * @throws IOException
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        initWenGetMessage();
        //心跳消息不广播
        if(isHeartBeat(message)){
            sendMessage(session,backPing());
        }
        else{
            log.info("发送一条数据,{}", message);
            sendMessage(session,message);
        }
    }


    private boolean isHeartBeat(String message){
        Gson gson = BeanUtilExt.newGsonInstance();
        if(StringHelper.isJSONType(message)){
            MessageDto messageDto = gson.fromJson(message,MessageDto.class);
            return ObjectUtil.equals("ping",messageDto.getType());
        }
        return false;
    }

    private MessageDto backPing(){
        MessageDto messageDto = new MessageDto();
        messageDto.setType("pong");
        return messageDto;
    }
    private void initWenGetMessage() {
        redisUtils = (RedisUtils) SpringBeanUtils.getBean("redisUtils");
        redisUtils.delete(MessageConstants.REDIS_KEY_NOTIFIED_USERS);

    }

    /**
     * @param session
     * @param userId
     */
    private void sendMessageOnOpen(Session session, String userId) {
    }


    /**
     * 发送消息给指定的客户端
     */

    private static boolean sendMessage(Session session, String message) {
        try {
//            log.info("发送一条数据,{}", message);
            session.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            return false;
        }
        return true;
    }




    private static boolean sendMessage(Session session, MessageDto messageDto) {

        Gson gson = BeanUtilExt.newGsonInstance();
        return sendMessage(session, gson.toJson(messageDto));
    }

    public static boolean sendMessages(List<MessageDto> messageDtos){
        for (MessageDto messageDto : messageDtos) {
            String sendUserName = messageDto.getUserName();
            sessionMap.forEach((k, v) -> {
                String userName= userNameMap.get(k);
                if(ObjectUtil.equals(sendUserName,userName)){
                    sendMessage(v,messageDto);
                }
            });
        }
        return true;
    }

    /**
     * 消息广播
     *
     * @param messageDto
     */
    public static void broadcast(MessageDto messageDto) {

        List<String> userIdList = new ArrayList<>();
        sessionMap.forEach((k, v) -> {
            String userId= userNameMap.get(k);
            //首次
            messageDto.setSendFlag("0");
            boolean b = sendMessage(v, messageDto);
            if (b && ObjectUtil.isNotEmpty(userId)) {
                userIdList.add(userId);
            }
        });

        /**
         * 记录已发送的用户
         */
        if (CollUtil.isNotEmpty(userIdList)) {
            redisUtils.setList(MessageConstants.REDIS_KEY_NOTIFIED_USERS, userIdList.toArray());
        }

    }


}


@Configuration
@EnableWebSocket
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    /**
     * 通信文本消息和二进制缓存区大小
     * 避免对接 第三方 报文过大时,Websocket 1009 错误
     * @return
     */

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        // 在此处设置bufferSize
        container.setMaxTextMessageBufferSize(10240000);
        container.setMaxBinaryMessageBufferSize(10240000);
        //8小时工作制
        container.setMaxSessionIdleTimeout(8* 60 * 60000L);
        return container;
    }



}


@Slf4j
@Configuration
public class RedisMessageListenerConfig   {

    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    private final int poolSize = 20;

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, RedisProperties redisProperties,
                                                   @Qualifier("messageReceiver") MessageListenerAdapter listenerAdapter

    ) {
        log.info("初始化消息监听RedisMessageListenerContainer");
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);


        container.addMessageListener(listenerAdapter, new PatternTopic( redisProperties.getKeyPrefix()+MessageConstants.EASAYCODE_MSG_PATTERN_TOPIC));



        return container;
    }


    @Bean("messageReceiver")
    public MessageListenerAdapter listenerAdapter( @Qualifier("receiverRedisMessage") ReceiverRedisMessage receiver){

        return new MessageListenerAdapter(receiver,"onMessage");

    }


    @Bean
    public ThreadPoolTaskScheduler initMessageScheduler(){
        if(threadPoolTaskScheduler != null){
            return threadPoolTaskScheduler;
        }

        threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize( poolSize );
        threadPoolTaskScheduler.setRejectedExecutionHandler( new MessageTaskRejectedPolicy() );
        threadPoolTaskScheduler.setThreadFactory( new MessageTaskThreadFactory() );

        return threadPoolTaskScheduler;
    }



}


@Component
@Slf4j
public class ReceiverRedisMessage implements MessageListener {
@Autowired
RedisUtils redisUtils;
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("监听到redis广播消息:{}", message);
        byte[] body = message.getBody();
        if (body.length > 0) {
            MessageDto messageDto = str2MessageDto(body);
            //缓存消息
            redisUtils.setWithExpireTime(MessageConstants.REDIS_KEY_MSG, messageDto, 60 * 60 * 24 * 7 * 2);

            BroadcastWebSocketHandler.broadcast(messageDto);

        }

    }

    private MessageDto str2MessageDto(byte[] body){
        String msg =  StringHelper.removeQuotes(StringEscapeUtils.unescapeJava(new String(body, StandardCharsets.UTF_8)));
        log.info("广播消息msg:{}",msg);
        MessageDto messageDto = null;
        Gson gson = BeanUtilExt.newGsonInstance();
        if(StringHelper.isJSONType(msg)){
            messageDto = gson.fromJson(msg,MessageDto.class);
        } else {
            messageDto = new MessageDto();
            messageDto.setData(msg);
            messageDto.setSendFlag("0");
        }
        return messageDto;
    }






}




@Data
public class MessageDto {

    private String userId;

    private String data;
    /**
     * 是否已发送过
     */
    private String sendFlag ;
    private String type;
    private String message;
    private String userName;
}