记一次消息推送业务的探索

记一次消息推送业务的探索

什么是服务端消息推送

服务端消息推送(Push Notification)是一种技术概念,指的是从服务端实时发送信息到客户端的过程。在移动互联网和Web应用中,服务端消息推送被广泛用于提升用户体验、增加用户粘性和活跃度。以下是服务端消息推送的详细解释:

定义

服务端消息推送,简称推送(Push),是指服务器主动向客户端发送信息,而无需客户端显式请求。这种方式使得信息能够实时地到达用户,无需用户手动刷新页面或应用。

实现方式

服务端消息推送的实现方式多种多样,主要包括以下几种:

  1. 短轮询(Short Polling)

    • 客户端定时向服务器发送请求,询问是否有新消息。
    • 优点:实现简单。
    • 缺点:实时性差,服务器资源消耗大。
  2. 长轮询(Long Polling)

    • 客户端向服务器发送请求后,服务器会保持连接,直到有新消息才返回响应并关闭连接。
    • 优点:相比短轮询,实时性更好,资源消耗更少。
    • 缺点:服务器需要保持大量连接,对服务器性能有一定要求。
  3. SSE(Server-Sent Events)

    • SSE是WebSocket的一种轻量级替代方案,使用HTTP协议。
    • 它是单向的,只能由服务器向客户端发送消息。
    • 优点:实现简单,兼容性好。
    • 缺点:不支持双向通信。
  4. WebSocket

    • WebSocket是一种在单个TCP连接上进行全双工通信的协议。
    • 它允许服务器和客户端之间进行双向通信,实时性强。
    • 优点:实时性好,支持双向通信。
    • 缺点:实现相对复杂,对服务器和客户端的性能有一定要求。

应用场景

服务端消息推送在多种应用场景中发挥着重要作用,包括但不限于:

  • 新闻客户端:推送热点新闻和资讯。
  • IM工具:推送聊天消息和通知。
  • 电商应用:推送促销信息和订单状态更新。
  • 企业应用:推送通知和审批流程。

为什么需要服务端消息推送

  • 实时数据更新:在许多应用场景中,用户希望看到的数据能够实时更新,例如实时新闻、股票行情、聊天应用中的消息、社交媒体通知等。消息推送允许服务器在数据发生变化时立即通知客户端,而无需客户端不断轮询服务器以检查更新。
  • 提高用户体验:通过实时推送消息,应用可以提供更加流畅和即时的用户体验。例如,在聊天应用中,用户能够立即收到新消息的通知,而无需手动刷新页面或等待轮询请求的结果。
  • 减少服务器负载:传统的轮询机制(如Ajax轮询)需要客户端定期向服务器发送请求以检查是否有更新。这种方式不仅增加了服务器的处理负担,还可能导致网络资源的浪费。通过消息推送,客户端只在有更新时才接收数据,从而减少了不必要的请求和服务器负载。
  • 支持离线消息:消息推送系统能够处理用户离线时的消息。当用户重新上线时,可以接收到之前错过的所有消息,确保用户不会错过任何重要信息。
  • 提升应用互动性:消息推送允许应用主动与用户互动,例如发送通知、提醒、优惠券等。这种互动性可以增强用户对应用的粘性,提高用户留存率和活跃度。

如何实现服务端消息推送

场景

现有一客户响应平台,需要接入AI能力,在客服客户聊天过程中即时的对用户输入分析处理,推送给客服合适的话术与可能的解决方案。

技术选型

由于用户输入实际为获取的企业微信回调消息,选用与页面双工通信的WebSocket并不合适,且WebSocket在身份认证方面在前端实现有一定难度,遂决定使用SSE技术。

代码实现

SpringMVC已经集成了SSE协议,只需导入SseEmitter类,构造该类对象并在Mapping方法中返回即可,示例代码如下:

@GetMapping("/sse")
    public SseEmitter sseEndpoint(@RequestParam("sessionId")final String sessionId){
        log.info("客户端请求建立SSE连接 session: {}",sessionId);
        final SseEmitter sseEmitter =
                new SseEmitter(TimeUnit.MINUTES.toMillis(30));
        this.sseHandler.handler(sessionId,sseEmitter);
        return sseEmitter;
    }

我这里使用了含timeOut参数的有参构造,设置一个响应写出的超时时间,及时释放连接。
为方便后续扩展,定义了ISSEHandler接口,其包含一个抽象方法handler(final String sessionId,final SseEmitter sseEmitter),对SSEEmitter的具体处理挪至ISSEHandler的实现类中。
此外,考虑到逻辑解耦,需要引入消息队列,于是定义了队列服务接口

public interface IQueueService<T> {
    /**
     * 向队列推入数据
     * @param topic 主题
     * @param t 消息
     * @return
     */
    Boolean push(String topic,T t);

    /**
     * 从队尾取出数据
     * @param topic 主题
     * @return
     */
    T pop(String topic);
}

ISSEHandler简单实现类示例

@Component
@Slf4j
public class SSEHandlerImpl implements ISSEHandler {

    IQueueService<SseEmitter.SseEventBuilder> queueService;
    
    @Autowired
    public void setQueueService(IQueueService<SseEmitter.SseEventBuilder> queueService) {
        this.queueService = queueService;
    }
    @Override
    public void handler(final String sessionId,final SseEmitter sseEmitter) {
        new Thread(()->{
            int count = 0;
            while (true){
                if (count%600 ==0){
                    try {
                        sseEmitter
                                .send(SseEmitter.event()
                                        .name(SSEEventNameEnum.CHAT_STATUS.value)
                                        .comment("keep alive"));
                    } catch (IOException e) {
                        log.error("与客户端连接中断, sessionId:{},线程关闭",sessionId);
                        return;
                    }
                }
                count++;
                final SseEmitter.SseEventBuilder pop = queueService.pop(sessionId);
                if (Objects.isNull(pop)){
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        log.info(e.getMessage());
                    }
                    continue;
                }
                try {
                    sseEmitter.send(pop);
                } catch (IOException e) {
                    log.error("与客户端连接中断, sessionId:{},线程关闭",sessionId);
                    return;
                }
            }
        }).start();
    }
}

IQueueService<T>简单实现类实例

@Service
@Slf4j
public class MemoryQueueServiceImpl implements IQueueService<SseEmitter.SseEventBuilder> {

    //topic做key 队列做value
    private final static ConcurrentHashMap<String, BlockingQueue<SseEmitter.SseEventBuilder>> exchange =
            new ConcurrentHashMap<>(1<<10);
    /**
     * 向队列推入数据
     *
     * @param topic
     * @param sseEventBuilder
     * @return
     */
    @Override
    public Boolean push(String topic, SseEmitter.SseEventBuilder sseEventBuilder) {
        BlockingQueue<SseEmitter.SseEventBuilder> q = this.exchange.get(topic);
        if (Objects.isNull(q)){
            q = new ArrayBlockingQueue<>(1<<9);
            exchange.put(topic, q);
        }
        return q.offer(sseEventBuilder);
    }

    /**
     * 从队尾取出数据
     *
     * @param topic
     * @return
     */
    @Override
    public SseEmitter.SseEventBuilder pop(String topic) {
        final BlockingQueue<SseEmitter.SseEventBuilder> q = exchange.get(topic);
        if (Objects.isNull(q)){
            return null;
        }
        return q.poll();
    }
}

代码分析

上述的代码实现,其特点就在于简单,是最直接的实现方式,连接绑定线程,线程消费内存队列消息再将消息写出至连接。针对我们当前的业务场景特点:单体服务,对实时性要求高、内测用户少(不超过100人)、推送LOSS率有一定容忍度,这个方案已经够用了。
但是,若产品持续推行,扩大用户规模,上述实现就不再适用了,熟悉分布式服务的小伙伴应该一眼就能瞧出其中端倪,SSE处理线程与连接绑定,在用户量增多的情况下会创建过多的线程,无论是线程堆栈的内存消耗,还是上下文切换的cpu消耗都是不利于服务健康运行的,且内存阻塞队列虽然用起来很舒坦,但其在进程间不能共享,且不具备持久化特性,既增大了服务水平扩展的难度,又增加了消息丢失的概率

如何提高消息推送的性能和降低消息丢失的概率

技术选型

一条连接绑定一条线程,这个问题是否很熟悉,没错,就是Linux内核 select,poll,epoll 这些io多路复用技术解决的问题。这个问题可以通过io多路复用来解决。

消息队列,这个问题在市面上有多种开源的解决方案,Kafka、RocketMQ、Redis,由于前两种消息队列解决方案需要引入新的中间件,我们这里选用Redis,在我之前的文章曾写过,Redis 5.0新增加了对标消息队列中间件的结构 Stream,具体就不在这里详谈。

代码实现

基于Redis Stream的队列接口实现
public class RedisQueueServiceImpl implements IQueueService<Map<String,String>> {
    @Value("${mq.sse.group}")
    private String group;
    @Value("${mq.sse.consumer}")
    private String consumer;
    @Value("${mq.sse.batchSize}")
    private Integer batchSize;
    @Value("${mq.sse.timeoutSec}")
    private Integer timeoutSec;

    private final StringRedisTemplate redisTemplate;

    @Autowired
    public RedisQueueServiceImpl (StringRedisTemplate redisTemplate){
        this.redisTemplate = redisTemplate;
    }

    /**
     * 向队列推入数据
     *
     * @param topic
     * @return
     */

    public Boolean push(String topic, Map<String,String> msg) {
        final RecordId add = this.redisTemplate.<String,String>opsForStream().add(topic,msg);
        return Objects.nonNull(add);
    }

    /**
     * 从队尾取出数据
     *
     * @param topic
     * @return
     */

    public Map<String, String> pop(String topic) {
        final List<MapRecord<String, String, String>> read =
                this.redisTemplate
                        .<String,String>opsForStream()
                .read(Consumer.from(this.group,this.consumer),
                        StreamReadOptions.empty().count(this.batchSize)
                                .block(Duration.ofSeconds(this.timeoutSec)),
                        StreamOffset.create(topic, ReadOffset.lastConsumed()));
        final MapRecord<String, String, String> entries = read.get(0);
        return entries.getValue();
    }
}
SSE多路复用器实现
public class SSEMultiplexer implements ISSEHandler , CommandLineRunner {

    private static final String READY_QUEUE = "ready_queue";
    private static final String SESSION_ID_KEY = "sessionId";

    private ConcurrentHashMap<String,SseEmitter> exchange = new ConcurrentHashMap<>();

    private IQueueService<Map<String,String>> queueService;

    @Autowired
    @Qualifier("redisQueueServiceImpl")
    public void setQueueService(IQueueService<Map<String, String>> queueService) {
        this.queueService = queueService;
    }

    @Override
    public void handler(String sessionId, SseEmitter sseEmitter) {
        this.exchange.put(sessionId,sseEmitter);
    }

    @Override
    public void run(String... args) throws Exception {
        new Thread(()->{
           while(true){
               //监听就绪队列
               final Map<String, String> pop = this.queueService.pop(READY_QUEUE);
               //结果为null说明block超时,重新监听
                if (Objects.isNull(pop)){
                    continue;
                }
                final String sessionId = pop.get(SESSION_ID_KEY);
                //如果当前服务实例存在该sessionId对应的发射器则发送给客户端
                this.exchange.computeIfPresent(sessionId,(k,emitter)->{
                    try {
                        emitter.send(SseEmitter.event().data(pop.get("data")).name(pop.get("name")).build());
                    } catch (IOException e) {
                        e.printStackTrace();
                        return null;
                    }
                    return emitter;
                });
           }
        });
    }
}

代码分析

通过使用Redis的Stream结构,我们把消息队列从进程内多线程安全优化为了服务间多进程安全,依靠Redis Stream的消费组-消费者机制,令订阅同一消息队列的不同服务可以消费到完全一致的消息,依靠服务的SSE多路复用器自行甄别消费后是否执行消息推送逻辑。不过经常做服务性能优化,或内存管理的同学应该能看出来,上述多路复用器示例具有内存泄漏的风险,需要增加一块定时释放SSE中断连接的异步处理逻辑,这里就不多做赘述。

Read more

RocketMQ消息的文件组织形式

RocketMQ消息的文件组织形式

RocketMQ文件的组织形式主要围绕消息的高效存储与检索设计,主要包括CommitLog、ConsumeQueue和IndexFile三类文件。以下是对这三类文件组织形式的详细阐述: 1. CommitLog文件 * 作用:CommitLog是消息存储的主体文件,用于存储Producer端写入的消息主体内容。 * 组织形式: * 所有topic的消息都存储在同一个CommitLog文件中,确保消息发送时按顺序写文件,以追求极致的消息存储性能和高吞吐量。 * 单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量。例如,第一个文件名为00000000000000000000,代表起始偏移量为0,文件大小为1G。当第一个文件写满后,第二个文件名为00000000001073741824,以此类推。 * 存储内容:消息内容不是定长的,每条消息在CommitLog中的存储结构包括消息长度、消息体、消息属性等。 2. ConsumeQueue文件 * 作用:ConsumeQueue是消息消费队列文件,主要用于提高消息消费的性

By Zhewen Cao
Redis Stream:构建高效、可靠的消息队列新选择

Redis Stream:构建高效、可靠的消息队列新选择

引言 随着分布式系统的日益复杂,消息队列作为一种重要的中间件,在解决系统间异步通信、负载均衡、数据缓冲等方面发挥着不可替代的作用。Redis,作为一个高性能的键值存储系统,在5.0版本中引入了Stream这一新的数据结构,为构建高效、可靠的消息队列提供了新的选择。本文将深入探讨Redis Stream的架构、特性及其在消息队列中的应用。 Redis Stream概述 Redis Stream是Redis 5.0版本引入的一种新的数据结构,它提供了一种持久化的、可查询的、可扩展的消息队列服务。Stream类型的数据结构类似于一个日志系统,数据被添加到Stream的末尾,并且每个数据都会被分配一个唯一的序列号(Entry ID),这个序列号是按照时间顺序递增的。这使得Stream类型非常适合用于实现消息队列、事件驱动的系统、数据流处理等场景。 Stream的底层结构 Redis Stream的底层结构主要由基数树(Radix Tree)和Listpack组成。基数树用于索引Listpack,而Listpack用于存储Stream Entry。每个Stream Ent

By Zhewen Cao
MQTT协议帧结构解析

MQTT协议帧结构解析

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模式的轻量级消息传输协议,广泛应用于物联网(IoT)、移动应用等领域。MQTT的报文帧结构是其通信的基础,主要由三部分组成:固定报头(Fixed Header)、可变报头(Variable Header)和有效载荷(Payload)。以下是对这三部分的详细解析: 1. 固定报头(Fixed Header) 固定报头是MQTT报文帧的开始部分,每个MQTT报文都必须包含固定报头。它占据报文帧的前两个字节,具体结构如下: * 报文类型(4位):第一个字节的前4位(7-4位)用于标识报文类型,MQTT协议定义了16种报文类型,但并非所有类型都已被使用或定义。常见的报文类型包括CONNECT(连接服务器)、CONNACK(连接确认)、PUBLISH(发布消息)、PUBACK(发布确认)、SUBSCRIBE(订阅主题)、SUBACK(订阅确认)等。 * 标志位(

By Zhewen Cao
MQTT协议技术解析与应用场景探索

MQTT协议技术解析与应用场景探索

引言 在物联网(IoT)和机器对机器(M2M)通信日益普及的今天,MQTT(Message Queuing Telemetry Transport)协议以其轻量级、高效和可靠的特点,成为了连接远程设备、实现实时消息传输的重要工具。本文将深入解析MQTT协议的核心特性,并探讨其在多个领域的应用场景。 MQTT协议概述 MQTT(消息队列遥测传输)是一种基于发布/订阅模式的轻量级通信协议,由IBM在1999年发布。它构建在TCP/IP协议之上,特别适用于硬件性能有限、网络状况不佳的远程设备。MQTT的协议设计简单、数据包头部小,这使得它在低带宽、高延迟的网络环境中运行效率极高。 核心特性 1. 发布/订阅模式:MQTT采用发布/订阅模式,消息不是直接由发送者发送到接收者,而是通过MQTT服务器(也称为MQTT代理或Broker)分发。这种模式解除了应用程序之间的耦合,提高了系统的灵活性和可扩展性。 2. 低开销与高效:MQTT协议的数据包头部非常小(固定长度为2字节),并且协议交换最小化,这

By Zhewen Cao