记一次消息推送业务的探索
什么是服务端消息推送
服务端消息推送(Push Notification)是一种技术概念,指的是从服务端实时发送信息到客户端的过程。在移动互联网和Web应用中,服务端消息推送被广泛用于提升用户体验、增加用户粘性和活跃度。以下是服务端消息推送的详细解释:
定义
服务端消息推送,简称推送(Push),是指服务器主动向客户端发送信息,而无需客户端显式请求。这种方式使得信息能够实时地到达用户,无需用户手动刷新页面或应用。
实现方式
服务端消息推送的实现方式多种多样,主要包括以下几种:
-
短轮询(Short Polling):
- 客户端定时向服务器发送请求,询问是否有新消息。
- 优点:实现简单。
- 缺点:实时性差,服务器资源消耗大。
-
长轮询(Long Polling):
- 客户端向服务器发送请求后,服务器会保持连接,直到有新消息才返回响应并关闭连接。
- 优点:相比短轮询,实时性更好,资源消耗更少。
- 缺点:服务器需要保持大量连接,对服务器性能有一定要求。
-
SSE(Server-Sent Events):
- SSE是WebSocket的一种轻量级替代方案,使用HTTP协议。
- 它是单向的,只能由服务器向客户端发送消息。
- 优点:实现简单,兼容性好。
- 缺点:不支持双向通信。
-
WebSocket:
- WebSocket是一种在单个TCP连接上进行全双工通信的协议。
- 它允许服务器和客户端之间进行双向通信,实时性强。
- 优点:实时性好,支持双向通信。
- 缺点:实现相对复杂,对服务器和客户端的性能有一定要求。
应用场景
服务端消息推送在多种应用场景中发挥着重要作用,包括但不限于:
- 新闻客户端:推送热点新闻和资讯。
- IM工具:推送聊天消息和通知。
- 电商应用:推送促销信息和订单状态更新。
- 企业应用:推送通知和审批流程。
为什么需要服务端消息推送
- 实时数据更新:在许多应用场景中,用户希望看到的数据能够实时更新,例如实时新闻、股票行情、聊天应用中的消息、社交媒体通知等。消息推送允许服务器在数据发生变化时立即通知客户端,而无需客户端不断轮询服务器以检查更新。
- 提高用户体验:通过实时推送消息,应用可以提供更加流畅和即时的用户体验。例如,在聊天应用中,用户能够立即收到新消息的通知,而无需手动刷新页面或等待轮询请求的结果。
- 减少服务器负载:传统的轮询机制(如Ajax轮询)需要客户端定期向服务器发送请求以检查是否有更新。这种方式不仅增加了服务器的处理负担,还可能导致网络资源的浪费。通过消息推送,客户端只在有更新时才接收数据,从而减少了不必要的请求和服务器负载。
- 支持离线消息:消息推送系统能够处理用户离线时的消息。当用户重新上线时,可以接收到之前错过的所有消息,确保用户不会错过任何重要信息。
- 提升应用互动性:消息推送允许应用主动与用户互动,例如发送通知、提醒、优惠券等。这种互动性可以增强用户对应用的粘性,提高用户留存率和活跃度。
如何实现服务端消息推送
场景
现有一客户响应平台,需要接入AI能力,在客服与客户聊天过程中即时的对用户输入分析处理,推送给客服合适的话术与可能的解决方案。
技术选型
由于用户输入实际为获取的企业微信回调消息,选用与页面双工通信的WebSocket并不合适,且WebSocket在身份认证方面在前端实现有一定难度,遂决定使用SSE技术。
代码实现
SpringMVC已经集成了SSE协议,只需导入SseEmitter
类,构造该类对象并在Mapping方法中返回即可,示例代码如下:
@GetMapping("/sse")
public SseEmitter sseEndpoint(@RequestParam("sessionId")final String sessionId){
log.info("客户端请求建立SSE连接 session: {}",sessionId);
final SseEmitter sseEmitter =
new SseEmitter(TimeUnit.MINUTES.toMillis(30));
this.sseHandler.handler(sessionId,sseEmitter);
return sseEmitter;
}
我这里使用了含timeOut参数的有参构造,设置一个响应写出的超时时间,及时释放连接。
为方便后续扩展,定义了ISSEHandler
接口,其包含一个抽象方法handler(final String sessionId,final SseEmitter sseEmitter)
,对SSEEmitter的具体处理挪至ISSEHandler
的实现类中。
此外,考虑到逻辑解耦,需要引入消息队列,于是定义了队列服务接口
public interface IQueueService<T> {
/**
* 向队列推入数据
* @param topic 主题
* @param t 消息
* @return
*/
Boolean push(String topic,T t);
/**
* 从队尾取出数据
* @param topic 主题
* @return
*/
T pop(String topic);
}
ISSEHandler
简单实现类示例
@Component
@Slf4j
public class SSEHandlerImpl implements ISSEHandler {
IQueueService<SseEmitter.SseEventBuilder> queueService;
@Autowired
public void setQueueService(IQueueService<SseEmitter.SseEventBuilder> queueService) {
this.queueService = queueService;
}
@Override
public void handler(final String sessionId,final SseEmitter sseEmitter) {
new Thread(()->{
int count = 0;
while (true){
if (count%600 ==0){
try {
sseEmitter
.send(SseEmitter.event()
.name(SSEEventNameEnum.CHAT_STATUS.value)
.comment("keep alive"));
} catch (IOException e) {
log.error("与客户端连接中断, sessionId:{},线程关闭",sessionId);
return;
}
}
count++;
final SseEmitter.SseEventBuilder pop = queueService.pop(sessionId);
if (Objects.isNull(pop)){
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
log.info(e.getMessage());
}
continue;
}
try {
sseEmitter.send(pop);
} catch (IOException e) {
log.error("与客户端连接中断, sessionId:{},线程关闭",sessionId);
return;
}
}
}).start();
}
}
IQueueService<T>
简单实现类实例
@Service
@Slf4j
public class MemoryQueueServiceImpl implements IQueueService<SseEmitter.SseEventBuilder> {
//topic做key 队列做value
private final static ConcurrentHashMap<String, BlockingQueue<SseEmitter.SseEventBuilder>> exchange =
new ConcurrentHashMap<>(1<<10);
/**
* 向队列推入数据
*
* @param topic
* @param sseEventBuilder
* @return
*/
@Override
public Boolean push(String topic, SseEmitter.SseEventBuilder sseEventBuilder) {
BlockingQueue<SseEmitter.SseEventBuilder> q = this.exchange.get(topic);
if (Objects.isNull(q)){
q = new ArrayBlockingQueue<>(1<<9);
exchange.put(topic, q);
}
return q.offer(sseEventBuilder);
}
/**
* 从队尾取出数据
*
* @param topic
* @return
*/
@Override
public SseEmitter.SseEventBuilder pop(String topic) {
final BlockingQueue<SseEmitter.SseEventBuilder> q = exchange.get(topic);
if (Objects.isNull(q)){
return null;
}
return q.poll();
}
}
代码分析
上述的代码实现,其特点就在于简单,是最直接的实现方式,连接绑定线程,线程消费内存队列消息再将消息写出至连接。针对我们当前的业务场景特点:单体服务,对实时性要求高、内测用户少(不超过100人)、推送LOSS率有一定容忍度,这个方案已经够用了。
但是,若产品持续推行,扩大用户规模,上述实现就不再适用了,熟悉分布式服务的小伙伴应该一眼就能瞧出其中端倪,SSE处理线程与连接绑定,在用户量增多的情况下会创建过多的线程,无论是线程堆栈的内存消耗,还是上下文切换的cpu消耗都是不利于服务健康运行的,且内存阻塞队列虽然用起来很舒坦,但其在进程间不能共享,且不具备持久化特性,既增大了服务水平扩展的难度,又增加了消息丢失的概率
如何提高消息推送的性能和降低消息丢失的概率
技术选型
一条连接绑定一条线程,这个问题是否很熟悉,没错,就是Linux内核 select,poll,epoll 这些io多路复用技术解决的问题。这个问题可以通过io多路复用来解决。
消息队列,这个问题在市面上有多种开源的解决方案,Kafka、RocketMQ、Redis,由于前两种消息队列解决方案需要引入新的中间件,我们这里选用Redis,在我之前的文章曾写过,Redis 5.0新增加了对标消息队列中间件的结构 Stream,具体就不在这里详谈。
代码实现
基于Redis Stream的队列接口实现
public class RedisQueueServiceImpl implements IQueueService<Map<String,String>> {
@Value("${mq.sse.group}")
private String group;
@Value("${mq.sse.consumer}")
private String consumer;
@Value("${mq.sse.batchSize}")
private Integer batchSize;
@Value("${mq.sse.timeoutSec}")
private Integer timeoutSec;
private final StringRedisTemplate redisTemplate;
@Autowired
public RedisQueueServiceImpl (StringRedisTemplate redisTemplate){
this.redisTemplate = redisTemplate;
}
/**
* 向队列推入数据
*
* @param topic
* @return
*/
public Boolean push(String topic, Map<String,String> msg) {
final RecordId add = this.redisTemplate.<String,String>opsForStream().add(topic,msg);
return Objects.nonNull(add);
}
/**
* 从队尾取出数据
*
* @param topic
* @return
*/
public Map<String, String> pop(String topic) {
final List<MapRecord<String, String, String>> read =
this.redisTemplate
.<String,String>opsForStream()
.read(Consumer.from(this.group,this.consumer),
StreamReadOptions.empty().count(this.batchSize)
.block(Duration.ofSeconds(this.timeoutSec)),
StreamOffset.create(topic, ReadOffset.lastConsumed()));
final MapRecord<String, String, String> entries = read.get(0);
return entries.getValue();
}
}
SSE多路复用器实现
public class SSEMultiplexer implements ISSEHandler , CommandLineRunner {
private static final String READY_QUEUE = "ready_queue";
private static final String SESSION_ID_KEY = "sessionId";
private ConcurrentHashMap<String,SseEmitter> exchange = new ConcurrentHashMap<>();
private IQueueService<Map<String,String>> queueService;
@Autowired
@Qualifier("redisQueueServiceImpl")
public void setQueueService(IQueueService<Map<String, String>> queueService) {
this.queueService = queueService;
}
@Override
public void handler(String sessionId, SseEmitter sseEmitter) {
this.exchange.put(sessionId,sseEmitter);
}
@Override
public void run(String... args) throws Exception {
new Thread(()->{
while(true){
//监听就绪队列
final Map<String, String> pop = this.queueService.pop(READY_QUEUE);
//结果为null说明block超时,重新监听
if (Objects.isNull(pop)){
continue;
}
final String sessionId = pop.get(SESSION_ID_KEY);
//如果当前服务实例存在该sessionId对应的发射器则发送给客户端
this.exchange.computeIfPresent(sessionId,(k,emitter)->{
try {
emitter.send(SseEmitter.event().data(pop.get("data")).name(pop.get("name")).build());
} catch (IOException e) {
e.printStackTrace();
return null;
}
return emitter;
});
}
});
}
}
代码分析
通过使用Redis的Stream结构,我们把消息队列从进程内多线程安全优化为了服务间多进程安全,依靠Redis Stream的消费组-消费者机制,令订阅同一消息队列的不同服务可以消费到完全一致的消息,依靠服务的SSE多路复用器自行甄别消费后是否执行消息推送逻辑。不过经常做服务性能优化,或内存管理的同学应该能看出来,上述多路复用器示例具有内存泄漏的风险,需要增加一块定时释放SSE中断连接的异步处理逻辑,这里就不多做赘述。