您现在的位置是:首页 > 技术文章 > 详情<<文章列表阅读 实现一个关于队列的伪需求是一种怎样的体验 队列 rabbitmq Spring 过滤器 wjyuian 2019-10-18 374 1 最近花了一天的时间,在实现一个关于队列扩展的伪需求。就是当队列消息有积累的时候,如果对队列中的消息进行去重,或者说在一定范围内去重。 ### 场景 比如,有一个用于通知``搜索引擎进行职位索引更新``的消息队列,消息内容就是职位主键`positionId`,当职位数据更新频繁的时候,在队列中积累了100个消息,其中有30个消息都是关于同一个职位A的。那么,我的需求就是如何在这些消息被消费前,将其根据职位主键进行去重,也就是说,职位A的索引更新,我只想执行一次,而不是30次。 我之所以把这个需求称为伪需求,因为队列本身就是为了有序进行任务的一个数据结构,即先进先出。而经过去重,本质上就是对于同一个主键,都只执行一次,因此顺序是不能严格保证的,不过在主键上还是保留了大方向上的有序性。 即使是伪需求,对于我们目前的情况来说,还是很有必要的。 #### 需求针对的数据范围 从目前的索引更新日志分析看来,任务高峰时期,在几十毫秒内会有十来个相同的消息(每个消息都是一批职位主键)连续从队列中被消费。 **我自己定义的数据范围的概念就是:当队列中消息积累的某个时刻,针对这些积累的数据进行去重,这些积累的数据就是数据范围,这是一个动态的数据范围。每当进来一个消息,就会针对当前的数据范围进行去重,保证当前数据范围不会存在重复数据。** 发生这种重复的现实原因就是,索引更新队列的通知服务是开放的,公司内部很多其他服务都会通知搜索引擎进行数据更新。比如职位服务在发布、更新职位时,算法服务在进行职位匹配后,数据统计服务在统计职位数据后等等。而且很多时候,由于功能的先后接入以及缺乏相关良好的规划,甚至会出现一些重复通知的情况,在一个调用链中,上下游可能会重复通知索引更新队列很多次。 综上所述,在现阶段的实际情况下,实现前面提出的伪需求,应该是解决这一问题的最快速有效的方法。 ### 思考 内事不决问媳妇,外事不决问百度。 遇到这种情况,我的第一选择当然是去搜索,看看java车库中有没有类似功能的轮子。不知是我搜索的关键字不对还是这个需求过于奇葩,百度、谷歌一圈,居然没有发现轮子。怎么办?既然没有搜到轮子,当然就撸起袖子,问老铁了。 #### 加同步锁 我问的第一位老铁就是,有着30年交情的发小,某IT互联网公司技术大拿。上至九十九,下到刚会走...... 啊不,上至前端交互`java`、`go`,下到网络运维`DBA`,乃至世界上最好的语言`PHP`,无一不是精通。 当我想他描述了现状,表达了诉求,果然,他立马给出方案:针对职位主键级别添加同步锁。通过redis实现分布式锁,锁竞争失败就忽略这条消息。好像这确实解决了并发的问题,因为锁是职位主键级别,所以不管几个线程还是几个服务器都没关系,永远都不存在同一时间对同一个职位执行多次更新服务。 不过,我思考了下,似乎还是存在以下问题: 1. 虽然通过同步锁控制了同一职位发并发更新,但是会带来脏数据问题。比如A、B两个消息是针对同一个职位,当A获得锁并进行数据更新的时候,B消息还未产生,或者说B消息对应的DB更新还未发生。当A对应的更新完成了数据查询阶段,B消息进入队列并被消费,但是由于无法获得同步锁,所以B的数据修改被搜索引擎忽略。当A消费结束之后,搜索引擎的数据是A对应版本,DB的数据可能是B或者之后的版本,因此造成数据不一致问题。 2. 实际上并没有实现我想要的效果,就是在消费前进行去重,压根就不让重复数据在一定的范围内被重复消费。 #### 需求的本质 回过头来再看看这个需求。``在数据范围内去重``,这个数据范围中的数据代表什么?他们代表DB已持久化完毕,所以在数据范围内去重是不会有数据一致性问题的。`加同步锁`会有一致性问题是因为A和B不一定是一个数据范围。 **所以,要实现这个需求,就必须在这个`数据范围`内按规则去重,即使这个`数据范围`是动态的,也要时刻保持在某个时间点的内是去重后的。** ### 分析源码 有了思路,接下来就是先了解目前队列的工作过程,至少要了解``消息到达本地``以及``消息推送给消费者线程``的过程。很简单,看下我们自己封装的mq相关配置文件,发现使用的消息监听器是`org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer`。这个类主要封装了队列消费者相关配置,比如并发消费者数量`concurrentConsumers`、消息预取数量`prefetchCount`、ack模式`acknowledgeMode`等。不过了解这些内容并不是我们此行的目的,所以略过,我们要找到消息是如何到达本地以及如何推送给消费者的。 打开编辑器的`Outline`信息框,忽略那些public方法,直接关注`protected`或者`private`方法。  发现里面有个方法很可疑`doReceiveAndExecute`,点进去一看,没错,就是它了: ```java private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { Channel channel = consumer.getChannel(); for (int i = 0; i < txSize; i++) { logger.trace("Waiting for message from consumer."); // 从指定consumer中获取一个message,然后传递给messageListener // 注意,这里的consumer不是我们定义的队列消费者,而是mq内部的与服务器保持通讯的消费者 Message message = consumer.nextMessage(receiveTimeout); if (message == null) { break; } try { executeListener(channel, message); } catch (ImmediateAcknowledgeAmqpException e) { break; } catch (Throwable ex) { consumer.rollbackOnExceptionIfNecessary(ex); throw ex; } } // 在必要的是,进行commit或者ack return consumer.commitIfNecessary(isChannelLocallyTransacted(channel)); } ``` 从这个方法的入参可以看到,这个`org.springframework.amqp.rabbit.listener.BlockingQueueConsumer`类型的consumer是spring封装的,从命名也可以大致才出来是一个阻塞队列相关的消费者。 > 这是一个内部专用的封装了与服务器连接细节的消费者,它有自己的生命周期,比如开始与结束。 #### BlockingQueueConsumer 既然说到`BlockingQueueConsumer`,那么先来看看这个类在`SimpleMessageListenerContainer`中是做什么用的。 前面有提到在构建`SimpleMessageListenerContainer`对象时,有一个参数是`concurrentConsumers`,它是指定队列的最小并发消费者数量。这个参数指定的是consumer数量,这个consumer指的应该是`BlockingQueueConsumer`的数量没错了。那去看下这个参数注入方法`setConcurrentConsumers(final int concurrentConsumers)`。 ```Java public void setConcurrentConsumers(final int concurrentConsumers) { Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); // 限制:独占模式下,消费者并发数只能是1 Assert.isTrue(!this.exclusive || concurrentConsumers == 1, "When the consumer is exclusive, the concurrency must be 1"); if (this.maxConcurrentConsumers != null) { Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers, "'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'"); } // 上面都是一些参数验证 synchronized(consumersMonitor) { if (logger.isDebugEnabled()) { logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers); } // 根据指定的并发数以及当前已设置的并发数,判断是增加还是减少 int delta = this.concurrentConsumers - concurrentConsumers; // 更新设置值 this.concurrentConsumers = concurrentConsumers; // 判断当前container是否是有效,未被shutdown if (isActive() && this.consumers != null) { // 如果delta大于零,则表示本次设置的并发数更小,需要减少并发数,就是加多余的consumer置为无效,即false if (delta > 0) { // consumers参数记录了当前,已经创建的consumer以及他们的状态 // 到这里应该就可以确认,concurrentConsumers代表的就是BlockingQueueConsumer实例的数量 Iterator> entryIterator = consumers.entrySet() .iterator(); while (entryIterator.hasNext() && delta > 0) { Entry entry = entryIterator.next(); if (entry.getValue()) { BlockingQueueConsumer consumer = entry.getKey(); consumer.basicCancel(); // 设置为无效 this.consumers.put(consumer, false); delta--; } } } else { // 如果是负数,则表示要增加并发,也就是BlockingQueueConsumer实例数 addAndStartConsumers(-delta); } } } } ``` 接下来看下`addAndStartConsumers(int delta)`是如何创建`BlockingQueueConsumer`实例的: ```java protected void addAndStartConsumers(int delta) { synchronized (this.consumersMonitor) { if (this.consumers != null) { for (int i = 0; i < delta; i++) { // 标记重点1:创建消费者实例,这个方法跟我们的伪需求有关 BlockingQueueConsumer consumer = createBlockingQueueConsumer(); this.consumers.put(consumer, true); /* 标记重点2:这个类实现了Runnable接口,负责维护消费者的状态,以及 * 由消费者状态影响整个container中维护的消费者并发数,从而决定是否需要对消费者连接池进行扩容或缩减 */ AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); if (logger.isDebugEnabled()) { logger.debug("Starting a new consumer: " + consumer); } this.taskExecutor.execute(processor); // ...... 省略了一些错误处理 } } } } ``` 先来看看重点2,`AsyncMessageProcessingConsumer.run()`的方法实现: ```java @Override public void run() { // 省略了一些代码,队列等数据的声明,队列事务是否开启的处理 boolean continuable = false; while (isActive(this.consumer) || this.consumer.hasDelivery() || continuable) { try { // 重点3,这个就是具体消费消息的方法,只有队列已经消费为空,才会返回false continuable = receiveAndExecute(this.consumer) && !isChannelTransacted(); // 进行一轮消费之后,需要对队列进行维护,只有配置了最大并发数的情况下才会进行 if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) { if (continuable) { // 当前消费者的队列不为空,则会考虑是否需要增加消费者实例来提升消费速度 if (isActive(this.consumer)) { consecutiveIdles = 0; if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) { considerAddingAConsumer(); consecutiveMessages = 0; } } } else { // 否则会考虑,是否需要空置消费者实例来节省资源 consecutiveMessages = 0; if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) { considerStoppingAConsumer(this.consumer); consecutiveIdles = 0; } } } } } // ...... 省略一些错误处理 } ``` 上面标注的重点3,对应的实际执行的方法就是一开始提到的`doReceiveAndExecute(BlockingQueueConsumer consumer)`。 所以可以看出来: + 重点1负责创建消费者`createBlockingQueueConsumer`实例 + 重点2创建消费者实例的包装类,负责启动异步线程进行消费者状态维护 + 重点3负责从消费者队列中获取消息,然后进行消费 现在再来看重点1,就是负责初始化消费者实例对象的: ```java protected BlockingQueueConsumer createBlockingQueueConsumer() { BlockingQueueConsumer consumer; String[] queues = getRequiredQueueNames(); // 由于一轮消费的数量是txSize指定的,所以预取数量必须要大于txSize才有效,所以实际的预取数量取了一个较大值 int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize; // 创建一个 BlockingQueueConsumer 实例对象 consumer = new BlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected, this.consumerArgs, this.exclusive, queues); // 设置相关参数,后面关于伪需求的扩展,需要在下面进行设置 if (this.declarationRetries != null) { consumer.setDeclarationRetries(this.declarationRetries); } if (this.failedDeclarationRetryInterval != null) { consumer.setFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval); } if (this.retryDeclarationInterval != null) { consumer.setRetryDeclarationInterval(this.retryDeclarationInterval); } return consumer; } ``` 最后再来看看这个`BlockingQueueConsumer`类,其实从这个类的命名上也能发现些什么,它似乎与java中的阻塞队`BlockingQueue`列命名很像。没错,它持有的属性中最重要的一个就是`BlockingQueue`类型的变量`queue`,不过这个queue的实际上是有序的阻塞队列`LinkedBlockingQueue`类型的。queue负责存储服务端推送过来的消息,然后给重点3中的`consumer.nextMessage`方法提供返回值。 ### 实现 我们再回头看看“需求的本质”那一节提到的内容: > **所以,要实现这个需求,就必须在这个`数据范围`内按规则去重,即使这个`数据范围`是动态的,也要时刻保持在某个时间点的内是去重后的。** 划重点就是: 1. 在`数据范围`内去重 2. 时刻保持`数据范围`内是动态去重的 经过源码分析可以确定,这个`数据范围`就是重点1中提到的`BlockingQueueConsumer`实例,准确的说是它持有的`queue`对象中的数据。因为这是服务器推送的消息的第一个落脚点。 要保证`数据范围`的动态去重,只要保证队列`queue`的入队列(在重点2提到的consumer的内部类`InternalConsumer`中)和出队列(重点3)都进行去重操作即可。 #### 编码扩展 根据上面的讨论,发现需要修改的类主要有`BlockingQueueConsumer`和`SimpleMessageListenerContainer`两个。 考虑一下几点实际情况: 1. 这两个都是`spring`封装的类,我不宜做修改,万一出什么问题,影响太大。 2. 类中有许多final的属性和方法,也不适合进行继承来实现。 3. 为这个伪需求提供可选择性,因为很多场景并没有需要这个功能。 4. 为这个伪需求提供可扩展性,不同类型的消费队列去重的逻辑都不同,所以交由扩展的接口实现来决定。 所以,我直接复制了这两个类,重新命名为`HanldedBlockingQueueConsumer`和`HandledMessageListenerContainer`。并提供了两个接口`IHandledMessageBodyConsumer`和`IBlockingQueueDeliveryFilter`。 当用户自定义的队列消费者需要使用这个过滤功能时,只要实现`IHandledMessageBodyConsumer`接口即可。系统在加载mq配置文件并注入队列消费者的时候,根据其是否实现`IHandledMessageBodyConsumer`接口来判断创建`SimpleMessageListenerContainer`实例还是`HandledMessageListenerContainer`实例。 #### 扩展接口 下面说说这个两个接口的定义。 IHandledMessageBodyConsumer.java,它负责标记是否需要这个伪需求,如果需要,则实现这个接口,返回扩展的container,这个container持有去重逻辑的处理类`IBlockingQueueDeliveryFilter`的实现: ```java public interface IHandledMessageBodyConsumer { // 实现这个接口,需要返回HandledMessageListenerContainer实例,这个实例持有一个 IBlockingQueueDeliveryFilter 接口的实现类 AbstractMessageListenerContainer getHandledMessageListernerContainer(); } ``` 重点说说`IBlockingQueueDeliveryFilter`这个接口类。 ```java public interface IBlockingQueueDeliveryFilter { /** * 在消息数据进入本地阻塞队列之前,进行body数据分析、过滤、去重, * 这个body就是消息实体对象的二进制数组; * * 我们对body数组进行发序列化,然后根据instanceof判断类型,根据具体需求,进行对象转换 * 获得生产者那边的对象之后,在根据具体属性,比如说是我前面提到的职位ID,进行记录, * 只要创建一个JVM级别的静态且线程安全的Set即可 * 这个方法不能返回null,因为body数组是null的消息会被过滤;我们根据Set返回新增的职位ID, * 然后更新对象内容,重新序列化并返回。 **/ byte[] beforeIntoBlockingQueue(byte[] body); // 当这个消息被取出之后,被消费之前,进行数据操作,过滤、去重 // 还是对body进行反序列化并对象转换,将即将消费的职位ID,从Set移除,从而保证Set的正确性 void afterTakeFromBlockingQueue(byte[] body); } ``` 我在`HandledMessageListenerContainer`中新增了一个构造方法,用于: ```java public HandledMessageListenerContainer(IBlockingQueueDeliveryFilter filter) { this.filter = filter; } ``` 这个类持有`IBlockingQueueDeliveryFilter`对象的目的,是在初始化(重点1)`BlockingQueueConsumer`的时候,设置其过滤方法: ```java protected HanldedBlockingQueueConsumer createBlockingQueueConsumer() { HanldedBlockingQueueConsumer consumer; String[] queues = getRequiredQueueNames(); int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize; // 创建了我们自定义的 HanldedBlockingQueueConsumer 实例对象 consumer = new HanldedBlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, cancellationLock, getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected, this.consumerArgs, this.exclusive, queues); // 。。。。。。省略了常规设置 // 进行filter设置 if(this.filter != null) { consumer.setFilter(filter); } return consumer; } ``` 接下来看下`HanldedBlockingQueueConsumer`中创建`InternalConsumer`的地方,将这个filter传递给`InternalConsumer`实例,因为它(handleDelivery方法)负责从服务器接收消息并put到阻塞队列: ```java // HanldedBlockingQueueConsumer.start() 方法中: this.consumer = new InternalConsumer(channel); // 设置filter this.consumer.setDeliveryFilter(filter); ``` 以及`InternalConsumer.handleDelivery`方法: ```java @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if (logger.isDebugEnabled()) { logger.debug("Storing delivery for " + HanldedBlockingQueueConsumer.this); } try { if(deliveryFilter != null) { body = deliveryFilter.beforeIntoBlockingQueue(body); // 这里只做过滤,不做拦截 } queue.put(new Delivery(consumerTag, envelope, properties, body)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } ``` 再看看从队列中返回消息的地方,我们增加一个后置处理方法,调用`com.hunteron.mqkit.consumer.handled.filter.IBlockingQueueDeliveryFilter.afterTakeFromBlockingQueue(byte[])`方法: ```java public Message nextMessage() throws InterruptedException, ShutdownSignalException { logger.trace("Retrieving delivery for " + this); Delivery delivery = queue.take(); processAfter(delivery); return handle(delivery); } public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException { if (logger.isDebugEnabled()) { logger.debug("Retrieving delivery for " + this); } checkShutdown(); if (this.missingQueues.size() > 0) { checkMissingQueues(); } Delivery delivery = queue.poll(timeout, TimeUnit.MILLISECONDS); processAfter(delivery); Message message = handle(delivery); if (message == null && cancelReceived.get()) { throw new ConsumerCancelledException(); } return message; } private void processAfter(Delivery delivery) { if(delivery == null || filter == null) { return ; } try { // 当消息从queue中获取之后,进行处理 filter.afterTakeFromBlockingQueue(delivery.getBody()); } catch (Exception e) { e.printStackTrace(); } } ``` #### 如何使用 首先,按照业务逻辑,实现一下过滤器: ```java // 我的需求还是,基于职位ID进行 数据范围 内去重 public class DefaultBlockingQueueDeliveryFilter implements IBlockingQueueDeliveryFilter { private static final Set filter = Sets.newConcurrentHashSet(); @Override public byte[] beforeIntoBlockingQueue(byte[] body) { try { // 将消息数据进行目标对象反序列化 MessageBody msg = createFromBody(body); // 转换失败,则原路返回 if(msg == null) { return body; } IndexParameter parameter = msg.getBody(); if(!isPositionIndex(parameter)) { return body; } // 去重复 List after = filterAndReturnNotExistIds(parameter.getIds()); // 更新参数值 parameter.setIds(after); // 序列化之后返回 return SerializationUtils.serialize(msg); } catch (Exception e) { return body; } } @SuppressWarnings("unchecked") private MessageBody createFromBody(byte[] body) { if(body == null || body.length == 0) { return null; } Object object = SerializationUtils.deserialize(body); try { MessageBody msg = (MessageBody) object; return msg; } catch (Exception e) { return null; } } @Override public void afterTakeFromBlockingQueue(byte[] body) { try { MessageBody messageBody = createFromBody(body); if(messageBody == null) { return ; } IndexParameter parameter = messageBody.getBody(); if(parameter == null) { return ; } List ids = parameter.getIds(); // 从Set中删除将会被消费的职位ID clearIds(ids); } catch (Exception e) { } } private boolean isPositionIndex(IndexParameter p) { // ...... 省略代码;判断是否是需要去重的消息 return true; } // 进行ID过滤 private List filterAndReturnNotExistIds(List ids) { if(CollectionUtils.isEmpty(ids)) { return ids; } List rs = new ArrayList<>(ids.size() / 2); for(String id : ids) { if(filter.contains(id)) { continue; } rs.add(id); filter.add(id); } return rs; } private void clearIds(List ids) { if(CollectionUtils.isEmpty(ids)) { return ; } filter.removeAll(ids); } } ``` 然后,让队列消费者实现我们的`IHandledMessageBodyConsumer`接口: ```java @Service("searchIndexConsumer") public class SearchIndexConsumer implements IHandledMessageBodyConsumer, IMessageBodyConsumer { // 伪需求对应的container private AbstractMessageListenerContainer container = new HandledMessageListenerContainer(new DefaultBlockingQueueDeliveryFilter()); // 消费者入口 @Override public Object handle(MessageBody messageBody) { // ...... 省略消费者具体业务代码 return null; } @Override public AbstractMessageListenerContainer getHandledMessageListernerContainer() { // 返回 container return container; } } ``` ### 总结 至此,我们的这轮伪需求已经成功实现了。不过它是JVM中,同一个消费者级别的去重,所以我配置了少量的并发,较多的fetechCount,力求在控制并发的前提下,增大我们的`数据范围`,以便获得较好的去重效果,尤其是消息积累的时候。 不过,我觉得其中可能会存在一些隐患或者纰漏,只不过是我没发现。 相关文章 关于搭建MongoDB Replica Set(副本集) MongoDB关于authSchema的认证版本问题 SpringBoot2从零开始(三)—— rabbit MQ 从零开发参数同步框架(五)—— Spring集成 Java网络编程之Netty学习(二)—— 简单RPC实现 Docker学习——创建一个JDK+Tomcat的Solr服务镜像 开发一个简单的集成编译、打包、服务检测、依赖发布的模块 搜索引擎入门——启动第一个Solr应用 搜索引擎入门——启动第一个ElasticSearch单机节点 栏目导航 关于我 不止技术 工程化应用(23) 技术学习/探索(32) 自娱自乐(2) 还有生活 随便写写(1) 娱乐/放松(1) 点击排行 SpringBoot2从零开始(二)——多数据源配置 搜索引擎进阶——IK扩展之动态加载与同义词 从零开发参数同步框架(二)—— 前期准备之工具类 Nginx的nginx.conf配置部分解释 springMVC中controller参数拦截问题处理 Maven项目一键打包、上传、重启服务器 微信小程序深入踩坑总结 微信小程序的搜索高亮、自定义导航条等踩坑记录 标签云 Java(19) 搜索引擎(13) Solr(7) 参数同步(6) SpringBoot(4) ES(3) ElasticSearch(3) JVM(3) Netty(3) Spring(3) mongoDB(3) 设计模式(3) Curator(2) Docker(2) Dubbo(2) 大家推荐 魔神重返战场!厄祭战争的巴巴托斯:第四形态 搜索引擎入门——Solr查询参数详解以及如何使用Java完成对接 来聊一聊这个被淘汰的图片验证码 搜索引擎入门——聊聊schema.xml配置 搜索引擎入门——启动第一个Solr应用 君子性非异也,善假于物也——功能强大的Postman 择其善而从之——我为什么开始学习ElasticSearch 实现一个关于队列的伪需求是一种怎样的体验